Merge pull request #21806 from grpc/revert-21767-revert-21725-revert-21680-cq_ordering

Revert "Revert "Revert "Log an error (in dbg mode) if CQ is Shutdown before its Server(s)"""
pull/21705/head
Lidi Zheng 5 years ago committed by GitHub
commit 312b31c264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      include/grpcpp/impl/codegen/completion_queue_impl.h
  2. 6
      include/grpcpp/server_impl.h
  3. 6
      src/cpp/common/completion_queue_cc.cc
  4. 15
      src/cpp/server/server_builder.cc
  5. 12
      src/cpp/server/server_cc.cc

@ -32,14 +32,11 @@
#ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H
#define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_IMPL_H
#include <list>
#include <grpc/impl/codegen/atm.h> #include <grpc/impl/codegen/atm.h>
#include <grpcpp/impl/codegen/completion_queue_tag.h> #include <grpcpp/impl/codegen/completion_queue_tag.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h> #include <grpcpp/impl/codegen/core_codegen_interface.h>
#include <grpcpp/impl/codegen/grpc_library.h> #include <grpcpp/impl/codegen/grpc_library.h>
#include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/time.h> #include <grpcpp/impl/codegen/time.h>
struct grpc_completion_queue; struct grpc_completion_queue;
@ -253,11 +250,6 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen {
} }
private: private:
// Friends for access to server registration lists that enable checking and
// logging on shutdown
friend class ::grpc_impl::ServerBuilder;
friend class ::grpc_impl::Server;
// Friend synchronous wrappers so that they can access Pluck(), which is // Friend synchronous wrappers so that they can access Pluck(), which is
// a semi-private API geared towards the synchronous implementation. // a semi-private API geared towards the synchronous implementation.
template <class R> template <class R>
@ -282,6 +274,7 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen {
friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler; friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler;
template <::grpc::StatusCode code> template <::grpc::StatusCode code>
friend class ::grpc_impl::internal::ErrorMethodHandler; friend class ::grpc_impl::internal::ErrorMethodHandler;
friend class ::grpc_impl::Server;
friend class ::grpc_impl::ServerContextBase; friend class ::grpc_impl::ServerContextBase;
friend class ::grpc::ServerInterface; friend class ::grpc::ServerInterface;
template <class InputMessage, class OutputMessage> template <class InputMessage, class OutputMessage>
@ -386,38 +379,13 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen {
} }
} }
void RegisterServer(const Server* server) {
#ifndef NDEBUG
grpc::internal::MutexLock l(&server_list_mutex_);
server_list_.push_back(server);
#endif
}
void UnregisterServer(const Server* server) {
#ifndef NDEBUG
grpc::internal::MutexLock l(&server_list_mutex_);
server_list_.remove(server);
#endif
}
bool ServerListEmpty() const {
#ifndef NDEBUG
grpc::internal::MutexLock l(&server_list_mutex_);
return server_list_.empty();
#endif
return true;
}
#ifndef NDEBUG
mutable grpc::internal::Mutex server_list_mutex_;
std::list<const Server*> server_list_ /* GUARDED_BY(server_list_mutex_) */;
#endif
grpc_completion_queue* cq_; // owned grpc_completion_queue* cq_; // owned
gpr_atm avalanches_in_flight_; gpr_atm avalanches_in_flight_;
}; };
/// A specific type of completion queue used by the processing of notifications /// A specific type of completion queue used by the processing of notifications
/// by servers. Instantiated by \a ServerBuilder or Server (for health checker). /// by servers. Instantiated by \a ServerBuilder.
class ServerCompletionQueue : public CompletionQueue { class ServerCompletionQueue : public CompletionQueue {
public: public:
bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; } bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }

@ -385,12 +385,6 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
// shutdown callback tag (invoked when the CQ is fully shutdown). // shutdown callback tag (invoked when the CQ is fully shutdown).
// It is protected by mu_ // It is protected by mu_
CompletionQueue* callback_cq_ = nullptr; CompletionQueue* callback_cq_ = nullptr;
#ifndef NDEBUG
// List of CQs passed in by user that must be Shutdown only after Server is
// Shutdown.
std::vector<CompletionQueue*> cq_list_;
#endif
}; };
} // namespace grpc_impl } // namespace grpc_impl

@ -39,12 +39,6 @@ CompletionQueue::CompletionQueue(grpc_completion_queue* take)
void CompletionQueue::Shutdown() { void CompletionQueue::Shutdown() {
g_gli_initializer.summon(); g_gli_initializer.summon();
#ifndef NDEBUG
if (!ServerListEmpty()) {
gpr_log(GPR_ERROR,
"CompletionQueue shutdown being shutdown before its server.");
}
#endif
CompleteAvalanching(); CompleteAvalanching();
} }

@ -354,8 +354,9 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// server // server
// 2. cqs_: Completion queues added via AddCompletionQueue() call // 2. cqs_: Completion queues added via AddCompletionQueue() call
for (const auto& cq : *sync_server_cqs) { for (const auto& value : *sync_server_cqs) {
grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr); grpc_server_register_completion_queue(server->server_, value->cq(),
nullptr);
has_frequently_polled_cqs = true; has_frequently_polled_cqs = true;
} }
@ -368,12 +369,10 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
// AddCompletionQueue() API. Some of them may not be frequently polled (i.e by // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
// calling Next() or AsyncNext()) and hence are not safe to be used for // calling Next() or AsyncNext()) and hence are not safe to be used for
// listening to incoming channels. Such completion queues must be registered // listening to incoming channels. Such completion queues must be registered
// as non-listening queues. In debug mode, these should have their server list // as non-listening queues
// tracked since these are provided the user and must be Shutdown by the user for (const auto& value : cqs_) {
// after the server is shutdown. grpc_server_register_completion_queue(server->server_, value->cq(),
for (const auto& cq : cqs_) { nullptr);
grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
cq->RegisterServer(server.get());
} }
if (!has_frequently_polled_cqs) { if (!has_frequently_polled_cqs) {

@ -1249,9 +1249,6 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) {
} }
for (size_t i = 0; i < num_cqs; i++) { for (size_t i = 0; i < num_cqs; i++) {
#ifndef NDEBUG
cq_list_.push_back(cqs[i]);
#endif
if (cqs[i]->IsFrequentlyPolled()) { if (cqs[i]->IsFrequentlyPolled()) {
new UnimplementedAsyncRequest(this, cqs[i]); new UnimplementedAsyncRequest(this, cqs[i]);
} }
@ -1363,15 +1360,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
shutdown_notified_ = true; shutdown_notified_ = true;
shutdown_cv_.Broadcast(); shutdown_cv_.Broadcast();
#ifndef NDEBUG
// Unregister this server with the CQs passed into it by the user so that
// those can be checked for properly-ordered shutdown.
for (auto* cq : cq_list_) {
cq->UnregisterServer(this);
}
cq_list_.clear();
#endif
} }
void Server::Wait() { void Server::Wait() {

Loading…
Cancel
Save