Fix build errors

pull/18717/head
Soheil Hassas Yeganeh 6 years ago committed by Karthik Ravi Shankar
parent f03b309787
commit e85dd2e644
  1. 14
      BUILD
  2. 5
      BUILD.gn
  3. 5
      CMakeLists.txt
  4. 5
      Makefile
  5. 8
      build.yaml
  6. 7
      gRPC-C++.podspec
  7. 4
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 3
      include/grpcpp/channel.h
  10. 3
      include/grpcpp/impl/codegen/client_context.h
  11. 138
      include/grpcpp/impl/codegen/sync.h
  12. 8
      include/grpcpp/server.h
  13. 8
      include/grpcpp/server_impl.h
  14. 2
      package.xml
  15. 2
      src/core/ext/filters/client_channel/client_channel.cc
  16. 4
      src/core/ext/filters/client_channel/health/health_check_client.cc
  17. 3
      src/core/ext/filters/client_channel/health/health_check_client.h
  18. 2
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  19. 1
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  20. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
  21. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  22. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  23. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  24. 19
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  25. 2
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  26. 70
      src/core/ext/filters/client_channel/subchannel.cc
  27. 3
      src/core/ext/filters/client_channel/subchannel.h
  28. 2
      src/core/lib/channel/channelz_registry.cc
  29. 2
      src/core/lib/channel/handshaker.h
  30. 42
      src/core/lib/gprpp/mutex_lock.h
  31. 126
      src/core/lib/gprpp/sync.h
  32. 2
      src/core/lib/iomgr/ev_epollex_linux.cc
  33. 2
      src/core/lib/surface/init.cc
  34. 2
      src/core/tsi/ssl/session_cache/ssl_session_cache.cc
  35. 2
      src/cpp/client/channel_cc.cc
  36. 5
      src/cpp/client/client_context.cc
  37. 23
      src/cpp/server/dynamic_thread_pool.cc
  38. 7
      src/cpp/server/dynamic_thread_pool.h
  39. 28
      src/cpp/server/health/default_health_check_service.cc
  40. 7
      src/cpp/server/health/default_health_check_service.h
  41. 18
      src/cpp/server/load_reporter/load_reporter.cc
  42. 5
      src/cpp/server/load_reporter/load_reporter.h
  43. 24
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  44. 3
      src/cpp/server/load_reporter/load_reporter_async_service_impl.h
  45. 24
      src/cpp/server/server_cc.cc
  46. 14
      src/cpp/server/server_context.cc
  47. 34
      src/cpp/thread_manager/thread_manager.cc
  48. 7
      src/cpp/thread_manager/thread_manager.h
  49. 17
      test/cpp/client/client_channel_stress_test.cc
  50. 37
      test/cpp/end2end/client_lb_end2end_test.cc
  51. 67
      test/cpp/end2end/grpclb_end2end_test.cc
  52. 21
      test/cpp/end2end/thread_stress_test.cc
  53. 66
      test/cpp/end2end/xds_end2end_test.cc
  54. 1
      tools/doxygen/Doxyfile.c++
  55. 3
      tools/doxygen/Doxyfile.c++.internal
  56. 2
      tools/doxygen/Doxyfile.core.internal
  57. 20
      tools/run_tests/generated/sources_and_headers.json

14
BUILD

@ -525,6 +525,17 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc++_internal_hdrs_only",
hdrs = [
"include/grpcpp/impl/codegen/sync.h",
],
language = "c++",
deps = [
"gpr_codegen",
],
)
grpc_cc_library(
name = "gpr_base",
srcs = [
@ -590,8 +601,8 @@ grpc_cc_library(
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h",
],
@ -2145,6 +2156,7 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/time.h",
],
deps = [
"grpc++_internal_hdrs_only",
"grpc_codegen",
],
)

@ -186,8 +186,8 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
"src/core/lib/gprpp/thd_posix.cc",
"src/core/lib/gprpp/thd_windows.cc",
@ -1064,6 +1064,7 @@ config("grpc_config") {
"include/grpcpp/impl/codegen/status_code_enum.h",
"include/grpcpp/impl/codegen/string_ref.h",
"include/grpcpp/impl/codegen/stub_options.h",
"include/grpcpp/impl/codegen/sync.h",
"include/grpcpp/impl/codegen/sync_stream.h",
"include/grpcpp/impl/codegen/time.h",
"include/grpcpp/impl/grpc_library.h",
@ -1158,12 +1159,12 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
"src/core/lib/http/format_request.h",
"src/core/lib/http/httpcli.h",

@ -3181,6 +3181,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -3783,6 +3784,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
include/grpc/census.h
)
string(REPLACE "include/" "" _path ${_hdr})
@ -4237,6 +4239,7 @@ foreach(_hdr
include/grpc/impl/codegen/sync_generic.h
include/grpc/impl/codegen/sync_posix.h
include/grpc/impl/codegen/sync_windows.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -4433,6 +4436,7 @@ foreach(_hdr
include/grpc/impl/codegen/sync_generic.h
include/grpc/impl/codegen/sync_posix.h
include/grpc/impl/codegen/sync_windows.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -4759,6 +4763,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
)
string(REPLACE "include/" "" _path ${_hdr})
get_filename_component(_path ${_path} PATH)

@ -5516,6 +5516,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -6126,6 +6127,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc/census.h \
LIBGRPC++_CRONET_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_CRONET_SRC))))
@ -6552,6 +6554,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc/impl/codegen/sync_generic.h \
include/grpc/impl/codegen/sync_posix.h \
include/grpc/impl/codegen/sync_windows.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -6719,6 +6722,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc/impl/codegen/sync_generic.h \
include/grpc/impl/codegen/sync_posix.h \
include/grpc/impl/codegen/sync_windows.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -7051,6 +7055,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
LIBGRPC++_UNSECURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_UNSECURE_SRC))))

