New completion queue

pull/431/head
Craig Tiller 10 years ago
parent 2a2bacccf3
commit 6135107e4a
  1. 52
      include/grpc++/completion_queue.h
  2. 70
      src/cpp/common/completion_queue.cc

@ -34,6 +34,8 @@
#ifndef __GRPCPP_COMPLETION_QUEUE_H__
#define __GRPCPP_COMPLETION_QUEUE_H__
#include <functional>
struct grpc_completion_queue;
namespace grpc {
@ -44,41 +46,37 @@ class CompletionQueue {
CompletionQueue();
~CompletionQueue();
enum CompletionType {
QUEUE_CLOSED = 0, // Shutting down.
RPC_END = 1, // An RPC finished. Either at client or server.
CLIENT_READ_OK = 2, // A client-side read has finished successfully.
CLIENT_READ_ERROR = 3, // A client-side read has finished with error.
CLIENT_WRITE_OK = 4,
CLIENT_WRITE_ERROR = 5,
SERVER_RPC_NEW = 6, // A new RPC just arrived at the server.
SERVER_READ_OK = 7, // A server-side read has finished successfully.
SERVER_READ_ERROR = 8, // A server-side read has finished with error.
SERVER_WRITE_OK = 9,
SERVER_WRITE_ERROR = 10,
// Client or server has sent half close successfully.
HALFCLOSE_OK = 11,
// New CompletionTypes may be added in the future, so user code should
// always
// handle the default case of a CompletionType that appears after such code
// was
// written.
DO_NOT_USE = 20,
};
// Blocking read from queue.
// For QUEUE_CLOSED, *tag is not changed.
// For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext.
// For others, *tag will be the AsyncServerContext of this rpc.
CompletionType Next(void** tag);
// Returns true if an event was received, false if the queue is ready
// for destruction.
bool Next(void** tag);
// Prepare a tag for the C api
// Given a tag we'd like to receive from Next, what tag should we pass
// down to the C api?
// Usage example:
// grpc_call_start_batch(..., cq.PrepareTagForC(tag));
// Allows attaching some work to be executed before the original tag
// is returned.
// MUST be used for all events that could be surfaced through this
// wrapping API
template <class F>
void *PrepareTagForC(void *user_tag, F on_ready) {
return new std::function<void*()>([user_tag, on_ready]() {
on_ready();
return user_tag;
});
}
// Shutdown has to be called, and the CompletionQueue can only be
// destructed when the QUEUE_CLOSED message has been read with Next().
// destructed when false is returned from Next().
void Shutdown();
grpc_completion_queue* cq() { return cq_; }
private:
typedef std::function<void*()> FinishFunc;
grpc_completion_queue* cq_; // owned
};

@ -33,6 +33,8 @@
#include <grpc++/completion_queue.h>
#include <memory>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@ -47,66 +49,26 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
CompletionQueue::CompletionType CompletionQueue::Next(void **tag) {
grpc_event *ev;
CompletionType return_type;
bool success;
// Helper class so we can declare a unique_ptr with grpc_event
class EventDeleter {
public:
void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); }
};
bool CompletionQueue::Next(void **tag) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev = grpc_completion_queue_next(cq_, gpr_inf_future);
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
if (!ev) {
gpr_log(GPR_ERROR, "no next event in queue");
abort();
}
switch (ev->type) {
case GRPC_QUEUE_SHUTDOWN:
return_type = QUEUE_CLOSED;
break;
case GRPC_READ:
*tag = ev->tag;
if (ev->data.read) {
success = static_cast<AsyncServerContext *>(ev->tag)
->ParseRead(ev->data.read);
return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR;
} else {
return_type = SERVER_READ_ERROR;
}
break;
case GRPC_WRITE_ACCEPTED:
*tag = ev->tag;
if (ev->data.write_accepted != GRPC_OP_ERROR) {
return_type = SERVER_WRITE_OK;
} else {
return_type = SERVER_WRITE_ERROR;
}
break;
case GRPC_SERVER_RPC_NEW:
GPR_ASSERT(!ev->tag);
// Finishing the pending new rpcs after the server has been shutdown.
if (!ev->call) {
*tag = nullptr;
} else {
*tag = new AsyncServerContext(
ev->call, ev->data.server_rpc_new.method,
ev->data.server_rpc_new.host,
Timespec2Timepoint(ev->data.server_rpc_new.deadline));
}
return_type = SERVER_RPC_NEW;
break;
case GRPC_FINISHED:
*tag = ev->tag;
return_type = RPC_END;
break;
case GRPC_FINISH_ACCEPTED:
*tag = ev->tag;
return_type = HALFCLOSE_OK;
break;
default:
// We do not handle client side messages now
gpr_log(GPR_ERROR, "client-side messages aren't supported yet");
abort();
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
}
grpc_event_finish(ev);
return return_type;
std::unique_ptr<FinishFunc> func(static_cast<FinishFunc*>(ev->tag));
*tag = (*func)();
return true;
}
} // namespace grpc

Loading…
Cancel
Save