From d09c9f8e20363918d6b1aa92ee977e6f62551b48 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 11 Apr 2019 16:52:54 -0700 Subject: [PATCH 1/3] Revert "Introduce C++ wrappers for gpr_mu and gpr_cv." This reverts commit a26c09dd258a0d4dc3023e874b13f92e29174a43. --- BUILD | 14 +- BUILD.gn | 5 +- CMakeLists.txt | 5 - Makefile | 5 - build.yaml | 8 +- gRPC-C++.podspec | 7 +- gRPC-Core.podspec | 4 +- grpc.gemspec | 2 +- include/grpcpp/channel.h | 3 +- include/grpcpp/impl/codegen/client_context.h | 3 +- include/grpcpp/impl/codegen/sync.h | 138 ------------------ include/grpcpp/server_impl.h | 8 +- package.xml | 2 +- .../filters/client_channel/client_channel.cc | 2 +- .../health/health_check_client.cc | 4 +- .../health/health_check_client.h | 3 +- .../client_channel/http_connect_handshaker.cc | 2 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 1 + .../lb_policy/grpclb/grpclb_client_stats.cc | 2 +- .../lb_policy/grpclb/grpclb_client_stats.h | 6 +- .../lb_policy/pick_first/pick_first.cc | 6 +- .../lb_policy/round_robin/round_robin.cc | 6 +- .../client_channel/lb_policy/xds/xds.cc | 19 ++- .../client_channel/resolving_lb_policy.cc | 2 +- .../ext/filters/client_channel/subchannel.cc | 70 +++++---- .../ext/filters/client_channel/subchannel.h | 3 +- src/core/lib/channel/channelz_registry.cc | 2 +- src/core/lib/channel/handshaker.h | 2 +- src/core/lib/gprpp/mutex_lock.h | 42 ++++++ src/core/lib/gprpp/sync.h | 126 ---------------- src/core/lib/iomgr/ev_epollex_linux.cc | 2 +- src/core/lib/surface/init.cc | 2 +- .../ssl/session_cache/ssl_session_cache.cc | 2 +- src/cpp/client/channel_cc.cc | 2 +- src/cpp/client/client_context.cc | 5 +- src/cpp/server/dynamic_thread_pool.cc | 23 ++- src/cpp/server/dynamic_thread_pool.h | 7 +- .../health/default_health_check_service.cc | 28 ++-- .../health/default_health_check_service.h | 7 +- src/cpp/server/load_reporter/load_reporter.cc | 18 +-- src/cpp/server/load_reporter/load_reporter.h | 5 +- .../load_reporter_async_service_impl.cc | 24 +-- .../load_reporter_async_service_impl.h | 3 +- src/cpp/server/server_cc.cc | 24 +-- src/cpp/server/server_context.cc | 17 ++- src/cpp/thread_manager/thread_manager.cc | 34 ++--- src/cpp/thread_manager/thread_manager.h | 7 +- test/cpp/client/client_channel_stress_test.cc | 17 +-- test/cpp/end2end/client_lb_end2end_test.cc | 37 +++-- test/cpp/end2end/grpclb_end2end_test.cc | 67 +++++---- test/cpp/end2end/thread_stress_test.cc | 21 ++- test/cpp/end2end/xds_end2end_test.cc | 66 ++++----- tools/doxygen/Doxyfile.c++ | 1 - tools/doxygen/Doxyfile.c++.internal | 3 +- tools/doxygen/Doxyfile.core.internal | 2 +- .../generated/sources_and_headers.json | 20 +-- 56 files changed, 338 insertions(+), 608 deletions(-) delete mode 100644 include/grpcpp/impl/codegen/sync.h create mode 100644 src/core/lib/gprpp/mutex_lock.h delete mode 100644 src/core/lib/gprpp/sync.h diff --git a/BUILD b/BUILD index b0c501455d1..8837ed8075f 100644 --- a/BUILD +++ b/BUILD @@ -525,17 +525,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "grpc++_internal_hdrs_only", - hdrs = [ - "include/grpcpp/impl/codegen/sync.h", - ], - language = "c++", - deps = [ - "gpr_codegen", - ], -) - grpc_cc_library( name = "gpr_base", srcs = [ @@ -601,8 +590,8 @@ grpc_cc_library( "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", + "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", - "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h", ], @@ -2156,7 +2145,6 @@ grpc_cc_library( "include/grpcpp/impl/codegen/time.h", ], deps = [ - "grpc++_internal_hdrs_only", "grpc_codegen", ], ) diff --git a/BUILD.gn b/BUILD.gn index 7f5157377a7..1417f1db5ea 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -186,8 +186,8 @@ config("grpc_config") { "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", + "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", - "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd_posix.cc", "src/core/lib/gprpp/thd_windows.cc", @@ -1064,7 +1064,6 @@ config("grpc_config") { "include/grpcpp/impl/codegen/status_code_enum.h", "include/grpcpp/impl/codegen/string_ref.h", "include/grpcpp/impl/codegen/stub_options.h", - "include/grpcpp/impl/codegen/sync.h", "include/grpcpp/impl/codegen/sync_stream.h", "include/grpcpp/impl/codegen/time.h", "include/grpcpp/impl/grpc_library.h", @@ -1159,12 +1158,12 @@ config("grpc_config") { "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", + "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted_ptr.h", - "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/http/format_request.h", "src/core/lib/http/httpcli.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index cce21957659..5b99a57633a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3181,7 +3181,6 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h - include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -3784,7 +3783,6 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h - include/grpcpp/impl/codegen/sync.h include/grpc/census.h ) string(REPLACE "include/" "" _path ${_hdr}) @@ -4239,7 +4237,6 @@ foreach(_hdr include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h - include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -4436,7 +4433,6 @@ foreach(_hdr include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h - include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -4763,7 +4759,6 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h - include/grpcpp/impl/codegen/sync.h ) string(REPLACE "include/" "" _path ${_hdr}) get_filename_component(_path ${_path} PATH) diff --git a/Makefile b/Makefile index d972215250a..2e6e6070c43 100644 --- a/Makefile +++ b/Makefile @@ -5516,7 +5516,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ - include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -6127,7 +6126,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ - include/grpcpp/impl/codegen/sync.h \ include/grpc/census.h \ LIBGRPC++_CRONET_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_CRONET_SRC)))) @@ -6554,7 +6552,6 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ - include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -6722,7 +6719,6 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ - include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -7055,7 +7051,6 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ - include/grpcpp/impl/codegen/sync.h \ LIBGRPC++_UNSECURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_UNSECURE_SRC)))) diff --git a/build.yaml b/build.yaml index 9e78ec4efa3..5e8c803e56f 100644 --- a/build.yaml +++ b/build.yaml @@ -196,8 +196,8 @@ filegroups: - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/map.h - src/core/lib/gprpp/memory.h + - src/core/lib/gprpp/mutex_lock.h - src/core/lib/gprpp/pair.h - - src/core/lib/gprpp/sync.h - src/core/lib/gprpp/thd.h - src/core/lib/profiling/timers.h uses: @@ -1276,7 +1276,6 @@ filegroups: - include/grpcpp/impl/codegen/time.h uses: - grpc_codegen - - grpc++_internal_hdrs_only - name: grpc++_codegen_base_src language: c++ src: @@ -1451,7 +1450,6 @@ filegroups: - grpc_base_headers - grpc_transport_inproc_headers - grpc++_codegen_base - - grpc++_internal_hdrs_only - nanopb_headers - health_proto - name: grpc++_config_proto @@ -1459,10 +1457,6 @@ filegroups: public_headers: - include/grpc++/impl/codegen/config_protobuf.h - include/grpcpp/impl/codegen/config_protobuf.h -- name: grpc++_internal_hdrs_only - language: c++ - public_headers: - - include/grpcpp/impl/codegen/sync.h - name: grpc++_reflection_proto language: c++ src: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index b8e4d80b838..b8f3f56a83f 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -183,8 +183,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/string_ref.h', 'include/grpcpp/impl/codegen/stub_options.h', 'include/grpcpp/impl/codegen/sync_stream.h', - 'include/grpcpp/impl/codegen/time.h', - 'include/grpcpp/impl/codegen/sync.h' + 'include/grpcpp/impl/codegen/time.h' end s.subspec 'Implementation' do |ss| @@ -267,8 +266,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', + 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', - 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h', @@ -584,8 +583,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', + 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', - 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/lib/avl/avl.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index c58152cf7f8..bab7ce4378e 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -210,8 +210,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', + 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', - 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/lib/gpr/alloc.cc', @@ -889,8 +889,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', + 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', - 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h', diff --git a/grpc.gemspec b/grpc.gemspec index c41db6f6293..b9d2107ee48 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -104,8 +104,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gprpp/manual_constructor.h ) s.files += %w( src/core/lib/gprpp/map.h ) s.files += %w( src/core/lib/gprpp/memory.h ) + s.files += %w( src/core/lib/gprpp/mutex_lock.h ) s.files += %w( src/core/lib/gprpp/pair.h ) - s.files += %w( src/core/lib/gprpp/sync.h ) s.files += %w( src/core/lib/gprpp/thd.h ) s.files += %w( src/core/lib/profiling/timers.h ) s.files += %w( src/core/lib/gpr/alloc.cc ) diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index c4d5ab1177b..ee833960698 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -28,7 +28,6 @@ #include #include #include -#include struct grpc_channel; @@ -98,7 +97,7 @@ class Channel final : public ChannelInterface, grpc_channel* const c_channel_; // owned // mu_ protects callback_cq_ (the per-channel callbackable completion queue) - grpc::internal::Mutex mu_; + std::mutex mu_; // callback_cq_ references the callbackable completion queue associated // with this channel (if any). It is set on the first call to CallbackCQ(). diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 85bbf36f06d..5946488566e 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -51,7 +51,6 @@ #include #include #include -#include #include struct census_context; @@ -458,7 +457,7 @@ class ClientContext { bool idempotent_; bool cacheable_; std::shared_ptr channel_; - grpc::internal::Mutex mu_; + std::mutex mu_; grpc_call* call_; bool call_canceled_; gpr_timespec deadline_; diff --git a/include/grpcpp/impl/codegen/sync.h b/include/grpcpp/impl/codegen/sync.h deleted file mode 100644 index 2ed71eeb9f2..00000000000 --- a/include/grpcpp/impl/codegen/sync.h +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPCPP_IMPL_CODEGEN_SYNC_H -#define GRPCPP_IMPL_CODEGEN_SYNC_H - -#include -#include -#include - -#include - -// The core library is not accessible in C++ codegen headers, and vice versa. -// Thus, we need to have duplicate headers with similar functionality. -// Make sure any change to this file is also reflected in -// src/core/lib/gprpp/sync.h too. -// -// Whenever possible, prefer "src/core/lib/gprpp/sync.h" over this file, -// since in core we do not rely on g_core_codegen_interface and hence do not -// pay the costs of virtual function calls. - -namespace grpc { -namespace internal { - -class Mutex { - public: - Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } - ~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); } - - Mutex(const Mutex&) = delete; - Mutex& operator=(const Mutex&) = delete; - - gpr_mu* get() { return &mu_; } - const gpr_mu* get() const { return &mu_; } - - private: - gpr_mu mu_; -}; - -// MutexLock is a std:: -class MutexLock { - public: - explicit MutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - explicit MutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - ~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); } - - MutexLock(const MutexLock&) = delete; - MutexLock& operator=(const MutexLock&) = delete; - - private: - gpr_mu* const mu_; -}; - -class ReleasableMutexLock { - public: - explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { - g_core_codegen_interface->gpr_mu_lock(mu_); - } - ~ReleasableMutexLock() { - if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_); - } - - ReleasableMutexLock(const ReleasableMutexLock&) = delete; - ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; - - void Lock() { - GPR_DEBUG_ASSERT(released_); - g_core_codegen_interface->gpr_mu_lock(mu_); - released_ = false; - } - - void Unlock() { - GPR_DEBUG_ASSERT(!released_); - released_ = true; - g_core_codegen_interface->gpr_mu_unlock(mu_); - } - - private: - gpr_mu* const mu_; - bool released_ = false; -}; - -class CondVar { - public: - CondVar() { g_core_codegen_interface->gpr_cv_init(&cv_); } - ~CondVar() { g_core_codegen_interface->gpr_cv_destroy(&cv_); } - - CondVar(const CondVar&) = delete; - CondVar& operator=(const CondVar&) = delete; - - void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } - void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } - - int Wait(Mutex* mu) { - return Wait(mu, - g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } - int Wait(Mutex* mu, const gpr_timespec& deadline) { - return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline); - } - - template - void WaitUntil(Mutex* mu, Predicate pred) { - while (!pred()) { - Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); - } - } - - private: - gpr_cv cv_; -}; - -} // namespace internal -} // namespace grpc - -#endif // GRPCPP_IMPL_CODEGEN_SYNC_H diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h index 14b16a06f4e..771a3a10be9 100644 --- a/include/grpcpp/server_impl.h +++ b/include/grpcpp/server_impl.h @@ -304,12 +304,12 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { experimental_registration_type experimental_registration_{this}; // Server status - grpc::internal::Mutex mu_; + std::mutex mu_; bool started_; bool shutdown_; bool shutdown_notified_; // Was notify called on the shutdown_cv_ - grpc::internal::CondVar shutdown_cv_; + std::condition_variable shutdown_cv_; // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be @@ -318,8 +318,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // during periods of increasing load; the decrement happens only when memory // is maxed out, during server shutdown, or (possibly in a future version) // during decreasing load, so it is less performance-critical. - grpc::internal::Mutex callback_reqs_mu_; - grpc::internal::CondVar callback_reqs_done_cv_; + std::mutex callback_reqs_mu_; + std::condition_variable callback_reqs_done_cv_; std::atomic_int callback_reqs_outstanding_{0}; std::shared_ptr global_callbacks_; diff --git a/package.xml b/package.xml index 5be3d24365d..f3bbb449ac5 100644 --- a/package.xml +++ b/package.xml @@ -109,8 +109,8 @@ + - diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index cd552739732..86938a51d9b 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -51,7 +51,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index a99f1e54062..a22d6450cbd 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -27,7 +27,7 @@ #include "pb_encode.h" #include "src/core/ext/filters/client_channel/health/health.pb.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/status_metadata.h" @@ -69,6 +69,7 @@ HealthCheckClient::HealthCheckClient( } GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this, grpc_schedule_on_exec_ctx); + gpr_mu_init(&mu_); StartCall(); } @@ -77,6 +78,7 @@ HealthCheckClient::~HealthCheckClient() { gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this); } GRPC_ERROR_UNREF(error_); + gpr_mu_destroy(&mu_); } void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state, diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 6e0123e4925..1fa4487c403 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -31,7 +31,6 @@ #include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -158,7 +157,7 @@ class HealthCheckClient : public InternallyRefCounted { grpc_pollset_set* interested_parties_; // Do not own. RefCountedPtr channelz_node_; - Mutex mu_; + gpr_mu mu_; grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; grpc_error* error_ = GRPC_ERROR_NONE; grpc_connectivity_state* notify_state_ = nullptr; diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 90a79843458..2b1eb92bbd4 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -33,7 +33,7 @@ #include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/slice/slice_internal.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ead15308f49..aebd2fd3faa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -88,6 +88,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc index 35123633feb..84b9c41a734 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc @@ -25,7 +25,7 @@ #include #include -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index bcc6598c105..fdebdf55c17 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -26,7 +26,6 @@ #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" namespace grpc_core { @@ -42,6 +41,9 @@ class GrpcLbClientStats : public RefCounted { typedef InlinedVector DroppedCallCounts; + GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); } + ~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); } + void AddCallStarted(); void AddCallFinished(bool finished_with_client_failed_to_send, bool finished_known_received); @@ -64,7 +66,7 @@ class GrpcLbClientStats : public RefCounted { gpr_atm num_calls_finished_ = 0; gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; gpr_atm num_calls_finished_known_received_ = 0; - Mutex drop_count_mu_; // Guards drop_token_counts_. + gpr_mu drop_count_mu_; // Guards drop_token_counts_. UniquePtr drop_token_counts_; }; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 892472e1c9b..332f66808e0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -27,7 +27,7 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -154,12 +154,13 @@ class PickFirst : public LoadBalancingPolicy { /// Lock and data used to capture snapshots of this channels child /// channels and subchannels. This data is consumed by channelz. - Mutex child_refs_mu_; + gpr_mu child_refs_mu_; channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_channels_; }; PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { + gpr_mu_init(&child_refs_mu_); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p created.", this); } @@ -169,6 +170,7 @@ PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Destroying Pick First %p", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 29cfe972f78..d3faaaddc98 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -36,8 +36,8 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -193,7 +193,7 @@ class RoundRobin : public LoadBalancingPolicy { bool shutdown_ = false; /// Lock and data used to capture snapshots of this channel's child /// channels and subchannels. This data is consumed by channelz. - Mutex child_refs_mu_; + gpr_mu child_refs_mu_; channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_channels_; }; @@ -245,6 +245,7 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick, // RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { + gpr_mu_init(&child_refs_mu_); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Created", this); } @@ -254,6 +255,7 @@ RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } + gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index 443deb01274..fe49e9aca03 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -89,9 +89,9 @@ #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -278,8 +278,10 @@ class XdsLb : public LoadBalancingPolicy { class LocalityEntry : public InternallyRefCounted { public: explicit LocalityEntry(RefCountedPtr parent) - : parent_(std::move(parent)) {} - ~LocalityEntry() = default; + : parent_(std::move(parent)) { + gpr_mu_init(&child_policy_mu_); + } + ~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); } void UpdateLocked(xds_grpclb_serverlist* serverlist, LoadBalancingPolicy::Config* child_policy_config, @@ -321,10 +323,13 @@ class XdsLb : public LoadBalancingPolicy { OrphanablePtr pending_child_policy_; // Lock held when modifying the value of child_policy_ or // pending_child_policy_. - Mutex child_policy_mu_; + gpr_mu child_policy_mu_; RefCountedPtr parent_; }; + LocalityMap() { gpr_mu_init(&child_refs_mu_); } + ~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); } + void UpdateLocked(const LocalityList& locality_list, LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args, XdsLb* parent); @@ -338,7 +343,7 @@ class XdsLb : public LoadBalancingPolicy { Map, OrphanablePtr, StringLess> map_; // Lock held while filling child refs for all localities // inside the map - Mutex child_refs_mu_; + gpr_mu child_refs_mu_; }; struct LocalityServerlistEntry { @@ -392,7 +397,7 @@ class XdsLb : public LoadBalancingPolicy { // Mutex to protect the channel to the LB server. This is used when // processing a channelz request. // TODO(juanlishen): Replace this with atomic. - Mutex lb_chand_mu_; + gpr_mu lb_chand_mu_; // Timeout in milliseconds for the LB call. 0 means no deadline. int lb_call_timeout_ms_ = 0; @@ -1085,6 +1090,7 @@ XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)), locality_map_(), locality_serverlist_() { + gpr_mu_init(&lb_chand_mu_); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1108,6 +1114,7 @@ XdsLb::XdsLb(Args args) } XdsLb::~XdsLb() { + gpr_mu_destroy(&lb_chand_mu_); gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); locality_serverlist_.clear(); diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index a2367e09d9b..d15af908b3f 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -48,7 +48,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index b38bd4b701e..20a6023f71c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -42,8 +42,8 @@ #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -457,14 +457,13 @@ struct Subchannel::ExternalStateWatcher { grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, w->pollset_set); } - { - MutexLock lock(&w->subchannel->mu_); - if (w->subchannel->external_state_watcher_list_ == w) { - w->subchannel->external_state_watcher_list_ = w->next; - } - if (w->next != nullptr) w->next->prev = w->prev; - if (w->prev != nullptr) w->prev->next = w->next; + gpr_mu_lock(&w->subchannel->mu_); + if (w->subchannel->external_state_watcher_list_ == w) { + w->subchannel->external_state_watcher_list_ = w->next; } + if (w->next != nullptr) w->next->prev = w->prev; + if (w->prev != nullptr) w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu_); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); Delete(w); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); @@ -586,6 +585,7 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, "subchannel"); grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_mu_init(&mu_); // Check whether we should enable health checking. const char* service_config_json = grpc_channel_arg_get_string( grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG)); @@ -632,6 +632,7 @@ Subchannel::~Subchannel() { grpc_connector_unref(connector_); grpc_pollset_set_destroy(pollset_set_); Delete(key_); + gpr_mu_destroy(&mu_); } Subchannel* Subchannel::Create(grpc_connector* connector, @@ -907,9 +908,7 @@ void Subchannel::MaybeStartConnectingLocked() { void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { Subchannel* c = static_cast(arg); - // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use - // MutexLock instead of ReleasableMutexLock, here. - ReleasableMutexLock lock(&c->mu_); + gpr_mu_lock(&c->mu_); c->have_retry_alarm_ = false; if (c->disconnected_) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", @@ -923,9 +922,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->ContinueConnectingLocked(); - lock.Unlock(); + gpr_mu_unlock(&c->mu_); } else { - lock.Unlock(); + gpr_mu_unlock(&c->mu_); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } GRPC_ERROR_UNREF(error); @@ -952,30 +951,29 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { auto* c = static_cast(arg); grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); - { - MutexLock lock(&c->mu_); - c->connecting_ = false; - if (c->connecting_result_.transport != nullptr && - c->PublishTransportLocked()) { - // Do nothing, transport was published. - } else if (c->disconnected_) { - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } else { - const char* errmsg = grpc_error_string(error); - gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - error = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Connect Failed", - &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connect_failed"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, - GRPC_CHANNEL_TRANSIENT_FAILURE, error, - "connect_failed"); - c->MaybeStartConnectingLocked(); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } + gpr_mu_lock(&c->mu_); + c->connecting_ = false; + if (c->connecting_result_.transport != nullptr && + c->PublishTransportLocked()) { + // Do nothing, transport was published. + } else if (c->disconnected_) { + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } else { + const char* errmsg = grpc_error_string(error); + gpr_log(GPR_INFO, "Connect failed: %s", errmsg); + error = + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "connect_failed"); + grpc_connectivity_state_set(&c->state_and_health_tracker_, + GRPC_CHANNEL_TRANSIENT_FAILURE, error, + "connect_failed"); + c->MaybeStartConnectingLocked(); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } + gpr_mu_unlock(&c->mu_); GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); grpc_channel_args_destroy(delete_channel_args); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 83b57dd7ff3..968fc74e22a 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -29,7 +29,6 @@ #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/connectivity_state.h" @@ -265,7 +264,7 @@ class Subchannel { // pollset_set tracking who's interested in a connection being setup. grpc_pollset_set* pollset_set_; // Protects the other members. - Mutex mu_; + gpr_mu mu_; // Refcount // - lower INTERNAL_REF_BITS bits are for internal references: // these do not keep the subchannel open. diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index b6a660b18fd..9f0169aeaab 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -23,7 +23,7 @@ #include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include #include diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index b68799b6e0e..912d524c8db 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -27,8 +27,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" diff --git a/src/core/lib/gprpp/mutex_lock.h b/src/core/lib/gprpp/mutex_lock.h new file mode 100644 index 00000000000..54751d5fe46 --- /dev/null +++ b/src/core/lib/gprpp/mutex_lock.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H +#define GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H + +#include + +#include + +namespace grpc_core { + +class MutexLock { + public: + explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu); } + ~MutexLock() { gpr_mu_unlock(mu_); } + + MutexLock(const MutexLock&) = delete; + MutexLock& operator=(const MutexLock&) = delete; + + private: + gpr_mu* const mu_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H */ diff --git a/src/core/lib/gprpp/sync.h b/src/core/lib/gprpp/sync.h deleted file mode 100644 index 895ca60fec0..00000000000 --- a/src/core/lib/gprpp/sync.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_GPRPP_SYNC_H -#define GRPC_CORE_LIB_GPRPP_SYNC_H - -#include - -#include -#include -#include -#include - -// The core library is not accessible in C++ codegen headers, and vice versa. -// Thus, we need to have duplicate headers with similar functionality. -// Make sure any change to this file is also reflected in -// include/grpcpp/impl/codegen/sync.h. -// -// Whenever possible, prefer using this file over -// since this file doesn't rely on g_core_codegen_interface and hence does not -// pay the costs of virtual function calls. - -namespace grpc_core { - -class Mutex { - public: - Mutex() { gpr_mu_init(&mu_); } - ~Mutex() { gpr_mu_destroy(&mu_); } - - Mutex(const Mutex&) = delete; - Mutex& operator=(const Mutex&) = delete; - - gpr_mu* get() { return &mu_; } - const gpr_mu* get() const { return &mu_; } - - private: - gpr_mu mu_; -}; - -// MutexLock is a std:: -class MutexLock { - public: - explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } - explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } - ~MutexLock() { gpr_mu_unlock(mu_); } - - MutexLock(const MutexLock&) = delete; - MutexLock& operator=(const MutexLock&) = delete; - - private: - gpr_mu* const mu_; -}; - -class ReleasableMutexLock { - public: - explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } - explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } - ~ReleasableMutexLock() { - if (!released_) gpr_mu_unlock(mu_); - } - - ReleasableMutexLock(const ReleasableMutexLock&) = delete; - ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; - - void Lock() { - GPR_DEBUG_ASSERT(released_); - gpr_mu_lock(mu_); - released_ = false; - } - - void Unlock() { - GPR_DEBUG_ASSERT(!released_); - released_ = true; - gpr_mu_unlock(mu_); - } - - private: - gpr_mu* const mu_; - bool released_ = false; -}; - -class CondVar { - public: - CondVar() { gpr_cv_init(&cv_); } - ~CondVar() { gpr_cv_destroy(&cv_); } - - CondVar(const CondVar&) = delete; - CondVar& operator=(const CondVar&) = delete; - - void Signal() { gpr_cv_signal(&cv_); } - void Broadcast() { gpr_cv_broadcast(&cv_); } - - int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } - int Wait(Mutex* mu, const gpr_timespec& deadline) { - return gpr_cv_wait(&cv_, mu->get(), deadline); - } - - template - void WaitUntil(Mutex* mu, Predicate pred) { - while (!pred()) { - Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); - } - } - - private: - gpr_cv cv_; -}; - -} // namespace grpc_core - -#endif /* GRPC_CORE_LIB_GPRPP_SYNC_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index c387f8359a0..01be46c9f68 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -47,7 +47,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 57d975564b1..b67d4f12252 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -33,7 +33,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/fork.h" -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index ba0745a2359..f9184bcc34f 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -18,7 +18,7 @@ #include -#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index 2d5e74163a9..db59d4d8416 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -232,7 +232,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered - grpc::internal::MutexLock l(&mu_); + std::lock_guard l(mu_); if (callback_cq_ == nullptr) { auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index b4fce79b99a..efb59c71a8c 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -85,7 +84,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key, void ClientContext::set_call(grpc_call* call, const std::shared_ptr& channel) { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); GPR_ASSERT(call_ == nullptr); call_ = call; channel_ = channel; @@ -115,7 +114,7 @@ void ClientContext::set_compression_algorithm( } void ClientContext::TryCancel() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (call_) { SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index c8bdbdea7e6..ef99d6459f7 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -21,7 +21,6 @@ #include #include -#include #include "src/core/lib/gprpp/thd.h" @@ -41,27 +40,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->ThreadFunc(); // Now that we have killed ourselves, we should reduce the thread count - grpc_core::MutexLock lock(&pool_->mu_); + std::unique_lock lock(pool_->mu_); pool_->nthreads_--; // Move ourselves to dead list pool_->dead_threads_.push_back(this); if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { - pool_->shutdown_cv_.Signal(); + pool_->shutdown_cv_.notify_one(); } } void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. - grpc_core::ReleasableMutexLock lock(&mu_); + std::unique_lock lock(mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { break; } threads_waiting_++; - cv_.Wait(&mu_); + cv_.wait(lock); threads_waiting_--; } // Drain callbacks before considering shutdown to ensure all work @@ -69,7 +68,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.Unlock(); + lock.unlock(); cb(); } else if (shutdown_) { break; @@ -83,7 +82,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) nthreads_(0), threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); nthreads_++; new DynamicThread(this); } @@ -96,17 +95,17 @@ void DynamicThreadPool::ReapThreads(std::list* tlist) { } DynamicThreadPool::~DynamicThreadPool() { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); shutdown_ = true; - cv_.Broadcast(); + cv_.notify_all(); while (nthreads_ != 0) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.wait(lock); } ReapThreads(&dead_threads_); } void DynamicThreadPool::Add(const std::function& callback) { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); // Add works to the callbacks list callbacks_.push(callback); // Increase pool size or notify as needed @@ -115,7 +114,7 @@ void DynamicThreadPool::Add(const std::function& callback) { nthreads_++; new DynamicThread(this); } else { - cv_.Signal(); + cv_.notify_one(); } // Also use this chance to harvest dead threads if (!dead_threads_.empty()) { diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h index 4ae0257d40b..5df8cf2b043 100644 --- a/src/cpp/server/dynamic_thread_pool.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -27,7 +27,6 @@ #include -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/thread_pool_interface.h" @@ -51,9 +50,9 @@ class DynamicThreadPool final : public ThreadPoolInterface { grpc_core::Thread thd_; void ThreadFunc(); }; - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - grpc_core::CondVar shutdown_cv_; + std::mutex mu_; + std::condition_variable cv_; + std::condition_variable shutdown_cv_; bool shutdown_; std::queue> callbacks_; int reserve_threads_; diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 01bc51aa213..44aebd2f9d9 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -41,7 +41,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const grpc::string& service_name, bool serving) { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -51,7 +51,7 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (shutdown_) { return; } @@ -62,7 +62,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (shutdown_) { return; } @@ -76,7 +76,7 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const grpc::string& service_name) const { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) { return NOT_FOUND; @@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus( void DefaultHealthCheckService::RegisterCallHandler( const grpc::string& service_name, std::shared_ptr handler) { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ServiceData& service_data = services_map_[service_name]; service_data.AddCallHandler(handler /* copies ref */); HealthCheckServiceImpl::CallHandler* h = handler.get(); @@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler( const grpc::string& service_name, const std::shared_ptr& handler) { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; @@ -166,7 +166,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - grpc_core::MutexLock lock(&cq_shutdown_mu_); + std::unique_lock lock(cq_shutdown_mu_); cq_->Shutdown(); } thread_->Join(); @@ -266,7 +266,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: std::make_shared(cq, database, service); CheckCallHandler* handler = static_cast(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + std::unique_lock lock(service->cq_shutdown_mu_); if (service->shutdown_) return; // Request a Check() call. handler->next_ = @@ -311,7 +311,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: } // Send response. { - grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (!service_->shutdown_) { next_ = CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, @@ -347,7 +347,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: std::make_shared(cq, database, service); WatchCallHandler* handler = static_cast(self.get()); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + std::unique_lock lock(service->cq_shutdown_mu_); if (service->shutdown_) return; // Request AsyncNotifyWhenDone(). handler->on_done_notified_ = @@ -402,7 +402,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealth(std::shared_ptr self, ServingStatus status) { - grpc_core::MutexLock lock(&send_mu_); + std::unique_lock lock(send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. if (send_in_flight_) { @@ -420,7 +420,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: ByteBuffer response; bool success = service_->EncodeResponse(status, &response); // Grab shutdown lock and send response. - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + std::unique_lock cq_lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { SendFinishLocked(std::move(self), Status::CANCELLED); return; @@ -442,7 +442,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::move(self), Status::CANCELLED); return; } - grpc_core::MutexLock lock(&send_mu_); + std::unique_lock lock(send_mu_); send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. @@ -456,7 +456,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::shared_ptr self, const Status& status) { if (finish_called_) return; - grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); + std::unique_lock cq_lock(service_->cq_shutdown_mu_); if (service_->shutdown_) return; SendFinishLocked(std::move(self), status); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 4b926efdfe8..9551cd2e2cf 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -31,7 +31,6 @@ #include #include -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -198,7 +197,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { GenericServerAsyncWriter stream_; ServerContext ctx_; - grpc_core::Mutex send_mu_; + std::mutex send_mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. @@ -227,7 +226,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. - grpc_core::Mutex cq_shutdown_mu_; + std::mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; }; @@ -274,7 +273,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { const grpc::string& service_name, const std::shared_ptr& handler); - mutable grpc_core::Mutex mu_; + mutable std::mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map services_map_; // Guarded by mu_. std::unique_ptr impl_; diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc index 422ea62efd5..464063a13ff 100644 --- a/src/cpp/server/load_reporter/load_reporter.cc +++ b/src/cpp/server/load_reporter/load_reporter.cc @@ -239,7 +239,7 @@ grpc::string LoadReporter::GenerateLbId() { ::grpc::lb::v1::LoadBalancingFeedback LoadReporter::GenerateLoadBalancingFeedback() { - grpc_core::ReleasableMutexLock lock(&feedback_mu_); + std::unique_lock lock(feedback_mu_); auto now = std::chrono::system_clock::now(); // Discard records outside the window until there is only one record // outside the window, which is used as the base for difference. @@ -277,7 +277,7 @@ LoadReporter::GenerateLoadBalancingFeedback() { double cpu_limit = newest->cpu_limit - oldest->cpu_limit; std::chrono::duration duration_seconds = newest->end_time - oldest->end_time; - lock.Unlock(); + lock.unlock(); ::grpc::lb::v1::LoadBalancingFeedback feedback; feedback.set_server_utilization(static_cast(cpu_usage / cpu_limit)); feedback.set_calls_per_second( @@ -290,7 +290,7 @@ LoadReporter::GenerateLoadBalancingFeedback() { ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> LoadReporter::GenerateLoads(const grpc::string& hostname, const grpc::string& lb_id) { - grpc_core::MutexLock lock(&store_mu_); + std::lock_guard lock(store_mu_); auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id); GPR_ASSERT(assigned_stores != nullptr); GPR_ASSERT(!assigned_stores->empty()); @@ -371,7 +371,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) { // This will make the load balancing feedback generation a no-op. cpu_stats = {0, 0}; } - grpc_core::MutexLock lock(&feedback_mu_); + std::unique_lock lock(feedback_mu_); feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors, cpu_stats.first, cpu_stats.second); } @@ -379,7 +379,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) { void LoadReporter::ReportStreamCreated(const grpc::string& hostname, const grpc::string& lb_id, const grpc::string& load_key) { - grpc_core::MutexLock lock(&store_mu_); + std::lock_guard lock(store_mu_); load_data_store_.ReportStreamCreated(hostname, lb_id, load_key); gpr_log(GPR_INFO, "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).", @@ -388,7 +388,7 @@ void LoadReporter::ReportStreamCreated(const grpc::string& hostname, void LoadReporter::ReportStreamClosed(const grpc::string& hostname, const grpc::string& lb_id) { - grpc_core::MutexLock lock(&store_mu_); + std::lock_guard lock(store_mu_); load_data_store_.ReportStreamClosed(hostname, lb_id); gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this, hostname.c_str(), lb_id.c_str()); @@ -407,7 +407,7 @@ void LoadReporter::ProcessViewDataCallStart( LoadRecordKey key(client_ip_and_token, user_id); LoadRecordValue value = LoadRecordValue(start_count); { - grpc_core::MutexLock lock(&store_mu_); + std::unique_lock lock(store_mu_); load_data_store_.MergeRow(host, key, value); } } @@ -459,7 +459,7 @@ void LoadReporter::ProcessViewDataCallEnd( LoadRecordValue value = LoadRecordValue( 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms); { - grpc_core::MutexLock lock(&store_mu_); + std::unique_lock lock(store_mu_); load_data_store_.MergeRow(host, key, value); } } @@ -486,7 +486,7 @@ void LoadReporter::ProcessViewDataOtherCallMetrics( LoadRecordValue value = LoadRecordValue( metric_name, static_cast(num_calls), total_metric_value); { - grpc_core::MutexLock lock(&store_mu_); + std::unique_lock lock(store_mu_); load_data_store_.MergeRow(host, key, value); } } diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h index 766e02a407a..b2254d56016 100644 --- a/src/cpp/server/load_reporter/load_reporter.h +++ b/src/cpp/server/load_reporter/load_reporter.h @@ -29,7 +29,6 @@ #include #include -#include "src/core/lib/gprpp/sync.h" #include "src/cpp/server/load_reporter/load_data_store.h" #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" @@ -213,11 +212,11 @@ class LoadReporter { std::atomic next_lb_id_{0}; const std::chrono::seconds feedback_sample_window_seconds_; - grpc_core::Mutex feedback_mu_; + std::mutex feedback_mu_; std::deque feedback_records_; // TODO(juanlishen): Lock in finer grain. Locking the whole store may be // too expensive. - grpc_core::Mutex store_mu_; + std::mutex store_mu_; LoadDataStore load_data_store_; std::unique_ptr census_view_provider_; std::unique_ptr cpu_stats_provider_; diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc index 9eaab5d6366..859ad9946c8 100644 --- a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc +++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc @@ -48,7 +48,7 @@ LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - grpc_core::MutexLock lock(&cq_shutdown_mu_); + std::unique_lock lock(cq_shutdown_mu_); cq_->Shutdown(); } if (next_fetch_and_sample_alarm_ != nullptr) @@ -62,7 +62,7 @@ void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() { gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000, GPR_TIMESPAN)); { - grpc_core::MutexLock lock(&cq_shutdown_mu_); + std::unique_lock lock(cq_shutdown_mu_); if (shutdown_) return; // TODO(juanlishen): Improve the Alarm implementation to reuse a single // instance for multiple events. @@ -119,7 +119,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart( std::make_shared(cq, service, load_reporter); ReportLoadHandler* p = handler.get(); { - grpc_core::MutexLock lock(&service->cq_shutdown_mu_); + std::unique_lock lock(service->cq_shutdown_mu_); if (service->shutdown_) return; p->on_done_notified_ = CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p, @@ -164,9 +164,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered( // instance will deallocate itself when it's done. CreateAndStart(cq_, service_, load_reporter_); { - grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.release()->unlock(); Shutdown(std::move(self), "OnRequestDelivered"); return; } @@ -222,9 +222,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone( SendReport(self, true /* ok */); // Expect this read to fail. { - grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.release()->unlock(); Shutdown(std::move(self), "OnReadDone"); return; } @@ -254,9 +254,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN)); { - grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.release()->unlock(); Shutdown(std::move(self), "ScheduleNextReport"); return; } @@ -294,9 +294,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport( call_status_ = INITIAL_RESPONSE_SENT; } { - grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.Unlock(); + lock.release()->unlock(); Shutdown(std::move(self), "SendReport"); return; } @@ -342,7 +342,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown( // OnRequestDelivered() may be called after OnDoneNotified(), so we need to // try to Finish() every time we are in Shutdown(). if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) { - grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); + std::unique_lock lock(service_->cq_shutdown_mu_); if (!service_->shutdown_) { on_finish_done_ = CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this, diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h index 3087cbfc04d..6fc577ff493 100644 --- a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h +++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h @@ -25,7 +25,6 @@ #include #include -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/load_reporter/load_reporter.h" @@ -182,7 +181,7 @@ class LoadReporterAsyncServiceImpl std::unique_ptr cq_; // To synchronize the operations related to shutdown state of cq_, so that we // don't enqueue new tags into cq_ after it is already shut down. - grpc_core::Mutex cq_shutdown_mu_; + std::mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; std::unique_ptr load_reporter_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 64a6de97d7e..aa9fdac9bea 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -388,9 +388,9 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { // The counter of outstanding requests must be decremented // under a lock in case it causes the server shutdown. - grpc::internal::MutexLock l(&server_->callback_reqs_mu_); + std::lock_guard l(server_->callback_reqs_mu_); if (--server_->callback_reqs_outstanding_ == 0) { - server_->callback_reqs_done_cv_.Signal(); + server_->callback_reqs_done_cv_.notify_one(); } } @@ -814,12 +814,12 @@ Server::Server( Server::~Server() { { - grpc::internal::ReleasableMutexLock lock(&mu_); + std::unique_lock lock(mu_); if (callback_cq_ != nullptr) { callback_cq_->Shutdown(); } if (started_ && !shutdown_) { - lock.Unlock(); + lock.unlock(); Shutdown(); } else if (!started_) { // Shutdown the completion queues @@ -1051,7 +1051,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) { } void Server::ShutdownInternal(gpr_timespec deadline) { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (shutdown_) { return; } @@ -1102,9 +1102,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // will report a failure, indicating a shutdown and again we won't end // up incrementing the counter. { - grpc::internal::MutexLock cblock(&callback_reqs_mu_); - callback_reqs_done_cv_.WaitUntil( - &callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; }); + std::unique_lock cblock(callback_reqs_mu_); + callback_reqs_done_cv_.wait( + cblock, [this] { return callback_reqs_outstanding_ == 0; }); } // Drain the shutdown queue (if the previous call to AsyncNext() timed out @@ -1114,13 +1114,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.Broadcast(); + shutdown_cv_.notify_all(); } void Server::Wait() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); while (started_ && !shutdown_notified_) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.wait(lock); } } @@ -1322,7 +1322,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered - grpc::internal::MutexLock l(&mu_); + std::lock_guard l(mu_); if (callback_cq_ == nullptr) { auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index eced89d1a79..2bbf0e727a1 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -33,7 +33,6 @@ #include #include "src/core/lib/gprpp/ref_counted.h" -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/call.h" namespace grpc { @@ -97,7 +96,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } void SetCancelCallback(std::function callback) { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); if (finalized_ && (cancelled_ != 0)) { callback(); @@ -108,7 +107,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } void ClearCancelCallback() { - grpc_core::MutexLock g(&mu_); + std::lock_guard g(mu_); cancel_callback_ = nullptr; } @@ -145,7 +144,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { private: bool CheckCancelledNoPluck() { - grpc_core::MutexLock lock(&mu_); + std::lock_guard g(mu_); return finalized_ ? (cancelled_ != 0) : false; } @@ -155,7 +154,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { void* tag_; void* core_cq_tag_; grpc_core::RefCount refs_; - grpc_core::Mutex mu_; + std::mutex mu_; bool finalized_; int cancelled_; // This is an int (not bool) because it is passed to core std::function cancel_callback_; @@ -187,7 +186,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { bool ret = false; - grpc_core::ReleasableMutexLock lock(&mu_); + std::unique_lock lock(mu_); if (done_intercepting_) { /* We are done intercepting. */ if (has_tag_) { @@ -219,12 +218,14 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { cancel_callback_(); } - // Release the lock since we may call a callback and interceptors now. - lock.Unlock(); + // Release the lock since we are going to be calling a callback and + // interceptors now + lock.unlock(); if (call_cancel && reactor_ != nullptr) { reactor_->MaybeCallOnCancel(); } + /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 2b65352f797..3e8606a76fd 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -62,7 +62,7 @@ ThreadManager::ThreadManager(const char* name, ThreadManager::~ThreadManager() { { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); GPR_ASSERT(num_threads_ == 0); } @@ -72,38 +72,38 @@ ThreadManager::~ThreadManager() { } void ThreadManager::Wait() { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); while (num_threads_ != 0) { - shutdown_cv_.Wait(&mu_); + shutdown_cv_.wait(lock); } } void ThreadManager::Shutdown() { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); shutdown_ = true; } bool ThreadManager::IsShutdown() { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); return shutdown_; } int ThreadManager::GetMaxActiveThreadsSoFar() { - grpc_core::MutexLock list_lock(&list_mu_); + std::lock_guard list_lock(list_mu_); return max_active_threads_sofar_; } void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { - grpc_core::MutexLock list_lock(&list_mu_); + std::lock_guard list_lock(list_mu_); completed_threads_.push_back(thd); } { - grpc_core::MutexLock lock(&mu_); + std::lock_guard lock(mu_); num_threads_--; if (num_threads_ == 0) { - shutdown_cv_.Signal(); + shutdown_cv_.notify_one(); } } @@ -116,7 +116,7 @@ void ThreadManager::CleanupCompletedThreads() { { // swap out the completed threads list: allows other threads to clean up // more quickly - grpc_core::MutexLock lock(&list_mu_); + std::unique_lock lock(list_mu_); completed_threads.swap(completed_threads_); } for (auto thd : completed_threads) delete thd; @@ -132,7 +132,7 @@ void ThreadManager::Initialize() { } { - grpc_core::MutexLock lock(&mu_); + std::unique_lock lock(mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; max_active_threads_sofar_ = min_pollers_; @@ -149,7 +149,7 @@ void ThreadManager::MainWorkLoop() { bool ok; WorkStatus work_status = PollForWork(&tag, &ok); - grpc_core::ReleasableMutexLock lock(&mu_); + std::unique_lock lock(mu_); // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; bool done = false; @@ -176,30 +176,30 @@ void ThreadManager::MainWorkLoop() { max_active_threads_sofar_ = num_threads_; } // Drop lock before spawning thread to avoid contention - lock.Unlock(); + lock.unlock(); new WorkerThread(this); } else if (num_pollers_ > 0) { // There is still at least some thread polling, so we can go on // even though we are below the number of pollers that we would // like to have (min_pollers_) - lock.Unlock(); + lock.unlock(); } else { // There are no pollers to spare and we couldn't allocate // a new thread, so resources are exhausted! - lock.Unlock(); + lock.unlock(); resource_exhausted = true; } } else { // There are a sufficient number of pollers available so we can do // the work and continue polling with our existing poller threads - lock.Unlock(); + lock.unlock(); } // Lock is always released at this point - do the application work // or return resource exhausted if there is new work but we couldn't // get a thread in which to do it. DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions - lock.Lock(); + lock.lock(); // If we're shutdown, we should finish at this point. if (shutdown_) done = true; break; diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 2fbf309d421..6f0bd17c5fe 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -26,7 +26,6 @@ #include -#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/resource_quota.h" @@ -141,10 +140,10 @@ class ThreadManager { // Protects shutdown_, num_pollers_, num_threads_ and // max_active_threads_sofar_ - grpc_core::Mutex mu_; + std::mutex mu_; bool shutdown_; - grpc_core::CondVar shutdown_cv_; + std::condition_variable shutdown_cv_; // The resource user object to use when requesting quota to create threads // @@ -170,7 +169,7 @@ class ThreadManager { // ever set so far int max_active_threads_sofar_; - grpc_core::Mutex list_mu_; + std::mutex list_mu_; std::list completed_threads_; }; diff --git a/test/cpp/client/client_channel_stress_test.cc b/test/cpp/client/client_channel_stress_test.cc index d326b2ed37e..91419cb257b 100644 --- a/test/cpp/client/client_channel_stress_test.cc +++ b/test/cpp/client/client_channel_stress_test.cc @@ -31,7 +31,6 @@ #include #include #include -#include #include #include @@ -169,24 +168,24 @@ class ClientChannelStressTest { explicit ServerThread(const grpc::string& type, const grpc::string& server_host, T* service) : type_(type), service_(service) { - grpc::internal::Mutex mu; + std::mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Start from firing before the wait below is hit. - grpc::internal::MutexLock lock(&mu); + std::unique_lock lock(mu); port_ = grpc_pick_unused_port_or_die(); gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); - grpc::internal::CondVar cond; + std::condition_variable cond; thread_.reset(new std::thread( std::bind(&ServerThread::Start, this, server_host, &mu, &cond))); - cond.Wait(&mu); + cond.wait(lock); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Start(const grpc::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Start(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(mu); + std::lock_guard lock(*mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -194,7 +193,7 @@ class ClientChannelStressTest { InsecureServerCredentials()); builder.RegisterService(service_); server_ = builder.BuildAndStart(); - cond->Signal(); + cond->notify_one(); } void Shutdown() { diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 6623a2ff55f..77f9db94acc 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -33,7 +33,6 @@ #include #include #include -#include #include #include @@ -99,7 +98,7 @@ class MyTestServiceImpl : public TestServiceImpl { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ++request_count_; } AddClient(context->peer()); @@ -107,29 +106,29 @@ class MyTestServiceImpl : public TestServiceImpl { } int request_count() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return request_count_; } void ResetCounters() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); request_count_ = 0; } std::set clients() { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); clients_.insert(client); } - grpc::internal::Mutex mu_; + std::mutex mu_; int request_count_; - grpc::internal::Mutex clients_mu_; + std::mutex clients_mu_; std::set clients_; }; @@ -294,18 +293,18 @@ class ClientLbEnd2endTest : public ::testing::Test { void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); started_ = true; - grpc::internal::Mutex mu; - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; + std::mutex mu; + std::unique_lock lock(mu); + std::condition_variable cond; thread_.reset(new std::thread( std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); - cond.WaitUntil(&mu, [this] { return server_ready_; }); + cond.wait(lock, [this] { return server_ready_; }); server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } - void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -314,9 +313,9 @@ class ClientLbEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), std::move(creds)); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - grpc::internal::MutexLock lock(mu); + std::lock_guard lock(*mu); server_ready_ = true; - cond->Signal(); + cond->notify_one(); } void Shutdown() { @@ -1375,7 +1374,7 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { void TearDown() override { ClientLbEnd2endTest::TearDown(); } int trailers_intercepted() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return trailers_intercepted_; } @@ -1383,11 +1382,11 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { static void ReportTrailerIntercepted(void* arg) { ClientLbInterceptTrailingMetadataTest* self = static_cast(arg); - grpc::internal::MutexLock lock(&self->mu_); + std::unique_lock lock(self->mu_); self->trailers_intercepted_++; } - grpc::internal::Mutex mu_; + std::mutex mu_; int trailers_intercepted_ = 0; }; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f8d887dd24d..0dc3746c2d7 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -30,7 +30,6 @@ #include #include #include -#include #include #include @@ -86,32 +85,32 @@ template class CountedService : public ServiceType { public: size_t request_count() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return request_count_; } size_t response_count() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return response_count_; } void IncreaseResponseCount() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ++response_count_; } void IncreaseRequestCount() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ++request_count_; } void ResetCounters() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); request_count_ = 0; response_count_ = 0; } protected: - grpc::internal::Mutex mu_; + std::mutex mu_; private: size_t request_count_ = 0; @@ -149,18 +148,18 @@ class BackendServiceImpl : public BackendService { void Shutdown() {} std::set clients() { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); clients_.insert(client); } - grpc::internal::Mutex mu_; - grpc::internal::Mutex clients_mu_; + std::mutex mu_; + std::mutex clients_mu_; std::set clients_; }; @@ -211,7 +210,7 @@ class BalancerServiceImpl : public BalancerService { Status BalanceLoad(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (serverlist_done_) goto done; } { @@ -238,7 +237,7 @@ class BalancerServiceImpl : public BalancerService { } { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { @@ -246,8 +245,8 @@ class BalancerServiceImpl : public BalancerService { response_and_delay.second); } { - grpc::internal::MutexLock lock(&mu_); - serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); + std::unique_lock lock(mu_); + serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); } if (client_load_reporting_interval_seconds_ > 0) { @@ -258,7 +257,7 @@ class BalancerServiceImpl : public BalancerService { GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(&mu_); + std::lock_guard lock(mu_); client_stats_.num_calls_started += request.client_stats().num_calls_started(); client_stats_.num_calls_finished += @@ -275,7 +274,7 @@ class BalancerServiceImpl : public BalancerService { drop_token_count.num_calls(); } load_report_ready_ = true; - load_report_cond_.Signal(); + load_report_cond_.notify_one(); } } } @@ -285,12 +284,12 @@ class BalancerServiceImpl : public BalancerService { } void add_response(const LoadBalanceResponse& response, int send_after_ms) { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } void Start() { - grpc::internal::MutexLock lock(&mu_); + std::lock_guard lock(mu_); serverlist_done_ = false; load_report_ready_ = false; responses_and_delays_.clear(); @@ -327,17 +326,17 @@ class BalancerServiceImpl : public BalancerService { } const ClientStats& WaitForLoadReport() { - grpc::internal::MutexLock lock(&mu_); - load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; }); + std::unique_lock lock(mu_); + load_report_cond_.wait(lock, [this] { return load_report_ready_; }); load_report_ready_ = false; return client_stats_; } void NotifyDoneWithServerlists() { - grpc::internal::MutexLock lock(&mu_); + std::lock_guard lock(mu_); if (!serverlist_done_) { serverlist_done_ = true; - serverlist_cond_.Broadcast(); + serverlist_cond_.notify_all(); } } @@ -356,10 +355,10 @@ class BalancerServiceImpl : public BalancerService { const int client_load_reporting_interval_seconds_; std::vector responses_and_delays_; - grpc::internal::Mutex mu_; - grpc::internal::CondVar load_report_cond_; + std::mutex mu_; + std::condition_variable load_report_cond_; bool load_report_ready_ = false; - grpc::internal::CondVar serverlist_cond_; + std::condition_variable serverlist_cond_; bool serverlist_done_ = false; ClientStats client_stats_; }; @@ -625,22 +624,22 @@ class GrpclbEnd2endTest : public ::testing::Test { GPR_ASSERT(!running_); running_ = true; service_.Start(); - grpc::internal::Mutex mu; + std::mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Serve from firing before the wait below is hit. - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; + std::unique_lock lock(mu); + std::condition_variable cond; thread_.reset(new std::thread( std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); - cond.Wait(&mu); + cond.wait(lock); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(mu); + std::lock_guard lock(*mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -649,7 +648,7 @@ class GrpclbEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), creds); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - cond->Signal(); + cond->notify_one(); } void Shutdown() { diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index 5b8af61ee33..e30ce0dbcbf 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -25,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -189,7 +188,7 @@ class CommonStressTestAsyncServer : public BaseClass { } void TearDown() override { { - grpc::internal::MutexLock l(&mu_); + std::unique_lock l(mu_); this->TearDownStart(); shutting_down_ = true; cq_->Shutdown(); @@ -230,7 +229,7 @@ class CommonStressTestAsyncServer : public BaseClass { } } void RefreshContext(int i) { - grpc::internal::MutexLock l(&mu_); + std::unique_lock l(mu_); if (!shutting_down_) { contexts_[i].state = Context::READY; contexts_[i].srv_ctx.reset(new ServerContext); @@ -254,7 +253,7 @@ class CommonStressTestAsyncServer : public BaseClass { ::grpc::testing::EchoTestService::AsyncService service_; std::unique_ptr cq_; bool shutting_down_; - grpc::internal::Mutex mu_; + std::mutex mu_; std::vector server_threads_; }; @@ -342,9 +341,9 @@ class AsyncClientEnd2endTest : public ::testing::Test { } void Wait() { - grpc::internal::MutexLock l(&mu_); + std::unique_lock l(mu_); while (rpcs_outstanding_ != 0) { - cv_.Wait(&mu_); + cv_.wait(l); } cq_.Shutdown(); @@ -367,7 +366,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { call->response_reader->Finish(&call->response, &call->status, (void*)call); - grpc::internal::MutexLock l(&mu_); + std::unique_lock l(mu_); rpcs_outstanding_++; } } @@ -385,20 +384,20 @@ class AsyncClientEnd2endTest : public ::testing::Test { bool notify; { - grpc::internal::MutexLock l(&mu_); + std::unique_lock l(mu_); rpcs_outstanding_--; notify = (rpcs_outstanding_ == 0); } if (notify) { - cv_.Signal(); + cv_.notify_all(); } } } Common common_; CompletionQueue cq_; - grpc::internal::Mutex mu_; - grpc::internal::CondVar cv_; + std::mutex mu_; + std::condition_variable cv_; int rpcs_outstanding_; }; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index ee248239909..c767a4485ce 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -84,32 +84,32 @@ template class CountedService : public ServiceType { public: size_t request_count() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return request_count_; } size_t response_count() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); return response_count_; } void IncreaseResponseCount() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ++response_count_; } void IncreaseRequestCount() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); ++request_count_; } void ResetCounters() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); request_count_ = 0; response_count_ = 0; } protected: - grpc::internal::Mutex mu_; + std::mutex mu_; private: size_t request_count_ = 0; @@ -145,18 +145,18 @@ class BackendServiceImpl : public BackendService { void Shutdown() {} std::set clients() { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - grpc::internal::MutexLock lock(&clients_mu_); + std::unique_lock lock(clients_mu_); clients_.insert(client); } - grpc::internal::Mutex mu_; - grpc::internal::Mutex clients_mu_; + std::mutex mu_; + std::mutex clients_mu_; std::set clients_; }; @@ -208,7 +208,7 @@ class BalancerServiceImpl : public BalancerService { // TODO(juanlishen): Clean up the scoping. gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); if (serverlist_done_) goto done; } { @@ -234,7 +234,7 @@ class BalancerServiceImpl : public BalancerService { } { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { @@ -242,8 +242,8 @@ class BalancerServiceImpl : public BalancerService { response_and_delay.second); } { - grpc::internal::MutexLock lock(&mu_); - serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); + std::unique_lock lock(mu_); + serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); } if (client_load_reporting_interval_seconds_ > 0) { @@ -254,7 +254,7 @@ class BalancerServiceImpl : public BalancerService { GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(&mu_); + std::lock_guard lock(mu_); client_stats_.num_calls_started += request.client_stats().num_calls_started(); client_stats_.num_calls_finished += @@ -271,7 +271,7 @@ class BalancerServiceImpl : public BalancerService { drop_token_count.num_calls(); } load_report_ready_ = true; - load_report_cond_.Signal(); + load_report_cond_.notify_one(); } } } @@ -281,12 +281,12 @@ class BalancerServiceImpl : public BalancerService { } void add_response(const LoadBalanceResponse& response, int send_after_ms) { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } void Shutdown() { - grpc::internal::MutexLock lock(&mu_); + std::unique_lock lock(mu_); NotifyDoneWithServerlistsLocked(); responses_and_delays_.clear(); client_stats_.Reset(); @@ -318,21 +318,21 @@ class BalancerServiceImpl : public BalancerService { } const ClientStats& WaitForLoadReport() { - grpc::internal::MutexLock lock(&mu_); - load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; }); + std::unique_lock lock(mu_); + load_report_cond_.wait(lock, [this] { return load_report_ready_; }); load_report_ready_ = false; return client_stats_; } void NotifyDoneWithServerlists() { - grpc::internal::MutexLock lock(&mu_); + std::lock_guard lock(mu_); NotifyDoneWithServerlistsLocked(); } void NotifyDoneWithServerlistsLocked() { if (!serverlist_done_) { serverlist_done_ = true; - serverlist_cond_.Broadcast(); + serverlist_cond_.notify_all(); } } @@ -351,10 +351,10 @@ class BalancerServiceImpl : public BalancerService { const int client_load_reporting_interval_seconds_; std::vector responses_and_delays_; - grpc::internal::Mutex mu_; - grpc::internal::CondVar load_report_cond_; + std::mutex mu_; + std::condition_variable load_report_cond_; bool load_report_ready_ = false; - grpc::internal::CondVar serverlist_cond_; + std::condition_variable serverlist_cond_; bool serverlist_done_ = false; ClientStats client_stats_; }; @@ -637,22 +637,22 @@ class XdsEnd2endTest : public ::testing::Test { gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); GPR_ASSERT(!running_); running_ = true; - grpc::internal::Mutex mu; + std::mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Serve from firing before the wait below is hit. - grpc::internal::MutexLock lock(&mu); - grpc::internal::CondVar cond; + std::unique_lock lock(mu); + std::condition_variable cond; thread_.reset(new std::thread( std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); - cond.Wait(&mu); + cond.wait(lock); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, - grpc::internal::CondVar* cond) { + void Serve(const grpc::string& server_host, std::mutex* mu, + std::condition_variable* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - grpc::internal::MutexLock lock(mu); + std::lock_guard lock(*mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -661,7 +661,7 @@ class XdsEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), creds); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - cond->Signal(); + cond->notify_one(); } void Shutdown() { diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index a0a6e952409..7274f9588b3 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -987,7 +987,6 @@ include/grpcpp/impl/codegen/status.h \ include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/stub_options.h \ -include/grpcpp/impl/codegen/sync.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/grpc_library.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 21e5b49440f..f169199ae96 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -989,7 +989,6 @@ include/grpcpp/impl/codegen/status.h \ include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/stub_options.h \ -include/grpcpp/impl/codegen/sync.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/grpc_library.h \ @@ -1086,12 +1085,12 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/map.h \ src/core/lib/gprpp/memory.h \ +src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ -src/core/lib/gprpp/sync.h \ src/core/lib/gprpp/thd.h \ src/core/lib/http/format_request.h \ src/core/lib/http/httpcli.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 1817be44bc3..e68e758c64e 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1166,12 +1166,12 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/map.h \ src/core/lib/gprpp/memory.h \ +src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ -src/core/lib/gprpp/sync.h \ src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd_posix.cc \ src/core/lib/gprpp/thd_windows.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 29372824822..29fd6f32e59 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8033,8 +8033,8 @@ "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", + "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", - "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h" ], @@ -8081,8 +8081,8 @@ "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", + "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", - "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h" ], @@ -9860,7 +9860,6 @@ }, { "deps": [ - "grpc++_internal_hdrs_only", "grpc_codegen" ], "headers": [ @@ -10057,7 +10056,6 @@ "gpr", "gpr_base_headers", "grpc++_codegen_base", - "grpc++_internal_hdrs_only", "grpc_base_headers", "grpc_transport_inproc_headers", "health_proto", @@ -10352,20 +10350,6 @@ "third_party": false, "type": "filegroup" }, - { - "deps": [], - "headers": [ - "include/grpcpp/impl/codegen/sync.h" - ], - "is_filegroup": true, - "language": "c++", - "name": "grpc++_internal_hdrs_only", - "src": [ - "include/grpcpp/impl/codegen/sync.h" - ], - "third_party": false, - "type": "filegroup" - }, { "deps": [], "headers": [ From a31e86b7806a7e79f1ca6a21dc9c0feee5aeaef3 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 11 Apr 2019 17:54:33 -0700 Subject: [PATCH 2/3] Revert remaining conversions --- include/grpcpp/server.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/grpcpp/server.h b/include/grpcpp/server.h index 8aff0663fe2..2ae9d712012 100644 --- a/include/grpcpp/server.h +++ b/include/grpcpp/server.h @@ -297,12 +297,12 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { experimental_registration_type experimental_registration_{this}; // Server status - grpc::internal::Mutex mu_; + std::mutex mu_; bool started_; bool shutdown_; bool shutdown_notified_; // Was notify called on the shutdown_cv_ - grpc::internal::CondVar shutdown_cv_; + std::condition_variable shutdown_cv_; // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be @@ -311,8 +311,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen { // during periods of increasing load; the decrement happens only when memory // is maxed out, during server shutdown, or (possibly in a future version) // during decreasing load, so it is less performance-critical. - grpc::internal::Mutex callback_reqs_mu_; - grpc::internal::CondVar callback_reqs_done_cv_; + std::mutex callback_reqs_mu_; + std::condition_variable callback_reqs_done_cv_; std::atomic_int callback_reqs_outstanding_{0}; std::shared_ptr global_callbacks_; From 880796e8ba097eaea8c3f030ea00a8ce4c3bfa0c Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 11 Apr 2019 17:56:41 -0700 Subject: [PATCH 3/3] Revert "Ban std:: sync constructs" This reverts commit bb8ba4547aa04f720fb418f6e8d76ffb9d4730ea. --- .../run_tests/sanity/cpp_banned_constructs.sh | 29 ------------------- tools/run_tests/sanity/sanity_tests.yaml | 1 - 2 files changed, 30 deletions(-) delete mode 100755 tools/run_tests/sanity/cpp_banned_constructs.sh diff --git a/tools/run_tests/sanity/cpp_banned_constructs.sh b/tools/run_tests/sanity/cpp_banned_constructs.sh deleted file mode 100755 index 68240d6f831..00000000000 --- a/tools/run_tests/sanity/cpp_banned_constructs.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/sh -# Copyright 2019 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -e - -cd "$(dirname "$0")/../../.." - -# -# Prevent the use of synchronization and threading constructs from std:: since -# the code should be using grpc_core::Mutex, grpc::internal::Mutex, -# grpc_core::Thread, etc. -# - -egrep -Irn \ - 'std::(mutex|condition_variable|lock_guard|unique_lock|thread)' \ - include/grpc include/grpcpp src/core src/cpp | diff - /dev/null - diff --git a/tools/run_tests/sanity/sanity_tests.yaml b/tools/run_tests/sanity/sanity_tests.yaml index 40686e0f3ab..1913edd4257 100644 --- a/tools/run_tests/sanity/sanity_tests.yaml +++ b/tools/run_tests/sanity/sanity_tests.yaml @@ -10,7 +10,6 @@ - script: tools/run_tests/sanity/check_tracer_sanity.py - script: tools/run_tests/sanity/core_banned_functions.py - script: tools/run_tests/sanity/core_untyped_structs.sh -- script: tools/run_tests/sanity/cpp_banned_constructs.sh - script: tools/run_tests/sanity/check_deprecated_grpc++.py - script: tools/run_tests/sanity/check_port_platform.py - script: tools/buildgen/generate_projects.sh -j 3