@ -196,8 +196,8 @@ filegroups:
- src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/map.h
- src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mutex_lock.h
- src/core/lib/gprpp/pair.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h
- src/core/lib/profiling/timers.h
uses:
@ -1276,6 +1276,7 @@ filegroups:
- include/grpcpp/impl/codegen/time.h
uses:
- grpc_codegen
- grpc++_internal_hdrs_only
- name: grpc++_codegen_base_src
language: c++
src:
@ -1450,6 +1451,7 @@ filegroups:
- grpc_base_headers
- grpc_transport_inproc_headers
- grpc++_codegen_base
- grpc++_internal_hdrs_only
- nanopb_headers
- health_proto
- name: grpc++_config_proto
@ -1457,6 +1459,10 @@ filegroups:
public_headers:
- include/grpc++/impl/codegen/config_protobuf.h
- include/grpcpp/impl/codegen/config_protobuf.h
- name: grpc++_internal_hdrs_only
language: c++
public_headers:
- include/grpcpp/impl/codegen/sync.h
- name: grpc++_reflection_proto
language: c++
src:

@ -183,7 +183,8 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/string_ref.h',
'include/grpcpp/impl/codegen/stub_options.h',
'include/grpcpp/impl/codegen/sync_stream.h',
'include/grpcpp/impl/codegen/time.h'
'include/grpcpp/impl/codegen/time.h',
'include/grpcpp/impl/codegen/sync.h'
end
s.subspec 'Implementation' do |ss|
@ -266,8 +267,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',
@ -583,8 +584,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/avl/avl.h',

@ -210,8 +210,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h',
'src/core/lib/gpr/alloc.cc',
@ -889,8 +889,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h',

@ -104,8 +104,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/manual_constructor.h )
s.files += %w( src/core/lib/gprpp/map.h )
s.files += %w( src/core/lib/gprpp/memory.h )
s.files += %w( src/core/lib/gprpp/mutex_lock.h )
s.files += %w( src/core/lib/gprpp/pair.h )
s.files += %w( src/core/lib/gprpp/sync.h )
s.files += %w( src/core/lib/gprpp/thd.h )
s.files += %w( src/core/lib/profiling/timers.h )
s.files += %w( src/core/lib/gpr/alloc.cc )

@ -28,6 +28,7 @@
#include <grpcpp/impl/codegen/client_interceptor.h>
#include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/grpc_library.h>
#include <grpcpp/impl/codegen/sync.h>
struct grpc_channel;
@ -97,7 +98,7 @@ class Channel final : public ChannelInterface,
grpc_channel* const c_channel_; // owned
// mu_ protects callback_cq_ (the per-channel callbackable completion queue)
std::mutex mu_;
grpc::internal::Mutex mu_;
// callback_cq_ references the callbackable completion queue associated
// with this channel (if any). It is set on the first call to CallbackCQ().

@ -51,6 +51,7 @@
#include <grpcpp/impl/codegen/slice.h>
#include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/string_ref.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/time.h>
struct census_context;
@ -457,7 +458,7 @@ class ClientContext {
bool idempotent_;
bool cacheable_;
std::shared_ptr<Channel> channel_;
std::mutex mu_;
grpc::internal::Mutex mu_;
grpc_call* call_;
bool call_canceled_;
gpr_timespec deadline_;

@ -0,0 +1,138 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_IMPL_CODEGEN_SYNC_H
#define GRPCPP_IMPL_CODEGEN_SYNC_H
#include <grpc/impl/codegen/log.h>
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
// src/core/lib/gprpp/sync.h too.
//
// Whenever possible, prefer "src/core/lib/gprpp/sync.h" over this file,
// since in core we do not rely on g_core_codegen_interface and hence do not
// pay the costs of virtual function calls.
namespace grpc {
namespace internal {
class Mutex {
public:
Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); }
~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); }
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
private:
gpr_mu mu_;
};
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit MutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~ReleasableMutexLock() {
if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_);
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
g_core_codegen_interface->gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
g_core_codegen_interface->gpr_mu_unlock(mu_);
}
private:
gpr_mu* const mu_;
bool released_ = false;
};
class CondVar {
public:
CondVar() { g_core_codegen_interface->gpr_cv_init(&cv_); }
~CondVar() { g_core_codegen_interface->gpr_cv_destroy(&cv_); }
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); }
void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) {
return Wait(mu,
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
}
private:
gpr_cv cv_;
};
} // namespace internal
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_SYNC_H

@ -297,12 +297,12 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
experimental_registration_type experimental_registration_{this};
// Server status
std::mutex mu_;
grpc::internal::Mutex mu_;
bool started_;
bool shutdown_;
bool shutdown_notified_; // Was notify called on the shutdown_cv_
std::condition_variable shutdown_cv_;
grpc::internal::CondVar shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
@ -311,8 +311,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
// during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_;
grpc::internal::Mutex callback_reqs_mu_;
grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;

@ -304,12 +304,12 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
experimental_registration_type experimental_registration_{this};
// Server status
std::mutex mu_;
grpc::internal::Mutex mu_;
bool started_;
bool shutdown_;
bool shutdown_notified_; // Was notify called on the shutdown_cv_
std::condition_variable shutdown_cv_;
grpc::internal::CondVar shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
@ -318,8 +318,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
// during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_;
grpc::internal::Mutex callback_reqs_mu_;
grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;

@ -109,8 +109,8 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mutex_lock.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/sync.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/profiling/timers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />

@ -51,7 +51,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"

@ -27,7 +27,7 @@
#include "pb_encode.h"
#include "src/core/ext/filters/client_channel/health/health.pb.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/status_metadata.h"
@ -69,7 +69,6 @@ HealthCheckClient::HealthCheckClient(
}
GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
gpr_mu_init(&mu_);
StartCall();
}
@ -78,7 +77,6 @@ HealthCheckClient::~HealthCheckClient() {
gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
}
GRPC_ERROR_UNREF(error_);
gpr_mu_destroy(&mu_);
}
void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state,

@ -31,6 +31,7 @@
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -157,7 +158,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_pollset_set* interested_parties_; // Do not own.
RefCountedPtr<channelz::SubchannelNode> channelz_node_;
gpr_mu mu_;
Mutex mu_;
grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
grpc_error* error_ = GRPC_ERROR_NONE;
grpc_connectivity_state* notify_state_ = nullptr;

@ -33,7 +33,7 @@
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/slice/slice_internal.h"

@ -88,7 +88,6 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"

