diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h index db2afe930c6..271d464583f 100644 --- a/include/grpc++/impl/codegen/client_context.h +++ b/include/grpc++/impl/codegen/client_context.h @@ -54,6 +54,7 @@ #include #include +#include #include #include #include @@ -192,7 +193,7 @@ class ClientContext { /// \return A multimap of initial metadata key-value pairs from the server. const std::multimap& GetServerInitialMetadata() { - GPR_ASSERT(initial_metadata_received_); + GPR_CODEGEN_ASSERT(initial_metadata_received_); return recv_initial_metadata_; } diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 102831e1c9b..6473494d86c 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -36,6 +36,9 @@ #ifndef GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H #define GRPCXX_IMPL_CODEGEN_COMPLETION_QUEUE_H +#include +#include +#include #include #include #include @@ -76,13 +79,17 @@ class Server; class ServerBuilder; class ServerContext; +extern CoreCodegenInterface* g_core_codegen_interface; + /// A thin wrapper around \a grpc_completion_queue (see / \a /// src/core/surface/completion_queue.h). class CompletionQueue : private GrpcLibrary { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue(); + CompletionQueue() { + cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); + } /// Wrap \a take, taking ownership of the instance. /// @@ -90,7 +97,9 @@ class CompletionQueue : private GrpcLibrary { explicit CompletionQueue(grpc_completion_queue* take); /// Destructor. Destroys the owned wrapped completion queue / instance. - ~CompletionQueue(); + ~CompletionQueue() { + g_core_codegen_interface->grpc_completion_queue_destroy(cq_); + } /// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT. enum NextStatus { @@ -181,10 +190,29 @@ class CompletionQueue : private GrpcLibrary { /// Wraps \a grpc_completion_queue_pluck. /// \warning Must not be mixed with calls to \a Next. - bool Pluck(CompletionQueueTag* tag); + bool Pluck(CompletionQueueTag* tag) { + auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + bool ok = ev.success != 0; + void* ignored = tag; + GPR_CODEGEN_ASSERT(tag->FinalizeResult(&ignored, &ok)); + GPR_CODEGEN_ASSERT(ignored == tag); + // Ignore mutations by FinalizeResult: Pluck returns the C API status + return ev.success != 0; + } /// Performs a single polling pluck on \a tag. - void TryPluck(CompletionQueueTag* tag); + void TryPluck(CompletionQueueTag* tag) { + auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); + auto ev = g_core_codegen_interface->grpc_completion_queue_pluck( + cq_, tag, deadline, nullptr); + if (ev.type == GRPC_QUEUE_TIMEOUT) return; + bool ok = ev.success != 0; + void* ignored = tag; + // the tag must be swallowed if using TryPluck + GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); + } grpc_completion_queue* cq_; // owned }; diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index b1b128cc4aa..f043f96072c 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -43,9 +43,13 @@ namespace grpc { class CoreCodegenInterface { public: - virtual grpc_completion_queue* CompletionQueueCreate() = 0; - virtual grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag, - gpr_timespec deadline) = 0; + virtual grpc_completion_queue* grpc_completion_queue_create( + void* reserved) = 0; + virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; + virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, + void* tag, + gpr_timespec deadline, + void* reserved) = 0; // Serialize the msg into a buffer created inside the function. The caller // should destroy the returned buffer when done with it. If serialization @@ -70,11 +74,12 @@ class CoreCodegenInterface { }; /* XXX */ -#define GPR_CODEGEN_ASSERT(x) \ - do { \ - if (!(x)) { \ - g_core_codegen_interface->assert_fail(#x); \ - } \ +#define GPR_CODEGEN_ASSERT(x) \ + do { \ + if (!(x)) { \ + extern CoreCodegenInterface* g_core_codegen_interface; \ + g_core_codegen_interface->assert_fail(#x); \ + } \ } while (0) } // namespace grpc diff --git a/include/grpc++/impl/codegen/grpc_library.h b/include/grpc++/impl/codegen/grpc_library.h index eb7152a2c60..ef076315f5a 100644 --- a/include/grpc++/impl/codegen/grpc_library.h +++ b/include/grpc++/impl/codegen/grpc_library.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CODEGEN_GRPC_LIBRARY_H #include +#include namespace grpc { @@ -49,13 +50,13 @@ extern GrpcLibraryInterface* g_glip; class GrpcLibrary { public: GrpcLibrary() { - GPR_ASSERT(g_glip && + GPR_CODEGEN_ASSERT(g_glip && "gRPC library not initialized. See " "grpc::internal::GrpcLibraryInitializer."); g_glip->init(); } virtual ~GrpcLibrary() { - GPR_ASSERT(g_glip && + GPR_CODEGEN_ASSERT(g_glip && "gRPC library not initialized. See " "grpc::internal::GrpcLibraryInitializer."); g_glip->shutdown(); diff --git a/include/grpc++/impl/codegen/impl/async_stream.h b/include/grpc++/impl/codegen/impl/async_stream.h index b0410485f85..fea935a362c 100644 --- a/include/grpc++/impl/codegen/impl/async_stream.h +++ b/include/grpc++/impl/codegen/impl/async_stream.h @@ -36,6 +36,7 @@ #include #include +#include #include #include #include @@ -109,13 +110,13 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface { init_ops_.set_output_tag(tag); init_ops_.SendInitialMetadata(context->send_initial_metadata_); // TODO(ctiller): don't assert - GPR_ASSERT(init_ops_.SendMessage(request).ok()); + GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); init_ops_.ClientSendClose(); call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -177,7 +178,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -187,7 +188,7 @@ class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface { void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } @@ -243,7 +244,7 @@ class ClientAsyncReaderWriter GRPC_FINAL } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!context_->initial_metadata_received_); + GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); meta_ops_.set_output_tag(tag); meta_ops_.RecvInitialMetadata(context_); @@ -262,7 +263,7 @@ class ClientAsyncReaderWriter GRPC_FINAL void Write(const W& msg, void* tag) GRPC_OVERRIDE { write_ops_.set_output_tag(tag); // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } @@ -300,7 +301,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -331,7 +332,7 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, } void FinishWithError(const Status& status, void* tag) { - GPR_ASSERT(!status.ok()); + GPR_CODEGEN_ASSERT(!status.ok()); finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -360,7 +361,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -375,7 +376,7 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } @@ -409,7 +410,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} void SendInitialMetadata(void* tag) GRPC_OVERRIDE { - GPR_ASSERT(!ctx_->sent_initial_metadata_); + GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); meta_ops_.set_output_tag(tag); meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); @@ -430,7 +431,7 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, ctx_->sent_initial_metadata_ = true; } // TODO(ctiller): don't assert - GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); call_.PerformOps(&write_ops_); } diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h index f934619c20b..1dcc01285aa 100644 --- a/include/grpc++/impl/codegen/server_interface.h +++ b/include/grpc++/impl/codegen/server_interface.h @@ -37,6 +37,7 @@ #include #include #include +#include #include namespace grpc { @@ -223,7 +224,7 @@ class ServerInterface : public CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag, Message* message) { - GPR_ASSERT(method); + GPR_CODEGEN_ASSERT(method); new PayloadAsyncRequest(method->server_tag(), this, context, stream, call_cq, notification_cq, tag, message); @@ -233,7 +234,7 @@ class ServerInterface : public CallHook { ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - GPR_ASSERT(method); + GPR_CODEGEN_ASSERT(method); new NoPayloadAsyncRequest(method->server_tag(), this, context, stream, call_cq, notification_cq, tag); } diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index deb91a41d96..901468e3eec 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CODEGEN_SERVICE_TYPE_H #include +#include #include #include #include @@ -131,21 +132,16 @@ class Service { void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); } void MarkMethodAsync(int index) { - if (methods_[index].get() == nullptr) { - gpr_log(GPR_ERROR, + GPR_CODEGEN_ASSERT(methods_[index].get() != nullptr && "Cannot mark the method as 'async' because it has already been " "marked as 'generic'."); - return; - } methods_[index]->ResetHandler(); } void MarkMethodGeneric(int index) { - if (methods_[index]->handler() == nullptr) { - gpr_log(GPR_ERROR, + GPR_CODEGEN_ASSERT(methods_[index]->handler() != nullptr && "Cannot mark the method as 'generic' because it has already been " "marked as 'async'."); - } methods_[index].reset(); } diff --git a/include/grpc/impl/codegen/time.h b/include/grpc/impl/codegen/time.h index c22bedfe77c..9776dabc11d 100644 --- a/include/grpc/impl/codegen/time.h +++ b/include/grpc/impl/codegen/time.h @@ -69,10 +69,33 @@ typedef struct gpr_timespec { } gpr_timespec; /* Time constants. */ -GPRAPI gpr_timespec -gpr_time_0(gpr_clock_type type); /* The zero time interval. */ -GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type); /* The far future */ -GPRAPI gpr_timespec gpr_inf_past(gpr_clock_type type); /* The far past. */ +/* The zero time interval. */ +GPRAPI static inline gpr_timespec gpr_time_0(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = 0; + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +/* The far future */ +GPRAPI static inline gpr_timespec gpr_inf_future(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = INT64_MAX; + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + +/* The far past. */ +GPRAPI static inline gpr_timespec gpr_inf_past(gpr_clock_type type) { + gpr_timespec out; + out.tv_sec = INT64_MIN; + out.tv_nsec = 0; + out.clock_type = type; + return out; +} + #define GPR_MS_PER_SEC 1000 #define GPR_US_PER_SEC 1000000 diff --git a/src/core/support/time.c b/src/core/support/time.c index 423d12ffc0f..fec3c7a2c5d 100644 --- a/src/core/support/time.c +++ b/src/core/support/time.c @@ -56,30 +56,6 @@ gpr_timespec gpr_time_max(gpr_timespec a, gpr_timespec b) { return gpr_time_cmp(a, b) > 0 ? a : b; } -gpr_timespec gpr_time_0(gpr_clock_type type) { - gpr_timespec out; - out.tv_sec = 0; - out.tv_nsec = 0; - out.clock_type = type; - return out; -} - -gpr_timespec gpr_inf_future(gpr_clock_type type) { - gpr_timespec out; - out.tv_sec = INT64_MAX; - out.tv_nsec = 0; - out.clock_type = type; - return out; -} - -gpr_timespec gpr_inf_past(gpr_clock_type type) { - gpr_timespec out; - out.tv_sec = INT64_MIN; - out.tv_nsec = 0; - out.clock_type = type; - return out; -} - /* TODO(ctiller): consider merging _nanos, _micros, _millis into a single function for maintainability. Similarly for _seconds, _minutes, and _hours */ diff --git a/src/cpp/codegen/core_codegen.cc b/src/cpp/codegen/core_codegen.cc index ccae2062914..00e8005f281 100644 --- a/src/cpp/codegen/core_codegen.cc +++ b/src/cpp/codegen/core_codegen.cc @@ -54,8 +54,7 @@ const int kGrpcBufferWriterMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, - int block_size) + explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { *bp = grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; @@ -170,22 +169,23 @@ namespace grpc { class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* CompletionQueueCreate() override { - return grpc_completion_queue_create(nullptr); + grpc_completion_queue* grpc_completion_queue_create(void* reserved) override { + return ::grpc_completion_queue_create(reserved); } - grpc_event CompletionQueuePluck(grpc_completion_queue* cq, void* tag, - gpr_timespec deadline) override { - return grpc_completion_queue_pluck(cq, tag, deadline, nullptr); + void grpc_completion_queue_destroy(grpc_completion_queue* cq) override { + ::grpc_completion_queue_destroy(cq); } - void* gpr_malloc(size_t size) override { - return ::gpr_malloc(size); + grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, + gpr_timespec deadline, + void* reserved) override { + return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved); } - void gpr_free(void* p) override { - return ::gpr_free(p); - } + void* gpr_malloc(size_t size) override { return ::gpr_malloc(size); } + + void gpr_free(void* p) override { return ::gpr_free(p); } void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override { ::grpc_byte_buffer_destroy(bb); @@ -205,13 +205,14 @@ class CoreCodegen : public CoreCodegenInterface { } Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) override { + grpc_byte_buffer** bp) override { GPR_TIMER_SCOPE("SerializeProto", 0); int byte_size = msg.ByteSize(); if (byte_size <= kGrpcBufferWriterMaxBufferLength) { gpr_slice slice = gpr_slice_malloc(byte_size); - GPR_ASSERT(GPR_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); + GPR_ASSERT( + GPR_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); *bp = grpc_raw_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); return Status::OK; @@ -224,7 +225,8 @@ class CoreCodegen : public CoreCodegenInterface { } Status DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, int max_message_size) override { + grpc::protobuf::Message* msg, + int max_message_size) override { GPR_TIMER_SCOPE("DeserializeProto", 0); if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 4f76dfff1d7..729dc33749a 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -34,7 +34,6 @@ #include -#include #include #include #include @@ -43,16 +42,13 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue() { - g_gli_initializer.summon(); - cq_ = grpc_completion_queue_create(nullptr); -} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} -CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } - -void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + grpc_completion_queue_shutdown(cq_); +} CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { @@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } -bool CompletionQueue::Pluck(CompletionQueueTag* tag) { - auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == tag); - // Ignore mutations by FinalizeResult: Pluck returns the C API status - return ev.success != 0; -} - -void CompletionQueue::TryPluck(CompletionQueueTag* tag) { - auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok)); -} - } // namespace grpc