wip. cq refactored

pull/5270/head
David Garcia Quintas 9 years ago
parent 6848c4e145
commit e1ce31eda3
  1. 3
      include/grpc++/impl/codegen/client_context.h
  2. 36
      include/grpc++/impl/codegen/completion_queue.h
  3. 21
      include/grpc++/impl/codegen/core_codegen_interface.h
  4. 5
      include/grpc++/impl/codegen/grpc_library.h
  5. 25
      include/grpc++/impl/codegen/impl/async_stream.h
  6. 5
      include/grpc++/impl/codegen/server_interface.h
  7. 10
      include/grpc++/impl/codegen/service_type.h
  8. 31
      include/grpc/impl/codegen/time.h
  9. 24
      src/core/support/time.c
  10. 34
      src/cpp/codegen/core_codegen.cc
  11. 33
      src/cpp/common/completion_queue.cc

@ -54,6 +54,7 @@
#include <string>
#include <grpc++/impl/codegen/config.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/security/auth_context.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/string_ref.h>
@ -192,7 +193,7 @@ class ClientContext {
/// \return A multimap of initial metadata key-value pairs from the server.
const std::multimap<grpc::string_ref, grpc::string_ref>&
GetServerInitialMetadata() {
GPR_ASSERT(initial_metadata_received_);
GPR_CODEGEN_ASSERT(initial_metadata_received_);
return recv_initial_metadata_;
}

@ -36,6 +36,9 @@
#ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
#define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H
#include <grpc/impl/codegen/time.h>
#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/time.h>
@ -76,13 +79,17 @@ class Server;
class ServerBuilder;
class ServerContext;
extern CoreCodegenInterface* g_core_codegen_interface;
/// A thin wrapper around \a grpc_completion_queue (see / \a
/// src/core/surface/completion_queue.h).
class CompletionQueue : private GrpcLibrary {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
CompletionQueue();
CompletionQueue() {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr);
}
/// Wrap \a take, taking ownership of the instance.
///
@ -90,7 +97,9 @@ class CompletionQueue : private GrpcLibrary {
explicit CompletionQueue(grpc_completion_queue* take);
/// Destructor. Destroys the owned wrapped completion queue / instance.
~CompletionQueue();
~CompletionQueue() {
g_core_codegen_interface->grpc_completion_queue_destroy(cq_);
}
/// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
enum NextStatus {
@ -181,10 +190,29 @@ class CompletionQueue : private GrpcLibrary {
/// Wraps \a grpc_completion_queue_pluck.
/// \warning Must not be mixed with calls to \a Next.
bool Pluck(CompletionQueueTag* tag);
bool Pluck(CompletionQueueTag* tag) {
auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
bool ok = ev.success != 0;
void* ignored = tag;
GPR_CODEGEN_ASSERT(tag->FinalizeResult(&ignored, &ok));
GPR_CODEGEN_ASSERT(ignored == tag);
// Ignore mutations by FinalizeResult: Pluck returns the C API status
return ev.success != 0;
}
/// Performs a single polling pluck on \a tag.
void TryPluck(CompletionQueueTag* tag);
void TryPluck(CompletionQueueTag* tag) {
auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0;
void* ignored = tag;
// the tag must be swallowed if using TryPluck
GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
}
grpc_completion_queue* cq_; // owned
};

@ -43,9 +43,13 @@ namespace grpc {
class CoreCodegenInterface {
public:
virtual grpc_completion_queue* CompletionQueueCreate() = 0;
virtual grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create(
void* reserved) = 0;
virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0;
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq,
void* tag,
gpr_timespec deadline,
void* reserved) = 0;
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization
@ -70,11 +74,12 @@ class CoreCodegenInterface {
};
/* XXX */
#define GPR_CODEGEN_ASSERT(x) \
do { \
if (!(x)) { \
g_core_codegen_interface->assert_fail(#x); \
} \
#define GPR_CODEGEN_ASSERT(x) \
do { \
if (!(x)) { \
extern CoreCodegenInterface* g_core_codegen_interface; \
g_core_codegen_interface->assert_fail(#x); \
} \
} while (0)
} // namespace grpc

@ -35,6 +35,7 @@
#define GRPCXX_IMPL_CODEGEN_GRPC_LIBRARY_H
#include <grpc/impl/codegen/log.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
namespace grpc {
@ -49,13 +50,13 @@ extern GrpcLibraryInterface* g_glip;
class GrpcLibrary {
public:
GrpcLibrary() {
GPR_ASSERT(g_glip &&
GPR_CODEGEN_ASSERT(g_glip &&
"gRPC library not initialized. See "
"grpc::internal::GrpcLibraryInitializer.");
g_glip->init();
}
virtual ~GrpcLibrary() {
GPR_ASSERT(g_glip &&
GPR_CODEGEN_ASSERT(g_glip &&
"gRPC library not initialized. See "
"grpc::internal::GrpcLibraryInitializer.");
g_glip->shutdown();

@ -36,6 +36,7 @@
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/codegen/call.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/service_type.h>
#include <grpc++/impl/codegen/server_context.h>
#include <grpc++/impl/codegen/status.h>
@ -109,13 +110,13 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
init_ops_.set_output_tag(tag);
init_ops_.SendInitialMetadata(context->send_initial_metadata_);
// TODO(ctiller): don't assert
GPR_ASSERT(init_ops_.SendMessage(request).ok());
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
init_ops_.ClientSendClose();
call_.PerformOps(&init_ops_);
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@ -177,7 +178,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@ -187,7 +188,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@ -243,7 +244,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
}
void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!context_->initial_metadata_received_);
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
meta_ops_.set_output_tag(tag);
meta_ops_.RecvInitialMetadata(context_);
@ -262,7 +263,7 @@ class ClientAsyncReaderWriter GRPC_FINAL
void Write(const W& msg, void* tag) GRPC_OVERRIDE {
write_ops_.set_output_tag(tag);
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@ -300,7 +301,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -331,7 +332,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
}
void FinishWithError(const Status& status, void* tag) {
GPR_ASSERT(!status.ok());
GPR_CODEGEN_ASSERT(!status.ok());
finish_ops_.set_output_tag(tag);
if (!ctx_->sent_initial_metadata_) {
finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -360,7 +361,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -375,7 +376,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}
@ -409,7 +410,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
GPR_ASSERT(!ctx_->sent_initial_metadata_);
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
meta_ops_.set_output_tag(tag);
meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
@ -430,7 +431,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
ctx_->sent_initial_metadata_ = true;
}
// TODO(ctiller): don't assert
GPR_ASSERT(write_ops_.SendMessage(msg).ok());
GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
call_.PerformOps(&write_ops_);
}

@ -37,6 +37,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc++/impl/codegen/call_hook.h>
#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/rpc_service_method.h>
namespace grpc {
@ -223,7 +224,7 @@ class ServerInterface : public CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
GPR_ASSERT(method);
GPR_CODEGEN_ASSERT(method);
new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
stream, call_cq, notification_cq, tag,
message);
@ -233,7 +234,7 @@ class ServerInterface : public CallHook {
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_ASSERT(method);
GPR_CODEGEN_ASSERT(method);
new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
call_cq, notification_cq, tag);
}

@ -35,6 +35,7 @@
#define GRPCXX_IMPL_CODEGEN_SERVICE_TYPE_H
#include <grpc++/impl/codegen/config.h>
#include <grpc++/impl/codegen/core_codegen_interface.h>
#include <grpc++/impl/codegen/rpc_service_method.h>
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/server_interface.h>
@ -131,21 +132,16 @@ class Service {
void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
void MarkMethodAsync(int index) {
if (methods_[index].get() == nullptr) {
gpr_log(GPR_ERROR,
GPR_CODEGEN_ASSERT(methods_[index].get() != nullptr &&
"Cannot mark the method as 'async' because it has already been "
"marked as 'generic'.");
return;
}
methods_[index]->ResetHandler();
}
void MarkMethodGeneric(int index) {
if (methods_[index]->handler() == nullptr) {
gpr_log(GPR_ERROR,
GPR_CODEGEN_ASSERT(methods_[index]->handler() != nullptr &&
"Cannot mark the method as 'generic' because it has already been "
"marked as 'async'.");
}
methods_[index].reset();
}

@ -69,10 +69,33 @@ typedef struct gpr_timespec {
} gpr_timespec;
/* Time constants. */
GPRAPI gpr_timespec
gpr_time_0(gpr_clock_type type); /* The zero time interval. */
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type); /* The far future */
GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type); /* The far past. */
/* The zero time interval. */
GPRAPI static inline gpr_timespec gpr_time_0(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = 0;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
/* The far future */
GPRAPI static inline gpr_timespec gpr_inf_future(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = INT64_MAX;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
/* The far past. */
GPRAPI static inline gpr_timespec gpr_inf_past(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = INT64_MIN;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
#define GPR_MS_PER_SEC 1000
#define GPR_US_PER_SEC 1000000

@ -56,30 +56,6 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) {
return gpr_time_cmp(a, b) > 0 ? a : b;
}
gpr_timespec gpr_time_0(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = 0;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
gpr_timespec gpr_inf_future(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = INT64_MAX;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
gpr_timespec gpr_inf_past(gpr_clock_type type) {
gpr_timespec out;
out.tv_sec = INT64_MIN;
out.tv_nsec = 0;
out.clock_type = type;
return out;
}
/* TODO(ctiller): consider merging _nanos, _micros, _millis into a single
function for maintainability. Similarly for _seconds, _minutes, and _hours */

@ -54,8 +54,7 @@ const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size)
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
@ -170,22 +169,23 @@ namespace grpc {
class CoreCodegen : public CoreCodegenInterface {
private:
grpc_completion_queue* CompletionQueueCreate() override {
return grpc_completion_queue_create(nullptr);
grpc_completion_queue* grpc_completion_queue_create(void* reserved) override {
return ::grpc_completion_queue_create(reserved);
}
grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline) override {
return grpc_completion_queue_pluck(cq, tag, deadline, nullptr);
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override {
::grpc_completion_queue_destroy(cq);
}
void* gpr_malloc(size_t size) override {
return ::gpr_malloc(size);
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline,
void* reserved) override {
return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
}
void gpr_free(void* p) override {
return ::gpr_free(p);
}
void* gpr_malloc(size_t size) override { return ::gpr_malloc(size); }
void gpr_free(void* p) override { return ::gpr_free(p); }
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override {
::grpc_byte_buffer_destroy(bb);
@ -205,13 +205,14 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp) override {
grpc_byte_buffer** bp) override {
GPR_TIMER_SCOPE("SerializeProto", 0);
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = gpr_slice_malloc(byte_size);
GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
GPR_ASSERT(
GPR_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
*bp = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return Status::OK;
@ -224,7 +225,8 @@ class CoreCodegen : public CoreCodegenInterface {
}
Status DeserializeProto(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg, int max_message_size) override {
grpc::protobuf::Message* msg,
int max_message_size) override {
GPR_TIMER_SCOPE("DeserializeProto", 0);
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");

@ -34,7 +34,6 @@
#include <memory>
#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
@ -43,16 +42,13 @@
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
CompletionQueue::CompletionQueue() {
g_gli_initializer.summon();
cq_ = grpc_completion_queue_create(nullptr);
}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
void CompletionQueue::Shutdown() {
g_gli_initializer.summon();
grpc_completion_queue_shutdown(cq_);
}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
}
bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
bool ok = ev.success != 0;
void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
GPR_ASSERT(ignored == tag);
// Ignore mutations by FinalizeResult: Pluck returns the C API status
return ev.success != 0;
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0;
void* ignored = tag;
// the tag must be swallowed if using TryPluck
GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
}
} // namespace grpc

Loading…
Cancel
Save