@ -25,7 +25,7 @@
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {

@ -26,6 +26,7 @@
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
@ -41,9 +42,6 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); }
~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); }
void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received);
@ -66,7 +64,7 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0;
gpr_mu drop_count_mu_; // Guards drop_token_counts_.
Mutex drop_count_mu_; // Guards drop_token_counts_.
UniquePtr<DroppedCallCounts> drop_token_counts_;
};

@ -27,7 +27,7 @@
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -154,13 +154,12 @@ class PickFirst : public LoadBalancingPolicy {
/// Lock and data used to capture snapshots of this channels child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&child_refs_mu_);
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
@ -170,7 +169,6 @@ PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}

@ -36,8 +36,8 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -193,7 +193,7 @@ class RoundRobin : public LoadBalancingPolicy {
bool shutdown_ = false;
/// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_;
Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_;
};
@ -245,7 +245,6 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
//
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&child_refs_mu_);
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created", this);
}
@ -255,7 +254,6 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
}
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
}

@ -89,9 +89,9 @@
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@ -278,10 +278,8 @@ class XdsLb : public LoadBalancingPolicy {
class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
public:
explicit LocalityEntry(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {
gpr_mu_init(&child_policy_mu_);
}
~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); }
: parent_(std::move(parent)) {}
~LocalityEntry() = default;
void UpdateLocked(xds_grpclb_serverlist* serverlist,
LoadBalancingPolicy::Config* child_policy_config,
@ -323,13 +321,10 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
gpr_mu child_policy_mu_;
Mutex child_policy_mu_;
RefCountedPtr<XdsLb> parent_;
};
LocalityMap() { gpr_mu_init(&child_refs_mu_); }
~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); }
void UpdateLocked(const LocalityList& locality_list,
LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent);
@ -343,7 +338,7 @@ class XdsLb : public LoadBalancingPolicy {
Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_;
// Lock held while filling child refs for all localities
// inside the map
gpr_mu child_refs_mu_;
Mutex child_refs_mu_;
};
struct LocalityServerlistEntry {
@ -397,7 +392,7 @@ class XdsLb : public LoadBalancingPolicy {
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
// TODO(juanlishen): Replace this with atomic.
gpr_mu lb_chand_mu_;
Mutex lb_chand_mu_;
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0;
@ -1090,7 +1085,6 @@ XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
locality_map_(),
locality_serverlist_() {
gpr_mu_init(&lb_chand_mu_);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1114,7 +1108,6 @@ XdsLb::XdsLb(Args args)
}
XdsLb::~XdsLb() {
gpr_mu_destroy(&lb_chand_mu_);
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
locality_serverlist_.clear();

@ -48,7 +48,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"

@ -42,8 +42,8 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -457,13 +457,14 @@ struct Subchannel::ExternalStateWatcher {
grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_,
w->pollset_set);
}
gpr_mu_lock(&w->subchannel->mu_);
if (w->subchannel->external_state_watcher_list_ == w) {
w->subchannel->external_state_watcher_list_ = w->next;
{
MutexLock lock(&w->subchannel->mu_);
if (w->subchannel->external_state_watcher_list_ == w) {
w->subchannel->external_state_watcher_list_ = w->next;
}
if (w->next != nullptr) w->next->prev = w->prev;
if (w->prev != nullptr) w->prev->next = w->next;
}
if (w->next != nullptr) w->next->prev = w->prev;
if (w->prev != nullptr) w->prev->next = w->next;
gpr_mu_unlock(&w->subchannel->mu_);
GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done");
Delete(w);
GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
@ -585,7 +586,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
"subchannel");
grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE,
"subchannel");
gpr_mu_init(&mu_);
// Check whether we should enable health checking.
const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG));
@ -632,7 +632,6 @@ Subchannel::~Subchannel() {
grpc_connector_unref(connector_);
grpc_pollset_set_destroy(pollset_set_);
Delete(key_);
gpr_mu_destroy(&mu_);
}
Subchannel* Subchannel::Create(grpc_connector* connector,
@ -908,7 +907,9 @@ void Subchannel::MaybeStartConnectingLocked() {
void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
Subchannel* c = static_cast<Subchannel*>(arg);
gpr_mu_lock(&c->mu_);
// TODO(soheilhy): Once subchannel refcounting is simplified, we can get use
// MutexLock instead of ReleasableMutexLock, here.
ReleasableMutexLock lock(&c->mu_);
c->have_retry_alarm_ = false;
if (c->disconnected_) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
@ -922,9 +923,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->ContinueConnectingLocked();
gpr_mu_unlock(&c->mu_);
lock.Unlock();
} else {
gpr_mu_unlock(&c->mu_);
lock.Unlock();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
}
GRPC_ERROR_UNREF(error);
@ -951,29 +952,30 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
auto* c = static_cast<Subchannel*>(arg);
grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
gpr_mu_lock(&c->mu_);
c->connecting_ = false;
if (c->connecting_result_.transport != nullptr &&
c->PublishTransportLocked()) {
// Do nothing, transport was published.
} else if (c->disconnected_) {
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} else {
const char* errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
error =
grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE, error,
"connect_failed");
c->MaybeStartConnectingLocked();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
{
MutexLock lock(&c->mu_);
c->connecting_ = false;
if (c->connecting_result_.transport != nullptr &&
c->PublishTransportLocked()) {
// Do nothing, transport was published.
} else if (c->disconnected_) {
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} else {
const char* errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
error = grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Connect Failed",
&error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE, error,
"connect_failed");
c->MaybeStartConnectingLocked();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
}
}
gpr_mu_unlock(&c->mu_);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished");
grpc_channel_args_destroy(delete_channel_args);
}

