diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index 72f6253f8e8..4e8c1071c03 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -34,6 +34,8 @@ #ifndef __GRPCPP_COMPLETION_QUEUE_H__ #define __GRPCPP_COMPLETION_QUEUE_H__ +#include + struct grpc_completion_queue; namespace grpc { @@ -44,41 +46,37 @@ class CompletionQueue { CompletionQueue(); ~CompletionQueue(); - enum CompletionType { - QUEUE_CLOSED = 0, // Shutting down. - RPC_END = 1, // An RPC finished. Either at client or server. - CLIENT_READ_OK = 2, // A client-side read has finished successfully. - CLIENT_READ_ERROR = 3, // A client-side read has finished with error. - CLIENT_WRITE_OK = 4, - CLIENT_WRITE_ERROR = 5, - SERVER_RPC_NEW = 6, // A new RPC just arrived at the server. - SERVER_READ_OK = 7, // A server-side read has finished successfully. - SERVER_READ_ERROR = 8, // A server-side read has finished with error. - SERVER_WRITE_OK = 9, - SERVER_WRITE_ERROR = 10, - // Client or server has sent half close successfully. - HALFCLOSE_OK = 11, - // New CompletionTypes may be added in the future, so user code should - // always - // handle the default case of a CompletionType that appears after such code - // was - // written. - DO_NOT_USE = 20, - }; - // Blocking read from queue. - // For QUEUE_CLOSED, *tag is not changed. - // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext. - // For others, *tag will be the AsyncServerContext of this rpc. - CompletionType Next(void** tag); + // Returns true if an event was received, false if the queue is ready + // for destruction. + bool Next(void** tag); + + // Prepare a tag for the C api + // Given a tag we'd like to receive from Next, what tag should we pass + // down to the C api? + // Usage example: + // grpc_call_start_batch(..., cq.PrepareTagForC(tag)); + // Allows attaching some work to be executed before the original tag + // is returned. + // MUST be used for all events that could be surfaced through this + // wrapping API + template + void *PrepareTagForC(void *user_tag, F on_ready) { + return new std::function([user_tag, on_ready]() { + on_ready(); + return user_tag; + }); + } // Shutdown has to be called, and the CompletionQueue can only be - // destructed when the QUEUE_CLOSED message has been read with Next(). + // destructed when false is returned from Next(). void Shutdown(); grpc_completion_queue* cq() { return cq_; } private: + typedef std::function FinishFunc; + grpc_completion_queue* cq_; // owned }; diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index f06da9b04fe..a1a858ae2e0 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -33,6 +33,8 @@ #include +#include + #include #include #include @@ -47,66 +49,26 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } -CompletionQueue::CompletionType CompletionQueue::Next(void **tag) { - grpc_event *ev; - CompletionType return_type; - bool success; +// Helper class so we can declare a unique_ptr with grpc_event +class EventDeleter { + public: + void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); } +}; + +bool CompletionQueue::Next(void **tag) { + std::unique_ptr ev; - ev = grpc_completion_queue_next(cq_, gpr_inf_future); + ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future)); if (!ev) { gpr_log(GPR_ERROR, "no next event in queue"); abort(); } - switch (ev->type) { - case GRPC_QUEUE_SHUTDOWN: - return_type = QUEUE_CLOSED; - break; - case GRPC_READ: - *tag = ev->tag; - if (ev->data.read) { - success = static_cast(ev->tag) - ->ParseRead(ev->data.read); - return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR; - } else { - return_type = SERVER_READ_ERROR; - } - break; - case GRPC_WRITE_ACCEPTED: - *tag = ev->tag; - if (ev->data.write_accepted != GRPC_OP_ERROR) { - return_type = SERVER_WRITE_OK; - } else { - return_type = SERVER_WRITE_ERROR; - } - break; - case GRPC_SERVER_RPC_NEW: - GPR_ASSERT(!ev->tag); - // Finishing the pending new rpcs after the server has been shutdown. - if (!ev->call) { - *tag = nullptr; - } else { - *tag = new AsyncServerContext( - ev->call, ev->data.server_rpc_new.method, - ev->data.server_rpc_new.host, - Timespec2Timepoint(ev->data.server_rpc_new.deadline)); - } - return_type = SERVER_RPC_NEW; - break; - case GRPC_FINISHED: - *tag = ev->tag; - return_type = RPC_END; - break; - case GRPC_FINISH_ACCEPTED: - *tag = ev->tag; - return_type = HALFCLOSE_OK; - break; - default: - // We do not handle client side messages now - gpr_log(GPR_ERROR, "client-side messages aren't supported yet"); - abort(); + if (ev->type == GRPC_QUEUE_SHUTDOWN) { + return false; } - grpc_event_finish(ev); - return return_type; + std::unique_ptr func(static_cast(ev->tag)); + *tag = (*func)(); + return true; } } // namespace grpc