Revert "Revert "Introduce C++ wrappers for gpr_mu and gpr_cv.""

This reverts commit d09c9f8e20.
pull/18774/head
Soheil Hassas Yeganeh 6 years ago
parent 0de83950e6
commit 2b9448a71c
  1. 14
      BUILD
  2. 5
      BUILD.gn
  3. 5
      CMakeLists.txt
  4. 5
      Makefile
  5. 8
      build.yaml
  6. 7
      gRPC-C++.podspec
  7. 4
      gRPC-Core.podspec
  8. 2
      grpc.gemspec
  9. 3
      include/grpcpp/channel.h
  10. 3
      include/grpcpp/impl/codegen/client_context.h
  11. 138
      include/grpcpp/impl/codegen/sync.h
  12. 8
      include/grpcpp/server.h
  13. 8
      include/grpcpp/server_impl.h
  14. 2
      package.xml
  15. 2
      src/core/ext/filters/client_channel/client_channel.cc
  16. 4
      src/core/ext/filters/client_channel/health/health_check_client.cc
  17. 3
      src/core/ext/filters/client_channel/health/health_check_client.h
  18. 2
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  19. 1
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  20. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
  21. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  22. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  23. 6
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  24. 19
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  25. 2
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  26. 58
      src/core/ext/filters/client_channel/subchannel.cc
  27. 3
      src/core/ext/filters/client_channel/subchannel.h
  28. 2
      src/core/lib/channel/channelz_registry.cc
  29. 2
      src/core/lib/channel/handshaker.h
  30. 42
      src/core/lib/gprpp/mutex_lock.h
  31. 126
      src/core/lib/gprpp/sync.h
  32. 2
      src/core/lib/iomgr/ev_epollex_linux.cc
  33. 2
      src/core/lib/surface/init.cc
  34. 2
      src/core/tsi/ssl/session_cache/ssl_session_cache.cc
  35. 2
      src/cpp/client/channel_cc.cc
  36. 5
      src/cpp/client/client_context.cc
  37. 23
      src/cpp/server/dynamic_thread_pool.cc
  38. 7
      src/cpp/server/dynamic_thread_pool.h
  39. 28
      src/cpp/server/health/default_health_check_service.cc
  40. 7
      src/cpp/server/health/default_health_check_service.h
  41. 18
      src/cpp/server/load_reporter/load_reporter.cc
  42. 5
      src/cpp/server/load_reporter/load_reporter.h
  43. 24
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  44. 3
      src/cpp/server/load_reporter/load_reporter_async_service_impl.h
  45. 24
      src/cpp/server/server_cc.cc
  46. 17
      src/cpp/server/server_context.cc
  47. 34
      src/cpp/thread_manager/thread_manager.cc
  48. 7
      src/cpp/thread_manager/thread_manager.h
  49. 17
      test/cpp/client/client_channel_stress_test.cc
  50. 37
      test/cpp/end2end/client_lb_end2end_test.cc
  51. 67
      test/cpp/end2end/grpclb_end2end_test.cc
  52. 21
      test/cpp/end2end/thread_stress_test.cc
  53. 66
      test/cpp/end2end/xds_end2end_test.cc
  54. 1
      tools/doxygen/Doxyfile.c++
  55. 3
      tools/doxygen/Doxyfile.c++.internal
  56. 2
      tools/doxygen/Doxyfile.core.internal
  57. 20
      tools/run_tests/generated/sources_and_headers.json

14
BUILD

@ -525,6 +525,17 @@ grpc_cc_library(
], ],
) )
grpc_cc_library(
name = "grpc++_internal_hdrs_only",
hdrs = [
"include/grpcpp/impl/codegen/sync.h",
],
language = "c++",
deps = [
"gpr_codegen",
],
)
grpc_cc_library( grpc_cc_library(
name = "gpr_base", name = "gpr_base",
srcs = [ srcs = [
@ -590,8 +601,8 @@ grpc_cc_library(
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h", "src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h", "src/core/lib/profiling/timers.h",
], ],
@ -2147,6 +2158,7 @@ grpc_cc_library(
"include/grpcpp/impl/codegen/time.h", "include/grpcpp/impl/codegen/time.h",
], ],
deps = [ deps = [
"grpc++_internal_hdrs_only",
"grpc_codegen", "grpc_codegen",
], ],
) )

@ -186,8 +186,8 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h", "src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/gprpp/thd_posix.cc", "src/core/lib/gprpp/thd_posix.cc",
"src/core/lib/gprpp/thd_windows.cc", "src/core/lib/gprpp/thd_windows.cc",
@ -1066,6 +1066,7 @@ config("grpc_config") {
"include/grpcpp/impl/codegen/status_code_enum.h", "include/grpcpp/impl/codegen/status_code_enum.h",
"include/grpcpp/impl/codegen/string_ref.h", "include/grpcpp/impl/codegen/string_ref.h",
"include/grpcpp/impl/codegen/stub_options.h", "include/grpcpp/impl/codegen/stub_options.h",
"include/grpcpp/impl/codegen/sync.h",
"include/grpcpp/impl/codegen/sync_stream.h", "include/grpcpp/impl/codegen/sync_stream.h",
"include/grpcpp/impl/codegen/time.h", "include/grpcpp/impl/codegen/time.h",
"include/grpcpp/impl/grpc_library.h", "include/grpcpp/impl/grpc_library.h",
@ -1161,12 +1162,12 @@ config("grpc_config") {
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h", "src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/optional.h", "src/core/lib/gprpp/optional.h",
"src/core/lib/gprpp/orphanable.h", "src/core/lib/gprpp/orphanable.h",
"src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/ref_counted.h", "src/core/lib/gprpp/ref_counted.h",
"src/core/lib/gprpp/ref_counted_ptr.h", "src/core/lib/gprpp/ref_counted_ptr.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/http/format_request.h", "src/core/lib/http/format_request.h",
"src/core/lib/http/httpcli.h", "src/core/lib/http/httpcli.h",

@ -3187,6 +3187,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -3790,6 +3791,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
include/grpc/census.h include/grpc/census.h
) )
string(REPLACE "include/" "" _path ${_hdr}) string(REPLACE "include/" "" _path ${_hdr})
@ -4244,6 +4246,7 @@ foreach(_hdr
include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_generic.h
include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_posix.h
include/grpc/impl/codegen/sync_windows.h include/grpc/impl/codegen/sync_windows.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -4440,6 +4443,7 @@ foreach(_hdr
include/grpc/impl/codegen/sync_generic.h include/grpc/impl/codegen/sync_generic.h
include/grpc/impl/codegen/sync_posix.h include/grpc/impl/codegen/sync_posix.h
include/grpc/impl/codegen/sync_windows.h include/grpc/impl/codegen/sync_windows.h
include/grpcpp/impl/codegen/sync.h
include/grpc++/impl/codegen/proto_utils.h include/grpc++/impl/codegen/proto_utils.h
include/grpcpp/impl/codegen/proto_buffer_reader.h include/grpcpp/impl/codegen/proto_buffer_reader.h
include/grpcpp/impl/codegen/proto_buffer_writer.h include/grpcpp/impl/codegen/proto_buffer_writer.h
@ -4766,6 +4770,7 @@ foreach(_hdr
include/grpcpp/impl/codegen/stub_options.h include/grpcpp/impl/codegen/stub_options.h
include/grpcpp/impl/codegen/sync_stream.h include/grpcpp/impl/codegen/sync_stream.h
include/grpcpp/impl/codegen/time.h include/grpcpp/impl/codegen/time.h
include/grpcpp/impl/codegen/sync.h
) )
string(REPLACE "include/" "" _path ${_hdr}) string(REPLACE "include/" "" _path ${_hdr})
get_filename_component(_path ${_path} PATH) get_filename_component(_path ${_path} PATH)

@ -5523,6 +5523,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -6134,6 +6135,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc/census.h \ include/grpc/census.h \
LIBGRPC++_CRONET_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_CRONET_SRC)))) LIBGRPC++_CRONET_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_CRONET_SRC))))
@ -6560,6 +6562,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_generic.h \
include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_posix.h \
include/grpc/impl/codegen/sync_windows.h \ include/grpc/impl/codegen/sync_windows.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -6727,6 +6730,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc/impl/codegen/sync_generic.h \ include/grpc/impl/codegen/sync_generic.h \
include/grpc/impl/codegen/sync_posix.h \ include/grpc/impl/codegen/sync_posix.h \
include/grpc/impl/codegen/sync_windows.h \ include/grpc/impl/codegen/sync_windows.h \
include/grpcpp/impl/codegen/sync.h \
include/grpc++/impl/codegen/proto_utils.h \ include/grpc++/impl/codegen/proto_utils.h \
include/grpcpp/impl/codegen/proto_buffer_reader.h \ include/grpcpp/impl/codegen/proto_buffer_reader.h \
include/grpcpp/impl/codegen/proto_buffer_writer.h \ include/grpcpp/impl/codegen/proto_buffer_writer.h \
@ -7059,6 +7063,7 @@ PUBLIC_HEADERS_CXX += \
include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/codegen/sync.h \
LIBGRPC++_UNSECURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_UNSECURE_SRC)))) LIBGRPC++_UNSECURE_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBGRPC++_UNSECURE_SRC))))

@ -196,8 +196,8 @@ filegroups:
- src/core/lib/gprpp/manual_constructor.h - src/core/lib/gprpp/manual_constructor.h
- src/core/lib/gprpp/map.h - src/core/lib/gprpp/map.h
- src/core/lib/gprpp/memory.h - src/core/lib/gprpp/memory.h
- src/core/lib/gprpp/mutex_lock.h
- src/core/lib/gprpp/pair.h - src/core/lib/gprpp/pair.h
- src/core/lib/gprpp/sync.h
- src/core/lib/gprpp/thd.h - src/core/lib/gprpp/thd.h
- src/core/lib/profiling/timers.h - src/core/lib/profiling/timers.h
uses: uses:
@ -1278,6 +1278,7 @@ filegroups:
- include/grpcpp/impl/codegen/time.h - include/grpcpp/impl/codegen/time.h
uses: uses:
- grpc_codegen - grpc_codegen
- grpc++_internal_hdrs_only
- name: grpc++_codegen_base_src - name: grpc++_codegen_base_src
language: c++ language: c++
src: src:
@ -1452,6 +1453,7 @@ filegroups:
- grpc_base_headers - grpc_base_headers
- grpc_transport_inproc_headers - grpc_transport_inproc_headers
- grpc++_codegen_base - grpc++_codegen_base
- grpc++_internal_hdrs_only
- nanopb_headers - nanopb_headers
- health_proto - health_proto
- name: grpc++_config_proto - name: grpc++_config_proto
@ -1459,6 +1461,10 @@ filegroups:
public_headers: public_headers:
- include/grpc++/impl/codegen/config_protobuf.h - include/grpc++/impl/codegen/config_protobuf.h
- include/grpcpp/impl/codegen/config_protobuf.h - include/grpcpp/impl/codegen/config_protobuf.h
- name: grpc++_internal_hdrs_only
language: c++
public_headers:
- include/grpcpp/impl/codegen/sync.h
- name: grpc++_reflection_proto - name: grpc++_reflection_proto
language: c++ language: c++
src: src:

@ -183,7 +183,8 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/codegen/string_ref.h', 'include/grpcpp/impl/codegen/string_ref.h',
'include/grpcpp/impl/codegen/stub_options.h', 'include/grpcpp/impl/codegen/stub_options.h',
'include/grpcpp/impl/codegen/sync_stream.h', 'include/grpcpp/impl/codegen/sync_stream.h',
'include/grpcpp/impl/codegen/time.h' 'include/grpcpp/impl/codegen/time.h',
'include/grpcpp/impl/codegen/sync.h'
end end
s.subspec 'Implementation' do |ss| s.subspec 'Implementation' do |ss|
@ -266,8 +267,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h', 'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',
@ -584,8 +585,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h', 'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/lib/avl/avl.h', 'src/core/lib/avl/avl.h',

