Merge branch 'track_cq_finalize' of github.com:vjpai/grpc into fixit29

pull/8962/head
Craig Tiller 8 years ago
commit 749ccd99df
  1. 22
      include/grpc++/impl/codegen/completion_queue.h
  2. 2
      include/grpc++/impl/codegen/server_interface.h
  3. 12
      src/cpp/common/completion_queue_cc.cc
  4. 5
      src/cpp/server/server_cc.cc

@ -52,6 +52,7 @@
#include <grpc++/impl/codegen/grpc_library.h> #include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/time.h> #include <grpc++/impl/codegen/time.h>
#include <grpc/impl/codegen/atm.h>
struct grpc_completion_queue; struct grpc_completion_queue;
@ -101,6 +102,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// instance. /// instance.
CompletionQueue() { CompletionQueue() {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr);
InitialAvalanching(); // reserve this for the future shutdown
} }
/// Wrap \a take, taking ownership of the instance. /// Wrap \a take, taking ownership of the instance.
@ -151,7 +153,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// Request the shutdown of the queue. /// Request the shutdown of the queue.
/// ///
/// \warning This method must be called at some point. Once invoked, \a Next /// \warning This method must be called at some point if this completion queue
/// is accessed with Next or AsyncNext. Once invoked, \a Next
/// will start to return false and \a AsyncNext will return \a /// will start to return false and \a AsyncNext will return \a
/// NextStatus::SHUTDOWN. Only once either one of these methods does that /// NextStatus::SHUTDOWN. Only once either one of these methods does that
/// (that is, once the queue has been \em drained) can an instance of this /// (that is, once the queue has been \em drained) can an instance of this
@ -165,6 +168,21 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// owership is performed. /// owership is performed.
grpc_completion_queue* cq() { return cq_; } grpc_completion_queue* cq() { return cq_; }
/// Manage state of avalanching operations : completion queue tags that
/// trigger other completion queue operations. The underlying core completion
/// queue should not really shutdown until all avalanching operations have
/// been finalized. Note that we maintain the requirement that an avalanche
/// registration must take place before CQ shutdown (which must be maintained
/// elsehwere)
void InitialAvalanching() {
gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
}
void RegisterAvalanching() {
gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
static_cast<gpr_atm>(1));
};
void CompleteAvalanching();
private: private:
// Friend synchronous wrappers so that they can access Pluck(), which is // Friend synchronous wrappers so that they can access Pluck(), which is
// a semi-private API geared towards the synchronous implementation. // a semi-private API geared towards the synchronous implementation.
@ -229,6 +247,8 @@ class CompletionQueue : private GrpcLibraryCodegen {
} }
grpc_completion_queue* cq_; // owned grpc_completion_queue* cq_; // owned
gpr_atm avalanches_in_flight_;
}; };
/// A specific type of completion queue used by the processing of notifications /// A specific type of completion queue used by the processing of notifications

@ -140,7 +140,7 @@ class ServerInterface : public CallHook {
ServerAsyncStreamingInterface* stream, ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize); bool delete_on_finalize);
virtual ~BaseAsyncRequest() {} virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) override; bool FinalizeResult(void** tag, bool* status) override;

@ -43,12 +43,22 @@ namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer; static internal::GrpcLibraryInitializer g_gli_initializer;
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {
InitialAvalanching();
}
void CompletionQueue::Shutdown() { void CompletionQueue::Shutdown() {
g_gli_initializer.summon(); g_gli_initializer.summon();
CompleteAvalanching();
}
void CompletionQueue::CompleteAvalanching() {
// Check if this was the last avalanching operation
if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
static_cast<gpr_atm>(-1)) == 1) {
grpc_completion_queue_shutdown(cq_); grpc_completion_queue_shutdown(cq_);
} }
}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) { void** tag, bool* ok, gpr_timespec deadline) {

@ -575,9 +575,14 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
tag_(tag), tag_(tag),
delete_on_finalize_(delete_on_finalize), delete_on_finalize_(delete_on_finalize),
call_(nullptr) { call_(nullptr) {
call_cq_->RegisterAvalanching(); // This op will trigger more ops
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
} }
ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
call_cq_->CompleteAvalanching();
}
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) { bool* status) {
if (*status) { if (*status) {

Loading…
Cancel
Save