@ -29,6 +29,7 @@
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -264,7 +265,7 @@ class Subchannel {
// pollset_set tracking who's interested in a connection being setup.
grpc_pollset_set* pollset_set_;
// Protects the other members.
gpr_mu mu_;
Mutex mu_;
// Refcount
// - lower INTERNAL_REF_BITS bits are for internal references:
// these do not keep the subchannel open.

@ -23,7 +23,7 @@
#include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>

@ -27,8 +27,8 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"

@ -1,42 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
#define GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
namespace grpc_core {
class MutexLock {
public:
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu); }
~MutexLock() { gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H */

@ -0,0 +1,126 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_SYNC_H
#define GRPC_CORE_LIB_GPRPP_SYNC_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/impl/codegen/log.h>
#include <grpc/impl/codegen/sync.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
// include/grpcpp/impl/codegen/sync.h.
//
// Whenever possible, prefer using this file over <grpcpp/impl/codegen/sync.h>
// since this file doesn't rely on g_core_codegen_interface and hence does not
// pay the costs of virtual function calls.
namespace grpc_core {
class Mutex {
public:
Mutex() { gpr_mu_init(&mu_); }
~Mutex() { gpr_mu_destroy(&mu_); }
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
private:
gpr_mu mu_;
};
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLock() { gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~ReleasableMutexLock() {
if (!released_) gpr_mu_unlock(mu_);
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
gpr_mu_unlock(mu_);
}
private:
gpr_mu* const mu_;
bool released_ = false;
};
class CondVar {
public:
CondVar() { gpr_cv_init(&cv_); }
~CondVar() { gpr_cv_destroy(&cv_); }
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
void Signal() { gpr_cv_signal(&cv_); }
void Broadcast() { gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); }
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
}
private:
gpr_cv cv_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_SYNC_H */

@ -47,7 +47,7 @@
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"

@ -33,7 +33,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"

@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h"
#include "src/core/tsi/ssl/session_cache/ssl_session_cache.h"

@ -232,7 +232,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered
std::lock_guard<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{

@ -25,6 +25,7 @@
#include <grpc/support/string_util.h>
#include <grpcpp/impl/codegen/interceptor_common.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h>
@ -84,7 +85,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key,
void ClientContext::set_call(grpc_call* call,
const std::shared_ptr<Channel>& channel) {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
GPR_ASSERT(call_ == nullptr);
call_ = call;
channel_ = channel;
@ -114,7 +115,7 @@ void ClientContext::set_compression_algorithm(
}
void ClientContext::TryCancel() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
if (call_) {
SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr);

@ -21,6 +21,7 @@
#include <mutex>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/sync.h>
#include "src/core/lib/gprpp/thd.h"
@ -40,27 +41,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->ThreadFunc();
// Now that we have killed ourselves, we should reduce the thread count
std::unique_lock<std::mutex> lock(pool_->mu_);
grpc_core::MutexLock lock(&pool_->mu_);
pool_->nthreads_--;
// Move ourselves to dead list
pool_->dead_threads_.push_back(this);
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
pool_->shutdown_cv_.notify_one();
pool_->shutdown_cv_.Signal();
}
}
void DynamicThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
std::unique_lock<std::mutex> lock(mu_);
grpc_core::ReleasableMutexLock lock(&mu_);
if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
break;
}
threads_waiting_++;
cv_.wait(lock);
cv_.Wait(&mu_);
threads_waiting_--;
}
// Drain callbacks before considering shutdown to ensure all work
@ -68,7 +69,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!callbacks_.empty()) {
auto cb = callbacks_.front();
callbacks_.pop();
lock.unlock();
lock.Unlock();
cb();
} else if (shutdown_) {
break;
@ -82,7 +83,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads)
nthreads_(0),
threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
nthreads_++;
new DynamicThread(this);
}
@ -95,17 +96,17 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
}
DynamicThreadPool::~DynamicThreadPool() {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_.notify_all();
cv_.Broadcast();
while (nthreads_ != 0) {
shutdown_cv_.wait(lock);
shutdown_cv_.Wait(&mu_);
}
ReapThreads(&dead_threads_);
}
void DynamicThreadPool::Add(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
callbacks_.push(callback);
// Increase pool size or notify as needed
@ -114,7 +115,7 @@ void DynamicThreadPool::Add(const std::function<void()>& callback) {
nthreads_++;
new DynamicThread(this);
} else {
cv_.notify_one();
cv_.Signal();
}
// Also use this chance to harvest dead threads
if (!dead_threads_.empty()) {

@ -27,6 +27,7 @@
#include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/thread_pool_interface.h"
@ -50,9 +51,9 @@ class DynamicThreadPool final : public ThreadPoolInterface {
grpc_core::Thread thd_;
void ThreadFunc();
};
std::mutex mu_;
std::condition_variable cv_;
std::condition_variable shutdown_cv_;
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
grpc_core::CondVar shutdown_cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
int reserve_threads_;

@ -41,7 +41,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map.
serving = false;
@ -51,7 +51,7 @@ void DefaultHealthCheckService::SetServingStatus(
void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING;
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
@ -62,7 +62,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) {
}
void DefaultHealthCheckService::Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
@ -76,7 +76,7 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) {
return NOT_FOUND;
@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus(
void DefaultHealthCheckService::RegisterCallHandler(
const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name];
service_data.AddCallHandler(handler /* copies ref */);
HealthCheckServiceImpl::CallHandler* h = handler.get();
@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler(
void DefaultHealthCheckService::UnregisterCallHandler(
const grpc::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name);
if (it == services_map_.end()) return;
ServiceData& service_data = it->second;
@ -166,7 +166,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
// We will reach here after the server starts shutting down.
shutdown_ = true;
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown();
}
thread_->Join();
@ -266,7 +266,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request a Check() call.
handler->next_ =
@ -311,7 +311,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
}
// Send response.
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
@ -347,7 +347,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
// Request AsyncNotifyWhenDone().
handler->on_done_notified_ =
@ -402,7 +402,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> lock(send_mu_);
grpc_core::MutexLock lock(&send_mu_);
// If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes.
if (send_in_flight_) {
@ -420,7 +420,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
ByteBuffer response;
bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response.
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED);
return;
@ -442,7 +442,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::move(self), Status::CANCELLED);
return;
}
std::unique_lock<std::mutex> lock(send_mu_);
grpc_core::MutexLock lock(&send_mu_);
send_in_flight_ = false;
// If we got a new status since we started the last send, start a
// new send for it.
@ -456,7 +456,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return;
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status);
}

@ -31,6 +31,7 @@
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc {
@ -197,7 +198,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
GenericServerAsyncWriter stream_;
ServerContext ctx_;
std::mutex send_mu_;
grpc_core::Mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
@ -226,7 +227,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_;
grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_;
};
@ -273,7 +274,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
const grpc::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable std::mutex mu_;
mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_.
std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_;

