diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h index eb014184e4a..9252599bac3 100644 --- a/include/grpcpp/generic/generic_stub.h +++ b/include/grpcpp/generic/generic_stub.h @@ -73,10 +73,15 @@ class GenericStub final { public: explicit experimental_type(GenericStub* stub) : stub_(stub) {} + /// Setup and start a unary call to a named method \a method using + /// \a context and specifying the \a request and \a response buffers. void UnaryCall(ClientContext* context, const grpc::string& method, const ByteBuffer* request, ByteBuffer* response, std::function on_completion); + /// Setup a call to a named method \a method using \a context and tied to + /// \a reactor . Like any other bidi streaming RPC, it will not be activated + /// until StartCall is invoked on its reactor. void PrepareBidiStreamingCall( ClientContext* context, const grpc::string& method, experimental::ClientBidiReactor* reactor); diff --git a/include/grpcpp/impl/codegen/async_generic_service.h b/include/grpcpp/impl/codegen/async_generic_service.h index 46489b135d7..759f6683bf4 100644 --- a/include/grpcpp/impl/codegen/async_generic_service.h +++ b/include/grpcpp/impl/codegen/async_generic_service.h @@ -85,13 +85,23 @@ class AsyncGenericService final { namespace experimental { +/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs +/// invoked on a CallbackGenericService. The API difference relative to +/// ServerBidiReactor is that the argument to OnStarted is a +/// GenericServerContext rather than a ServerContext. All other reaction and +/// operation initiation APIs are the same as ServerBidiReactor. class ServerGenericBidiReactor : public ServerBidiReactor { public: + /// Similar to ServerBidiReactor::OnStarted except for argument type. + /// + /// \param[in] context The context object associated with this RPC. + virtual void OnStarted(GenericServerContext* context) {} + + private: void OnStarted(ServerContext* ctx) final { OnStarted(static_cast(ctx)); } - virtual void OnStarted(GenericServerContext* ctx) {} }; } // namespace experimental @@ -108,10 +118,18 @@ class UnimplementedGenericBidiReactor } // namespace internal namespace experimental { + +/// \a CallbackGenericService is the base class for generic services implemented +/// using the callback API and registered through the ServerBuilder using +/// RegisterCallbackGenericService. class CallbackGenericService { public: CallbackGenericService() {} virtual ~CallbackGenericService() {} + + /// The "method handler" for the generic API. This function should be + /// overridden to return a ServerGenericBidiReactor that implements the + /// application-level interface for this RPC. virtual ServerGenericBidiReactor* CreateReactor() { return new internal::UnimplementedGenericBidiReactor; } diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index 89629c079af..53c57b55f7e 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -157,28 +157,69 @@ class ClientCallbackWriter { } }; -// The user must implement this reactor interface with reactions to each event -// type that gets called by the library. An empty reaction is provided by -// default +// The following classes are the reactor interfaces that are to be implemented +// by the user. They are passed in to the library as an argument to a call on a +// stub (either a codegen-ed call or a generic call). The streaming RPC is +// activated by calling StartCall, possibly after initiating StartRead, +// StartWrite, or AddHold operations on the streaming object. Note that none of +// the classes are pure; all reactions have a default empty reaction so that the +// user class only needs to override those classes that it cares about. + +/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC. template class ClientBidiReactor { public: virtual ~ClientBidiReactor() {} - virtual void OnDone(const Status& s) {} - virtual void OnReadInitialMetadataDone(bool ok) {} - virtual void OnReadDone(bool ok) {} - virtual void OnWriteDone(bool ok) {} - virtual void OnWritesDoneDone(bool ok) {} + /// Activate the RPC and initiate any reads or writes that have been Start'ed + /// before this call. All streaming RPCs issued by the client MUST have + /// StartCall invoked on them (even if they are canceled) as this call is the + /// activation of their lifecycle. void StartCall() { stream_->StartCall(); } + + /// Initiate a read operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[out] resp Where to eventually store the read message. Valid when + /// the library calls OnReadDone void StartRead(Response* resp) { stream_->Read(resp); } + + /// Initiate a write operation (or post it for later initiation if StartCall + /// has not yet been invoked). + /// + /// \param[in] req The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the application + /// regains ownership of msg. void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } + + /// Initiate/post a write operation with specified options. + /// + /// \param[in] req The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the application + /// regains ownership of msg. + /// \param[in] options The WriteOptions to use for writing this message void StartWrite(const Request* req, WriteOptions options) { stream_->Write(req, std::move(options)); } + + /// Initiate/post a write operation with specified options and an indication + /// that this is the last write (like StartWrite and StartWritesDone, merged). + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. + /// + /// \param[in] req The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the application + /// regains ownership of msg. + /// \param[in] options The WriteOptions to use for writing this message void StartWriteLast(const Request* req, WriteOptions options) { StartWrite(req, std::move(options.set_last_message())); } + + /// Indicate that the RPC will have no more write operations. This can only be + /// issued once for a given RPC. This is not required or allowed if + /// StartWriteLast is used since that already has the same implication. + /// Note that calling this means that no more calls to StartWrite, + /// StartWriteLast, or StartWritesDone are allowed. void StartWritesDone() { stream_->WritesDone(); } /// Holds are needed if (and only if) this stream has operations that take @@ -196,14 +237,51 @@ class ClientBidiReactor { /// AddHold or AddMultipleHolds before StartCall. If there is going to be, /// for example, a read-flow and a write-flow taking place outside the /// reactions, then call AddMultipleHolds(2) before StartCall. When the - /// application knows that it won't issue any more Read operations (such as + /// application knows that it won't issue any more read operations (such as /// when a read comes back as not ok), it should issue a RemoveHold(). It /// should also call RemoveHold() again after it does StartWriteLast or - /// StartWritesDone that indicates that there will be no more Write ops. + /// StartWritesDone that indicates that there will be no more write ops. + /// The number of RemoveHold calls must match the total number of AddHold + /// calls plus the number of holds added by AddMultipleHolds. void AddHold() { AddMultipleHolds(1); } void AddMultipleHolds(int holds) { stream_->AddHold(holds); } void RemoveHold() { stream_->RemoveHold(); } + /// Notifies the application that all operations associated with this RPC + /// have completed and provides the RPC status outcome. + /// + /// \param[in] s The status outcome of this RPC + virtual void OnDone(const Status& s) {} + + /// Notifies the application that a read of initial metadata from the + /// server is done. If the application chooses not to implement this method, + /// it can assume that the initial metadata has been read before the first + /// call of OnReadDone or OnDone. + /// + /// \param[in] ok Was the initial metadata read successfully? If false, no + /// further read-side operation will succeed. + virtual void OnReadInitialMetadataDone(bool ok) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no further read-side operation + /// will succeed. + virtual void OnReadDone(bool ok) {} + + /// Notifies the application that a StartWrite operation completed. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnWriteDone(bool ok) {} + + /// Notifies the application that a StartWritesDone operation completed. Note + /// that this is only used on explicit StartWritesDone operations and not for + /// those that are implicitly invoked as part of a StartWriteLast. + /// + /// \param[in] ok Was it successful? If false, the application will later see + /// the failure reflected as a bad status in OnDone. + virtual void OnWritesDoneDone(bool ok) {} + private: friend class ClientCallbackReaderWriter; void BindStream(ClientCallbackReaderWriter* stream) { @@ -212,13 +290,12 @@ class ClientBidiReactor { ClientCallbackReaderWriter* stream_; }; +/// \a ClientReadReactor is the interface for a server-streaming RPC. +/// All public methods behave as in ClientBidiReactor. template class ClientReadReactor { public: virtual ~ClientReadReactor() {} - virtual void OnDone(const Status& s) {} - virtual void OnReadInitialMetadataDone(bool ok) {} - virtual void OnReadDone(bool ok) {} void StartCall() { reader_->StartCall(); } void StartRead(Response* resp) { reader_->Read(resp); } @@ -227,20 +304,22 @@ class ClientReadReactor { void AddMultipleHolds(int holds) { reader_->AddHold(holds); } void RemoveHold() { reader_->RemoveHold(); } + virtual void OnDone(const Status& s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + private: friend class ClientCallbackReader; void BindReader(ClientCallbackReader* reader) { reader_ = reader; } ClientCallbackReader* reader_; }; +/// \a ClientWriteReactor is the interface for a client-streaming RPC. +/// All public methods behave as in ClientBidiReactor. template class ClientWriteReactor { public: virtual ~ClientWriteReactor() {} - virtual void OnDone(const Status& s) {} - virtual void OnReadInitialMetadataDone(bool ok) {} - virtual void OnWriteDone(bool ok) {} - virtual void OnWritesDoneDone(bool ok) {} void StartCall() { writer_->StartCall(); } void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } @@ -256,6 +335,11 @@ class ClientWriteReactor { void AddMultipleHolds(int holds) { writer_->AddHold(holds); } void RemoveHold() { writer_->RemoveHold(); } + virtual void OnDone(const Status& s) {} + virtual void OnReadInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + virtual void OnWritesDoneDone(bool ok) {} + private: friend class ClientCallbackWriter; void BindWriter(ClientCallbackWriter* writer) { writer_ = writer; } diff --git a/include/grpcpp/impl/codegen/server_callback.h b/include/grpcpp/impl/codegen/server_callback.h index 33988fb6c23..7421acc45bf 100644 --- a/include/grpcpp/impl/codegen/server_callback.h +++ b/include/grpcpp/impl/codegen/server_callback.h @@ -40,7 +40,12 @@ namespace internal { class ServerReactor { public: virtual ~ServerReactor() = default; + + /// Notifies the application that all operations associated with this RPC + /// have completed. virtual void OnDone() {} + + /// Notifies the application that this RPC has been cancelled. virtual void OnCancel() {} }; @@ -167,33 +172,110 @@ class ServerCallbackReaderWriter { } }; -// The following classes are reactors that are to be implemented -// by the user, returned as the result of the method handler for -// a callback method, and activated by the call to OnStarted +// The following classes are the reactor interfaces that are to be implemented +// by the user, returned as the result of the method handler for a callback +// method, and activated by the call to OnStarted. Note that none of the classes +// are pure; all reactions have a default empty reaction so that the user class +// only needs to override those classes that it cares about. + +/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. template class ServerBidiReactor : public internal::ServerReactor { public: ~ServerBidiReactor() = default; - virtual void OnStarted(ServerContext*) {} - virtual void OnSendInitialMetadataDone(bool ok) {} - virtual void OnReadDone(bool ok) {} - virtual void OnWriteDone(bool ok) {} + /// Send any initial metadata stored in the RPC context. If not invoked, + /// any initial metadata will be passed along with the first Write or the + /// Finish (if there are no writes). void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } - void StartRead(Request* msg) { stream_->Read(msg); } - void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } - void StartWrite(const Response* msg, WriteOptions options) { - stream_->Write(msg, std::move(options)); + + /// Initiate a read operation. + /// + /// \param[out] req Where to eventually store the read message. Valid when + /// the library calls OnReadDone + void StartRead(Request* req) { stream_->Read(req); } + + /// Initiate a write operation. + /// + /// \param[in] resp The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the + /// application regains ownership of resp. + void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); } + + /// Initiate a write operation with specified options. + /// + /// \param[in] resp The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the + /// application regains ownership of resp. + /// \param[in] options The WriteOptions to use for writing this message + void StartWrite(const Response* resp, WriteOptions options) { + stream_->Write(resp, std::move(options)); } - void StartWriteAndFinish(const Response* msg, WriteOptions options, + + /// Initiate a write operation with specified options and final RPC Status, + /// which also causes any trailing metadata for this RPC to be sent out. + /// StartWriteAndFinish is like merging StartWriteLast and Finish into a + /// single step. A key difference, though, is that this operation doesn't have + /// an OnWriteDone reaction - it is considered complete only when OnDone is + /// available. An RPC can either have StartWriteAndFinish or Finish, but not + /// both. + /// + /// \param[in] resp The message to be written. The library takes temporary + /// ownership until Onone, at which point the application + /// regains ownership of resp. + /// \param[in] options The WriteOptions to use for writing this message + /// \param[in] s The status outcome of this RPC + void StartWriteAndFinish(const Response* resp, WriteOptions options, Status s) { - stream_->WriteAndFinish(msg, std::move(options), std::move(s)); + stream_->WriteAndFinish(resp, std::move(options), std::move(s)); } - void StartWriteLast(const Response* msg, WriteOptions options) { - StartWrite(msg, std::move(options.set_last_message())); + + /// Inform system of a planned write operation with specified options, but + /// allow the library to schedule the actual write coalesced with the writing + /// of trailing metadata (which takes place on a Finish call). + /// + /// \param[in] resp The message to be written. The library takes temporary + /// ownership until OnWriteDone, at which point the + /// application regains ownership of resp. + /// \param[in] options The WriteOptions to use for writing this message + void StartWriteLast(const Response* resp, WriteOptions options) { + StartWrite(resp, std::move(options.set_last_message())); } + + /// Indicate that the stream is to be finished and the trailing metadata and + /// RPC status are to be sent. Every RPC MUST be finished using either Finish + /// or StartWriteAndFinish (but not both), even if the RPC is already + /// cancelled. + /// + /// \param[in] s The status outcome of this RPC void Finish(Status s) { stream_->Finish(std::move(s)); } + /// Notify the application that a streaming RPC has started + /// + /// \param[in] context The context object now associated with this RPC + virtual void OnStarted(ServerContext* context) {} + + /// Notifies the application that an explicit StartSendInitialMetadata + /// operation completed. Not used when the sending of initial metadata + /// piggybacks onto the first write. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnSendInitialMetadataDone(bool ok) {} + + /// Notifies the application that a StartRead operation completed. + /// + /// \param[in] ok Was it successful? If false, no further read-side operation + /// will succeed. + virtual void OnReadDone(bool ok) {} + + /// Notifies the application that a StartWrite (or StartWriteLast) operation + /// completed. + /// + /// \param[in] ok Was it successful? If false, no further write-side operation + /// will succeed. + virtual void OnWriteDone(bool ok) {} + private: friend class ServerCallbackReaderWriter; void BindStream(ServerCallbackReaderWriter* stream) { @@ -203,18 +285,29 @@ class ServerBidiReactor : public internal::ServerReactor { ServerCallbackReaderWriter* stream_; }; +/// \a ServerReadReactor is the interface for a client-streaming RPC. template class ServerReadReactor : public internal::ServerReactor { public: ~ServerReadReactor() = default; - virtual void OnStarted(ServerContext*, Response* resp) {} - virtual void OnSendInitialMetadataDone(bool ok) {} - virtual void OnReadDone(bool ok) {} + /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } - void StartRead(Request* msg) { reader_->Read(msg); } + void StartRead(Request* req) { reader_->Read(req); } void Finish(Status s) { reader_->Finish(std::move(s)); } + /// Similar to ServerBidiReactor::OnStarted, except that this also provides + /// the response object that the stream fills in before calling Finish. + /// (It must be filled in if status is OK, but it may be filled in otherwise.) + /// + /// \param[in] context The context object now associated with this RPC + /// \param[in] resp The response object to be used by this RPC + virtual void OnStarted(ServerContext* context, Response* resp) {} + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnReadDone(bool ok) {} + private: friend class ServerCallbackReader; void BindReader(ServerCallbackReader* reader) { reader_ = reader; } @@ -222,28 +315,38 @@ class ServerReadReactor : public internal::ServerReactor { ServerCallbackReader* reader_; }; +/// \a ServerReadReactor is the interface for a server-streaming RPC. template class ServerWriteReactor : public internal::ServerReactor { public: ~ServerWriteReactor() = default; - virtual void OnStarted(ServerContext*, const Request* req) {} - virtual void OnSendInitialMetadataDone(bool ok) {} - virtual void OnWriteDone(bool ok) {} + /// The following operation initiations are exactly like ServerBidiReactor. void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } - void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } - void StartWrite(const Response* msg, WriteOptions options) { - writer_->Write(msg, std::move(options)); + void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); } + void StartWrite(const Response* resp, WriteOptions options) { + writer_->Write(resp, std::move(options)); } - void StartWriteAndFinish(const Response* msg, WriteOptions options, + void StartWriteAndFinish(const Response* resp, WriteOptions options, Status s) { - writer_->WriteAndFinish(msg, std::move(options), std::move(s)); + writer_->WriteAndFinish(resp, std::move(options), std::move(s)); } - void StartWriteLast(const Response* msg, WriteOptions options) { - StartWrite(msg, std::move(options.set_last_message())); + void StartWriteLast(const Response* resp, WriteOptions options) { + StartWrite(resp, std::move(options.set_last_message())); } void Finish(Status s) { writer_->Finish(std::move(s)); } + /// Similar to ServerBidiReactor::OnStarted, except that this also provides + /// the request object sent by the client. + /// + /// \param[in] context The context object now associated with this RPC + /// \param[in] req The request object sent by the client + virtual void OnStarted(ServerContext* context, const Request* req) {} + + /// The following notifications are exactly like ServerBidiReactor. + virtual void OnSendInitialMetadataDone(bool ok) {} + virtual void OnWriteDone(bool ok) {} + private: friend class ServerCallbackWriter; void BindWriter(ServerCallbackWriter* writer) { writer_ = writer; } diff --git a/include/grpcpp/server_builder.h b/include/grpcpp/server_builder.h index 498e5b7bb31..18cfbb26c75 100644 --- a/include/grpcpp/server_builder.h +++ b/include/grpcpp/server_builder.h @@ -231,6 +231,10 @@ class ServerBuilder { builder_->interceptor_creators_ = std::move(interceptor_creators); } + /// Register a generic service that uses the callback API. + /// Matches requests with any :authority + /// This is mostly useful for writing generic gRPC Proxies where the exact + /// serialization format is unknown ServerBuilder& RegisterCallbackGenericService( experimental::CallbackGenericService* service);