@ -210,8 +210,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h', 'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/lib/gpr/alloc.cc', 'src/core/lib/gpr/alloc.cc',
@ -891,8 +891,8 @@ Pod::Spec.new do |s|
'src/core/lib/gprpp/manual_constructor.h', 'src/core/lib/gprpp/manual_constructor.h',
'src/core/lib/gprpp/map.h', 'src/core/lib/gprpp/map.h',
'src/core/lib/gprpp/memory.h', 'src/core/lib/gprpp/memory.h',
'src/core/lib/gprpp/mutex_lock.h',
'src/core/lib/gprpp/pair.h', 'src/core/lib/gprpp/pair.h',
'src/core/lib/gprpp/sync.h',
'src/core/lib/gprpp/thd.h', 'src/core/lib/gprpp/thd.h',
'src/core/lib/profiling/timers.h', 'src/core/lib/profiling/timers.h',
'src/core/ext/transport/chttp2/transport/bin_decoder.h', 'src/core/ext/transport/chttp2/transport/bin_decoder.h',

@ -104,8 +104,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gprpp/manual_constructor.h ) s.files += %w( src/core/lib/gprpp/manual_constructor.h )
s.files += %w( src/core/lib/gprpp/map.h ) s.files += %w( src/core/lib/gprpp/map.h )
s.files += %w( src/core/lib/gprpp/memory.h ) s.files += %w( src/core/lib/gprpp/memory.h )
s.files += %w( src/core/lib/gprpp/mutex_lock.h )
s.files += %w( src/core/lib/gprpp/pair.h ) s.files += %w( src/core/lib/gprpp/pair.h )
s.files += %w( src/core/lib/gprpp/sync.h )
s.files += %w( src/core/lib/gprpp/thd.h ) s.files += %w( src/core/lib/gprpp/thd.h )
s.files += %w( src/core/lib/profiling/timers.h ) s.files += %w( src/core/lib/profiling/timers.h )
s.files += %w( src/core/lib/gpr/alloc.cc ) s.files += %w( src/core/lib/gpr/alloc.cc )

@ -28,6 +28,7 @@
#include <grpcpp/impl/codegen/client_interceptor.h> #include <grpcpp/impl/codegen/client_interceptor.h>
#include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/config.h>
#include <grpcpp/impl/codegen/grpc_library.h> #include <grpcpp/impl/codegen/grpc_library.h>
#include <grpcpp/impl/codegen/sync.h>
struct grpc_channel; struct grpc_channel;
@ -97,7 +98,7 @@ class Channel final : public ChannelInterface,
grpc_channel* const c_channel_; // owned grpc_channel* const c_channel_; // owned
// mu_ protects callback_cq_ (the per-channel callbackable completion queue) // mu_ protects callback_cq_ (the per-channel callbackable completion queue)
std::mutex mu_; grpc::internal::Mutex mu_;
// callback_cq_ references the callbackable completion queue associated // callback_cq_ references the callbackable completion queue associated
// with this channel (if any). It is set on the first call to CallbackCQ(). // with this channel (if any). It is set on the first call to CallbackCQ().

@ -51,6 +51,7 @@
#include <grpcpp/impl/codegen/slice.h> #include <grpcpp/impl/codegen/slice.h>
#include <grpcpp/impl/codegen/status.h> #include <grpcpp/impl/codegen/status.h>
#include <grpcpp/impl/codegen/string_ref.h> #include <grpcpp/impl/codegen/string_ref.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/time.h> #include <grpcpp/impl/codegen/time.h>
struct census_context; struct census_context;
@ -457,7 +458,7 @@ class ClientContext {
bool idempotent_; bool idempotent_;
bool cacheable_; bool cacheable_;
std::shared_ptr<Channel> channel_; std::shared_ptr<Channel> channel_;
std::mutex mu_; grpc::internal::Mutex mu_;
grpc_call* call_; grpc_call* call_;
bool call_canceled_; bool call_canceled_;
gpr_timespec deadline_; gpr_timespec deadline_;

@ -0,0 +1,138 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_IMPL_CODEGEN_SYNC_H
#define GRPCPP_IMPL_CODEGEN_SYNC_H
#include <grpc/impl/codegen/log.h>
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/impl/codegen/sync.h>
#include <grpcpp/impl/codegen/core_codegen_interface.h>
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
// src/core/lib/gprpp/sync.h too.
//
// Whenever possible, prefer "src/core/lib/gprpp/sync.h" over this file,
// since in core we do not rely on g_core_codegen_interface and hence do not
// pay the costs of virtual function calls.
namespace grpc {
namespace internal {
class Mutex {
public:
Mutex() { g_core_codegen_interface->gpr_mu_init(&mu_); }
~Mutex() { g_core_codegen_interface->gpr_mu_destroy(&mu_); }
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
private:
gpr_mu mu_;
};
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit MutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~MutexLock() { g_core_codegen_interface->gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) {
g_core_codegen_interface->gpr_mu_lock(mu_);
}
~ReleasableMutexLock() {
if (!released_) g_core_codegen_interface->gpr_mu_unlock(mu_);
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
g_core_codegen_interface->gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
g_core_codegen_interface->gpr_mu_unlock(mu_);
}
private:
gpr_mu* const mu_;
bool released_ = false;
};
class CondVar {
public:
CondVar() { g_core_codegen_interface->gpr_cv_init(&cv_); }
~CondVar() { g_core_codegen_interface->gpr_cv_destroy(&cv_); }
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
void Signal() { g_core_codegen_interface->gpr_cv_signal(&cv_); }
void Broadcast() { g_core_codegen_interface->gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) {
return Wait(mu,
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return g_core_codegen_interface->gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME));
}
}
private:
gpr_cv cv_;
};
} // namespace internal
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_SYNC_H

@ -297,12 +297,12 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
experimental_registration_type experimental_registration_{this}; experimental_registration_type experimental_registration_{this};
// Server status // Server status
std::mutex mu_; grpc::internal::Mutex mu_;
bool started_; bool started_;
bool shutdown_; bool shutdown_;
bool shutdown_notified_; // Was notify called on the shutdown_cv_ bool shutdown_notified_; // Was notify called on the shutdown_cv_
std::condition_variable shutdown_cv_; grpc::internal::CondVar shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ . // It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
@ -311,8 +311,8 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
// during periods of increasing load; the decrement happens only when memory // during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version) // is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical. // during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_; grpc::internal::Mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_; grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0}; std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_; std::shared_ptr<GlobalCallbacks> global_callbacks_;

@ -304,12 +304,12 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
experimental_registration_type experimental_registration_{this}; experimental_registration_type experimental_registration_{this};
// Server status // Server status
std::mutex mu_; grpc::internal::Mutex mu_;
bool started_; bool started_;
bool shutdown_; bool shutdown_;
bool shutdown_notified_; // Was notify called on the shutdown_cv_ bool shutdown_notified_; // Was notify called on the shutdown_cv_
std::condition_variable shutdown_cv_; grpc::internal::CondVar shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ . // It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
@ -318,8 +318,8 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
// during periods of increasing load; the decrement happens only when memory // during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version) // is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical. // during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_; grpc::internal::Mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_; grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0}; std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_; std::shared_ptr<GlobalCallbacks> global_callbacks_;

@ -109,8 +109,8 @@
<file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/manual_constructor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/map.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/map.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/memory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/mutex_lock.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/pair.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/pair.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/sync.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/gprpp/thd.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/profiling/timers.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/profiling/timers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/gpr/alloc.cc" role="src" />

@ -51,7 +51,7 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"

@ -27,7 +27,7 @@
#include "pb_encode.h" #include "pb_encode.h"
#include "src/core/ext/filters/client_channel/health/health.pb.h" #include "src/core/ext/filters/client_channel/health/health.pb.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/error_utils.h"
#include "src/core/lib/transport/status_metadata.h" #include "src/core/lib/transport/status_metadata.h"
@ -69,7 +69,6 @@ HealthCheckClient::HealthCheckClient(
} }
GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this, GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
gpr_mu_init(&mu_);
StartCall(); StartCall();
} }
@ -78,7 +77,6 @@ HealthCheckClient::~HealthCheckClient() {
gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this); gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
} }
GRPC_ERROR_UNREF(error_); GRPC_ERROR_UNREF(error_);
gpr_mu_destroy(&mu_);
} }
void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state, void HealthCheckClient::NotifyOnHealthChange(grpc_connectivity_state* state,

@ -31,6 +31,7 @@
#include "src/core/lib/gprpp/atomic.h" #include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
@ -157,7 +158,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_pollset_set* interested_parties_; // Do not own. grpc_pollset_set* interested_parties_; // Do not own.
RefCountedPtr<channelz::SubchannelNode> channelz_node_; RefCountedPtr<channelz::SubchannelNode> channelz_node_;
gpr_mu mu_; Mutex mu_;
grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING;
grpc_error* error_ = GRPC_ERROR_NONE; grpc_error* error_ = GRPC_ERROR_NONE;
grpc_connectivity_state* notify_state_ = nullptr; grpc_connectivity_state* notify_state_ = nullptr;

@ -33,7 +33,7 @@
#include "src/core/lib/channel/handshaker_registry.h" #include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/http/format_request.h" #include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h" #include "src/core/lib/http/parser.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"

@ -88,7 +88,6 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"

@ -25,7 +25,7 @@
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
namespace grpc_core { namespace grpc_core {

@ -26,6 +26,7 @@
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core { namespace grpc_core {
@ -41,9 +42,6 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts; typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); }
~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); }
void AddCallStarted(); void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send, void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received); bool finished_known_received);
@ -66,7 +64,7 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
gpr_atm num_calls_finished_ = 0; gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0; gpr_atm num_calls_finished_known_received_ = 0;
gpr_mu drop_count_mu_; // Guards drop_token_counts_. Mutex drop_count_mu_; // Guards drop_token_counts_.
UniquePtr<DroppedCallCounts> drop_token_counts_; UniquePtr<DroppedCallCounts> drop_token_counts_;
}; };

@ -27,7 +27,7 @@
#include "src/core/ext/filters/client_channel/server_address.h" #include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
@ -154,13 +154,12 @@ class PickFirst : public LoadBalancingPolicy {
/// Lock and data used to capture snapshots of this channels child /// Lock and data used to capture snapshots of this channels child
/// channels and subchannels. This data is consumed by channelz. /// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_; Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_; channelz::ChildRefsList child_channels_;
}; };
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&child_refs_mu_);
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p created.", this); gpr_log(GPR_INFO, "Pick First %p created.", this);
} }
@ -170,7 +169,6 @@ PickFirst::~PickFirst() {
if (grpc_lb_pick_first_trace.enabled()) { if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Destroying Pick First %p", this); gpr_log(GPR_INFO, "Destroying Pick First %p", this);
} }
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
} }

@ -36,8 +36,8 @@
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
@ -188,7 +188,7 @@ class RoundRobin : public LoadBalancingPolicy {
bool shutdown_ = false; bool shutdown_ = false;
/// Lock and data used to capture snapshots of this channel's child /// Lock and data used to capture snapshots of this channel's child
/// channels and subchannels. This data is consumed by channelz. /// channels and subchannels. This data is consumed by channelz.
gpr_mu child_refs_mu_; Mutex child_refs_mu_;
channelz::ChildRefsList child_subchannels_; channelz::ChildRefsList child_subchannels_;
channelz::ChildRefsList child_channels_; channelz::ChildRefsList child_channels_;
}; };
@ -240,7 +240,6 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
// //
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
gpr_mu_init(&child_refs_mu_);
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Created", this); gpr_log(GPR_INFO, "[RR %p] Created", this);
} }
@ -250,7 +249,6 @@ RoundRobin::~RoundRobin() {
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
} }
gpr_mu_destroy(&child_refs_mu_);
GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(subchannel_list_ == nullptr);
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
} }