@ -239,7 +239,7 @@ grpc::string LoadReporter::GenerateLbId() {
::grpc::lb::v1::LoadBalancingFeedback
LoadReporter::GenerateLoadBalancingFeedback() {
std::unique_lock<std::mutex> lock(feedback_mu_);
grpc_core::ReleasableMutexLock lock(&feedback_mu_);
auto now = std::chrono::system_clock::now();
// Discard records outside the window until there is only one record
// outside the window, which is used as the base for difference.
@ -277,7 +277,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
std::chrono::duration<double> duration_seconds =
newest->end_time - oldest->end_time;
lock.unlock();
lock.Unlock();
::grpc::lb::v1::LoadBalancingFeedback feedback;
feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
feedback.set_calls_per_second(
@ -290,7 +290,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
LoadReporter::GenerateLoads(const grpc::string& hostname,
const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
GPR_ASSERT(assigned_stores != nullptr);
GPR_ASSERT(!assigned_stores->empty());
@ -371,7 +371,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
// This will make the load balancing feedback generation a no-op.
cpu_stats = {0, 0};
}
std::unique_lock<std::mutex> lock(feedback_mu_);
grpc_core::MutexLock lock(&feedback_mu_);
feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
cpu_stats.first, cpu_stats.second);
}
@ -379,7 +379,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
const grpc::string& lb_id,
const grpc::string& load_key) {
std::lock_guard<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
gpr_log(GPR_INFO,
"[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
@ -388,7 +388,7 @@ void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
load_data_store_.ReportStreamClosed(hostname, lb_id);
gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
hostname.c_str(), lb_id.c_str());
@ -407,7 +407,7 @@ void LoadReporter::ProcessViewDataCallStart(
LoadRecordKey key(client_ip_and_token, user_id);
LoadRecordValue value = LoadRecordValue(start_count);
{
std::unique_lock<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}
@ -459,7 +459,7 @@ void LoadReporter::ProcessViewDataCallEnd(
LoadRecordValue value = LoadRecordValue(
0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
{
std::unique_lock<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}
@ -486,7 +486,7 @@ void LoadReporter::ProcessViewDataOtherCallMetrics(
LoadRecordValue value = LoadRecordValue(
metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
{
std::unique_lock<std::mutex> lock(store_mu_);
grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value);
}
}

@ -29,6 +29,7 @@
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/server/load_reporter/load_data_store.h"
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
@ -212,11 +213,11 @@ class LoadReporter {
std::atomic<int64_t> next_lb_id_{0};
const std::chrono::seconds feedback_sample_window_seconds_;
std::mutex feedback_mu_;
grpc_core::Mutex feedback_mu_;
std::deque<LoadBalancingFeedbackRecord> feedback_records_;
// TODO(juanlishen): Lock in finer grain. Locking the whole store may be
// too expensive.
std::mutex store_mu_;
grpc_core::Mutex store_mu_;
LoadDataStore load_data_store_;
std::unique_ptr<CensusViewProvider> census_view_provider_;
std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;

@ -48,7 +48,7 @@ LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
// We will reach here after the server starts shutting down.
shutdown_ = true;
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown();
}
if (next_fetch_and_sample_alarm_ != nullptr)
@ -62,7 +62,7 @@ void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
GPR_TIMESPAN));
{
std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
grpc_core::MutexLock lock(&cq_shutdown_mu_);
if (shutdown_) return;
// TODO(juanlishen): Improve the Alarm implementation to reuse a single
// instance for multiple events.
@ -119,7 +119,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
ReportLoadHandler* p = handler.get();
{
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return;
p->on_done_notified_ =
CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
@ -164,9 +164,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
// instance will deallocate itself when it's done.
CreateAndStart(cq_, service_, load_reporter_);
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
lock.Unlock();
Shutdown(std::move(self), "OnRequestDelivered");
return;
}
@ -222,9 +222,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
SendReport(self, true /* ok */);
// Expect this read to fail.
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
lock.Unlock();
Shutdown(std::move(self), "OnReadDone");
return;
}
@ -254,9 +254,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
lock.Unlock();
Shutdown(std::move(self), "ScheduleNextReport");
return;
}
@ -294,9 +294,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
call_status_ = INITIAL_RESPONSE_SENT;
}
{
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) {
lock.release()->unlock();
lock.Unlock();
Shutdown(std::move(self), "SendReport");
return;
}
@ -342,7 +342,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
// OnRequestDelivered() may be called after OnDoneNotified(), so we need to
// try to Finish() every time we are in Shutdown().
if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) {
on_finish_done_ =
CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,

@ -25,6 +25,7 @@
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/load_reporter/load_reporter.h"
@ -181,7 +182,7 @@ class LoadReporterAsyncServiceImpl
std::unique_ptr<ServerCompletionQueue> cq_;
// To synchronize the operations related to shutdown state of cq_, so that we
// don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_;
grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_;
std::unique_ptr<LoadReporter> load_reporter_;

@ -388,9 +388,9 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
// The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown.
std::lock_guard<std::mutex> l(server_->callback_reqs_mu_);
grpc::internal::MutexLock l(&server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.notify_one();
server_->callback_reqs_done_cv_.Signal();
}
}
@ -814,12 +814,12 @@ Server::Server(
Server::~Server() {
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::ReleasableMutexLock lock(&mu_);
if (callback_cq_ != nullptr) {
callback_cq_->Shutdown();
}
if (started_ && !shutdown_) {
lock.unlock();
lock.Unlock();
Shutdown();
} else if (!started_) {
// Shutdown the completion queues
@ -1051,7 +1051,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
}
void Server::ShutdownInternal(gpr_timespec deadline) {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
if (shutdown_) {
return;
}
@ -1102,9 +1102,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter.
{
std::unique_lock<std::mutex> cblock(callback_reqs_mu_);
callback_reqs_done_cv_.wait(
cblock, [this] { return callback_reqs_outstanding_ == 0; });
grpc::internal::MutexLock cblock(&callback_reqs_mu_);
callback_reqs_done_cv_.WaitUntil(
&callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; });
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
@ -1114,13 +1114,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
shutdown_cv_.Broadcast();
}
void Server::Wait() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
while (started_ && !shutdown_notified_) {
shutdown_cv_.wait(lock);
shutdown_cv_.Wait(&mu_);
}
}
@ -1322,7 +1322,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered
std::lock_guard<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) {
auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{

@ -33,6 +33,7 @@
#include <grpcpp/support/time.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/call.h"
namespace grpc {
@ -96,7 +97,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
void SetCancelCallback(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
if (finalized_ && (cancelled_ != 0)) {
callback();
@ -107,7 +108,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
void ClearCancelCallback() {
std::lock_guard<std::mutex> g(mu_);
grpc_core::MutexLock g(&mu_);
cancel_callback_ = nullptr;
}
@ -144,7 +145,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
private:
bool CheckCancelledNoPluck() {
std::lock_guard<std::mutex> g(mu_);
grpc_core::MutexLock lock(&mu_);
return finalized_ ? (cancelled_ != 0) : false;
}
@ -154,7 +155,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
void* tag_;
void* core_cq_tag_;
grpc_core::RefCount refs_;
std::mutex mu_;
grpc_core::Mutex mu_;
bool finalized_;
int cancelled_; // This is an int (not bool) because it is passed to core
std::function<void()> cancel_callback_;
@ -186,7 +187,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
bool ret = false;
std::unique_lock<std::mutex> lock(mu_);
grpc_core::ReleasableMutexLock lock(&mu_);
if (done_intercepting_) {
/* We are done intercepting. */
if (has_tag_) {
@ -216,12 +217,11 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
// Release the lock since we are going to be calling a callback and
// interceptors now
lock.unlock();
lock.Unlock();
if (call_cancel && reactor_ != nullptr) {
reactor_->OnCancel();
}
/* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);

@ -62,7 +62,7 @@ ThreadManager::ThreadManager(const char* name,
ThreadManager::~ThreadManager() {
{
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(num_threads_ == 0);
}
@ -72,38 +72,38 @@ ThreadManager::~ThreadManager() {
}
void ThreadManager::Wait() {
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
while (num_threads_ != 0) {
shutdown_cv_.wait(lock);
shutdown_cv_.Wait(&mu_);
}
}
void ThreadManager::Shutdown() {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
}
bool ThreadManager::IsShutdown() {
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
return shutdown_;
}
int ThreadManager::GetMaxActiveThreadsSoFar() {
std::lock_guard<std::mutex> list_lock(list_mu_);
grpc_core::MutexLock list_lock(&list_mu_);
return max_active_threads_sofar_;
}
void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{
std::lock_guard<std::mutex> list_lock(list_mu_);
grpc_core::MutexLock list_lock(&list_mu_);
completed_threads_.push_back(thd);
}
{
std::lock_guard<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
num_threads_--;
if (num_threads_ == 0) {
shutdown_cv_.notify_one();
shutdown_cv_.Signal();
}
}
@ -116,7 +116,7 @@ void ThreadManager::CleanupCompletedThreads() {
{
// swap out the completed threads list: allows other threads to clean up
// more quickly
std::unique_lock<std::mutex> lock(list_mu_);
grpc_core::MutexLock lock(&list_mu_);
completed_threads.swap(completed_threads_);
}
for (auto thd : completed_threads) delete thd;
@ -132,7 +132,7 @@ void ThreadManager::Initialize() {
}
{
std::unique_lock<std::mutex> lock(mu_);
grpc_core::MutexLock lock(&mu_);
num_pollers_ = min_pollers_;
num_threads_ = min_pollers_;
max_active_threads_sofar_ = min_pollers_;
@ -149,7 +149,7 @@ void ThreadManager::MainWorkLoop() {
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);
std::unique_lock<std::mutex> lock(mu_);
grpc_core::ReleasableMutexLock lock(&mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
bool done = false;
@ -176,30 +176,30 @@ void ThreadManager::MainWorkLoop() {
max_active_threads_sofar_ = num_threads_;
}
// Drop lock before spawning thread to avoid contention
lock.unlock();
lock.Unlock();
new WorkerThread(this);
} else if (num_pollers_ > 0) {
// There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would
// like to have (min_pollers_)
lock.unlock();
lock.Unlock();
} else {
// There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted!
lock.unlock();
lock.Unlock();
resource_exhausted = true;
}
} else {
// There are a sufficient number of pollers available so we can do
// the work and continue polling with our existing poller threads
lock.unlock();
lock.Unlock();
}
// Lock is always released at this point - do the application work
// or return resource exhausted if there is new work but we couldn't
// get a thread in which to do it.
DoWork(tag, ok, !resource_exhausted);
// Take the lock again to check post conditions
lock.lock();
lock.Lock();
// If we're shutdown, we should finish at this point.
if (shutdown_) done = true;
break;

@ -26,6 +26,7 @@
#include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/resource_quota.h"
@ -140,10 +141,10 @@ class ThreadManager {
// Protects shutdown_, num_pollers_, num_threads_ and
// max_active_threads_sofar_
std::mutex mu_;
grpc_core::Mutex mu_;
bool shutdown_;
std::condition_variable shutdown_cv_;
grpc_core::CondVar shutdown_cv_;
// The resource user object to use when requesting quota to create threads
//
@ -169,7 +170,7 @@ class ThreadManager {
// ever set so far
int max_active_threads_sofar_;
std::mutex list_mu_;
grpc_core::Mutex list_mu_;
std::list<WorkerThread*> completed_threads_;
};

@ -31,6 +31,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
@ -168,24 +169,24 @@ class ClientChannelStressTest {
explicit ServerThread(const grpc::string& type,
const grpc::string& server_host, T* service)
: type_(type), service_(service) {
std::mutex mu;
grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Start from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu);
grpc::internal::MutexLock lock(&mu);
port_ = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
std::condition_variable cond;
grpc::internal::CondVar cond;
thread_.reset(new std::thread(
std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
cond.wait(lock);
cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
}
void Start(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
void Start(const grpc::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu);
grpc::internal::MutexLock lock(mu);
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -193,7 +194,7 @@ class ClientChannelStressTest {
InsecureServerCredentials());
builder.RegisterService(service_);
server_ = builder.BuildAndStart();
cond->notify_one();
cond->Signal();
}
void Shutdown() {

@ -33,6 +33,7 @@
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
@ -98,7 +99,7 @@ class MyTestServiceImpl : public TestServiceImpl {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
++request_count_;
}
AddClient(context->peer());
@ -106,29 +107,29 @@ class MyTestServiceImpl : public TestServiceImpl {
}
int request_count() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return request_count_;
}
void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
request_count_ = 0;
}
std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
return clients_;
}
private:
void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client);
}
std::mutex mu_;
grpc::internal::Mutex mu_;
int request_count_;
std::mutex clients_mu_;
grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_;
};
@ -293,18 +294,18 @@ class ClientLbEnd2endTest : public ::testing::Test {
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
started_ = true;
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
grpc::internal::Mutex mu;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
cond.WaitUntil(&mu, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -313,9 +314,9 @@ class ClientLbEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), std::move(creds));
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu);
grpc::internal::MutexLock lock(mu);
server_ready_ = true;
cond->notify_one();
cond->Signal();
}
void Shutdown() {
@ -1374,7 +1375,7 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
void TearDown() override { ClientLbEnd2endTest::TearDown(); }
int trailers_intercepted() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return trailers_intercepted_;
}
@ -1382,11 +1383,11 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
static void ReportTrailerIntercepted(void* arg) {
ClientLbInterceptTrailingMetadataTest* self =
static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
std::unique_lock<std::mutex> lock(self->mu_);
grpc::internal::MutexLock lock(&self->mu_);
self->trailers_intercepted_++;
}
std::mutex mu_;
grpc::internal::Mutex mu_;
int trailers_intercepted_ = 0;
};

@ -30,6 +30,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
@ -85,32 +86,32 @@ template <typename ServiceType>
class CountedService : public ServiceType {
public:
size_t request_count() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return request_count_;
}
size_t response_count() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return response_count_;
}
void IncreaseResponseCount() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
++response_count_;
}
void IncreaseRequestCount() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
++request_count_;
}
void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
request_count_ = 0;
response_count_ = 0;
}
protected:
std::mutex mu_;
grpc::internal::Mutex mu_;
private:
size_t request_count_ = 0;
@ -148,18 +149,18 @@ class BackendServiceImpl : public BackendService {
void Shutdown() {}
std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
return clients_;
}
private:
void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client);
}
std::mutex mu_;
std::mutex clients_mu_;
grpc::internal::Mutex mu_;
grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_;
};
@ -210,7 +211,7 @@ class BalancerServiceImpl : public BalancerService {
Status BalanceLoad(ServerContext* context, Stream* stream) override {
gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
if (serverlist_done_) goto done;
}
{
@ -237,7 +238,7 @@ class BalancerServiceImpl : public BalancerService {
}
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
@ -245,8 +246,8 @@ class BalancerServiceImpl : public BalancerService {
response_and_delay.second);
}
{
std::unique_lock<std::mutex> lock(mu_);
serverlist_cond_.wait(lock, [this] { return serverlist_done_; });
grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; });
}
if (client_load_reporting_interval_seconds_ > 0) {
@ -257,7 +258,7 @@ class BalancerServiceImpl : public BalancerService {
GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
client_stats_.num_calls_started +=
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
@ -274,7 +275,7 @@ class BalancerServiceImpl : public BalancerService {
drop_token_count.num_calls();
}
load_report_ready_ = true;
load_report_cond_.notify_one();
load_report_cond_.Signal();
}
}
}
@ -284,12 +285,12 @@ class BalancerServiceImpl : public BalancerService {
}
void add_response(const LoadBalanceResponse& response, int send_after_ms) {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
void Start() {
std::lock_guard<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
serverlist_done_ = false;
load_report_ready_ = false;
responses_and_delays_.clear();
@ -326,17 +327,17 @@ class BalancerServiceImpl : public BalancerService {
}
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
load_report_cond_.wait(lock, [this] { return load_report_ready_; });
grpc::internal::MutexLock lock(&mu_);
load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; });
load_report_ready_ = false;
return client_stats_;
}
void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
if (!serverlist_done_) {
serverlist_done_ = true;
serverlist_cond_.notify_all();
serverlist_cond_.Broadcast();
}
}
@ -355,10 +356,10 @@ class BalancerServiceImpl : public BalancerService {
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
std::condition_variable load_report_cond_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar load_report_cond_;
bool load_report_ready_ = false;
std::condition_variable serverlist_cond_;
grpc::internal::CondVar serverlist_cond_;
bool serverlist_done_ = false;
ClientStats client_stats_;
};
@ -624,22 +625,22 @@ class GrpclbEnd2endTest : public ::testing::Test {
GPR_ASSERT(!running_);
running_ = true;
service_.Start();
std::mutex mu;
grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Serve from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_.reset(new std::thread(
std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)));
cond.wait(lock);
cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
}
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu);
grpc::internal::MutexLock lock(mu);
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -648,7 +649,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), creds);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
cond->notify_one();
cond->Signal();
}
void Shutdown() {

@ -25,6 +25,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
@ -188,7 +189,7 @@ class CommonStressTestAsyncServer : public BaseClass {
}
void TearDown() override {
{
std::unique_lock<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
this->TearDownStart();
shutting_down_ = true;
cq_->Shutdown();
@ -229,7 +230,7 @@ class CommonStressTestAsyncServer : public BaseClass {
}
}
void RefreshContext(int i) {
std::unique_lock<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
if (!shutting_down_) {
contexts_[i].state = Context::READY;
contexts_[i].srv_ctx.reset(new ServerContext);
@ -253,7 +254,7 @@ class CommonStressTestAsyncServer : public BaseClass {
::grpc::testing::EchoTestService::AsyncService service_;
std::unique_ptr<ServerCompletionQueue> cq_;
bool shutting_down_;
std::mutex mu_;
grpc::internal::Mutex mu_;
std::vector<std::thread> server_threads_;
};
@ -341,9 +342,9 @@ class AsyncClientEnd2endTest : public ::testing::Test {
}
void Wait() {
std::unique_lock<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
while (rpcs_outstanding_ != 0) {
cv_.wait(l);
cv_.Wait(&mu_);
}
cq_.Shutdown();
@ -366,7 +367,7 @@ class AsyncClientEnd2endTest : public ::testing::Test {
call->response_reader->Finish(&call->response, &call->status,
(void*)call);
std::unique_lock<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
rpcs_outstanding_++;
}
}
@ -384,20 +385,20 @@ class AsyncClientEnd2endTest : public ::testing::Test {
bool notify;
{
std::unique_lock<std::mutex> l(mu_);
grpc::internal::MutexLock l(&mu_);
rpcs_outstanding_--;
notify = (rpcs_outstanding_ == 0);
}
if (notify) {
cv_.notify_all();
cv_.Signal();
}
}
}
Common common_;
CompletionQueue cq_;
std::mutex mu_;
std::condition_variable cv_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar cv_;
int rpcs_outstanding_;
};

@ -84,32 +84,32 @@ template <typename ServiceType>
class CountedService : public ServiceType {
public:
size_t request_count() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return request_count_;
}
size_t response_count() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
return response_count_;
}
void IncreaseResponseCount() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
++response_count_;
}
void IncreaseRequestCount() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
++request_count_;
}
void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
request_count_ = 0;
response_count_ = 0;
}
protected:
std::mutex mu_;
grpc::internal::Mutex mu_;
private:
size_t request_count_ = 0;
@ -145,18 +145,18 @@ class BackendServiceImpl : public BackendService {
void Shutdown() {}
std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
return clients_;
}
private:
void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_);
grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client);
}
std::mutex mu_;
std::mutex clients_mu_;
grpc::internal::Mutex mu_;
grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_;
};
@ -208,7 +208,7 @@ class BalancerServiceImpl : public BalancerService {
// TODO(juanlishen): Clean up the scoping.
gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
if (serverlist_done_) goto done;
}
{
@ -234,7 +234,7 @@ class BalancerServiceImpl : public BalancerService {
}
{
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
responses_and_delays = responses_and_delays_;
}
for (const auto& response_and_delay : responses_and_delays) {
@ -242,8 +242,8 @@ class BalancerServiceImpl : public BalancerService {
response_and_delay.second);
}
{
std::unique_lock<std::mutex> lock(mu_);
serverlist_cond_.wait(lock, [this] { return serverlist_done_; });
grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; });
}
if (client_load_reporting_interval_seconds_ > 0) {
@ -254,7 +254,7 @@ class BalancerServiceImpl : public BalancerService {
GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
client_stats_.num_calls_started +=
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
@ -271,7 +271,7 @@ class BalancerServiceImpl : public BalancerService {
drop_token_count.num_calls();
}
load_report_ready_ = true;
load_report_cond_.notify_one();
load_report_cond_.Signal();
}
}
}
@ -281,12 +281,12 @@ class BalancerServiceImpl : public BalancerService {
}
void add_response(const LoadBalanceResponse& response, int send_after_ms) {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
}
void Shutdown() {
std::unique_lock<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
NotifyDoneWithServerlistsLocked();
responses_and_delays_.clear();
client_stats_.Reset();
@ -318,21 +318,21 @@ class BalancerServiceImpl : public BalancerService {
}
const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_);
load_report_cond_.wait(lock, [this] { return load_report_ready_; });
grpc::internal::MutexLock lock(&mu_);
load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; });
load_report_ready_ = false;
return client_stats_;
}
void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_);
grpc::internal::MutexLock lock(&mu_);
NotifyDoneWithServerlistsLocked();
}
void NotifyDoneWithServerlistsLocked() {
if (!serverlist_done_) {
serverlist_done_ = true;
serverlist_cond_.notify_all();
serverlist_cond_.Broadcast();
}
}
@ -351,10 +351,10 @@ class BalancerServiceImpl : public BalancerService {
const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_;
std::condition_variable load_report_cond_;
grpc::internal::Mutex mu_;
grpc::internal::CondVar load_report_cond_;
bool load_report_ready_ = false;
std::condition_variable serverlist_cond_;
grpc::internal::CondVar serverlist_cond_;
bool serverlist_done_ = false;
ClientStats client_stats_;
};
@ -637,22 +637,22 @@ class XdsEnd2endTest : public ::testing::Test {
gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
GPR_ASSERT(!running_);
running_ = true;
std::mutex mu;
grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Serve from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_.reset(new std::thread(
std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)));
cond.wait(lock);
cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
}
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu);
grpc::internal::MutexLock lock(mu);
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
@ -661,7 +661,7 @@ class XdsEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), creds);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
cond->notify_one();
cond->Signal();
}
void Shutdown() {

@ -987,6 +987,7 @@ include/grpcpp/impl/codegen/status.h \
include/grpcpp/impl/codegen/status_code_enum.h \
include/grpcpp/impl/codegen/string_ref.h \
include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync.h \
include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/grpc_library.h \

@ -989,6 +989,7 @@ include/grpcpp/impl/codegen/status.h \
include/grpcpp/impl/codegen/status_code_enum.h \
include/grpcpp/impl/codegen/string_ref.h \
include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync.h \
include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/grpc_library.h \
@ -1085,12 +1086,12 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/sync.h \
src/core/lib/gprpp/thd.h \
src/core/lib/http/format_request.h \
src/core/lib/http/httpcli.h \

@ -1166,12 +1166,12 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/sync.h \
src/core/lib/gprpp/thd.h \
src/core/lib/gprpp/thd_posix.cc \
src/core/lib/gprpp/thd_windows.cc \

@ -8033,8 +8033,8 @@
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h"
],
@ -8081,8 +8081,8 @@
"src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h"
],
@ -9860,6 +9860,7 @@
},
{
"deps": [
"grpc++_internal_hdrs_only",
"grpc_codegen"
],
"headers": [
@ -10056,6 +10057,7 @@
"gpr",
"gpr_base_headers",
"grpc++_codegen_base",
"grpc++_internal_hdrs_only",
"grpc_base_headers",
"grpc_transport_inproc_headers",
"health_proto",
@ -10350,6 +10352,20 @@
"third_party": false,
"type": "filegroup"
},
{
"deps": [],
"headers": [
"include/grpcpp/impl/codegen/sync.h"
],
"is_filegroup": true,
"language": "c++",
"name": "grpc++_internal_hdrs_only",
"src": [
"include/grpcpp/impl/codegen/sync.h"
],
"third_party": false,
"type": "filegroup"
},
{
"deps": [],
"headers": [

Loading…
Cancel
Save