Comments for all callback API methods

pull/18493/head
Vijay Pai 6 years ago
parent 59ffdd9929
commit 9169159f30
  1. 5
      include/grpcpp/generic/generic_stub.h
  2. 20
      include/grpcpp/impl/codegen/async_generic_service.h
  3. 118
      include/grpcpp/impl/codegen/client_callback.h
  4. 161
      include/grpcpp/impl/codegen/server_callback.h
  5. 4
      include/grpcpp/server_builder.h

@ -73,10 +73,15 @@ class GenericStub final {
public: public:
explicit experimental_type(GenericStub* stub) : stub_(stub) {} explicit experimental_type(GenericStub* stub) : stub_(stub) {}
/// Setup and start a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
void UnaryCall(ClientContext* context, const grpc::string& method, void UnaryCall(ClientContext* context, const grpc::string& method,
const ByteBuffer* request, ByteBuffer* response, const ByteBuffer* request, ByteBuffer* response,
std::function<void(Status)> on_completion); std::function<void(Status)> on_completion);
/// Setup a call to a named method \a method using \a context and tied to
/// \a reactor . Like any other bidi streaming RPC, it will not be activated
/// until StartCall is invoked on its reactor.
void PrepareBidiStreamingCall( void PrepareBidiStreamingCall(
ClientContext* context, const grpc::string& method, ClientContext* context, const grpc::string& method,
experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor); experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor);

@ -85,13 +85,23 @@ class AsyncGenericService final {
namespace experimental { namespace experimental {
/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs
/// invoked on a CallbackGenericService. The API difference relative to
/// ServerBidiReactor is that the argument to OnStarted is a
/// GenericServerContext rather than a ServerContext. All other reaction and
/// operation initiation APIs are the same as ServerBidiReactor.
class ServerGenericBidiReactor class ServerGenericBidiReactor
: public ServerBidiReactor<ByteBuffer, ByteBuffer> { : public ServerBidiReactor<ByteBuffer, ByteBuffer> {
public: public:
/// Similar to ServerBidiReactor::OnStarted except for argument type.
///
/// \param[in] context The context object associated with this RPC.
virtual void OnStarted(GenericServerContext* context) {}
private:
void OnStarted(ServerContext* ctx) final { void OnStarted(ServerContext* ctx) final {
OnStarted(static_cast<GenericServerContext*>(ctx)); OnStarted(static_cast<GenericServerContext*>(ctx));
} }
virtual void OnStarted(GenericServerContext* ctx) {}
}; };
} // namespace experimental } // namespace experimental
@ -108,10 +118,18 @@ class UnimplementedGenericBidiReactor
} // namespace internal } // namespace internal
namespace experimental { namespace experimental {
/// \a CallbackGenericService is the base class for generic services implemented
/// using the callback API and registered through the ServerBuilder using
/// RegisterCallbackGenericService.
class CallbackGenericService { class CallbackGenericService {
public: public:
CallbackGenericService() {} CallbackGenericService() {}
virtual ~CallbackGenericService() {} virtual ~CallbackGenericService() {}
/// The "method handler" for the generic API. This function should be
/// overridden to return a ServerGenericBidiReactor that implements the
/// application-level interface for this RPC.
virtual ServerGenericBidiReactor* CreateReactor() { virtual ServerGenericBidiReactor* CreateReactor() {
return new internal::UnimplementedGenericBidiReactor; return new internal::UnimplementedGenericBidiReactor;
} }

@ -157,28 +157,69 @@ class ClientCallbackWriter {
} }
}; };
// The user must implement this reactor interface with reactions to each event // The following classes are the reactor interfaces that are to be implemented
// type that gets called by the library. An empty reaction is provided by // by the user. They are passed in to the library as an argument to a call on a
// default // stub (either a codegen-ed call or a generic call). The streaming RPC is
// activated by calling StartCall, possibly after initiating StartRead,
// StartWrite, or AddHold operations on the streaming object. Note that none of
// the classes are pure; all reactions have a default empty reaction so that the
// user class only needs to override those classes that it cares about.
/// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
template <class Request, class Response> template <class Request, class Response>
class ClientBidiReactor { class ClientBidiReactor {
public: public:
virtual ~ClientBidiReactor() {} virtual ~ClientBidiReactor() {}
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {}
/// Activate the RPC and initiate any reads or writes that have been Start'ed
/// before this call. All streaming RPCs issued by the client MUST have
/// StartCall invoked on them (even if they are canceled) as this call is the
/// activation of their lifecycle.
void StartCall() { stream_->StartCall(); } void StartCall() { stream_->StartCall(); }
/// Initiate a read operation (or post it for later initiation if StartCall
/// has not yet been invoked).
///
/// \param[out] resp Where to eventually store the read message. Valid when
/// the library calls OnReadDone
void StartRead(Response* resp) { stream_->Read(resp); } void StartRead(Response* resp) { stream_->Read(resp); }
/// Initiate a write operation (or post it for later initiation if StartCall
/// has not yet been invoked).
///
/// \param[in] req The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the application
/// regains ownership of msg.
void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
/// Initiate/post a write operation with specified options.
///
/// \param[in] req The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the application
/// regains ownership of msg.
/// \param[in] options The WriteOptions to use for writing this message
void StartWrite(const Request* req, WriteOptions options) { void StartWrite(const Request* req, WriteOptions options) {
stream_->Write(req, std::move(options)); stream_->Write(req, std::move(options));
} }
/// Initiate/post a write operation with specified options and an indication
/// that this is the last write (like StartWrite and StartWritesDone, merged).
/// Note that calling this means that no more calls to StartWrite,
/// StartWriteLast, or StartWritesDone are allowed.
///
/// \param[in] req The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the application
/// regains ownership of msg.
/// \param[in] options The WriteOptions to use for writing this message
void StartWriteLast(const Request* req, WriteOptions options) { void StartWriteLast(const Request* req, WriteOptions options) {
StartWrite(req, std::move(options.set_last_message())); StartWrite(req, std::move(options.set_last_message()));
} }
/// Indicate that the RPC will have no more write operations. This can only be
/// issued once for a given RPC. This is not required or allowed if
/// StartWriteLast is used since that already has the same implication.
/// Note that calling this means that no more calls to StartWrite,
/// StartWriteLast, or StartWritesDone are allowed.
void StartWritesDone() { stream_->WritesDone(); } void StartWritesDone() { stream_->WritesDone(); }
/// Holds are needed if (and only if) this stream has operations that take /// Holds are needed if (and only if) this stream has operations that take
@ -196,14 +237,51 @@ class ClientBidiReactor {
/// AddHold or AddMultipleHolds before StartCall. If there is going to be, /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
/// for example, a read-flow and a write-flow taking place outside the /// for example, a read-flow and a write-flow taking place outside the
/// reactions, then call AddMultipleHolds(2) before StartCall. When the /// reactions, then call AddMultipleHolds(2) before StartCall. When the
/// application knows that it won't issue any more Read operations (such as /// application knows that it won't issue any more read operations (such as
/// when a read comes back as not ok), it should issue a RemoveHold(). It /// when a read comes back as not ok), it should issue a RemoveHold(). It
/// should also call RemoveHold() again after it does StartWriteLast or /// should also call RemoveHold() again after it does StartWriteLast or
/// StartWritesDone that indicates that there will be no more Write ops. /// StartWritesDone that indicates that there will be no more write ops.
/// The number of RemoveHold calls must match the total number of AddHold
/// calls plus the number of holds added by AddMultipleHolds.
void AddHold() { AddMultipleHolds(1); } void AddHold() { AddMultipleHolds(1); }
void AddMultipleHolds(int holds) { stream_->AddHold(holds); } void AddMultipleHolds(int holds) { stream_->AddHold(holds); }
void RemoveHold() { stream_->RemoveHold(); } void RemoveHold() { stream_->RemoveHold(); }
/// Notifies the application that all operations associated with this RPC
/// have completed and provides the RPC status outcome.
///
/// \param[in] s The status outcome of this RPC
virtual void OnDone(const Status& s) {}
/// Notifies the application that a read of initial metadata from the
/// server is done. If the application chooses not to implement this method,
/// it can assume that the initial metadata has been read before the first
/// call of OnReadDone or OnDone.
///
/// \param[in] ok Was the initial metadata read successfully? If false, no
/// further read-side operation will succeed.
virtual void OnReadInitialMetadataDone(bool ok) {}
/// Notifies the application that a StartRead operation completed.
///
/// \param[in] ok Was it successful? If false, no further read-side operation
/// will succeed.
virtual void OnReadDone(bool ok) {}
/// Notifies the application that a StartWrite operation completed.
///
/// \param[in] ok Was it successful? If false, no further write-side operation
/// will succeed.
virtual void OnWriteDone(bool ok) {}
/// Notifies the application that a StartWritesDone operation completed. Note
/// that this is only used on explicit StartWritesDone operations and not for
/// those that are implicitly invoked as part of a StartWriteLast.
///
/// \param[in] ok Was it successful? If false, the application will later see
/// the failure reflected as a bad status in OnDone.
virtual void OnWritesDoneDone(bool ok) {}
private: private:
friend class ClientCallbackReaderWriter<Request, Response>; friend class ClientCallbackReaderWriter<Request, Response>;
void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) { void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
@ -212,13 +290,12 @@ class ClientBidiReactor {
ClientCallbackReaderWriter<Request, Response>* stream_; ClientCallbackReaderWriter<Request, Response>* stream_;
}; };
/// \a ClientReadReactor is the interface for a server-streaming RPC.
/// All public methods behave as in ClientBidiReactor.
template <class Response> template <class Response>
class ClientReadReactor { class ClientReadReactor {
public: public:
virtual ~ClientReadReactor() {} virtual ~ClientReadReactor() {}
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
void StartCall() { reader_->StartCall(); } void StartCall() { reader_->StartCall(); }
void StartRead(Response* resp) { reader_->Read(resp); } void StartRead(Response* resp) { reader_->Read(resp); }
@ -227,20 +304,22 @@ class ClientReadReactor {
void AddMultipleHolds(int holds) { reader_->AddHold(holds); } void AddMultipleHolds(int holds) { reader_->AddHold(holds); }
void RemoveHold() { reader_->RemoveHold(); } void RemoveHold() { reader_->RemoveHold(); }
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
private: private:
friend class ClientCallbackReader<Response>; friend class ClientCallbackReader<Response>;
void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; } void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
ClientCallbackReader<Response>* reader_; ClientCallbackReader<Response>* reader_;
}; };
/// \a ClientWriteReactor is the interface for a client-streaming RPC.
/// All public methods behave as in ClientBidiReactor.
template <class Request> template <class Request>
class ClientWriteReactor { class ClientWriteReactor {
public: public:
virtual ~ClientWriteReactor() {} virtual ~ClientWriteReactor() {}
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {}
void StartCall() { writer_->StartCall(); } void StartCall() { writer_->StartCall(); }
void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); } void StartWrite(const Request* req) { StartWrite(req, WriteOptions()); }
@ -256,6 +335,11 @@ class ClientWriteReactor {
void AddMultipleHolds(int holds) { writer_->AddHold(holds); } void AddMultipleHolds(int holds) { writer_->AddHold(holds); }
void RemoveHold() { writer_->RemoveHold(); } void RemoveHold() { writer_->RemoveHold(); }
virtual void OnDone(const Status& s) {}
virtual void OnReadInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
virtual void OnWritesDoneDone(bool ok) {}
private: private:
friend class ClientCallbackWriter<Request>; friend class ClientCallbackWriter<Request>;
void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; } void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }

@ -40,7 +40,12 @@ namespace internal {
class ServerReactor { class ServerReactor {
public: public:
virtual ~ServerReactor() = default; virtual ~ServerReactor() = default;
/// Notifies the application that all operations associated with this RPC
/// have completed.
virtual void OnDone() {} virtual void OnDone() {}
/// Notifies the application that this RPC has been cancelled.
virtual void OnCancel() {} virtual void OnCancel() {}
}; };
@ -167,33 +172,110 @@ class ServerCallbackReaderWriter {
} }
}; };
// The following classes are reactors that are to be implemented // The following classes are the reactor interfaces that are to be implemented
// by the user, returned as the result of the method handler for // by the user, returned as the result of the method handler for a callback
// a callback method, and activated by the call to OnStarted // method, and activated by the call to OnStarted. Note that none of the classes
// are pure; all reactions have a default empty reaction so that the user class
// only needs to override those classes that it cares about.
/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
template <class Request, class Response> template <class Request, class Response>
class ServerBidiReactor : public internal::ServerReactor { class ServerBidiReactor : public internal::ServerReactor {
public: public:
~ServerBidiReactor() = default; ~ServerBidiReactor() = default;
virtual void OnStarted(ServerContext*) {}
virtual void OnSendInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
/// Send any initial metadata stored in the RPC context. If not invoked,
/// any initial metadata will be passed along with the first Write or the
/// Finish (if there are no writes).
void StartSendInitialMetadata() { stream_->SendInitialMetadata(); } void StartSendInitialMetadata() { stream_->SendInitialMetadata(); }
void StartRead(Request* msg) { stream_->Read(msg); }
void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } /// Initiate a read operation.
void StartWrite(const Response* msg, WriteOptions options) { ///
stream_->Write(msg, std::move(options)); /// \param[out] req Where to eventually store the read message. Valid when
/// the library calls OnReadDone
void StartRead(Request* req) { stream_->Read(req); }
/// Initiate a write operation.
///
/// \param[in] resp The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the
/// application regains ownership of resp.
void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
/// Initiate a write operation with specified options.
///
/// \param[in] resp The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the
/// application regains ownership of resp.
/// \param[in] options The WriteOptions to use for writing this message
void StartWrite(const Response* resp, WriteOptions options) {
stream_->Write(resp, std::move(options));
} }
void StartWriteAndFinish(const Response* msg, WriteOptions options,
/// Initiate a write operation with specified options and final RPC Status,
/// which also causes any trailing metadata for this RPC to be sent out.
/// StartWriteAndFinish is like merging StartWriteLast and Finish into a
/// single step. A key difference, though, is that this operation doesn't have
/// an OnWriteDone reaction - it is considered complete only when OnDone is
/// available. An RPC can either have StartWriteAndFinish or Finish, but not
/// both.
///
/// \param[in] resp The message to be written. The library takes temporary
/// ownership until Onone, at which point the application
/// regains ownership of resp.
/// \param[in] options The WriteOptions to use for writing this message
/// \param[in] s The status outcome of this RPC
void StartWriteAndFinish(const Response* resp, WriteOptions options,
Status s) { Status s) {
stream_->WriteAndFinish(msg, std::move(options), std::move(s)); stream_->WriteAndFinish(resp, std::move(options), std::move(s));
} }
void StartWriteLast(const Response* msg, WriteOptions options) {
StartWrite(msg, std::move(options.set_last_message())); /// Inform system of a planned write operation with specified options, but
/// allow the library to schedule the actual write coalesced with the writing
/// of trailing metadata (which takes place on a Finish call).
///
/// \param[in] resp The message to be written. The library takes temporary
/// ownership until OnWriteDone, at which point the
/// application regains ownership of resp.
/// \param[in] options The WriteOptions to use for writing this message
void StartWriteLast(const Response* resp, WriteOptions options) {
StartWrite(resp, std::move(options.set_last_message()));
} }
/// Indicate that the stream is to be finished and the trailing metadata and
/// RPC status are to be sent. Every RPC MUST be finished using either Finish
/// or StartWriteAndFinish (but not both), even if the RPC is already
/// cancelled.
///
/// \param[in] s The status outcome of this RPC
void Finish(Status s) { stream_->Finish(std::move(s)); } void Finish(Status s) { stream_->Finish(std::move(s)); }
/// Notify the application that a streaming RPC has started
///
/// \param[in] context The context object now associated with this RPC
virtual void OnStarted(ServerContext* context) {}
/// Notifies the application that an explicit StartSendInitialMetadata
/// operation completed. Not used when the sending of initial metadata
/// piggybacks onto the first write.
///
/// \param[in] ok Was it successful? If false, no further write-side operation
/// will succeed.
virtual void OnSendInitialMetadataDone(bool ok) {}
/// Notifies the application that a StartRead operation completed.
///
/// \param[in] ok Was it successful? If false, no further read-side operation
/// will succeed.
virtual void OnReadDone(bool ok) {}
/// Notifies the application that a StartWrite (or StartWriteLast) operation
/// completed.
///
/// \param[in] ok Was it successful? If false, no further write-side operation
/// will succeed.
virtual void OnWriteDone(bool ok) {}
private: private:
friend class ServerCallbackReaderWriter<Request, Response>; friend class ServerCallbackReaderWriter<Request, Response>;
void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) { void BindStream(ServerCallbackReaderWriter<Request, Response>* stream) {
@ -203,18 +285,29 @@ class ServerBidiReactor : public internal::ServerReactor {
ServerCallbackReaderWriter<Request, Response>* stream_; ServerCallbackReaderWriter<Request, Response>* stream_;
}; };
/// \a ServerReadReactor is the interface for a client-streaming RPC.
template <class Request, class Response> template <class Request, class Response>
class ServerReadReactor : public internal::ServerReactor { class ServerReadReactor : public internal::ServerReactor {
public: public:
~ServerReadReactor() = default; ~ServerReadReactor() = default;
virtual void OnStarted(ServerContext*, Response* resp) {}
virtual void OnSendInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
/// The following operation initiations are exactly like ServerBidiReactor.
void StartSendInitialMetadata() { reader_->SendInitialMetadata(); } void StartSendInitialMetadata() { reader_->SendInitialMetadata(); }
void StartRead(Request* msg) { reader_->Read(msg); } void StartRead(Request* req) { reader_->Read(req); }
void Finish(Status s) { reader_->Finish(std::move(s)); } void Finish(Status s) { reader_->Finish(std::move(s)); }
/// Similar to ServerBidiReactor::OnStarted, except that this also provides
/// the response object that the stream fills in before calling Finish.
/// (It must be filled in if status is OK, but it may be filled in otherwise.)
///
/// \param[in] context The context object now associated with this RPC
/// \param[in] resp The response object to be used by this RPC
virtual void OnStarted(ServerContext* context, Response* resp) {}
/// The following notifications are exactly like ServerBidiReactor.
virtual void OnSendInitialMetadataDone(bool ok) {}
virtual void OnReadDone(bool ok) {}
private: private:
friend class ServerCallbackReader<Request>; friend class ServerCallbackReader<Request>;
void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; } void BindReader(ServerCallbackReader<Request>* reader) { reader_ = reader; }
@ -222,28 +315,38 @@ class ServerReadReactor : public internal::ServerReactor {
ServerCallbackReader<Request>* reader_; ServerCallbackReader<Request>* reader_;
}; };
/// \a ServerReadReactor is the interface for a server-streaming RPC.
template <class Request, class Response> template <class Request, class Response>
class ServerWriteReactor : public internal::ServerReactor { class ServerWriteReactor : public internal::ServerReactor {
public: public:
~ServerWriteReactor() = default; ~ServerWriteReactor() = default;
virtual void OnStarted(ServerContext*, const Request* req) {}
virtual void OnSendInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
/// The following operation initiations are exactly like ServerBidiReactor.
void StartSendInitialMetadata() { writer_->SendInitialMetadata(); } void StartSendInitialMetadata() { writer_->SendInitialMetadata(); }
void StartWrite(const Response* msg) { StartWrite(msg, WriteOptions()); } void StartWrite(const Response* resp) { StartWrite(resp, WriteOptions()); }
void StartWrite(const Response* msg, WriteOptions options) { void StartWrite(const Response* resp, WriteOptions options) {
writer_->Write(msg, std::move(options)); writer_->Write(resp, std::move(options));
} }
void StartWriteAndFinish(const Response* msg, WriteOptions options, void StartWriteAndFinish(const Response* resp, WriteOptions options,
Status s) { Status s) {
writer_->WriteAndFinish(msg, std::move(options), std::move(s)); writer_->WriteAndFinish(resp, std::move(options), std::move(s));
} }
void StartWriteLast(const Response* msg, WriteOptions options) { void StartWriteLast(const Response* resp, WriteOptions options) {
StartWrite(msg, std::move(options.set_last_message())); StartWrite(resp, std::move(options.set_last_message()));
} }
void Finish(Status s) { writer_->Finish(std::move(s)); } void Finish(Status s) { writer_->Finish(std::move(s)); }
/// Similar to ServerBidiReactor::OnStarted, except that this also provides
/// the request object sent by the client.
///
/// \param[in] context The context object now associated with this RPC
/// \param[in] req The request object sent by the client
virtual void OnStarted(ServerContext* context, const Request* req) {}
/// The following notifications are exactly like ServerBidiReactor.
virtual void OnSendInitialMetadataDone(bool ok) {}
virtual void OnWriteDone(bool ok) {}
private: private:
friend class ServerCallbackWriter<Response>; friend class ServerCallbackWriter<Response>;
void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; } void BindWriter(ServerCallbackWriter<Response>* writer) { writer_ = writer; }

@ -231,6 +231,10 @@ class ServerBuilder {
builder_->interceptor_creators_ = std::move(interceptor_creators); builder_->interceptor_creators_ = std::move(interceptor_creators);
} }
/// Register a generic service that uses the callback API.
/// Matches requests with any :authority
/// This is mostly useful for writing generic gRPC Proxies where the exact
/// serialization format is unknown
ServerBuilder& RegisterCallbackGenericService( ServerBuilder& RegisterCallbackGenericService(
experimental::CallbackGenericService* service); experimental::CallbackGenericService* service);

Loading…
Cancel
Save