diff --git a/BUILD b/BUILD index 914a0b9bf92..6bea6557904 100644 --- a/BUILD +++ b/BUILD @@ -526,6 +526,17 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc++_internal_hdrs_only", + hdrs = [ + "include/grpcpp/impl/codegen/sync.h", + ], + language = "c++", + deps = [ + "gpr_codegen", + ], +) + grpc_cc_library( name = "gpr_base", srcs = [ @@ -591,8 +602,8 @@ grpc_cc_library( "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", - "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", + "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h", ], @@ -2146,6 +2157,7 @@ grpc_cc_library( "include/grpcpp/impl/codegen/time.h", ], deps = [ + "grpc++_internal_hdrs_only", "grpc_codegen", ], ) diff --git a/BUILD.gn b/BUILD.gn index 0813898bcb7..57e15800e20 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -186,8 +186,8 @@ config("grpc_config") { "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", - "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", + "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd_posix.cc", "src/core/lib/gprpp/thd_windows.cc", @@ -1064,6 +1064,7 @@ config("grpc_config") { "include/grpcpp/impl/codegen/status_code_enum.h", "include/grpcpp/impl/codegen/string_ref.h", "include/grpcpp/impl/codegen/stub_options.h", + "include/grpcpp/impl/codegen/sync.h", "include/grpcpp/impl/codegen/sync_stream.h", "include/grpcpp/impl/codegen/time.h", "include/grpcpp/impl/grpc_library.h", @@ -1159,12 +1160,12 @@ config("grpc_config") { "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", - "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted_ptr.h", + "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/http/format_request.h", "src/core/lib/http/httpcli.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 89bd64743ff..72d2a76d64d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3181,6 +3181,7 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h + include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -3784,6 +3785,7 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h + include/grpcpp/impl/codegen/sync.h include/grpc/census.h ) string(REPLACE "include/" "" _path ${_hdr}) @@ -4238,6 +4240,7 @@ foreach(_hdr include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h + include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -4434,6 +4437,7 @@ foreach(_hdr include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_windows.h + include/grpcpp/impl/codegen/sync.h include/grpc++/impl/codegen/proto_utils.h include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_writer.h @@ -4761,6 +4765,7 @@ foreach(_hdr include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/time.h + include/grpcpp/impl/codegen/sync.h ) string(REPLACE "include/" "" _path ${_hdr}) get_filename_component(_path ${_path} PATH) diff --git a/Makefile b/Makefile index e777f69e921..3acdcb256d3 100644 --- a/Makefile +++ b/Makefile @@ -5512,6 +5512,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ + include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -6123,6 +6124,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ + include/grpcpp/impl/codegen/sync.h \ include/grpc/census.h \ LIBGRPC++_CRONET_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_CRONET_SRC)))) @@ -6549,6 +6551,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ + include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -6716,6 +6719,7 @@ PUBLIC_HEADERS_CXX += \ include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_windows.h \ + include/grpcpp/impl/codegen/sync.h \ include/grpc++/impl/codegen/proto_utils.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \ @@ -7049,6 +7053,7 @@ PUBLIC_HEADERS_CXX += \ include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ + include/grpcpp/impl/codegen/sync.h \ LIBGRPC++_UNSECURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_UNSECURE_SRC)))) diff --git a/build.yaml b/build.yaml index a539f062d2e..99f9ef35f63 100644 --- a/build.yaml +++ b/build.yaml @@ -196,8 +196,8 @@ filegroups: - src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/map.h - src/core/lib/gprpp/memory.h - - src/core/lib/gprpp/mutex_lock.h - src/core/lib/gprpp/pair.h + - src/core/lib/gprpp/sync.h - src/core/lib/gprpp/thd.h - src/core/lib/profiling/timers.h uses: @@ -1276,6 +1276,7 @@ filegroups: - include/grpcpp/impl/codegen/time.h uses: - grpc_codegen + - grpc++_internal_hdrs_only - name: grpc++_codegen_base_src language: c++ src: @@ -1451,6 +1452,7 @@ filegroups: - grpc_base_headers - grpc_transport_inproc_headers - grpc++_codegen_base + - grpc++_internal_hdrs_only - nanopb_headers - health_proto - name: grpc++_config_proto @@ -1458,6 +1460,10 @@ filegroups: public_headers: - include/grpc++/impl/codegen/config_protobuf.h - include/grpcpp/impl/codegen/config_protobuf.h +- name: grpc++_internal_hdrs_only + language: c++ + public_headers: + - include/grpcpp/impl/codegen/sync.h - name: grpc++_reflection_proto language: c++ src: diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 58a4c82253b..7f1ae557537 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -184,7 +184,8 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/codegen/string_ref.h', 'include/grpcpp/impl/codegen/stub_options.h', 'include/grpcpp/impl/codegen/sync_stream.h', - 'include/grpcpp/impl/codegen/time.h' + 'include/grpcpp/impl/codegen/time.h', + 'include/grpcpp/impl/codegen/sync.h' end s.subspec 'Implementation' do |ss| @@ -267,8 +268,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', - 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', + 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h', @@ -584,8 +585,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', - 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', + 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/lib/avl/avl.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index bab7ce4378e..c58152cf7f8 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -210,8 +210,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', - 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', + 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/lib/gpr/alloc.cc', @@ -889,8 +889,8 @@ Pod::Spec.new do |s| 'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/memory.h', - 'src/core/lib/gprpp/mutex_lock.h', 'src/core/lib/gprpp/pair.h', + 'src/core/lib/gprpp/sync.h', 'src/core/lib/gprpp/thd.h', 'src/core/lib/profiling/timers.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h', diff --git a/grpc.gemspec b/grpc.gemspec index b9d2107ee48..c41db6f6293 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -104,8 +104,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/gprpp/manual_constructor.h ) s.files += %w( src/core/lib/gprpp/map.h ) s.files += %w( src/core/lib/gprpp/memory.h ) - s.files += %w( src/core/lib/gprpp/mutex_lock.h ) s.files += %w( src/core/lib/gprpp/pair.h ) + s.files += %w( src/core/lib/gprpp/sync.h ) s.files += %w( src/core/lib/gprpp/thd.h ) s.files += %w( src/core/lib/profiling/timers.h ) s.files += %w( src/core/lib/gpr/alloc.cc ) diff --git a/include/grpcpp/channel.h b/include/grpcpp/channel.h index ee833960698..c4d5ab1177b 100644 --- a/include/grpcpp/channel.h +++ b/include/grpcpp/channel.h @@ -28,6 +28,7 @@ #include #include #include +#include struct grpc_channel; @@ -97,7 +98,7 @@ class Channel final : public ChannelInterface, grpc_channel* const c_channel_; // owned // mu_ protects callback_cq_ (the per-channel callbackable completion queue) - std::mutex mu_; + grpc::internal::Mutex mu_; // callback_cq_ references the callbackable completion queue associated // with this channel (if any). It is set on the first call to CallbackCQ(). diff --git a/include/grpcpp/impl/codegen/client_context.h b/include/grpcpp/impl/codegen/client_context.h index 5946488566e..85bbf36f06d 100644 --- a/include/grpcpp/impl/codegen/client_context.h +++ b/include/grpcpp/impl/codegen/client_context.h @@ -51,6 +51,7 @@ #include #include #include +#include #include struct census_context; @@ -457,7 +458,7 @@ class ClientContext { bool idempotent_; bool cacheable_; std::shared_ptr channel_; - std::mutex mu_; + grpc::internal::Mutex mu_; grpc_call* call_; bool call_canceled_; gpr_timespec deadline_; diff --git a/include/grpcpp/impl/codegen/sync.h b/include/grpcpp/impl/codegen/sync.h new file mode 100644 index 00000000000..2ed71eeb9f2 --- /dev/null +++ b/include/grpcpp/impl/codegen/sync.h @@ -0,0 +1,138 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_IMPL_CODEGEN_SYNC_H +#define GRPCPP_IMPL_CODEGEN_SYNC_H + +#include +#include +#include + +#include + +// The core library is not accessible in C++ codegen headers, and vice versa. +// Thus, we need to have duplicate headers with similar functionality. +// Make sure any change to this file is also reflected in +// src/core/lib/gprpp/sync.h too. +// +// Whenever possible, prefer "src/core/lib/gprpp/sync.h" over this file, +// since in core we do not rely on g_core_codegen_interface and hence do not +// pay the costs of virtual function calls. + +namespace grpc { +namespace internal { + +class Mutex { + public: + Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); } + ~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); } + + Mutex(const Mutex&) = delete; + Mutex& operator=(const Mutex&) = delete; + + gpr_mu* get() { return &mu_; } + const gpr_mu* get() const { return &mu_; } + + private: + gpr_mu mu_; +}; + +// MutexLock is a std:: +class MutexLock { + public: + explicit MutexLock(Mutex* mu) : mu_(mu->get()) { + g_core_codegen_interface->gpr_mu_lock(mu_); + } + explicit MutexLock(gpr_mu* mu) : mu_(mu) { + g_core_codegen_interface->gpr_mu_lock(mu_); + } + ~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); } + + MutexLock(const MutexLock&) = delete; + MutexLock& operator=(const MutexLock&) = delete; + + private: + gpr_mu* const mu_; +}; + +class ReleasableMutexLock { + public: + explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { + g_core_codegen_interface->gpr_mu_lock(mu_); + } + explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { + g_core_codegen_interface->gpr_mu_lock(mu_); + } + ~ReleasableMutexLock() { + if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_); + } + + ReleasableMutexLock(const ReleasableMutexLock&) = delete; + ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; + + void Lock() { + GPR_DEBUG_ASSERT(released_); + g_core_codegen_interface->gpr_mu_lock(mu_); + released_ = false; + } + + void Unlock() { + GPR_DEBUG_ASSERT(!released_); + released_ = true; + g_core_codegen_interface->gpr_mu_unlock(mu_); + } + + private: + gpr_mu* const mu_; + bool released_ = false; +}; + +class CondVar { + public: + CondVar() { g_core_codegen_interface->gpr_cv_init(&cv_); } + ~CondVar() { g_core_codegen_interface->gpr_cv_destroy(&cv_); } + + CondVar(const CondVar&) = delete; + CondVar& operator=(const CondVar&) = delete; + + void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); } + void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); } + + int Wait(Mutex* mu) { + return Wait(mu, + g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); + } + int Wait(Mutex* mu, const gpr_timespec& deadline) { + return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline); + } + + template + void WaitUntil(Mutex* mu, Predicate pred) { + while (!pred()) { + Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME)); + } + } + + private: + gpr_cv cv_; +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_CODEGEN_SYNC_H diff --git a/include/grpcpp/server_impl.h b/include/grpcpp/server_impl.h index 771a3a10be9..14b16a06f4e 100644 --- a/include/grpcpp/server_impl.h +++ b/include/grpcpp/server_impl.h @@ -304,12 +304,12 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { experimental_registration_type experimental_registration_{this}; // Server status - std::mutex mu_; + grpc::internal::Mutex mu_; bool started_; bool shutdown_; bool shutdown_notified_; // Was notify called on the shutdown_cv_ - std::condition_variable shutdown_cv_; + grpc::internal::CondVar shutdown_cv_; // It is ok (but not required) to nest callback_reqs_mu_ under mu_ . // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be @@ -318,8 +318,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { // during periods of increasing load; the decrement happens only when memory // is maxed out, during server shutdown, or (possibly in a future version) // during decreasing load, so it is less performance-critical. - std::mutex callback_reqs_mu_; - std::condition_variable callback_reqs_done_cv_; + grpc::internal::Mutex callback_reqs_mu_; + grpc::internal::CondVar callback_reqs_done_cv_; std::atomic_int callback_reqs_outstanding_{0}; std::shared_ptr global_callbacks_; diff --git a/package.xml b/package.xml index f3bbb449ac5..5be3d24365d 100644 --- a/package.xml +++ b/package.xml @@ -109,8 +109,8 @@ - + diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 86938a51d9b..cd552739732 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -51,7 +51,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index a22d6450cbd..a99f1e54062 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -27,7 +27,7 @@ #include "pb_encode.h" #include "src/core/ext/filters/client_channel/health/health.pb.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/status_metadata.h" @@ -69,7 +69,6 @@ HealthCheckClient::HealthCheckClient( } GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this, grpc_schedule_on_exec_ctx); - gpr_mu_init(&mu_); StartCall(); } @@ -78,7 +77,6 @@ HealthCheckClient::~HealthCheckClient() { gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this); } GRPC_ERROR_UNREF(error_); - gpr_mu_destroy(&mu_); } void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state, diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 1fa4487c403..6e0123e4925 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -31,6 +31,7 @@ #include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/polling_entity.h" @@ -157,7 +158,7 @@ class HealthCheckClient : public InternallyRefCounted { grpc_pollset_set* interested_parties_; // Do not own. RefCountedPtr channelz_node_; - gpr_mu mu_; + Mutex mu_; grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; grpc_error* error_ = GRPC_ERROR_NONE; grpc_connectivity_state* notify_state_ = nullptr; diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 2b1eb92bbd4..90a79843458 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -33,7 +33,7 @@ #include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/slice/slice_internal.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index aebd2fd3faa..ead15308f49 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -88,7 +88,6 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc index 84b9c41a734..35123633feb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc @@ -25,7 +25,7 @@ #include #include -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_core { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index fdebdf55c17..bcc6598c105 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -26,6 +26,7 @@ #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" namespace grpc_core { @@ -41,9 +42,6 @@ class GrpcLbClientStats : public RefCounted { typedef InlinedVector DroppedCallCounts; - GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); } - ~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); } - void AddCallStarted(); void AddCallFinished(bool finished_with_client_failed_to_send, bool finished_known_received); @@ -66,7 +64,7 @@ class GrpcLbClientStats : public RefCounted { gpr_atm num_calls_finished_ = 0; gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; gpr_atm num_calls_finished_known_received_ = 0; - gpr_mu drop_count_mu_; // Guards drop_token_counts_. + Mutex drop_count_mu_; // Guards drop_token_counts_. UniquePtr drop_token_counts_; }; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 332f66808e0..892472e1c9b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -27,7 +27,7 @@ #include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -154,13 +154,12 @@ class PickFirst : public LoadBalancingPolicy { /// Lock and data used to capture snapshots of this channels child /// channels and subchannels. This data is consumed by channelz. - gpr_mu child_refs_mu_; + Mutex child_refs_mu_; channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_channels_; }; PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { - gpr_mu_init(&child_refs_mu_); if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Pick First %p created.", this); } @@ -170,7 +169,6 @@ PickFirst::~PickFirst() { if (grpc_lb_pick_first_trace.enabled()) { gpr_log(GPR_INFO, "Destroying Pick First %p", this); } - gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index d3faaaddc98..29cfe972f78 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -36,8 +36,8 @@ #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -193,7 +193,7 @@ class RoundRobin : public LoadBalancingPolicy { bool shutdown_ = false; /// Lock and data used to capture snapshots of this channel's child /// channels and subchannels. This data is consumed by channelz. - gpr_mu child_refs_mu_; + Mutex child_refs_mu_; channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_channels_; }; @@ -245,7 +245,6 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick, // RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { - gpr_mu_init(&child_refs_mu_); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Created", this); } @@ -255,7 +254,6 @@ RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } - gpr_mu_destroy(&child_refs_mu_); GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc index fe49e9aca03..443deb01274 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -89,9 +89,9 @@ #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -278,10 +278,8 @@ class XdsLb : public LoadBalancingPolicy { class LocalityEntry : public InternallyRefCounted { public: explicit LocalityEntry(RefCountedPtr parent) - : parent_(std::move(parent)) { - gpr_mu_init(&child_policy_mu_); - } - ~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); } + : parent_(std::move(parent)) {} + ~LocalityEntry() = default; void UpdateLocked(xds_grpclb_serverlist* serverlist, LoadBalancingPolicy::Config* child_policy_config, @@ -323,13 +321,10 @@ class XdsLb : public LoadBalancingPolicy { OrphanablePtr pending_child_policy_; // Lock held when modifying the value of child_policy_ or // pending_child_policy_. - gpr_mu child_policy_mu_; + Mutex child_policy_mu_; RefCountedPtr parent_; }; - LocalityMap() { gpr_mu_init(&child_refs_mu_); } - ~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); } - void UpdateLocked(const LocalityList& locality_list, LoadBalancingPolicy::Config* child_policy_config, const grpc_channel_args* args, XdsLb* parent); @@ -343,7 +338,7 @@ class XdsLb : public LoadBalancingPolicy { Map, OrphanablePtr, StringLess> map_; // Lock held while filling child refs for all localities // inside the map - gpr_mu child_refs_mu_; + Mutex child_refs_mu_; }; struct LocalityServerlistEntry { @@ -397,7 +392,7 @@ class XdsLb : public LoadBalancingPolicy { // Mutex to protect the channel to the LB server. This is used when // processing a channelz request. // TODO(juanlishen): Replace this with atomic. - gpr_mu lb_chand_mu_; + Mutex lb_chand_mu_; // Timeout in milliseconds for the LB call. 0 means no deadline. int lb_call_timeout_ms_ = 0; @@ -1090,7 +1085,6 @@ XdsLb::XdsLb(Args args) : LoadBalancingPolicy(std::move(args)), locality_map_(), locality_serverlist_() { - gpr_mu_init(&lb_chand_mu_); // Record server name. const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const char* server_uri = grpc_channel_arg_get_string(arg); @@ -1114,7 +1108,6 @@ XdsLb::XdsLb(Args args) } XdsLb::~XdsLb() { - gpr_mu_destroy(&lb_chand_mu_); gpr_free((void*)server_name_); grpc_channel_args_destroy(args_); locality_serverlist_.clear(); diff --git a/src/core/ext/filters/client_channel/resolving_lb_policy.cc b/src/core/ext/filters/client_channel/resolving_lb_policy.cc index d15af908b3f..a2367e09d9b 100644 --- a/src/core/ext/filters/client_channel/resolving_lb_policy.cc +++ b/src/core/ext/filters/client_channel/resolving_lb_policy.cc @@ -48,7 +48,7 @@ #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 8bb0c4c3498..3d3815384aa 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -42,8 +42,8 @@ #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -457,13 +457,14 @@ struct Subchannel::ExternalStateWatcher { grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, w->pollset_set); } - gpr_mu_lock(&w->subchannel->mu_); - if (w->subchannel->external_state_watcher_list_ == w) { - w->subchannel->external_state_watcher_list_ = w->next; + { + MutexLock lock(&w->subchannel->mu_); + if (w->subchannel->external_state_watcher_list_ == w) { + w->subchannel->external_state_watcher_list_ = w->next; + } + if (w->next != nullptr) w->next->prev = w->prev; + if (w->prev != nullptr) w->prev->next = w->next; } - if (w->next != nullptr) w->next->prev = w->prev; - if (w->prev != nullptr) w->prev->next = w->next; - gpr_mu_unlock(&w->subchannel->mu_); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); Delete(w); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); @@ -585,7 +586,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, "subchannel"); grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, "subchannel"); - gpr_mu_init(&mu_); // Check whether we should enable health checking. const char* service_config_json = grpc_channel_arg_get_string( grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG)); @@ -629,7 +629,6 @@ Subchannel::~Subchannel() { grpc_connector_unref(connector_); grpc_pollset_set_destroy(pollset_set_); Delete(key_); - gpr_mu_destroy(&mu_); } Subchannel* Subchannel::Create(grpc_connector* connector, @@ -905,7 +904,9 @@ void Subchannel::MaybeStartConnectingLocked() { void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { Subchannel* c = static_cast(arg); - gpr_mu_lock(&c->mu_); + // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use + // MutexLock instead of ReleasableMutexLock, here. + ReleasableMutexLock lock(&c->mu_); c->have_retry_alarm_ = false; if (c->disconnected_) { error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", @@ -919,9 +920,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); c->ContinueConnectingLocked(); - gpr_mu_unlock(&c->mu_); + lock.Unlock(); } else { - gpr_mu_unlock(&c->mu_); + lock.Unlock(); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } GRPC_ERROR_UNREF(error); @@ -948,29 +949,30 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { auto* c = static_cast(arg); grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); - gpr_mu_lock(&c->mu_); - c->connecting_ = false; - if (c->connecting_result_.transport != nullptr && - c->PublishTransportLocked()) { - // Do nothing, transport was published. - } else if (c->disconnected_) { - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } else { - const char* errmsg = grpc_error_string(error); - gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - error = - grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), "connect_failed"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, - GRPC_CHANNEL_TRANSIENT_FAILURE, error, - "connect_failed"); - c->MaybeStartConnectingLocked(); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + { + MutexLock lock(&c->mu_); + c->connecting_ = false; + if (c->connecting_result_.transport != nullptr && + c->PublishTransportLocked()) { + // Do nothing, transport was published. + } else if (c->disconnected_) { + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } else { + const char* errmsg = grpc_error_string(error); + gpr_log(GPR_INFO, "Connect failed: %s", errmsg); + error = grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Connect Failed", + &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), "connect_failed"); + grpc_connectivity_state_set(&c->state_and_health_tracker_, + GRPC_CHANNEL_TRANSIENT_FAILURE, error, + "connect_failed"); + c->MaybeStartConnectingLocked(); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } } - gpr_mu_unlock(&c->mu_); GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); grpc_channel_args_destroy(delete_channel_args); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 968fc74e22a..83b57dd7ff3 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -29,6 +29,7 @@ #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/connectivity_state.h" @@ -264,7 +265,7 @@ class Subchannel { // pollset_set tracking who's interested in a connection being setup. grpc_pollset_set* pollset_set_; // Protects the other members. - gpr_mu mu_; + Mutex mu_; // Refcount // - lower INTERNAL_REF_BITS bits are for internal references: // these do not keep the subchannel open. diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 9f0169aeaab..b6a660b18fd 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -23,7 +23,7 @@ #include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include #include diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 912d524c8db..b68799b6e0e 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -27,8 +27,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/inlined_vector.h" -#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" diff --git a/src/core/lib/gprpp/mutex_lock.h b/src/core/lib/gprpp/mutex_lock.h deleted file mode 100644 index 54751d5fe46..00000000000 --- a/src/core/lib/gprpp/mutex_lock.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H -#define GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H - -#include - -#include - -namespace grpc_core { - -class MutexLock { - public: - explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu); } - ~MutexLock() { gpr_mu_unlock(mu_); } - - MutexLock(const MutexLock&) = delete; - MutexLock& operator=(const MutexLock&) = delete; - - private: - gpr_mu* const mu_; -}; - -} // namespace grpc_core - -#endif /* GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H */ diff --git a/src/core/lib/gprpp/sync.h b/src/core/lib/gprpp/sync.h new file mode 100644 index 00000000000..895ca60fec0 --- /dev/null +++ b/src/core/lib/gprpp/sync.h @@ -0,0 +1,126 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_GPRPP_SYNC_H +#define GRPC_CORE_LIB_GPRPP_SYNC_H + +#include + +#include +#include +#include +#include + +// The core library is not accessible in C++ codegen headers, and vice versa. +// Thus, we need to have duplicate headers with similar functionality. +// Make sure any change to this file is also reflected in +// include/grpcpp/impl/codegen/sync.h. +// +// Whenever possible, prefer using this file over +// since this file doesn't rely on g_core_codegen_interface and hence does not +// pay the costs of virtual function calls. + +namespace grpc_core { + +class Mutex { + public: + Mutex() { gpr_mu_init(&mu_); } + ~Mutex() { gpr_mu_destroy(&mu_); } + + Mutex(const Mutex&) = delete; + Mutex& operator=(const Mutex&) = delete; + + gpr_mu* get() { return &mu_; } + const gpr_mu* get() const { return &mu_; } + + private: + gpr_mu mu_; +}; + +// MutexLock is a std:: +class MutexLock { + public: + explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } + explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } + ~MutexLock() { gpr_mu_unlock(mu_); } + + MutexLock(const MutexLock&) = delete; + MutexLock& operator=(const MutexLock&) = delete; + + private: + gpr_mu* const mu_; +}; + +class ReleasableMutexLock { + public: + explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); } + explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); } + ~ReleasableMutexLock() { + if (!released_) gpr_mu_unlock(mu_); + } + + ReleasableMutexLock(const ReleasableMutexLock&) = delete; + ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete; + + void Lock() { + GPR_DEBUG_ASSERT(released_); + gpr_mu_lock(mu_); + released_ = false; + } + + void Unlock() { + GPR_DEBUG_ASSERT(!released_); + released_ = true; + gpr_mu_unlock(mu_); + } + + private: + gpr_mu* const mu_; + bool released_ = false; +}; + +class CondVar { + public: + CondVar() { gpr_cv_init(&cv_); } + ~CondVar() { gpr_cv_destroy(&cv_); } + + CondVar(const CondVar&) = delete; + CondVar& operator=(const CondVar&) = delete; + + void Signal() { gpr_cv_signal(&cv_); } + void Broadcast() { gpr_cv_broadcast(&cv_); } + + int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } + int Wait(Mutex* mu, const gpr_timespec& deadline) { + return gpr_cv_wait(&cv_, mu->get(), deadline); + } + + template + void WaitUntil(Mutex* mu, Predicate pred) { + while (!pred()) { + Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + } + + private: + gpr_cv cv_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_GPRPP_SYNC_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 01be46c9f68..c387f8359a0 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -47,7 +47,7 @@ #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/manual_constructor.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h" diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index b67d4f12252..57d975564b1 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -33,7 +33,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/fork.h" -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc index f9184bcc34f..ba0745a2359 100644 --- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc +++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc @@ -18,7 +18,7 @@ #include -#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index db59d4d8416..2d5e74163a9 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -232,7 +232,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor { CompletionQueue* Channel::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-channel CQ registered - std::lock_guard l(mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ == nullptr) { auto* shutdown_callback = new ShutdownCallback; callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index efb59c71a8c..b4fce79b99a 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -84,7 +85,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key, void ClientContext::set_call(grpc_call* call, const std::shared_ptr& channel) { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); GPR_ASSERT(call_ == nullptr); call_ = call; channel_ = channel; @@ -114,7 +115,7 @@ void ClientContext::set_compression_algorithm( } void ClientContext::TryCancel() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); if (call_) { SendCancelToInterceptors(); grpc_call_cancel(call_, nullptr); diff --git a/src/cpp/server/dynamic_thread_pool.cc b/src/cpp/server/dynamic_thread_pool.cc index ef99d6459f7..c8bdbdea7e6 100644 --- a/src/cpp/server/dynamic_thread_pool.cc +++ b/src/cpp/server/dynamic_thread_pool.cc @@ -21,6 +21,7 @@ #include #include +#include #include "src/core/lib/gprpp/thd.h" @@ -40,27 +41,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); } void DynamicThreadPool::DynamicThread::ThreadFunc() { pool_->ThreadFunc(); // Now that we have killed ourselves, we should reduce the thread count - std::unique_lock lock(pool_->mu_); + grpc_core::MutexLock lock(&pool_->mu_); pool_->nthreads_--; // Move ourselves to dead list pool_->dead_threads_.push_back(this); if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { - pool_->shutdown_cv_.notify_one(); + pool_->shutdown_cv_.Signal(); } } void DynamicThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. - std::unique_lock lock(mu_); + grpc_core::ReleasableMutexLock lock(&mu_); if (!shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { break; } threads_waiting_++; - cv_.wait(lock); + cv_.Wait(&mu_); threads_waiting_--; } // Drain callbacks before considering shutdown to ensure all work @@ -68,7 +69,7 @@ void DynamicThreadPool::ThreadFunc() { if (!callbacks_.empty()) { auto cb = callbacks_.front(); callbacks_.pop(); - lock.unlock(); + lock.Unlock(); cb(); } else if (shutdown_) { break; @@ -82,7 +83,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads) nthreads_(0), threads_waiting_(0) { for (int i = 0; i < reserve_threads_; i++) { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); nthreads_++; new DynamicThread(this); } @@ -95,17 +96,17 @@ void DynamicThreadPool::ReapThreads(std::list* tlist) { } DynamicThreadPool::~DynamicThreadPool() { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); shutdown_ = true; - cv_.notify_all(); + cv_.Broadcast(); while (nthreads_ != 0) { - shutdown_cv_.wait(lock); + shutdown_cv_.Wait(&mu_); } ReapThreads(&dead_threads_); } void DynamicThreadPool::Add(const std::function& callback) { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); // Add works to the callbacks list callbacks_.push(callback); // Increase pool size or notify as needed @@ -114,7 +115,7 @@ void DynamicThreadPool::Add(const std::function& callback) { nthreads_++; new DynamicThread(this); } else { - cv_.notify_one(); + cv_.Signal(); } // Also use this chance to harvest dead threads if (!dead_threads_.empty()) { diff --git a/src/cpp/server/dynamic_thread_pool.h b/src/cpp/server/dynamic_thread_pool.h index 5df8cf2b043..4ae0257d40b 100644 --- a/src/cpp/server/dynamic_thread_pool.h +++ b/src/cpp/server/dynamic_thread_pool.h @@ -27,6 +27,7 @@ #include +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/thread_pool_interface.h" @@ -50,9 +51,9 @@ class DynamicThreadPool final : public ThreadPoolInterface { grpc_core::Thread thd_; void ThreadFunc(); }; - std::mutex mu_; - std::condition_variable cv_; - std::condition_variable shutdown_cv_; + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + grpc_core::CondVar shutdown_cv_; bool shutdown_; std::queue> callbacks_; int reserve_threads_; diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc index 44aebd2f9d9..01bc51aa213 100644 --- a/src/cpp/server/health/default_health_check_service.cc +++ b/src/cpp/server/health/default_health_check_service.cc @@ -41,7 +41,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() { void DefaultHealthCheckService::SetServingStatus( const grpc::string& service_name, bool serving) { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { // Set to NOT_SERVING in case service_name is not in the map. serving = false; @@ -51,7 +51,7 @@ void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(bool serving) { const ServingStatus status = serving ? SERVING : NOT_SERVING; - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -62,7 +62,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) { } void DefaultHealthCheckService::Shutdown() { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -76,7 +76,7 @@ void DefaultHealthCheckService::Shutdown() { DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::GetServingStatus( const grpc::string& service_name) const { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) { return NOT_FOUND; @@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus( void DefaultHealthCheckService::RegisterCallHandler( const grpc::string& service_name, std::shared_ptr handler) { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); ServiceData& service_data = services_map_[service_name]; service_data.AddCallHandler(handler /* copies ref */); HealthCheckServiceImpl::CallHandler* h = handler.get(); @@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler( const grpc::string& service_name, const std::shared_ptr& handler) { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); auto it = services_map_.find(service_name); if (it == services_map_.end()) return; ServiceData& service_data = it->second; @@ -166,7 +166,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - std::unique_lock lock(cq_shutdown_mu_); + grpc_core::MutexLock lock(&cq_shutdown_mu_); cq_->Shutdown(); } thread_->Join(); @@ -266,7 +266,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: std::make_shared(cq, database, service); CheckCallHandler* handler = static_cast(self.get()); { - std::unique_lock lock(service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request a Check() call. handler->next_ = @@ -311,7 +311,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: } // Send response. { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); if (!service_->shutdown_) { next_ = CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, @@ -347,7 +347,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: std::make_shared(cq, database, service); WatchCallHandler* handler = static_cast(self.get()); { - std::unique_lock lock(service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; // Request AsyncNotifyWhenDone(). handler->on_done_notified_ = @@ -402,7 +402,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendHealth(std::shared_ptr self, ServingStatus status) { - std::unique_lock lock(send_mu_); + grpc_core::MutexLock lock(&send_mu_); // If there's already a send in flight, cache the new status, and // we'll start a new send for it when the one in flight completes. if (send_in_flight_) { @@ -420,7 +420,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: ByteBuffer response; bool success = service_->EncodeResponse(status, &response); // Grab shutdown lock and send response. - std::unique_lock cq_lock(service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { SendFinishLocked(std::move(self), Status::CANCELLED); return; @@ -442,7 +442,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::move(self), Status::CANCELLED); return; } - std::unique_lock lock(send_mu_); + grpc_core::MutexLock lock(&send_mu_); send_in_flight_ = false; // If we got a new status since we started the last send, start a // new send for it. @@ -456,7 +456,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: SendFinish(std::shared_ptr self, const Status& status) { if (finish_called_) return; - std::unique_lock cq_lock(service_->cq_shutdown_mu_); + grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) return; SendFinishLocked(std::move(self), status); } diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h index 9551cd2e2cf..4b926efdfe8 100644 --- a/src/cpp/server/health/default_health_check_service.h +++ b/src/cpp/server/health/default_health_check_service.h @@ -31,6 +31,7 @@ #include #include +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc { @@ -197,7 +198,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { GenericServerAsyncWriter stream_; ServerContext ctx_; - std::mutex send_mu_; + grpc_core::Mutex send_mu_; bool send_in_flight_ = false; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. @@ -226,7 +227,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { // To synchronize the operations related to shutdown state of cq_, so that // we don't enqueue new tags into cq_ after it is already shut down. - std::mutex cq_shutdown_mu_; + grpc_core::Mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; }; @@ -273,7 +274,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface { const grpc::string& service_name, const std::shared_ptr& handler); - mutable std::mutex mu_; + mutable grpc_core::Mutex mu_; bool shutdown_ = false; // Guarded by mu_. std::map services_map_; // Guarded by mu_. std::unique_ptr impl_; diff --git a/src/cpp/server/load_reporter/load_reporter.cc b/src/cpp/server/load_reporter/load_reporter.cc index 464063a13ff..422ea62efd5 100644 --- a/src/cpp/server/load_reporter/load_reporter.cc +++ b/src/cpp/server/load_reporter/load_reporter.cc @@ -239,7 +239,7 @@ grpc::string LoadReporter::GenerateLbId() { ::grpc::lb::v1::LoadBalancingFeedback LoadReporter::GenerateLoadBalancingFeedback() { - std::unique_lock lock(feedback_mu_); + grpc_core::ReleasableMutexLock lock(&feedback_mu_); auto now = std::chrono::system_clock::now(); // Discard records outside the window until there is only one record // outside the window, which is used as the base for difference. @@ -277,7 +277,7 @@ LoadReporter::GenerateLoadBalancingFeedback() { double cpu_limit = newest->cpu_limit - oldest->cpu_limit; std::chrono::duration duration_seconds = newest->end_time - oldest->end_time; - lock.unlock(); + lock.Unlock(); ::grpc::lb::v1::LoadBalancingFeedback feedback; feedback.set_server_utilization(static_cast(cpu_usage / cpu_limit)); feedback.set_calls_per_second( @@ -290,7 +290,7 @@ LoadReporter::GenerateLoadBalancingFeedback() { ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> LoadReporter::GenerateLoads(const grpc::string& hostname, const grpc::string& lb_id) { - std::lock_guard lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id); GPR_ASSERT(assigned_stores != nullptr); GPR_ASSERT(!assigned_stores->empty()); @@ -371,7 +371,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) { // This will make the load balancing feedback generation a no-op. cpu_stats = {0, 0}; } - std::unique_lock lock(feedback_mu_); + grpc_core::MutexLock lock(&feedback_mu_); feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors, cpu_stats.first, cpu_stats.second); } @@ -379,7 +379,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) { void LoadReporter::ReportStreamCreated(const grpc::string& hostname, const grpc::string& lb_id, const grpc::string& load_key) { - std::lock_guard lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); load_data_store_.ReportStreamCreated(hostname, lb_id, load_key); gpr_log(GPR_INFO, "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).", @@ -388,7 +388,7 @@ void LoadReporter::ReportStreamCreated(const grpc::string& hostname, void LoadReporter::ReportStreamClosed(const grpc::string& hostname, const grpc::string& lb_id) { - std::lock_guard lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); load_data_store_.ReportStreamClosed(hostname, lb_id); gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this, hostname.c_str(), lb_id.c_str()); @@ -407,7 +407,7 @@ void LoadReporter::ProcessViewDataCallStart( LoadRecordKey key(client_ip_and_token, user_id); LoadRecordValue value = LoadRecordValue(start_count); { - std::unique_lock lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); load_data_store_.MergeRow(host, key, value); } } @@ -459,7 +459,7 @@ void LoadReporter::ProcessViewDataCallEnd( LoadRecordValue value = LoadRecordValue( 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms); { - std::unique_lock lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); load_data_store_.MergeRow(host, key, value); } } @@ -486,7 +486,7 @@ void LoadReporter::ProcessViewDataOtherCallMetrics( LoadRecordValue value = LoadRecordValue( metric_name, static_cast(num_calls), total_metric_value); { - std::unique_lock lock(store_mu_); + grpc_core::MutexLock lock(&store_mu_); load_data_store_.MergeRow(host, key, value); } } diff --git a/src/cpp/server/load_reporter/load_reporter.h b/src/cpp/server/load_reporter/load_reporter.h index b2254d56016..766e02a407a 100644 --- a/src/cpp/server/load_reporter/load_reporter.h +++ b/src/cpp/server/load_reporter/load_reporter.h @@ -29,6 +29,7 @@ #include #include +#include "src/core/lib/gprpp/sync.h" #include "src/cpp/server/load_reporter/load_data_store.h" #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" @@ -212,11 +213,11 @@ class LoadReporter { std::atomic next_lb_id_{0}; const std::chrono::seconds feedback_sample_window_seconds_; - std::mutex feedback_mu_; + grpc_core::Mutex feedback_mu_; std::deque feedback_records_; // TODO(juanlishen): Lock in finer grain. Locking the whole store may be // too expensive. - std::mutex store_mu_; + grpc_core::Mutex store_mu_; LoadDataStore load_data_store_; std::unique_ptr census_view_provider_; std::unique_ptr cpu_stats_provider_; diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc index 859ad9946c8..9eaab5d6366 100644 --- a/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc +++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.cc @@ -48,7 +48,7 @@ LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() { // We will reach here after the server starts shutting down. shutdown_ = true; { - std::unique_lock lock(cq_shutdown_mu_); + grpc_core::MutexLock lock(&cq_shutdown_mu_); cq_->Shutdown(); } if (next_fetch_and_sample_alarm_ != nullptr) @@ -62,7 +62,7 @@ void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() { gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000, GPR_TIMESPAN)); { - std::unique_lock lock(cq_shutdown_mu_); + grpc_core::MutexLock lock(&cq_shutdown_mu_); if (shutdown_) return; // TODO(juanlishen): Improve the Alarm implementation to reuse a single // instance for multiple events. @@ -119,7 +119,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart( std::make_shared(cq, service, load_reporter); ReportLoadHandler* p = handler.get(); { - std::unique_lock lock(service->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service->cq_shutdown_mu_); if (service->shutdown_) return; p->on_done_notified_ = CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p, @@ -164,9 +164,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered( // instance will deallocate itself when it's done. CreateAndStart(cq_, service_, load_reporter_); { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.release()->unlock(); + lock.Unlock(); Shutdown(std::move(self), "OnRequestDelivered"); return; } @@ -222,9 +222,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone( SendReport(self, true /* ok */); // Expect this read to fail. { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.release()->unlock(); + lock.Unlock(); Shutdown(std::move(self), "OnReadDone"); return; } @@ -254,9 +254,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN)); { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.release()->unlock(); + lock.Unlock(); Shutdown(std::move(self), "ScheduleNextReport"); return; } @@ -294,9 +294,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport( call_status_ = INITIAL_RESPONSE_SENT; } { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_); if (service_->shutdown_) { - lock.release()->unlock(); + lock.Unlock(); Shutdown(std::move(self), "SendReport"); return; } @@ -342,7 +342,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown( // OnRequestDelivered() may be called after OnDoneNotified(), so we need to // try to Finish() every time we are in Shutdown(). if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) { - std::unique_lock lock(service_->cq_shutdown_mu_); + grpc_core::MutexLock lock(&service_->cq_shutdown_mu_); if (!service_->shutdown_) { on_finish_done_ = CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this, diff --git a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h index 6fc577ff493..3087cbfc04d 100644 --- a/src/cpp/server/load_reporter/load_reporter_async_service_impl.h +++ b/src/cpp/server/load_reporter/load_reporter_async_service_impl.h @@ -25,6 +25,7 @@ #include #include +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/cpp/server/load_reporter/load_reporter.h" @@ -181,7 +182,7 @@ class LoadReporterAsyncServiceImpl std::unique_ptr cq_; // To synchronize the operations related to shutdown state of cq_, so that we // don't enqueue new tags into cq_ after it is already shut down. - std::mutex cq_shutdown_mu_; + grpc_core::Mutex cq_shutdown_mu_; std::atomic_bool shutdown_{false}; std::unique_ptr<::grpc_core::Thread> thread_; std::unique_ptr load_reporter_; diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 734940c248d..4f0fdefceda 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -561,9 +561,9 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase { // The counter of outstanding requests must be decremented // under a lock in case it causes the server shutdown. - std::lock_guard l(server_->callback_reqs_mu_); + grpc::internal::MutexLock l(&server_->callback_reqs_mu_); if (--server_->callback_reqs_outstanding_ == 0) { - server_->callback_reqs_done_cv_.notify_one(); + server_->callback_reqs_done_cv_.Signal(); } } @@ -992,12 +992,12 @@ Server::Server( Server::~Server() { { - std::unique_lock lock(mu_); + grpc::internal::ReleasableMutexLock lock(&mu_); if (callback_cq_ != nullptr) { callback_cq_->Shutdown(); } if (started_ && !shutdown_) { - lock.unlock(); + lock.Unlock(); Shutdown(); } else if (!started_) { // Shutdown the completion queues @@ -1233,7 +1233,7 @@ void Server::Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) { } void Server::ShutdownInternal(gpr_timespec deadline) { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); if (shutdown_) { return; } @@ -1284,9 +1284,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) { // will report a failure, indicating a shutdown and again we won't end // up incrementing the counter. { - std::unique_lock cblock(callback_reqs_mu_); - callback_reqs_done_cv_.wait( - cblock, [this] { return callback_reqs_outstanding_ == 0; }); + grpc::internal::MutexLock cblock(&callback_reqs_mu_); + callback_reqs_done_cv_.WaitUntil( + &callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; }); } // Drain the shutdown queue (if the previous call to AsyncNext() timed out @@ -1296,13 +1296,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) { } shutdown_notified_ = true; - shutdown_cv_.notify_all(); + shutdown_cv_.Broadcast(); } void Server::Wait() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); while (started_ && !shutdown_notified_) { - shutdown_cv_.wait(lock); + shutdown_cv_.Wait(&mu_); } } @@ -1342,7 +1342,7 @@ grpc::ServerInitializer* Server::initializer() { grpc::CompletionQueue* Server::CallbackCQ() { // TODO(vjpai): Consider using a single global CQ for the default CQ // if there is no explicit per-server CQ registered - std::lock_guard l(mu_); + grpc::internal::MutexLock l(&mu_); if (callback_cq_ == nullptr) { auto* shutdown_callback = new grpc::ShutdownCallback; callback_cq_ = new grpc::CompletionQueue(grpc_completion_queue_attributes{ diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 73fd6a62c48..70c5cd8861e 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -33,6 +33,7 @@ #include #include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/surface/call.h" namespace grpc { @@ -96,7 +97,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } void SetCancelCallback(std::function callback) { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); if (finalized_ && (cancelled_ != 0)) { callback(); @@ -107,7 +108,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { } void ClearCancelCallback() { - std::lock_guard g(mu_); + grpc_core::MutexLock g(&mu_); cancel_callback_ = nullptr; } @@ -144,7 +145,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { private: bool CheckCancelledNoPluck() { - std::lock_guard g(mu_); + grpc_core::MutexLock lock(&mu_); return finalized_ ? (cancelled_ != 0) : false; } @@ -154,7 +155,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface { void* tag_; void* core_cq_tag_; grpc_core::RefCount refs_; - std::mutex mu_; + grpc_core::Mutex mu_; bool finalized_; int cancelled_; // This is an int (not bool) because it is passed to core std::function cancel_callback_; @@ -186,7 +187,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) { bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { bool ret = false; - std::unique_lock lock(mu_); + grpc_core::ReleasableMutexLock lock(&mu_); if (done_intercepting_) { /* We are done intercepting. */ if (has_tag_) { @@ -216,12 +217,11 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { // Release the lock since we are going to be calling a callback and // interceptors now - lock.unlock(); + lock.Unlock(); if (call_cancel && reactor_ != nullptr) { reactor_->OnCancel(); } - /* Add interception point and run through interceptors */ interceptor_methods_.AddInterceptionHookPoint( experimental::InterceptionHookPoints::POST_RECV_CLOSE); diff --git a/src/cpp/thread_manager/thread_manager.cc b/src/cpp/thread_manager/thread_manager.cc index 3e8606a76fd..2b65352f797 100644 --- a/src/cpp/thread_manager/thread_manager.cc +++ b/src/cpp/thread_manager/thread_manager.cc @@ -62,7 +62,7 @@ ThreadManager::ThreadManager(const char* name, ThreadManager::~ThreadManager() { { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); GPR_ASSERT(num_threads_ == 0); } @@ -72,38 +72,38 @@ ThreadManager::~ThreadManager() { } void ThreadManager::Wait() { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); while (num_threads_ != 0) { - shutdown_cv_.wait(lock); + shutdown_cv_.Wait(&mu_); } } void ThreadManager::Shutdown() { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); shutdown_ = true; } bool ThreadManager::IsShutdown() { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); return shutdown_; } int ThreadManager::GetMaxActiveThreadsSoFar() { - std::lock_guard list_lock(list_mu_); + grpc_core::MutexLock list_lock(&list_mu_); return max_active_threads_sofar_; } void ThreadManager::MarkAsCompleted(WorkerThread* thd) { { - std::lock_guard list_lock(list_mu_); + grpc_core::MutexLock list_lock(&list_mu_); completed_threads_.push_back(thd); } { - std::lock_guard lock(mu_); + grpc_core::MutexLock lock(&mu_); num_threads_--; if (num_threads_ == 0) { - shutdown_cv_.notify_one(); + shutdown_cv_.Signal(); } } @@ -116,7 +116,7 @@ void ThreadManager::CleanupCompletedThreads() { { // swap out the completed threads list: allows other threads to clean up // more quickly - std::unique_lock lock(list_mu_); + grpc_core::MutexLock lock(&list_mu_); completed_threads.swap(completed_threads_); } for (auto thd : completed_threads) delete thd; @@ -132,7 +132,7 @@ void ThreadManager::Initialize() { } { - std::unique_lock lock(mu_); + grpc_core::MutexLock lock(&mu_); num_pollers_ = min_pollers_; num_threads_ = min_pollers_; max_active_threads_sofar_ = min_pollers_; @@ -149,7 +149,7 @@ void ThreadManager::MainWorkLoop() { bool ok; WorkStatus work_status = PollForWork(&tag, &ok); - std::unique_lock lock(mu_); + grpc_core::ReleasableMutexLock lock(&mu_); // Reduce the number of pollers by 1 and check what happened with the poll num_pollers_--; bool done = false; @@ -176,30 +176,30 @@ void ThreadManager::MainWorkLoop() { max_active_threads_sofar_ = num_threads_; } // Drop lock before spawning thread to avoid contention - lock.unlock(); + lock.Unlock(); new WorkerThread(this); } else if (num_pollers_ > 0) { // There is still at least some thread polling, so we can go on // even though we are below the number of pollers that we would // like to have (min_pollers_) - lock.unlock(); + lock.Unlock(); } else { // There are no pollers to spare and we couldn't allocate // a new thread, so resources are exhausted! - lock.unlock(); + lock.Unlock(); resource_exhausted = true; } } else { // There are a sufficient number of pollers available so we can do // the work and continue polling with our existing poller threads - lock.unlock(); + lock.Unlock(); } // Lock is always released at this point - do the application work // or return resource exhausted if there is new work but we couldn't // get a thread in which to do it. DoWork(tag, ok, !resource_exhausted); // Take the lock again to check post conditions - lock.lock(); + lock.Lock(); // If we're shutdown, we should finish at this point. if (shutdown_) done = true; break; diff --git a/src/cpp/thread_manager/thread_manager.h b/src/cpp/thread_manager/thread_manager.h index 6f0bd17c5fe..2fbf309d421 100644 --- a/src/cpp/thread_manager/thread_manager.h +++ b/src/cpp/thread_manager/thread_manager.h @@ -26,6 +26,7 @@ #include +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/resource_quota.h" @@ -140,10 +141,10 @@ class ThreadManager { // Protects shutdown_, num_pollers_, num_threads_ and // max_active_threads_sofar_ - std::mutex mu_; + grpc_core::Mutex mu_; bool shutdown_; - std::condition_variable shutdown_cv_; + grpc_core::CondVar shutdown_cv_; // The resource user object to use when requesting quota to create threads // @@ -169,7 +170,7 @@ class ThreadManager { // ever set so far int max_active_threads_sofar_; - std::mutex list_mu_; + grpc_core::Mutex list_mu_; std::list completed_threads_; }; diff --git a/test/cpp/client/client_channel_stress_test.cc b/test/cpp/client/client_channel_stress_test.cc index 91419cb257b..d326b2ed37e 100644 --- a/test/cpp/client/client_channel_stress_test.cc +++ b/test/cpp/client/client_channel_stress_test.cc @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -168,24 +169,24 @@ class ClientChannelStressTest { explicit ServerThread(const grpc::string& type, const grpc::string& server_host, T* service) : type_(type), service_(service) { - std::mutex mu; + grpc::internal::Mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Start from firing before the wait below is hit. - std::unique_lock lock(mu); + grpc::internal::MutexLock lock(&mu); port_ = grpc_pick_unused_port_or_die(); gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); - std::condition_variable cond; + grpc::internal::CondVar cond; thread_.reset(new std::thread( std::bind(&ServerThread::Start, this, server_host, &mu, &cond))); - cond.wait(lock); + cond.Wait(&mu); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Start(const grpc::string& server_host, std::mutex* mu, - std::condition_variable* cond) { + void Start(const grpc::string& server_host, grpc::internal::Mutex* mu, + grpc::internal::CondVar* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - std::lock_guard lock(*mu); + grpc::internal::MutexLock lock(mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -193,7 +194,7 @@ class ClientChannelStressTest { InsecureServerCredentials()); builder.RegisterService(service_); server_ = builder.BuildAndStart(); - cond->notify_one(); + cond->Signal(); } void Shutdown() { diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index 77f9db94acc..6623a2ff55f 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -98,7 +99,7 @@ class MyTestServiceImpl : public TestServiceImpl { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); ++request_count_; } AddClient(context->peer()); @@ -106,29 +107,29 @@ class MyTestServiceImpl : public TestServiceImpl { } int request_count() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return request_count_; } void ResetCounters() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); request_count_ = 0; } std::set clients() { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); clients_.insert(client); } - std::mutex mu_; + grpc::internal::Mutex mu_; int request_count_; - std::mutex clients_mu_; + grpc::internal::Mutex clients_mu_; std::set clients_; }; @@ -293,18 +294,18 @@ class ClientLbEnd2endTest : public ::testing::Test { void Start(const grpc::string& server_host) { gpr_log(GPR_INFO, "starting server on port %d", port_); started_ = true; - std::mutex mu; - std::unique_lock lock(mu); - std::condition_variable cond; + grpc::internal::Mutex mu; + grpc::internal::MutexLock lock(&mu); + grpc::internal::CondVar cond; thread_.reset(new std::thread( std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); - cond.wait(lock, [this] { return server_ready_; }); + cond.WaitUntil(&mu, [this] { return server_ready_; }); server_ready_ = false; gpr_log(GPR_INFO, "server startup complete"); } - void Serve(const grpc::string& server_host, std::mutex* mu, - std::condition_variable* cond) { + void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, + grpc::internal::CondVar* cond) { std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -313,9 +314,9 @@ class ClientLbEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), std::move(creds)); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - std::lock_guard lock(*mu); + grpc::internal::MutexLock lock(mu); server_ready_ = true; - cond->notify_one(); + cond->Signal(); } void Shutdown() { @@ -1374,7 +1375,7 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { void TearDown() override { ClientLbEnd2endTest::TearDown(); } int trailers_intercepted() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return trailers_intercepted_; } @@ -1382,11 +1383,11 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest { static void ReportTrailerIntercepted(void* arg) { ClientLbInterceptTrailingMetadataTest* self = static_cast(arg); - std::unique_lock lock(self->mu_); + grpc::internal::MutexLock lock(&self->mu_); self->trailers_intercepted_++; } - std::mutex mu_; + grpc::internal::Mutex mu_; int trailers_intercepted_ = 0; }; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index ee8b3e1e52b..6d470785ec2 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -85,32 +86,32 @@ template class CountedService : public ServiceType { public: size_t request_count() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return request_count_; } size_t response_count() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return response_count_; } void IncreaseResponseCount() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); ++response_count_; } void IncreaseRequestCount() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); ++request_count_; } void ResetCounters() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); request_count_ = 0; response_count_ = 0; } protected: - std::mutex mu_; + grpc::internal::Mutex mu_; private: size_t request_count_ = 0; @@ -148,18 +149,18 @@ class BackendServiceImpl : public BackendService { void Shutdown() {} std::set clients() { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); clients_.insert(client); } - std::mutex mu_; - std::mutex clients_mu_; + grpc::internal::Mutex mu_; + grpc::internal::Mutex clients_mu_; std::set clients_; }; @@ -210,7 +211,7 @@ class BalancerServiceImpl : public BalancerService { Status BalanceLoad(ServerContext* context, Stream* stream) override { gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); if (serverlist_done_) goto done; } { @@ -237,7 +238,7 @@ class BalancerServiceImpl : public BalancerService { } { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { @@ -245,8 +246,8 @@ class BalancerServiceImpl : public BalancerService { response_and_delay.second); } { - std::unique_lock lock(mu_); - serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); + grpc::internal::MutexLock lock(&mu_); + serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); } if (client_load_reporting_interval_seconds_ > 0) { @@ -257,7 +258,7 @@ class BalancerServiceImpl : public BalancerService { GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - std::lock_guard lock(mu_); + grpc::internal::MutexLock lock(&mu_); client_stats_.num_calls_started += request.client_stats().num_calls_started(); client_stats_.num_calls_finished += @@ -274,7 +275,7 @@ class BalancerServiceImpl : public BalancerService { drop_token_count.num_calls(); } load_report_ready_ = true; - load_report_cond_.notify_one(); + load_report_cond_.Signal(); } } } @@ -284,12 +285,12 @@ class BalancerServiceImpl : public BalancerService { } void add_response(const LoadBalanceResponse& response, int send_after_ms) { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } void Start() { - std::lock_guard lock(mu_); + grpc::internal::MutexLock lock(&mu_); serverlist_done_ = false; load_report_ready_ = false; responses_and_delays_.clear(); @@ -326,17 +327,17 @@ class BalancerServiceImpl : public BalancerService { } const ClientStats& WaitForLoadReport() { - std::unique_lock lock(mu_); - load_report_cond_.wait(lock, [this] { return load_report_ready_; }); + grpc::internal::MutexLock lock(&mu_); + load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; }); load_report_ready_ = false; return client_stats_; } void NotifyDoneWithServerlists() { - std::lock_guard lock(mu_); + grpc::internal::MutexLock lock(&mu_); if (!serverlist_done_) { serverlist_done_ = true; - serverlist_cond_.notify_all(); + serverlist_cond_.Broadcast(); } } @@ -355,10 +356,10 @@ class BalancerServiceImpl : public BalancerService { const int client_load_reporting_interval_seconds_; std::vector responses_and_delays_; - std::mutex mu_; - std::condition_variable load_report_cond_; + grpc::internal::Mutex mu_; + grpc::internal::CondVar load_report_cond_; bool load_report_ready_ = false; - std::condition_variable serverlist_cond_; + grpc::internal::CondVar serverlist_cond_; bool serverlist_done_ = false; ClientStats client_stats_; }; @@ -622,22 +623,22 @@ class GrpclbEnd2endTest : public ::testing::Test { GPR_ASSERT(!running_); running_ = true; service_.Start(); - std::mutex mu; + grpc::internal::Mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Serve from firing before the wait below is hit. - std::unique_lock lock(mu); - std::condition_variable cond; + grpc::internal::MutexLock lock(&mu); + grpc::internal::CondVar cond; thread_.reset(new std::thread( std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); - cond.wait(lock); + cond.Wait(&mu); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Serve(const grpc::string& server_host, std::mutex* mu, - std::condition_variable* cond) { + void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, + grpc::internal::CondVar* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - std::lock_guard lock(*mu); + grpc::internal::MutexLock lock(mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -646,7 +647,7 @@ class GrpclbEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), creds); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - cond->notify_one(); + cond->Signal(); } void Shutdown() { diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index e30ce0dbcbf..5b8af61ee33 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -188,7 +189,7 @@ class CommonStressTestAsyncServer : public BaseClass { } void TearDown() override { { - std::unique_lock l(mu_); + grpc::internal::MutexLock l(&mu_); this->TearDownStart(); shutting_down_ = true; cq_->Shutdown(); @@ -229,7 +230,7 @@ class CommonStressTestAsyncServer : public BaseClass { } } void RefreshContext(int i) { - std::unique_lock l(mu_); + grpc::internal::MutexLock l(&mu_); if (!shutting_down_) { contexts_[i].state = Context::READY; contexts_[i].srv_ctx.reset(new ServerContext); @@ -253,7 +254,7 @@ class CommonStressTestAsyncServer : public BaseClass { ::grpc::testing::EchoTestService::AsyncService service_; std::unique_ptr cq_; bool shutting_down_; - std::mutex mu_; + grpc::internal::Mutex mu_; std::vector server_threads_; }; @@ -341,9 +342,9 @@ class AsyncClientEnd2endTest : public ::testing::Test { } void Wait() { - std::unique_lock l(mu_); + grpc::internal::MutexLock l(&mu_); while (rpcs_outstanding_ != 0) { - cv_.wait(l); + cv_.Wait(&mu_); } cq_.Shutdown(); @@ -366,7 +367,7 @@ class AsyncClientEnd2endTest : public ::testing::Test { call->response_reader->Finish(&call->response, &call->status, (void*)call); - std::unique_lock l(mu_); + grpc::internal::MutexLock l(&mu_); rpcs_outstanding_++; } } @@ -384,20 +385,20 @@ class AsyncClientEnd2endTest : public ::testing::Test { bool notify; { - std::unique_lock l(mu_); + grpc::internal::MutexLock l(&mu_); rpcs_outstanding_--; notify = (rpcs_outstanding_ == 0); } if (notify) { - cv_.notify_all(); + cv_.Signal(); } } } Common common_; CompletionQueue cq_; - std::mutex mu_; - std::condition_variable cv_; + grpc::internal::Mutex mu_; + grpc::internal::CondVar cv_; int rpcs_outstanding_; }; diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 11a64264494..85753f1016c 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -84,32 +84,32 @@ template class CountedService : public ServiceType { public: size_t request_count() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return request_count_; } size_t response_count() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); return response_count_; } void IncreaseResponseCount() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); ++response_count_; } void IncreaseRequestCount() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); ++request_count_; } void ResetCounters() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); request_count_ = 0; response_count_ = 0; } protected: - std::mutex mu_; + grpc::internal::Mutex mu_; private: size_t request_count_ = 0; @@ -145,18 +145,18 @@ class BackendServiceImpl : public BackendService { void Shutdown() {} std::set clients() { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); return clients_; } private: void AddClient(const grpc::string& client) { - std::unique_lock lock(clients_mu_); + grpc::internal::MutexLock lock(&clients_mu_); clients_.insert(client); } - std::mutex mu_; - std::mutex clients_mu_; + grpc::internal::Mutex mu_; + grpc::internal::Mutex clients_mu_; std::set clients_; }; @@ -208,7 +208,7 @@ class BalancerServiceImpl : public BalancerService { // TODO(juanlishen): Clean up the scoping. gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); if (serverlist_done_) goto done; } { @@ -234,7 +234,7 @@ class BalancerServiceImpl : public BalancerService { } { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); responses_and_delays = responses_and_delays_; } for (const auto& response_and_delay : responses_and_delays) { @@ -242,8 +242,8 @@ class BalancerServiceImpl : public BalancerService { response_and_delay.second); } { - std::unique_lock lock(mu_); - serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); + grpc::internal::MutexLock lock(&mu_); + serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; }); } if (client_load_reporting_interval_seconds_ > 0) { @@ -254,7 +254,7 @@ class BalancerServiceImpl : public BalancerService { GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - std::lock_guard lock(mu_); + grpc::internal::MutexLock lock(&mu_); client_stats_.num_calls_started += request.client_stats().num_calls_started(); client_stats_.num_calls_finished += @@ -271,7 +271,7 @@ class BalancerServiceImpl : public BalancerService { drop_token_count.num_calls(); } load_report_ready_ = true; - load_report_cond_.notify_one(); + load_report_cond_.Signal(); } } } @@ -281,12 +281,12 @@ class BalancerServiceImpl : public BalancerService { } void add_response(const LoadBalanceResponse& response, int send_after_ms) { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); } void Shutdown() { - std::unique_lock lock(mu_); + grpc::internal::MutexLock lock(&mu_); NotifyDoneWithServerlistsLocked(); responses_and_delays_.clear(); client_stats_.Reset(); @@ -318,21 +318,21 @@ class BalancerServiceImpl : public BalancerService { } const ClientStats& WaitForLoadReport() { - std::unique_lock lock(mu_); - load_report_cond_.wait(lock, [this] { return load_report_ready_; }); + grpc::internal::MutexLock lock(&mu_); + load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; }); load_report_ready_ = false; return client_stats_; } void NotifyDoneWithServerlists() { - std::lock_guard lock(mu_); + grpc::internal::MutexLock lock(&mu_); NotifyDoneWithServerlistsLocked(); } void NotifyDoneWithServerlistsLocked() { if (!serverlist_done_) { serverlist_done_ = true; - serverlist_cond_.notify_all(); + serverlist_cond_.Broadcast(); } } @@ -351,10 +351,10 @@ class BalancerServiceImpl : public BalancerService { const int client_load_reporting_interval_seconds_; std::vector responses_and_delays_; - std::mutex mu_; - std::condition_variable load_report_cond_; + grpc::internal::Mutex mu_; + grpc::internal::CondVar load_report_cond_; bool load_report_ready_ = false; - std::condition_variable serverlist_cond_; + grpc::internal::CondVar serverlist_cond_; bool serverlist_done_ = false; ClientStats client_stats_; }; @@ -633,22 +633,22 @@ class XdsEnd2endTest : public ::testing::Test { gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); GPR_ASSERT(!running_); running_ = true; - std::mutex mu; + grpc::internal::Mutex mu; // We need to acquire the lock here in order to prevent the notify_one // by ServerThread::Serve from firing before the wait below is hit. - std::unique_lock lock(mu); - std::condition_variable cond; + grpc::internal::MutexLock lock(&mu); + grpc::internal::CondVar cond; thread_.reset(new std::thread( std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); - cond.wait(lock); + cond.Wait(&mu); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); } - void Serve(const grpc::string& server_host, std::mutex* mu, - std::condition_variable* cond) { + void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu, + grpc::internal::CondVar* cond) { // We need to acquire the lock here in order to prevent the notify_one // below from firing before its corresponding wait is executed. - std::lock_guard lock(*mu); + grpc::internal::MutexLock lock(mu); std::ostringstream server_address; server_address << server_host << ":" << port_; ServerBuilder builder; @@ -657,7 +657,7 @@ class XdsEnd2endTest : public ::testing::Test { builder.AddListeningPort(server_address.str(), creds); builder.RegisterService(&service_); server_ = builder.BuildAndStart(); - cond->notify_one(); + cond->Signal(); } void Shutdown() { diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 07e44b39bb6..67bbb4efa6b 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -987,6 +987,7 @@ include/grpcpp/impl/codegen/status.h \ include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/stub_options.h \ +include/grpcpp/impl/codegen/sync.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/grpc_library.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 0301818b9db..438cb98dbae 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -989,6 +989,7 @@ include/grpcpp/impl/codegen/status.h \ include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/stub_options.h \ +include/grpcpp/impl/codegen/sync.h \ include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/grpc_library.h \ @@ -1086,12 +1087,12 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/map.h \ src/core/lib/gprpp/memory.h \ -src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ +src/core/lib/gprpp/sync.h \ src/core/lib/gprpp/thd.h \ src/core/lib/http/format_request.h \ src/core/lib/http/httpcli.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index e68e758c64e..1817be44bc3 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1166,12 +1166,12 @@ src/core/lib/gprpp/inlined_vector.h \ src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/map.h \ src/core/lib/gprpp/memory.h \ -src/core/lib/gprpp/mutex_lock.h \ src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted_ptr.h \ +src/core/lib/gprpp/sync.h \ src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd_posix.cc \ src/core/lib/gprpp/thd_windows.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 4afbb767b66..5cf7bf3fc8c 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8015,8 +8015,8 @@ "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", - "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", + "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h" ], @@ -8063,8 +8063,8 @@ "src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/map.h", "src/core/lib/gprpp/memory.h", - "src/core/lib/gprpp/mutex_lock.h", "src/core/lib/gprpp/pair.h", + "src/core/lib/gprpp/sync.h", "src/core/lib/gprpp/thd.h", "src/core/lib/profiling/timers.h" ], @@ -9842,6 +9842,7 @@ }, { "deps": [ + "grpc++_internal_hdrs_only", "grpc_codegen" ], "headers": [ @@ -10038,6 +10039,7 @@ "gpr", "gpr_base_headers", "grpc++_codegen_base", + "grpc++_internal_hdrs_only", "grpc_base_headers", "grpc_transport_inproc_headers", "health_proto", @@ -10334,6 +10336,20 @@ "third_party": false, "type": "filegroup" }, + { + "deps": [], + "headers": [ + "include/grpcpp/impl/codegen/sync.h" + ], + "is_filegroup": true, + "language": "c++", + "name": "grpc++_internal_hdrs_only", + "src": [ + "include/grpcpp/impl/codegen/sync.h" + ], + "third_party": false, + "type": "filegroup" + }, { "deps": [], "headers": [