@ -89,9 +89,9 @@
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h" #include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
@ -278,10 +278,8 @@ class XdsLb : public LoadBalancingPolicy {
class LocalityEntry : public InternallyRefCounted<LocalityEntry> { class LocalityEntry : public InternallyRefCounted<LocalityEntry> {
public: public:
explicit LocalityEntry(RefCountedPtr<XdsLb> parent) explicit LocalityEntry(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) { : parent_(std::move(parent)) {}
gpr_mu_init(&child_policy_mu_); ~LocalityEntry() = default;
}
~LocalityEntry() { gpr_mu_destroy(&child_policy_mu_); }
void UpdateLocked(xds_grpclb_serverlist* serverlist, void UpdateLocked(xds_grpclb_serverlist* serverlist,
LoadBalancingPolicy::Config* child_policy_config, LoadBalancingPolicy::Config* child_policy_config,
@ -323,13 +321,10 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
// Lock held when modifying the value of child_policy_ or // Lock held when modifying the value of child_policy_ or
// pending_child_policy_. // pending_child_policy_.
gpr_mu child_policy_mu_; Mutex child_policy_mu_;
RefCountedPtr<XdsLb> parent_; RefCountedPtr<XdsLb> parent_;
}; };
LocalityMap() { gpr_mu_init(&child_refs_mu_); }
~LocalityMap() { gpr_mu_destroy(&child_refs_mu_); }
void UpdateLocked(const LocalityList& locality_list, void UpdateLocked(const LocalityList& locality_list,
LoadBalancingPolicy::Config* child_policy_config, LoadBalancingPolicy::Config* child_policy_config,
const grpc_channel_args* args, XdsLb* parent); const grpc_channel_args* args, XdsLb* parent);
@ -343,7 +338,7 @@ class XdsLb : public LoadBalancingPolicy {
Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_; Map<UniquePtr<char>, OrphanablePtr<LocalityEntry>, StringLess> map_;
// Lock held while filling child refs for all localities // Lock held while filling child refs for all localities
// inside the map // inside the map
gpr_mu child_refs_mu_; Mutex child_refs_mu_;
}; };
struct LocalityServerlistEntry { struct LocalityServerlistEntry {
@ -397,7 +392,7 @@ class XdsLb : public LoadBalancingPolicy {
// Mutex to protect the channel to the LB server. This is used when // Mutex to protect the channel to the LB server. This is used when
// processing a channelz request. // processing a channelz request.
// TODO(juanlishen): Replace this with atomic. // TODO(juanlishen): Replace this with atomic.
gpr_mu lb_chand_mu_; Mutex lb_chand_mu_;
// Timeout in milliseconds for the LB call. 0 means no deadline. // Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0; int lb_call_timeout_ms_ = 0;
@ -1090,7 +1085,6 @@ XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)), : LoadBalancingPolicy(std::move(args)),
locality_map_(), locality_map_(),
locality_serverlist_() { locality_serverlist_() {
gpr_mu_init(&lb_chand_mu_);
// Record server name. // Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg); const char* server_uri = grpc_channel_arg_get_string(arg);
@ -1114,7 +1108,6 @@ XdsLb::XdsLb(Args args)
} }
XdsLb::~XdsLb() { XdsLb::~XdsLb() {
gpr_mu_destroy(&lb_chand_mu_);
gpr_free((void*)server_name_); gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_); grpc_channel_args_destroy(args_);
locality_serverlist_.clear(); locality_serverlist_.clear();

@ -48,7 +48,7 @@
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"

@ -42,8 +42,8 @@
#include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/profiling/timers.h" #include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
@ -454,13 +454,14 @@ struct Subchannel::ExternalStateWatcher {
grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_,
w->pollset_set); w->pollset_set);
} }
gpr_mu_lock(&w->subchannel->mu_); {
if (w->subchannel->external_state_watcher_list_ == w) { MutexLock lock(&w->subchannel->mu_);
w->subchannel->external_state_watcher_list_ = w->next; if (w->subchannel->external_state_watcher_list_ == w) {
w->subchannel->external_state_watcher_list_ = w->next;
}
if (w->next != nullptr) w->next->prev = w->prev;
if (w->prev != nullptr) w->prev->next = w->next;
} }
if (w->next != nullptr) w->next->prev = w->prev;
if (w->prev != nullptr) w->prev->next = w->next;
gpr_mu_unlock(&w->subchannel->mu_);
GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done");
Delete(w); Delete(w);
GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
@ -582,7 +583,6 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
"subchannel"); "subchannel");
grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE,
"subchannel"); "subchannel");
gpr_mu_init(&mu_);
// Check whether we should enable health checking. // Check whether we should enable health checking.
const char* service_config_json = grpc_channel_arg_get_string( const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG)); grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG));
@ -629,7 +629,6 @@ Subchannel::~Subchannel() {
grpc_connector_unref(connector_); grpc_connector_unref(connector_);
grpc_pollset_set_destroy(pollset_set_); grpc_pollset_set_destroy(pollset_set_);
Delete(key_); Delete(key_);
gpr_mu_destroy(&mu_);
} }
Subchannel* Subchannel::Create(grpc_connector* connector, Subchannel* Subchannel::Create(grpc_connector* connector,
@ -903,7 +902,9 @@ void Subchannel::MaybeStartConnectingLocked() {
void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
Subchannel* c = static_cast<Subchannel*>(arg); Subchannel* c = static_cast<Subchannel*>(arg);
gpr_mu_lock(&c->mu_); // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use
// MutexLock instead of ReleasableMutexLock, here.
ReleasableMutexLock lock(&c->mu_);
c->have_retry_alarm_ = false; c->have_retry_alarm_ = false;
if (c->disconnected_) { if (c->disconnected_) {
error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
@ -917,9 +918,9 @@ void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
c->ContinueConnectingLocked(); c->ContinueConnectingLocked();
gpr_mu_unlock(&c->mu_); lock.Unlock();
} else { } else {
gpr_mu_unlock(&c->mu_); lock.Unlock();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -944,24 +945,25 @@ void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
auto* c = static_cast<Subchannel*>(arg); auto* c = static_cast<Subchannel*>(arg);
grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
gpr_mu_lock(&c->mu_); {
c->connecting_ = false; MutexLock lock(&c->mu_);
if (c->connecting_result_.transport != nullptr && c->connecting_ = false;
c->PublishTransportLocked()) { if (c->connecting_result_.transport != nullptr &&
// Do nothing, transport was published. c->PublishTransportLocked()) {
} else if (c->disconnected_) { // Do nothing, transport was published.
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } else if (c->disconnected_) {
} else { GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error)); } else {
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connect_failed"); "connect_failed");
grpc_connectivity_state_set(&c->state_and_health_tracker_, c->MaybeStartConnectingLocked();
GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
"connect_failed"); }
c->MaybeStartConnectingLocked();
GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
} }
gpr_mu_unlock(&c->mu_);
GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished");
grpc_channel_args_destroy(delete_channel_args); grpc_channel_args_destroy(delete_channel_args);
} }

@ -29,6 +29,7 @@
#include "src/core/lib/gpr/arena.h" #include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
@ -263,7 +264,7 @@ class Subchannel {
// pollset_set tracking who's interested in a connection being setup. // pollset_set tracking who's interested in a connection being setup.
grpc_pollset_set* pollset_set_; grpc_pollset_set* pollset_set_;
// Protects the other members. // Protects the other members.
gpr_mu mu_; Mutex mu_;
// Refcount // Refcount
// - lower INTERNAL_REF_BITS bits are for internal references: // - lower INTERNAL_REF_BITS bits are for internal references:
// these do not keep the subchannel open. // these do not keep the subchannel open.

@ -23,7 +23,7 @@
#include "src/core/lib/channel/channelz_registry.h" #include "src/core/lib/channel/channelz_registry.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>

@ -27,8 +27,8 @@
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"

@ -1,42 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
#define GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
namespace grpc_core {
class MutexLock {
public:
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu); }
~MutexLock() { gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_MUTEX_LOCK_H */

@ -0,0 +1,126 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_SYNC_H
#define GRPC_CORE_LIB_GPRPP_SYNC_H
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/impl/codegen/log.h>
#include <grpc/impl/codegen/sync.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
// The core library is not accessible in C++ codegen headers, and vice versa.
// Thus, we need to have duplicate headers with similar functionality.
// Make sure any change to this file is also reflected in
// include/grpcpp/impl/codegen/sync.h.
//
// Whenever possible, prefer using this file over <grpcpp/impl/codegen/sync.h>
// since this file doesn't rely on g_core_codegen_interface and hence does not
// pay the costs of virtual function calls.
namespace grpc_core {
class Mutex {
public:
Mutex() { gpr_mu_init(&mu_); }
~Mutex() { gpr_mu_destroy(&mu_); }
Mutex(const Mutex&) = delete;
Mutex& operator=(const Mutex&) = delete;
gpr_mu* get() { return &mu_; }
const gpr_mu* get() const { return &mu_; }
private:
gpr_mu mu_;
};
// MutexLock is a std::
class MutexLock {
public:
explicit MutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit MutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~MutexLock() { gpr_mu_unlock(mu_); }
MutexLock(const MutexLock&) = delete;
MutexLock& operator=(const MutexLock&) = delete;
private:
gpr_mu* const mu_;
};
class ReleasableMutexLock {
public:
explicit ReleasableMutexLock(Mutex* mu) : mu_(mu->get()) { gpr_mu_lock(mu_); }
explicit ReleasableMutexLock(gpr_mu* mu) : mu_(mu) { gpr_mu_lock(mu_); }
~ReleasableMutexLock() {
if (!released_) gpr_mu_unlock(mu_);
}
ReleasableMutexLock(const ReleasableMutexLock&) = delete;
ReleasableMutexLock& operator=(const ReleasableMutexLock&) = delete;
void Lock() {
GPR_DEBUG_ASSERT(released_);
gpr_mu_lock(mu_);
released_ = false;
}
void Unlock() {
GPR_DEBUG_ASSERT(!released_);
released_ = true;
gpr_mu_unlock(mu_);
}
private:
gpr_mu* const mu_;
bool released_ = false;
};
class CondVar {
public:
CondVar() { gpr_cv_init(&cv_); }
~CondVar() { gpr_cv_destroy(&cv_); }
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
void Signal() { gpr_cv_signal(&cv_); }
void Broadcast() { gpr_cv_broadcast(&cv_); }
int Wait(Mutex* mu) { return Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME)); }
int Wait(Mutex* mu, const gpr_timespec& deadline) {
return gpr_cv_wait(&cv_, mu->get(), deadline);
}
template <typename Predicate>
void WaitUntil(Mutex* mu, Predicate pred) {
while (!pred()) {
Wait(mu, gpr_inf_future(GPR_CLOCK_REALTIME));
}
}
private:
gpr_cv cv_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_SYNC_H */

@ -47,7 +47,7 @@
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/block_annotate.h" #include "src/core/lib/iomgr/block_annotate.h"
#include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h" #include "src/core/lib/iomgr/is_epollexclusive_available.h"

@ -33,7 +33,7 @@
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/http/parser.h" #include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"

@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h" #include "src/core/tsi/ssl/session_cache/ssl_session.h"
#include "src/core/tsi/ssl/session_cache/ssl_session_cache.h" #include "src/core/tsi/ssl/session_cache/ssl_session_cache.h"

@ -232,7 +232,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
CompletionQueue* Channel::CallbackCQ() { CompletionQueue* Channel::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ // TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-channel CQ registered // if there is no explicit per-channel CQ registered
std::lock_guard<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) { if (callback_cq_ == nullptr) {
auto* shutdown_callback = new ShutdownCallback; auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{

@ -25,6 +25,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpcpp/impl/codegen/interceptor_common.h> #include <grpcpp/impl/codegen/interceptor_common.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/impl/grpc_library.h> #include <grpcpp/impl/grpc_library.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
@ -84,7 +85,7 @@ void ClientContext::AddMetadata(const grpc::string& meta_key,
void ClientContext::set_call(grpc_call* call, void ClientContext::set_call(grpc_call* call,
const std::shared_ptr<Channel>& channel) { const std::shared_ptr<Channel>& channel) {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
GPR_ASSERT(call_ == nullptr); GPR_ASSERT(call_ == nullptr);
call_ = call; call_ = call;
channel_ = channel; channel_ = channel;
@ -114,7 +115,7 @@ void ClientContext::set_compression_algorithm(
} }
void ClientContext::TryCancel() { void ClientContext::TryCancel() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
if (call_) { if (call_) {
SendCancelToInterceptors(); SendCancelToInterceptors();
grpc_call_cancel(call_, nullptr); grpc_call_cancel(call_, nullptr);

@ -21,6 +21,7 @@
#include <mutex> #include <mutex>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpcpp/impl/codegen/sync.h>
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
@ -40,27 +41,27 @@ DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
void DynamicThreadPool::DynamicThread::ThreadFunc() { void DynamicThreadPool::DynamicThread::ThreadFunc() {
pool_->ThreadFunc(); pool_->ThreadFunc();
// Now that we have killed ourselves, we should reduce the thread count // Now that we have killed ourselves, we should reduce the thread count
std::unique_lock<std::mutex> lock(pool_->mu_); grpc_core::MutexLock lock(&pool_->mu_);
pool_->nthreads_--; pool_->nthreads_--;
// Move ourselves to dead list // Move ourselves to dead list
pool_->dead_threads_.push_back(this); pool_->dead_threads_.push_back(this);
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
pool_->shutdown_cv_.notify_one(); pool_->shutdown_cv_.Signal();
} }
} }
void DynamicThreadPool::ThreadFunc() { void DynamicThreadPool::ThreadFunc() {
for (;;) { for (;;) {
// Wait until work is available or we are shutting down. // Wait until work is available or we are shutting down.
std::unique_lock<std::mutex> lock(mu_); grpc_core::ReleasableMutexLock lock(&mu_);
if (!shutdown_ && callbacks_.empty()) { if (!shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread // If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) { if (threads_waiting_ >= reserve_threads_) {
break; break;
} }
threads_waiting_++; threads_waiting_++;
cv_.wait(lock); cv_.Wait(&mu_);
threads_waiting_--; threads_waiting_--;
} }
// Drain callbacks before considering shutdown to ensure all work // Drain callbacks before considering shutdown to ensure all work
@ -68,7 +69,7 @@ void DynamicThreadPool::ThreadFunc() {
if (!callbacks_.empty()) { if (!callbacks_.empty()) {
auto cb = callbacks_.front(); auto cb = callbacks_.front();
callbacks_.pop(); callbacks_.pop();
lock.unlock(); lock.Unlock();
cb(); cb();
} else if (shutdown_) { } else if (shutdown_) {
break; break;
@ -82,7 +83,7 @@ DynamicThreadPool::DynamicThreadPool(int reserve_threads)
nthreads_(0), nthreads_(0),
threads_waiting_(0) { threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) { for (int i = 0; i < reserve_threads_; i++) {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
nthreads_++; nthreads_++;
new DynamicThread(this); new DynamicThread(this);
} }
@ -95,17 +96,17 @@ void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
} }
DynamicThreadPool::~DynamicThreadPool() { DynamicThreadPool::~DynamicThreadPool() {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
shutdown_ = true; shutdown_ = true;
cv_.notify_all(); cv_.Broadcast();
while (nthreads_ != 0) { while (nthreads_ != 0) {
shutdown_cv_.wait(lock); shutdown_cv_.Wait(&mu_);
} }
ReapThreads(&dead_threads_); ReapThreads(&dead_threads_);
} }
void DynamicThreadPool::Add(const std::function<void()>& callback) { void DynamicThreadPool::Add(const std::function<void()>& callback) {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list // Add works to the callbacks list
callbacks_.push(callback); callbacks_.push(callback);
// Increase pool size or notify as needed // Increase pool size or notify as needed
@ -114,7 +115,7 @@ void DynamicThreadPool::Add(const std::function<void()>& callback) {
nthreads_++; nthreads_++;
new DynamicThread(this); new DynamicThread(this);
} else { } else {
cv_.notify_one(); cv_.Signal();
} }
// Also use this chance to harvest dead threads // Also use this chance to harvest dead threads
if (!dead_threads_.empty()) { if (!dead_threads_.empty()) {

@ -27,6 +27,7 @@
#include <grpcpp/support/config.h> #include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/thread_pool_interface.h" #include "src/cpp/server/thread_pool_interface.h"
@ -50,9 +51,9 @@ class DynamicThreadPool final : public ThreadPoolInterface {
grpc_core::Thread thd_; grpc_core::Thread thd_;
void ThreadFunc(); void ThreadFunc();
}; };
std::mutex mu_; grpc_core::Mutex mu_;
std::condition_variable cv_; grpc_core::CondVar cv_;
std::condition_variable shutdown_cv_; grpc_core::CondVar shutdown_cv_;
bool shutdown_; bool shutdown_;
std::queue<std::function<void()>> callbacks_; std::queue<std::function<void()>> callbacks_;
int reserve_threads_; int reserve_threads_;

@ -41,7 +41,7 @@ DefaultHealthCheckService::DefaultHealthCheckService() {
void DefaultHealthCheckService::SetServingStatus( void DefaultHealthCheckService::SetServingStatus(
const grpc::string& service_name, bool serving) { const grpc::string& service_name, bool serving) {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) { if (shutdown_) {
// Set to NOT_SERVING in case service_name is not in the map. // Set to NOT_SERVING in case service_name is not in the map.
serving = false; serving = false;
@ -51,7 +51,7 @@ void DefaultHealthCheckService::SetServingStatus(
void DefaultHealthCheckService::SetServingStatus(bool serving) { void DefaultHealthCheckService::SetServingStatus(bool serving) {
const ServingStatus status = serving ? SERVING : NOT_SERVING; const ServingStatus status = serving ? SERVING : NOT_SERVING;
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) { if (shutdown_) {
return; return;
} }
@ -62,7 +62,7 @@ void DefaultHealthCheckService::SetServingStatus(bool serving) {
} }
void DefaultHealthCheckService::Shutdown() { void DefaultHealthCheckService::Shutdown() {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
if (shutdown_) { if (shutdown_) {
return; return;
} }
@ -76,7 +76,7 @@ void DefaultHealthCheckService::Shutdown() {
DefaultHealthCheckService::ServingStatus DefaultHealthCheckService::ServingStatus
DefaultHealthCheckService::GetServingStatus( DefaultHealthCheckService::GetServingStatus(
const grpc::string& service_name) const { const grpc::string& service_name) const {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name); auto it = services_map_.find(service_name);
if (it == services_map_.end()) { if (it == services_map_.end()) {
return NOT_FOUND; return NOT_FOUND;
@ -88,7 +88,7 @@ DefaultHealthCheckService::GetServingStatus(
void DefaultHealthCheckService::RegisterCallHandler( void DefaultHealthCheckService::RegisterCallHandler(
const grpc::string& service_name, const grpc::string& service_name,
std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) { std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
ServiceData& service_data = services_map_[service_name]; ServiceData& service_data = services_map_[service_name];
service_data.AddCallHandler(handler /* copies ref */); service_data.AddCallHandler(handler /* copies ref */);
HealthCheckServiceImpl::CallHandler* h = handler.get(); HealthCheckServiceImpl::CallHandler* h = handler.get();
@ -98,7 +98,7 @@ void DefaultHealthCheckService::RegisterCallHandler(
void DefaultHealthCheckService::UnregisterCallHandler( void DefaultHealthCheckService::UnregisterCallHandler(
const grpc::string& service_name, const grpc::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) { const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
auto it = services_map_.find(service_name); auto it = services_map_.find(service_name);
if (it == services_map_.end()) return; if (it == services_map_.end()) return;
ServiceData& service_data = it->second; ServiceData& service_data = it->second;
@ -166,7 +166,7 @@ DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
// We will reach here after the server starts shutting down. // We will reach here after the server starts shutting down.
shutdown_ = true; shutdown_ = true;
{ {
std::unique_lock<std::mutex> lock(cq_shutdown_mu_); grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown(); cq_->Shutdown();
} }
thread_->Join(); thread_->Join();
@ -266,7 +266,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
std::make_shared<CheckCallHandler>(cq, database, service); std::make_shared<CheckCallHandler>(cq, database, service);
CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get()); CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
{ {
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return; if (service->shutdown_) return;
// Request a Check() call. // Request a Check() call.
handler->next_ = handler->next_ =
@ -311,7 +311,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
} }
// Send response. // Send response.
{ {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) { if (!service_->shutdown_) {
next_ = next_ =
CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this, CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
@ -347,7 +347,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
std::make_shared<WatchCallHandler>(cq, database, service); std::make_shared<WatchCallHandler>(cq, database, service);
WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get()); WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
{ {
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return; if (service->shutdown_) return;
// Request AsyncNotifyWhenDone(). // Request AsyncNotifyWhenDone().
handler->on_done_notified_ = handler->on_done_notified_ =
@ -402,7 +402,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) { SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
std::unique_lock<std::mutex> lock(send_mu_); grpc_core::MutexLock lock(&send_mu_);
// If there's already a send in flight, cache the new status, and // If there's already a send in flight, cache the new status, and
// we'll start a new send for it when the one in flight completes. // we'll start a new send for it when the one in flight completes.
if (send_in_flight_) { if (send_in_flight_) {
@ -420,7 +420,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
ByteBuffer response; ByteBuffer response;
bool success = service_->EncodeResponse(status, &response); bool success = service_->EncodeResponse(status, &response);
// Grab shutdown lock and send response. // Grab shutdown lock and send response.
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
SendFinishLocked(std::move(self), Status::CANCELLED); SendFinishLocked(std::move(self), Status::CANCELLED);
return; return;
@ -442,7 +442,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::move(self), Status::CANCELLED); SendFinish(std::move(self), Status::CANCELLED);
return; return;
} }
std::unique_lock<std::mutex> lock(send_mu_); grpc_core::MutexLock lock(&send_mu_);
send_in_flight_ = false; send_in_flight_ = false;
// If we got a new status since we started the last send, start a // If we got a new status since we started the last send, start a
// new send for it. // new send for it.
@ -456,7 +456,7 @@ void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self, const Status& status) { SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
if (finish_called_) return; if (finish_called_) return;
std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_); grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) return; if (service_->shutdown_) return;
SendFinishLocked(std::move(self), status); SendFinishLocked(std::move(self), status);
} }

@ -31,6 +31,7 @@
#include <grpcpp/impl/codegen/service_type.h> #include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/support/byte_buffer.h> #include <grpcpp/support/byte_buffer.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
namespace grpc { namespace grpc {
@ -197,7 +198,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
GenericServerAsyncWriter stream_; GenericServerAsyncWriter stream_;
ServerContext ctx_; ServerContext ctx_;
std::mutex send_mu_; grpc_core::Mutex send_mu_;
bool send_in_flight_ = false; // Guarded by mu_. bool send_in_flight_ = false; // Guarded by mu_.
ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_.
@ -226,7 +227,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
// To synchronize the operations related to shutdown state of cq_, so that // To synchronize the operations related to shutdown state of cq_, so that
// we don't enqueue new tags into cq_ after it is already shut down. // we don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_; grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false}; std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_; std::unique_ptr<::grpc_core::Thread> thread_;
}; };
@ -273,7 +274,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
const grpc::string& service_name, const grpc::string& service_name,
const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
mutable std::mutex mu_; mutable grpc_core::Mutex mu_;
bool shutdown_ = false; // Guarded by mu_. bool shutdown_ = false; // Guarded by mu_.
std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_. std::map<grpc::string, ServiceData> services_map_; // Guarded by mu_.
std::unique_ptr<HealthCheckServiceImpl> impl_; std::unique_ptr<HealthCheckServiceImpl> impl_;

@ -239,7 +239,7 @@ grpc::string LoadReporter::GenerateLbId() {
::grpc::lb::v1::LoadBalancingFeedback ::grpc::lb::v1::LoadBalancingFeedback
LoadReporter::GenerateLoadBalancingFeedback() { LoadReporter::GenerateLoadBalancingFeedback() {
std::unique_lock<std::mutex> lock(feedback_mu_); grpc_core::ReleasableMutexLock lock(&feedback_mu_);
auto now = std::chrono::system_clock::now(); auto now = std::chrono::system_clock::now();
// Discard records outside the window until there is only one record // Discard records outside the window until there is only one record
// outside the window, which is used as the base for difference. // outside the window, which is used as the base for difference.
@ -277,7 +277,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
double cpu_limit = newest->cpu_limit - oldest->cpu_limit; double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
std::chrono::duration<double> duration_seconds = std::chrono::duration<double> duration_seconds =
newest->end_time - oldest->end_time; newest->end_time - oldest->end_time;
lock.unlock(); lock.Unlock();
::grpc::lb::v1::LoadBalancingFeedback feedback; ::grpc::lb::v1::LoadBalancingFeedback feedback;
feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit)); feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
feedback.set_calls_per_second( feedback.set_calls_per_second(
@ -290,7 +290,7 @@ LoadReporter::GenerateLoadBalancingFeedback() {
::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
LoadReporter::GenerateLoads(const grpc::string& hostname, LoadReporter::GenerateLoads(const grpc::string& hostname,
const grpc::string& lb_id) { const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id); auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
GPR_ASSERT(assigned_stores != nullptr); GPR_ASSERT(assigned_stores != nullptr);
GPR_ASSERT(!assigned_stores->empty()); GPR_ASSERT(!assigned_stores->empty());
@ -371,7 +371,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
// This will make the load balancing feedback generation a no-op. // This will make the load balancing feedback generation a no-op.
cpu_stats = {0, 0}; cpu_stats = {0, 0};
} }
std::unique_lock<std::mutex> lock(feedback_mu_); grpc_core::MutexLock lock(&feedback_mu_);
feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors, feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
cpu_stats.first, cpu_stats.second); cpu_stats.first, cpu_stats.second);
} }
@ -379,7 +379,7 @@ void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
void LoadReporter::ReportStreamCreated(const grpc::string& hostname, void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
const grpc::string& lb_id, const grpc::string& lb_id,
const grpc::string& load_key) { const grpc::string& load_key) {
std::lock_guard<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
load_data_store_.ReportStreamCreated(hostname, lb_id, load_key); load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).", "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
@ -388,7 +388,7 @@ void LoadReporter::ReportStreamCreated(const grpc::string& hostname,
void LoadReporter::ReportStreamClosed(const grpc::string& hostname, void LoadReporter::ReportStreamClosed(const grpc::string& hostname,
const grpc::string& lb_id) { const grpc::string& lb_id) {
std::lock_guard<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
load_data_store_.ReportStreamClosed(hostname, lb_id); load_data_store_.ReportStreamClosed(hostname, lb_id);
gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this, gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
hostname.c_str(), lb_id.c_str()); hostname.c_str(), lb_id.c_str());
@ -407,7 +407,7 @@ void LoadReporter::ProcessViewDataCallStart(
LoadRecordKey key(client_ip_and_token, user_id); LoadRecordKey key(client_ip_and_token, user_id);
LoadRecordValue value = LoadRecordValue(start_count); LoadRecordValue value = LoadRecordValue(start_count);
{ {
std::unique_lock<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value); load_data_store_.MergeRow(host, key, value);
} }
} }
@ -459,7 +459,7 @@ void LoadReporter::ProcessViewDataCallEnd(
LoadRecordValue value = LoadRecordValue( LoadRecordValue value = LoadRecordValue(
0, ok_count, error_count, bytes_sent, bytes_received, latency_ms); 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
{ {
std::unique_lock<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value); load_data_store_.MergeRow(host, key, value);
} }
} }
@ -486,7 +486,7 @@ void LoadReporter::ProcessViewDataOtherCallMetrics(
LoadRecordValue value = LoadRecordValue( LoadRecordValue value = LoadRecordValue(
metric_name, static_cast<uint64_t>(num_calls), total_metric_value); metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
{ {
std::unique_lock<std::mutex> lock(store_mu_); grpc_core::MutexLock lock(&store_mu_);
load_data_store_.MergeRow(host, key, value); load_data_store_.MergeRow(host, key, value);
} }
} }

@ -29,6 +29,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config.h> #include <grpcpp/impl/codegen/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/cpp/server/load_reporter/load_data_store.h" #include "src/cpp/server/load_reporter/load_data_store.h"
#include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
@ -212,11 +213,11 @@ class LoadReporter {
std::atomic<int64_t> next_lb_id_{0}; std::atomic<int64_t> next_lb_id_{0};
const std::chrono::seconds feedback_sample_window_seconds_; const std::chrono::seconds feedback_sample_window_seconds_;
std::mutex feedback_mu_; grpc_core::Mutex feedback_mu_;
std::deque<LoadBalancingFeedbackRecord> feedback_records_; std::deque<LoadBalancingFeedbackRecord> feedback_records_;
// TODO(juanlishen): Lock in finer grain. Locking the whole store may be // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
// too expensive. // too expensive.
std::mutex store_mu_; grpc_core::Mutex store_mu_;
LoadDataStore load_data_store_; LoadDataStore load_data_store_;
std::unique_ptr<CensusViewProvider> census_view_provider_; std::unique_ptr<CensusViewProvider> census_view_provider_;
std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;

@ -48,7 +48,7 @@ LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() {
// We will reach here after the server starts shutting down. // We will reach here after the server starts shutting down.
shutdown_ = true; shutdown_ = true;
{ {
std::unique_lock<std::mutex> lock(cq_shutdown_mu_); grpc_core::MutexLock lock(&cq_shutdown_mu_);
cq_->Shutdown(); cq_->Shutdown();
} }
if (next_fetch_and_sample_alarm_ != nullptr) if (next_fetch_and_sample_alarm_ != nullptr)
@ -62,7 +62,7 @@ void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() {
gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000, gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000,
GPR_TIMESPAN)); GPR_TIMESPAN));
{ {
std::unique_lock<std::mutex> lock(cq_shutdown_mu_); grpc_core::MutexLock lock(&cq_shutdown_mu_);
if (shutdown_) return; if (shutdown_) return;
// TODO(juanlishen): Improve the Alarm implementation to reuse a single // TODO(juanlishen): Improve the Alarm implementation to reuse a single
// instance for multiple events. // instance for multiple events.
@ -119,7 +119,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart(
std::make_shared<ReportLoadHandler>(cq, service, load_reporter); std::make_shared<ReportLoadHandler>(cq, service, load_reporter);
ReportLoadHandler* p = handler.get(); ReportLoadHandler* p = handler.get();
{ {
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
if (service->shutdown_) return; if (service->shutdown_) return;
p->on_done_notified_ = p->on_done_notified_ =
CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p, CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p,
@ -164,9 +164,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered(
// instance will deallocate itself when it's done. // instance will deallocate itself when it's done.
CreateAndStart(cq_, service_, load_reporter_); CreateAndStart(cq_, service_, load_reporter_);
{ {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
lock.release()->unlock(); lock.Unlock();
Shutdown(std::move(self), "OnRequestDelivered"); Shutdown(std::move(self), "OnRequestDelivered");
return; return;
} }
@ -222,9 +222,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
SendReport(self, true /* ok */); SendReport(self, true /* ok */);
// Expect this read to fail. // Expect this read to fail.
{ {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
lock.release()->unlock(); lock.Unlock();
Shutdown(std::move(self), "OnReadDone"); Shutdown(std::move(self), "OnReadDone");
return; return;
} }
@ -254,9 +254,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN)); gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN));
{ {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
lock.release()->unlock(); lock.Unlock();
Shutdown(std::move(self), "ScheduleNextReport"); Shutdown(std::move(self), "ScheduleNextReport");
return; return;
} }
@ -294,9 +294,9 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport(
call_status_ = INITIAL_RESPONSE_SENT; call_status_ = INITIAL_RESPONSE_SENT;
} }
{ {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::ReleasableMutexLock lock(&service_->cq_shutdown_mu_);
if (service_->shutdown_) { if (service_->shutdown_) {
lock.release()->unlock(); lock.Unlock();
Shutdown(std::move(self), "SendReport"); Shutdown(std::move(self), "SendReport");
return; return;
} }
@ -342,7 +342,7 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown(
// OnRequestDelivered() may be called after OnDoneNotified(), so we need to // OnRequestDelivered() may be called after OnDoneNotified(), so we need to
// try to Finish() every time we are in Shutdown(). // try to Finish() every time we are in Shutdown().
if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) { if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) {
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
if (!service_->shutdown_) { if (!service_->shutdown_) {
on_finish_done_ = on_finish_done_ =
CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this, CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this,

@ -25,6 +25,7 @@
#include <grpcpp/alarm.h> #include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "src/cpp/server/load_reporter/load_reporter.h" #include "src/cpp/server/load_reporter/load_reporter.h"
@ -181,7 +182,7 @@ class LoadReporterAsyncServiceImpl
std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<ServerCompletionQueue> cq_;
// To synchronize the operations related to shutdown state of cq_, so that we // To synchronize the operations related to shutdown state of cq_, so that we
// don't enqueue new tags into cq_ after it is already shut down. // don't enqueue new tags into cq_ after it is already shut down.
std::mutex cq_shutdown_mu_; grpc_core::Mutex cq_shutdown_mu_;
std::atomic_bool shutdown_{false}; std::atomic_bool shutdown_{false};
std::unique_ptr<::grpc_core::Thread> thread_; std::unique_ptr<::grpc_core::Thread> thread_;
std::unique_ptr<LoadReporter> load_reporter_; std::unique_ptr<LoadReporter> load_reporter_;

@ -388,9 +388,9 @@ class Server::CallbackRequest final : public Server::CallbackRequestBase {
// The counter of outstanding requests must be decremented // The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown. // under a lock in case it causes the server shutdown.
std::lock_guard<std::mutex> l(server_->callback_reqs_mu_); grpc::internal::MutexLock l(&server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) { if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.notify_one(); server_->callback_reqs_done_cv_.Signal();
} }
} }
@ -814,12 +814,12 @@ Server::Server(
Server::~Server() { Server::~Server() {
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::ReleasableMutexLock lock(&mu_);
if (callback_cq_ != nullptr) { if (callback_cq_ != nullptr) {
callback_cq_->Shutdown(); callback_cq_->Shutdown();
} }
if (started_ && !shutdown_) { if (started_ && !shutdown_) {
lock.unlock(); lock.Unlock();
Shutdown(); Shutdown();
} else if (!started_) { } else if (!started_) {
// Shutdown the completion queues // Shutdown the completion queues
@ -1051,7 +1051,7 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
} }
void Server::ShutdownInternal(gpr_timespec deadline) { void Server::ShutdownInternal(gpr_timespec deadline) {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
if (shutdown_) { if (shutdown_) {
return; return;
} }
@ -1102,9 +1102,9 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// will report a failure, indicating a shutdown and again we won't end // will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter. // up incrementing the counter.
{ {
std::unique_lock<std::mutex> cblock(callback_reqs_mu_); grpc::internal::MutexLock cblock(&callback_reqs_mu_);
callback_reqs_done_cv_.wait( callback_reqs_done_cv_.WaitUntil(
cblock, [this] { return callback_reqs_outstanding_ == 0; }); &callback_reqs_mu_, [this] { return callback_reqs_outstanding_ == 0; });
} }
// Drain the shutdown queue (if the previous call to AsyncNext() timed out // Drain the shutdown queue (if the previous call to AsyncNext() timed out
@ -1114,13 +1114,13 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
} }
shutdown_notified_ = true; shutdown_notified_ = true;
shutdown_cv_.notify_all(); shutdown_cv_.Broadcast();
} }
void Server::Wait() { void Server::Wait() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
while (started_ && !shutdown_notified_) { while (started_ && !shutdown_notified_) {
shutdown_cv_.wait(lock); shutdown_cv_.Wait(&mu_);
} }
} }
@ -1322,7 +1322,7 @@ class ShutdownCallback : public grpc_experimental_completion_queue_functor {
CompletionQueue* Server::CallbackCQ() { CompletionQueue* Server::CallbackCQ() {
// TODO(vjpai): Consider using a single global CQ for the default CQ // TODO(vjpai): Consider using a single global CQ for the default CQ
// if there is no explicit per-server CQ registered // if there is no explicit per-server CQ registered
std::lock_guard<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
if (callback_cq_ == nullptr) { if (callback_cq_ == nullptr) {
auto* shutdown_callback = new ShutdownCallback; auto* shutdown_callback = new ShutdownCallback;
callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{ callback_cq_ = new CompletionQueue(grpc_completion_queue_attributes{

@ -33,6 +33,7 @@
#include <grpcpp/support/time.h> #include <grpcpp/support/time.h>
#include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/call.h"
namespace grpc { namespace grpc {
@ -96,7 +97,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
} }
void SetCancelCallback(std::function<void()> callback) { void SetCancelCallback(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
if (finalized_ && (cancelled_ != 0)) { if (finalized_ && (cancelled_ != 0)) {
callback(); callback();
@ -107,7 +108,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
} }
void ClearCancelCallback() { void ClearCancelCallback() {
std::lock_guard<std::mutex> g(mu_); grpc_core::MutexLock g(&mu_);
cancel_callback_ = nullptr; cancel_callback_ = nullptr;
} }
@ -144,7 +145,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
private: private:
bool CheckCancelledNoPluck() { bool CheckCancelledNoPluck() {
std::lock_guard<std::mutex> g(mu_); grpc_core::MutexLock lock(&mu_);
return finalized_ ? (cancelled_ != 0) : false; return finalized_ ? (cancelled_ != 0) : false;
} }
@ -154,7 +155,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
void* tag_; void* tag_;
void* core_cq_tag_; void* core_cq_tag_;
grpc_core::RefCount refs_; grpc_core::RefCount refs_;
std::mutex mu_; grpc_core::Mutex mu_;
bool finalized_; bool finalized_;
int cancelled_; // This is an int (not bool) because it is passed to core int cancelled_; // This is an int (not bool) because it is passed to core
std::function<void()> cancel_callback_; std::function<void()> cancel_callback_;
@ -186,7 +187,7 @@ void ServerContext::CompletionOp::FillOps(internal::Call* call) {
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
bool ret = false; bool ret = false;
std::unique_lock<std::mutex> lock(mu_); grpc_core::ReleasableMutexLock lock(&mu_);
if (done_intercepting_) { if (done_intercepting_) {
/* We are done intercepting. */ /* We are done intercepting. */
if (has_tag_) { if (has_tag_) {
@ -218,14 +219,12 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
cancel_callback_(); cancel_callback_();
} }
// Release the lock since we are going to be calling a callback and // Release the lock since we may call a callback and interceptors now.
// interceptors now lock.Unlock();
lock.unlock();
if (call_cancel && reactor_ != nullptr) { if (call_cancel && reactor_ != nullptr) {
reactor_->MaybeCallOnCancel(); reactor_->MaybeCallOnCancel();
} }
/* Add interception point and run through interceptors */ /* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint( interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE); experimental::InterceptionHookPoints::POST_RECV_CLOSE);

@ -62,7 +62,7 @@ ThreadManager::ThreadManager(const char* name,
ThreadManager::~ThreadManager() { ThreadManager::~ThreadManager() {
{ {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(num_threads_ == 0); GPR_ASSERT(num_threads_ == 0);
} }
@ -72,38 +72,38 @@ ThreadManager::~ThreadManager() {
} }
void ThreadManager::Wait() { void ThreadManager::Wait() {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
while (num_threads_ != 0) { while (num_threads_ != 0) {
shutdown_cv_.wait(lock); shutdown_cv_.Wait(&mu_);
} }
} }
void ThreadManager::Shutdown() { void ThreadManager::Shutdown() {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
shutdown_ = true; shutdown_ = true;
} }
bool ThreadManager::IsShutdown() { bool ThreadManager::IsShutdown() {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
return shutdown_; return shutdown_;
} }
int ThreadManager::GetMaxActiveThreadsSoFar() { int ThreadManager::GetMaxActiveThreadsSoFar() {
std::lock_guard<std::mutex> list_lock(list_mu_); grpc_core::MutexLock list_lock(&list_mu_);
return max_active_threads_sofar_; return max_active_threads_sofar_;
} }
void ThreadManager::MarkAsCompleted(WorkerThread* thd) { void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
{ {
std::lock_guard<std::mutex> list_lock(list_mu_); grpc_core::MutexLock list_lock(&list_mu_);
completed_threads_.push_back(thd); completed_threads_.push_back(thd);
} }
{ {
std::lock_guard<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
num_threads_--; num_threads_--;
if (num_threads_ == 0) { if (num_threads_ == 0) {
shutdown_cv_.notify_one(); shutdown_cv_.Signal();
} }
} }
@ -116,7 +116,7 @@ void ThreadManager::CleanupCompletedThreads() {
{ {
// swap out the completed threads list: allows other threads to clean up // swap out the completed threads list: allows other threads to clean up
// more quickly // more quickly
std::unique_lock<std::mutex> lock(list_mu_); grpc_core::MutexLock lock(&list_mu_);
completed_threads.swap(completed_threads_); completed_threads.swap(completed_threads_);
} }
for (auto thd : completed_threads) delete thd; for (auto thd : completed_threads) delete thd;
@ -132,7 +132,7 @@ void ThreadManager::Initialize() {
} }
{ {
std::unique_lock<std::mutex> lock(mu_); grpc_core::MutexLock lock(&mu_);
num_pollers_ = min_pollers_; num_pollers_ = min_pollers_;
num_threads_ = min_pollers_; num_threads_ = min_pollers_;
max_active_threads_sofar_ = min_pollers_; max_active_threads_sofar_ = min_pollers_;
@ -149,7 +149,7 @@ void ThreadManager::MainWorkLoop() {
bool ok; bool ok;
WorkStatus work_status = PollForWork(&tag, &ok); WorkStatus work_status = PollForWork(&tag, &ok);
std::unique_lock<std::mutex> lock(mu_); grpc_core::ReleasableMutexLock lock(&mu_);
// Reduce the number of pollers by 1 and check what happened with the poll // Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--; num_pollers_--;
bool done = false; bool done = false;
@ -176,30 +176,30 @@ void ThreadManager::MainWorkLoop() {
max_active_threads_sofar_ = num_threads_; max_active_threads_sofar_ = num_threads_;
} }
// Drop lock before spawning thread to avoid contention // Drop lock before spawning thread to avoid contention
lock.unlock(); lock.Unlock();
new WorkerThread(this); new WorkerThread(this);
} else if (num_pollers_ > 0) { } else if (num_pollers_ > 0) {
// There is still at least some thread polling, so we can go on // There is still at least some thread polling, so we can go on
// even though we are below the number of pollers that we would // even though we are below the number of pollers that we would
// like to have (min_pollers_) // like to have (min_pollers_)
lock.unlock(); lock.Unlock();
} else { } else {
// There are no pollers to spare and we couldn't allocate // There are no pollers to spare and we couldn't allocate
// a new thread, so resources are exhausted! // a new thread, so resources are exhausted!
lock.unlock(); lock.Unlock();
resource_exhausted = true; resource_exhausted = true;
} }
} else { } else {
// There are a sufficient number of pollers available so we can do // There are a sufficient number of pollers available so we can do
// the work and continue polling with our existing poller threads // the work and continue polling with our existing poller threads
lock.unlock(); lock.Unlock();
} }
// Lock is always released at this point - do the application work // Lock is always released at this point - do the application work
// or return resource exhausted if there is new work but we couldn't // or return resource exhausted if there is new work but we couldn't
// get a thread in which to do it. // get a thread in which to do it.
DoWork(tag, ok, !resource_exhausted); DoWork(tag, ok, !resource_exhausted);
// Take the lock again to check post conditions // Take the lock again to check post conditions
lock.lock(); lock.Lock();
// If we're shutdown, we should finish at this point. // If we're shutdown, we should finish at this point.
if (shutdown_) done = true; if (shutdown_) done = true;
break; break;

@ -26,6 +26,7 @@
#include <grpcpp/support/config.h> #include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/resource_quota.h"
@ -140,10 +141,10 @@ class ThreadManager {
// Protects shutdown_, num_pollers_, num_threads_ and // Protects shutdown_, num_pollers_, num_threads_ and
// max_active_threads_sofar_ // max_active_threads_sofar_
std::mutex mu_; grpc_core::Mutex mu_;
bool shutdown_; bool shutdown_;
std::condition_variable shutdown_cv_; grpc_core::CondVar shutdown_cv_;
// The resource user object to use when requesting quota to create threads // The resource user object to use when requesting quota to create threads
// //
@ -169,7 +170,7 @@ class ThreadManager {
// ever set so far // ever set so far
int max_active_threads_sofar_; int max_active_threads_sofar_;
std::mutex list_mu_; grpc_core::Mutex list_mu_;
std::list<WorkerThread*> completed_threads_; std::list<WorkerThread*> completed_threads_;
}; };

@ -31,6 +31,7 @@
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
@ -168,24 +169,24 @@ class ClientChannelStressTest {
explicit ServerThread(const grpc::string& type, explicit ServerThread(const grpc::string& type,
const grpc::string& server_host, T* service) const grpc::string& server_host, T* service)
: type_(type), service_(service) { : type_(type), service_(service) {
std::mutex mu; grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Start from firing before the wait below is hit. // by ServerThread::Start from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu); grpc::internal::MutexLock lock(&mu);
port_ = grpc_pick_unused_port_or_die(); port_ = grpc_pick_unused_port_or_die();
gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
std::condition_variable cond; grpc::internal::CondVar cond;
thread_.reset(new std::thread( thread_.reset(new std::thread(
std::bind(&ServerThread::Start, this, server_host, &mu, &cond))); std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
cond.wait(lock); cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
} }
void Start(const grpc::string& server_host, std::mutex* mu, void Start(const grpc::string& server_host, grpc::internal::Mutex* mu,
std::condition_variable* cond) { grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed. // below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu); grpc::internal::MutexLock lock(mu);
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
ServerBuilder builder; ServerBuilder builder;
@ -193,7 +194,7 @@ class ClientChannelStressTest {
InsecureServerCredentials()); InsecureServerCredentials());
builder.RegisterService(service_); builder.RegisterService(service_);
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
cond->notify_one(); cond->Signal();
} }
void Shutdown() { void Shutdown() {

@ -33,6 +33,7 @@
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/health_check_service_interface.h> #include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
@ -98,7 +99,7 @@ class MyTestServiceImpl : public TestServiceImpl {
Status Echo(ServerContext* context, const EchoRequest* request, Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override { EchoResponse* response) override {
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
++request_count_; ++request_count_;
} }
AddClient(context->peer()); AddClient(context->peer());
@ -106,29 +107,29 @@ class MyTestServiceImpl : public TestServiceImpl {
} }
int request_count() { int request_count() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return request_count_; return request_count_;
} }
void ResetCounters() { void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
request_count_ = 0; request_count_ = 0;
} }
std::set<grpc::string> clients() { std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
return clients_; return clients_;
} }
private: private:
void AddClient(const grpc::string& client) { void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client); clients_.insert(client);
} }
std::mutex mu_; grpc::internal::Mutex mu_;
int request_count_; int request_count_;
std::mutex clients_mu_; grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_; std::set<grpc::string> clients_;
}; };
@ -293,18 +294,18 @@ class ClientLbEnd2endTest : public ::testing::Test {
void Start(const grpc::string& server_host) { void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_); gpr_log(GPR_INFO, "starting server on port %d", port_);
started_ = true; started_ = true;
std::mutex mu; grpc::internal::Mutex mu;
std::unique_lock<std::mutex> lock(mu); grpc::internal::MutexLock lock(&mu);
std::condition_variable cond; grpc::internal::CondVar cond;
thread_.reset(new std::thread( thread_.reset(new std::thread(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; }); cond.WaitUntil(&mu, [this] { return server_ready_; });
server_ready_ = false; server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete"); gpr_log(GPR_INFO, "server startup complete");
} }
void Serve(const grpc::string& server_host, std::mutex* mu, void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
std::condition_variable* cond) { grpc::internal::CondVar* cond) {
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
ServerBuilder builder; ServerBuilder builder;
@ -313,9 +314,9 @@ class ClientLbEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), std::move(creds)); builder.AddListeningPort(server_address.str(), std::move(creds));
builder.RegisterService(&service_); builder.RegisterService(&service_);
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
std::lock_guard<std::mutex> lock(*mu); grpc::internal::MutexLock lock(mu);
server_ready_ = true; server_ready_ = true;
cond->notify_one(); cond->Signal();
} }
void Shutdown() { void Shutdown() {
@ -1374,7 +1375,7 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
void TearDown() override { ClientLbEnd2endTest::TearDown(); } void TearDown() override { ClientLbEnd2endTest::TearDown(); }
int trailers_intercepted() { int trailers_intercepted() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return trailers_intercepted_; return trailers_intercepted_;
} }
@ -1382,11 +1383,11 @@ class ClientLbInterceptTrailingMetadataTest : public ClientLbEnd2endTest {
static void ReportTrailerIntercepted(void* arg) { static void ReportTrailerIntercepted(void* arg) {
ClientLbInterceptTrailingMetadataTest* self = ClientLbInterceptTrailingMetadataTest* self =
static_cast<ClientLbInterceptTrailingMetadataTest*>(arg); static_cast<ClientLbInterceptTrailingMetadataTest*>(arg);
std::unique_lock<std::mutex> lock(self->mu_); grpc::internal::MutexLock lock(&self->mu_);
self->trailers_intercepted_++; self->trailers_intercepted_++;
} }
std::mutex mu_; grpc::internal::Mutex mu_;
int trailers_intercepted_ = 0; int trailers_intercepted_ = 0;
}; };

@ -30,6 +30,7 @@
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
@ -85,32 +86,32 @@ template <typename ServiceType>
class CountedService : public ServiceType { class CountedService : public ServiceType {
public: public:
size_t request_count() { size_t request_count() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return request_count_; return request_count_;
} }
size_t response_count() { size_t response_count() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return response_count_; return response_count_;
} }
void IncreaseResponseCount() { void IncreaseResponseCount() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
++response_count_; ++response_count_;
} }
void IncreaseRequestCount() { void IncreaseRequestCount() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
++request_count_; ++request_count_;
} }
void ResetCounters() { void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
request_count_ = 0; request_count_ = 0;
response_count_ = 0; response_count_ = 0;
} }
protected: protected:
std::mutex mu_; grpc::internal::Mutex mu_;
private: private:
size_t request_count_ = 0; size_t request_count_ = 0;
@ -148,18 +149,18 @@ class BackendServiceImpl : public BackendService {
void Shutdown() {} void Shutdown() {}
std::set<grpc::string> clients() { std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
return clients_; return clients_;
} }
private: private:
void AddClient(const grpc::string& client) { void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client); clients_.insert(client);
} }
std::mutex mu_; grpc::internal::Mutex mu_;
std::mutex clients_mu_; grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_; std::set<grpc::string> clients_;
}; };
@ -210,7 +211,7 @@ class BalancerServiceImpl : public BalancerService {
Status BalanceLoad(ServerContext* context, Stream* stream) override { Status BalanceLoad(ServerContext* context, Stream* stream) override {
gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
if (serverlist_done_) goto done; if (serverlist_done_) goto done;
} }
{ {
@ -237,7 +238,7 @@ class BalancerServiceImpl : public BalancerService {
} }
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
responses_and_delays = responses_and_delays_; responses_and_delays = responses_and_delays_;
} }
for (const auto& response_and_delay : responses_and_delays) { for (const auto& response_and_delay : responses_and_delays) {
@ -245,8 +246,8 @@ class BalancerServiceImpl : public BalancerService {
response_and_delay.second); response_and_delay.second);
} }
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; });
} }
if (client_load_reporting_interval_seconds_ > 0) { if (client_load_reporting_interval_seconds_ > 0) {
@ -257,7 +258,7 @@ class BalancerServiceImpl : public BalancerService {
GPR_ASSERT(request.has_client_stats()); GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed. // below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
client_stats_.num_calls_started += client_stats_.num_calls_started +=
request.client_stats().num_calls_started(); request.client_stats().num_calls_started();
client_stats_.num_calls_finished += client_stats_.num_calls_finished +=
@ -274,7 +275,7 @@ class BalancerServiceImpl : public BalancerService {
drop_token_count.num_calls(); drop_token_count.num_calls();
} }
load_report_ready_ = true; load_report_ready_ = true;
load_report_cond_.notify_one(); load_report_cond_.Signal();
} }
} }
} }
@ -284,12 +285,12 @@ class BalancerServiceImpl : public BalancerService {
} }
void add_response(const LoadBalanceResponse& response, int send_after_ms) { void add_response(const LoadBalanceResponse& response, int send_after_ms) {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
} }
void Start() { void Start() {
std::lock_guard<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
serverlist_done_ = false; serverlist_done_ = false;
load_report_ready_ = false; load_report_ready_ = false;
responses_and_delays_.clear(); responses_and_delays_.clear();
@ -326,17 +327,17 @@ class BalancerServiceImpl : public BalancerService {
} }
const ClientStats& WaitForLoadReport() { const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
load_report_cond_.wait(lock, [this] { return load_report_ready_; }); load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; });
load_report_ready_ = false; load_report_ready_ = false;
return client_stats_; return client_stats_;
} }
void NotifyDoneWithServerlists() { void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
if (!serverlist_done_) { if (!serverlist_done_) {
serverlist_done_ = true; serverlist_done_ = true;
serverlist_cond_.notify_all(); serverlist_cond_.Broadcast();
} }
} }
@ -355,10 +356,10 @@ class BalancerServiceImpl : public BalancerService {
const int client_load_reporting_interval_seconds_; const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_; std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_; grpc::internal::Mutex mu_;
std::condition_variable load_report_cond_; grpc::internal::CondVar load_report_cond_;
bool load_report_ready_ = false; bool load_report_ready_ = false;
std::condition_variable serverlist_cond_; grpc::internal::CondVar serverlist_cond_;
bool serverlist_done_ = false; bool serverlist_done_ = false;
ClientStats client_stats_; ClientStats client_stats_;
}; };
@ -624,22 +625,22 @@ class GrpclbEnd2endTest : public ::testing::Test {
GPR_ASSERT(!running_); GPR_ASSERT(!running_);
running_ = true; running_ = true;
service_.Start(); service_.Start();
std::mutex mu; grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Serve from firing before the wait below is hit. // by ServerThread::Serve from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu); grpc::internal::MutexLock lock(&mu);
std::condition_variable cond; grpc::internal::CondVar cond;
thread_.reset(new std::thread( thread_.reset(new std::thread(
std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)));
cond.wait(lock); cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
} }
void Serve(const grpc::string& server_host, std::mutex* mu, void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
std::condition_variable* cond) { grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed. // below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu); grpc::internal::MutexLock lock(mu);
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
ServerBuilder builder; ServerBuilder builder;
@ -648,7 +649,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), creds); builder.AddListeningPort(server_address.str(), creds);
builder.RegisterService(&service_); builder.RegisterService(&service_);
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
cond->notify_one(); cond->Signal();
} }
void Shutdown() { void Shutdown() {

@ -25,6 +25,7 @@
#include <grpcpp/channel.h> #include <grpcpp/channel.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h> #include <grpcpp/create_channel.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/resource_quota.h> #include <grpcpp/resource_quota.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
@ -188,7 +189,7 @@ class CommonStressTestAsyncServer : public BaseClass {
} }
void TearDown() override { void TearDown() override {
{ {
std::unique_lock<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
this->TearDownStart(); this->TearDownStart();
shutting_down_ = true; shutting_down_ = true;
cq_->Shutdown(); cq_->Shutdown();
@ -229,7 +230,7 @@ class CommonStressTestAsyncServer : public BaseClass {
} }
} }
void RefreshContext(int i) { void RefreshContext(int i) {
std::unique_lock<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
if (!shutting_down_) { if (!shutting_down_) {
contexts_[i].state = Context::READY; contexts_[i].state = Context::READY;
contexts_[i].srv_ctx.reset(new ServerContext); contexts_[i].srv_ctx.reset(new ServerContext);
@ -253,7 +254,7 @@ class CommonStressTestAsyncServer : public BaseClass {
::grpc::testing::EchoTestService::AsyncService service_; ::grpc::testing::EchoTestService::AsyncService service_;
std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<ServerCompletionQueue> cq_;
bool shutting_down_; bool shutting_down_;
std::mutex mu_; grpc::internal::Mutex mu_;
std::vector<std::thread> server_threads_; std::vector<std::thread> server_threads_;
}; };
@ -341,9 +342,9 @@ class AsyncClientEnd2endTest : public ::testing::Test {
} }
void Wait() { void Wait() {
std::unique_lock<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
while (rpcs_outstanding_ != 0) { while (rpcs_outstanding_ != 0) {
cv_.wait(l); cv_.Wait(&mu_);
} }
cq_.Shutdown(); cq_.Shutdown();
@ -366,7 +367,7 @@ class AsyncClientEnd2endTest : public ::testing::Test {
call->response_reader->Finish(&call->response, &call->status, call->response_reader->Finish(&call->response, &call->status,
(void*)call); (void*)call);
std::unique_lock<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
rpcs_outstanding_++; rpcs_outstanding_++;
} }
} }
@ -384,20 +385,20 @@ class AsyncClientEnd2endTest : public ::testing::Test {
bool notify; bool notify;
{ {
std::unique_lock<std::mutex> l(mu_); grpc::internal::MutexLock l(&mu_);
rpcs_outstanding_--; rpcs_outstanding_--;
notify = (rpcs_outstanding_ == 0); notify = (rpcs_outstanding_ == 0);
} }
if (notify) { if (notify) {
cv_.notify_all(); cv_.Signal();
} }
} }
} }
Common common_; Common common_;
CompletionQueue cq_; CompletionQueue cq_;
std::mutex mu_; grpc::internal::Mutex mu_;
std::condition_variable cv_; grpc::internal::CondVar cv_;
int rpcs_outstanding_; int rpcs_outstanding_;
}; };

@ -84,32 +84,32 @@ template <typename ServiceType>
class CountedService : public ServiceType { class CountedService : public ServiceType {
public: public:
size_t request_count() { size_t request_count() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return request_count_; return request_count_;
} }
size_t response_count() { size_t response_count() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
return response_count_; return response_count_;
} }
void IncreaseResponseCount() { void IncreaseResponseCount() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
++response_count_; ++response_count_;
} }
void IncreaseRequestCount() { void IncreaseRequestCount() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
++request_count_; ++request_count_;
} }
void ResetCounters() { void ResetCounters() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
request_count_ = 0; request_count_ = 0;
response_count_ = 0; response_count_ = 0;
} }
protected: protected:
std::mutex mu_; grpc::internal::Mutex mu_;
private: private:
size_t request_count_ = 0; size_t request_count_ = 0;
@ -145,18 +145,18 @@ class BackendServiceImpl : public BackendService {
void Shutdown() {} void Shutdown() {}
std::set<grpc::string> clients() { std::set<grpc::string> clients() {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
return clients_; return clients_;
} }
private: private:
void AddClient(const grpc::string& client) { void AddClient(const grpc::string& client) {
std::unique_lock<std::mutex> lock(clients_mu_); grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client); clients_.insert(client);
} }
std::mutex mu_; grpc::internal::Mutex mu_;
std::mutex clients_mu_; grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_; std::set<grpc::string> clients_;
}; };
@ -208,7 +208,7 @@ class BalancerServiceImpl : public BalancerService {
// TODO(juanlishen): Clean up the scoping. // TODO(juanlishen): Clean up the scoping.
gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this);
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
if (serverlist_done_) goto done; if (serverlist_done_) goto done;
} }
{ {
@ -234,7 +234,7 @@ class BalancerServiceImpl : public BalancerService {
} }
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
responses_and_delays = responses_and_delays_; responses_and_delays = responses_and_delays_;
} }
for (const auto& response_and_delay : responses_and_delays) { for (const auto& response_and_delay : responses_and_delays) {
@ -242,8 +242,8 @@ class BalancerServiceImpl : public BalancerService {
response_and_delay.second); response_and_delay.second);
} }
{ {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
serverlist_cond_.wait(lock, [this] { return serverlist_done_; }); serverlist_cond_.WaitUntil(&mu_, [this] { return serverlist_done_; });
} }
if (client_load_reporting_interval_seconds_ > 0) { if (client_load_reporting_interval_seconds_ > 0) {
@ -254,7 +254,7 @@ class BalancerServiceImpl : public BalancerService {
GPR_ASSERT(request.has_client_stats()); GPR_ASSERT(request.has_client_stats());
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed. // below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
client_stats_.num_calls_started += client_stats_.num_calls_started +=
request.client_stats().num_calls_started(); request.client_stats().num_calls_started();
client_stats_.num_calls_finished += client_stats_.num_calls_finished +=
@ -271,7 +271,7 @@ class BalancerServiceImpl : public BalancerService {
drop_token_count.num_calls(); drop_token_count.num_calls();
} }
load_report_ready_ = true; load_report_ready_ = true;
load_report_cond_.notify_one(); load_report_cond_.Signal();
} }
} }
} }
@ -281,12 +281,12 @@ class BalancerServiceImpl : public BalancerService {
} }
void add_response(const LoadBalanceResponse& response, int send_after_ms) { void add_response(const LoadBalanceResponse& response, int send_after_ms) {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
responses_and_delays_.push_back(std::make_pair(response, send_after_ms)); responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
} }
void Shutdown() { void Shutdown() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
NotifyDoneWithServerlistsLocked(); NotifyDoneWithServerlistsLocked();
responses_and_delays_.clear(); responses_and_delays_.clear();
client_stats_.Reset(); client_stats_.Reset();
@ -318,21 +318,21 @@ class BalancerServiceImpl : public BalancerService {
} }
const ClientStats& WaitForLoadReport() { const ClientStats& WaitForLoadReport() {
std::unique_lock<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
load_report_cond_.wait(lock, [this] { return load_report_ready_; }); load_report_cond_.WaitUntil(&mu_, [this] { return load_report_ready_; });
load_report_ready_ = false; load_report_ready_ = false;
return client_stats_; return client_stats_;
} }
void NotifyDoneWithServerlists() { void NotifyDoneWithServerlists() {
std::lock_guard<std::mutex> lock(mu_); grpc::internal::MutexLock lock(&mu_);
NotifyDoneWithServerlistsLocked(); NotifyDoneWithServerlistsLocked();
} }
void NotifyDoneWithServerlistsLocked() { void NotifyDoneWithServerlistsLocked() {
if (!serverlist_done_) { if (!serverlist_done_) {
serverlist_done_ = true; serverlist_done_ = true;
serverlist_cond_.notify_all(); serverlist_cond_.Broadcast();
} }
} }
@ -351,10 +351,10 @@ class BalancerServiceImpl : public BalancerService {
const int client_load_reporting_interval_seconds_; const int client_load_reporting_interval_seconds_;
std::vector<ResponseDelayPair> responses_and_delays_; std::vector<ResponseDelayPair> responses_and_delays_;
std::mutex mu_; grpc::internal::Mutex mu_;
std::condition_variable load_report_cond_; grpc::internal::CondVar load_report_cond_;
bool load_report_ready_ = false; bool load_report_ready_ = false;
std::condition_variable serverlist_cond_; grpc::internal::CondVar serverlist_cond_;
bool serverlist_done_ = false; bool serverlist_done_ = false;
ClientStats client_stats_; ClientStats client_stats_;
}; };
@ -637,22 +637,22 @@ class XdsEnd2endTest : public ::testing::Test {
gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_); gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
GPR_ASSERT(!running_); GPR_ASSERT(!running_);
running_ = true; running_ = true;
std::mutex mu; grpc::internal::Mutex mu;
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// by ServerThread::Serve from firing before the wait below is hit. // by ServerThread::Serve from firing before the wait below is hit.
std::unique_lock<std::mutex> lock(mu); grpc::internal::MutexLock lock(&mu);
std::condition_variable cond; grpc::internal::CondVar cond;
thread_.reset(new std::thread( thread_.reset(new std::thread(
std::bind(&ServerThread::Serve, this, server_host, &mu, &cond))); std::bind(&ServerThread::Serve, this, server_host, &mu, &cond)));
cond.wait(lock); cond.Wait(&mu);
gpr_log(GPR_INFO, "%s server startup complete", type_.c_str()); gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
} }
void Serve(const grpc::string& server_host, std::mutex* mu, void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
std::condition_variable* cond) { grpc::internal::CondVar* cond) {
// We need to acquire the lock here in order to prevent the notify_one // We need to acquire the lock here in order to prevent the notify_one
// below from firing before its corresponding wait is executed. // below from firing before its corresponding wait is executed.
std::lock_guard<std::mutex> lock(*mu); grpc::internal::MutexLock lock(mu);
std::ostringstream server_address; std::ostringstream server_address;
server_address << server_host << ":" << port_; server_address << server_host << ":" << port_;
ServerBuilder builder; ServerBuilder builder;
@ -661,7 +661,7 @@ class XdsEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address.str(), creds); builder.AddListeningPort(server_address.str(), creds);
builder.RegisterService(&service_); builder.RegisterService(&service_);
server_ = builder.BuildAndStart(); server_ = builder.BuildAndStart();
cond->notify_one(); cond->Signal();
} }
void Shutdown() { void Shutdown() {

@ -987,6 +987,7 @@ include/grpcpp/impl/codegen/status.h \
include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/status_code_enum.h \
include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/string_ref.h \
include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync.h \
include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/grpc_library.h \ include/grpcpp/impl/grpc_library.h \

@ -989,6 +989,7 @@ include/grpcpp/impl/codegen/status.h \
include/grpcpp/impl/codegen/status_code_enum.h \ include/grpcpp/impl/codegen/status_code_enum.h \
include/grpcpp/impl/codegen/string_ref.h \ include/grpcpp/impl/codegen/string_ref.h \
include/grpcpp/impl/codegen/stub_options.h \ include/grpcpp/impl/codegen/stub_options.h \
include/grpcpp/impl/codegen/sync.h \
include/grpcpp/impl/codegen/sync_stream.h \ include/grpcpp/impl/codegen/sync_stream.h \
include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/grpc_library.h \ include/grpcpp/impl/grpc_library.h \
@ -1086,12 +1087,12 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \ src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \ src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/sync.h \
src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd.h \
src/core/lib/http/format_request.h \ src/core/lib/http/format_request.h \
src/core/lib/http/httpcli.h \ src/core/lib/http/httpcli.h \

@ -1168,12 +1168,12 @@ src/core/lib/gprpp/inlined_vector.h \
src/core/lib/gprpp/manual_constructor.h \ src/core/lib/gprpp/manual_constructor.h \
src/core/lib/gprpp/map.h \ src/core/lib/gprpp/map.h \
src/core/lib/gprpp/memory.h \ src/core/lib/gprpp/memory.h \
src/core/lib/gprpp/mutex_lock.h \
src/core/lib/gprpp/optional.h \ src/core/lib/gprpp/optional.h \
src/core/lib/gprpp/orphanable.h \ src/core/lib/gprpp/orphanable.h \
src/core/lib/gprpp/pair.h \ src/core/lib/gprpp/pair.h \
src/core/lib/gprpp/ref_counted.h \ src/core/lib/gprpp/ref_counted.h \
src/core/lib/gprpp/ref_counted_ptr.h \ src/core/lib/gprpp/ref_counted_ptr.h \
src/core/lib/gprpp/sync.h \
src/core/lib/gprpp/thd.h \ src/core/lib/gprpp/thd.h \
src/core/lib/gprpp/thd_posix.cc \ src/core/lib/gprpp/thd_posix.cc \
src/core/lib/gprpp/thd_windows.cc \ src/core/lib/gprpp/thd_windows.cc \

@ -8050,8 +8050,8 @@
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h", "src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h" "src/core/lib/profiling/timers.h"
], ],
@ -8098,8 +8098,8 @@
"src/core/lib/gprpp/manual_constructor.h", "src/core/lib/gprpp/manual_constructor.h",
"src/core/lib/gprpp/map.h", "src/core/lib/gprpp/map.h",
"src/core/lib/gprpp/memory.h", "src/core/lib/gprpp/memory.h",
"src/core/lib/gprpp/mutex_lock.h",
"src/core/lib/gprpp/pair.h", "src/core/lib/gprpp/pair.h",
"src/core/lib/gprpp/sync.h",
"src/core/lib/gprpp/thd.h", "src/core/lib/gprpp/thd.h",
"src/core/lib/profiling/timers.h" "src/core/lib/profiling/timers.h"
], ],
@ -9880,6 +9880,7 @@
}, },
{ {
"deps": [ "deps": [
"grpc++_internal_hdrs_only",
"grpc_codegen" "grpc_codegen"
], ],
"headers": [ "headers": [
@ -10076,6 +10077,7 @@
"gpr", "gpr",
"gpr_base_headers", "gpr_base_headers",
"grpc++_codegen_base", "grpc++_codegen_base",
"grpc++_internal_hdrs_only",
"grpc_base_headers", "grpc_base_headers",
"grpc_transport_inproc_headers", "grpc_transport_inproc_headers",
"health_proto", "health_proto",
@ -10370,6 +10372,20 @@
"third_party": false, "third_party": false,
"type": "filegroup" "type": "filegroup"
}, },
{
"deps": [],
"headers": [
"include/grpcpp/impl/codegen/sync.h"
],
"is_filegroup": true,
"language": "c++",
"name": "grpc++_internal_hdrs_only",
"src": [
"include/grpcpp/impl/codegen/sync.h"
],
"third_party": false,
"type": "filegroup"
},
{ {
"deps": [], "deps": [],
"headers": [ "headers": [

Loading…
Cancel
Save