Eliminate grpc_core::Atomic (#27025)

* Eliminate grpc_core::Atomic

* Automated change: Fix sanity tests

* Automated change: Fix sanity tests

* initialize things

* fix include order

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
Co-authored-by: Mark D. Roth <roth@google.com>
pull/27069/head
Craig Tiller 4 years ago committed by GitHub
parent 40ab21d9ec
commit 59da7bc42a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      BUILD
  2. 5
      build_autogenerated.yaml
  3. 2
      doc/core/moving-to-c++.md
  4. 4
      gRPC-C++.podspec
  5. 4
      gRPC-Core.podspec
  6. 2
      grpc.gemspec
  7. 2
      package.xml
  8. 15
      src/core/ext/filters/client_channel/client_channel.cc
  9. 6
      src/core/ext/filters/client_channel/client_channel.h
  10. 4
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  11. 2
      src/core/ext/filters/client_channel/client_channel_channelz.h
  12. 9
      src/core/ext/filters/client_channel/health/health_check_client.cc
  13. 7
      src/core/ext/filters/client_channel/health/health_check_client.h
  14. 13
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  15. 7
      src/core/ext/filters/client_channel/resolver/google_c2p/google_c2p_resolver.cc
  16. 66
      src/core/ext/filters/client_idle/client_idle_filter.cc
  17. 14
      src/core/ext/filters/fault_injection/fault_injection_filter.cc
  18. 6
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  19. 31
      src/core/ext/xds/xds_client_stats.cc
  20. 12
      src/core/ext/xds/xds_client_stats.h
  21. 76
      src/core/lib/channel/channelz.cc
  22. 54
      src/core/lib/channel/channelz.h
  23. 2
      src/core/lib/config/core_configuration.cc
  24. 2
      src/core/lib/gprpp/arena.cc
  25. 6
      src/core/lib/gprpp/arena.h
  26. 104
      src/core/lib/gprpp/atomic.h
  27. 47
      src/core/lib/gprpp/atomic_utils.h
  28. 43
      src/core/lib/gprpp/dual_ref_counted.h
  29. 26
      src/core/lib/gprpp/fork.cc
  30. 8
      src/core/lib/gprpp/fork.h
  31. 14
      src/core/lib/gprpp/mpscq.cc
  32. 11
      src/core/lib/gprpp/mpscq.h
  33. 28
      src/core/lib/gprpp/ref_counted.h
  34. 19
      src/core/lib/iomgr/executor/mpmcqueue.cc
  35. 7
      src/core/lib/iomgr/executor/mpmcqueue.h
  36. 4
      src/core/lib/iomgr/executor/threadpool.cc
  37. 3
      src/core/lib/iomgr/executor/threadpool.h
  38. 16
      src/core/lib/iomgr/tcp_posix.cc
  39. 8
      src/core/lib/iomgr/work_serializer.cc
  40. 2
      src/core/lib/iomgr/work_serializer.h
  41. 6
      src/core/lib/surface/call.cc
  42. 85
      src/core/lib/surface/completion_queue.cc
  43. 6
      src/core/lib/surface/lame_client.cc
  44. 26
      src/core/lib/surface/server.cc
  45. 16
      src/core/lib/surface/server.h
  46. 13
      src/core/lib/transport/connectivity_state.cc
  47. 4
      src/core/lib/transport/connectivity_state.h
  48. 20
      src/core/lib/transport/metadata.cc
  49. 24
      src/core/lib/transport/metadata.h
  50. 9
      test/core/end2end/tests/retry_lb_fail.cc
  51. 6
      test/core/iomgr/threadpool_test.cc
  52. 2
      test/core/surface/concurrent_connectivity_test.cc
  53. 2
      tools/doxygen/Doxyfile.c++.internal
  54. 2
      tools/doxygen/Doxyfile.core.internal

23
BUILD

@ -329,6 +329,13 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "atomic_utils",
language = "c++",
public_hdrs = ["src/core/lib/gprpp/atomic_utils.h"],
deps = ["gpr_platform"],
)
grpc_cc_library(
name = "grpc_unsecure",
srcs = [
@ -670,7 +677,6 @@ grpc_cc_library(
"src/core/lib/gpr/tmpfile.h",
"src/core/lib/gpr/useful.h",
"src/core/lib/gprpp/arena.h",
"src/core/lib/gprpp/atomic.h",
"src/core/lib/gprpp/examine_stack.h",
"src/core/lib/gprpp/fork.h",
"src/core/lib/gprpp/global_config.h",
@ -780,17 +786,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "atomic",
language = "c++",
public_hdrs = [
"src/core/lib/gprpp/atomic.h",
],
deps = [
"gpr",
],
)
grpc_cc_library(
name = "config",
srcs = [
@ -1006,7 +1001,7 @@ grpc_cc_library(
language = "c++",
public_hdrs = ["src/core/lib/gprpp/ref_counted.h"],
deps = [
"atomic",
"atomic_utils",
"debug_location",
"gpr_base",
"grpc_trace",
@ -1019,7 +1014,6 @@ grpc_cc_library(
language = "c++",
public_hdrs = ["src/core/lib/gprpp/dual_ref_counted.h"],
deps = [
"atomic",
"debug_location",
"gpr_base",
"grpc_trace",
@ -1400,7 +1394,6 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:alt_grpc_base_legacy"],
deps = [
"atomic",
"gpr_base",
"grpc_base_c",
],

@ -311,7 +311,6 @@ libs:
- src/core/lib/gpr/tmpfile.h
- src/core/lib/gpr/useful.h
- src/core/lib/gprpp/arena.h
- src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/construct_destruct.h
- src/core/lib/gprpp/debug_location.h
- src/core/lib/gprpp/examine_stack.h
@ -738,7 +737,7 @@ libs:
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/endpoint_config_internal.h
- src/core/lib/event_engine/sockaddr.h
- src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/match.h
@ -1785,7 +1784,7 @@ libs:
- src/core/lib/debug/trace.h
- src/core/lib/event_engine/endpoint_config_internal.h
- src/core/lib/event_engine/sockaddr.h
- src/core/lib/gprpp/atomic.h
- src/core/lib/gprpp/atomic_utils.h
- src/core/lib/gprpp/bitset.h
- src/core/lib/gprpp/dual_ref_counted.h
- src/core/lib/gprpp/match.h

@ -44,8 +44,6 @@ C++ compatible with
- `<ratio>`
- `<system_error>`
- `<filesystem>`
- `grpc_core::Atomic` is prefered over `std::atomic` in gRPC library because it provides
additional debugging information.
## Roadmap

4
gRPC-C++.podspec generated

@ -544,7 +544,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/tmpfile.h',
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/debug_location.h',
@ -1209,7 +1209,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/tmpfile.h',
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/debug_location.h',

4
gRPC-Core.podspec generated

@ -916,7 +916,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/wrap_memcpy.cc',
'src/core/lib/gprpp/arena.cc',
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/debug_location.h',
@ -1797,7 +1797,7 @@ Pod::Spec.new do |s|
'src/core/lib/gpr/tmpfile.h',
'src/core/lib/gpr/useful.h',
'src/core/lib/gprpp/arena.h',
'src/core/lib/gprpp/atomic.h',
'src/core/lib/gprpp/atomic_utils.h',
'src/core/lib/gprpp/bitset.h',
'src/core/lib/gprpp/construct_destruct.h',
'src/core/lib/gprpp/debug_location.h',

2
grpc.gemspec generated

@ -829,7 +829,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/gpr/wrap_memcpy.cc )
s.files += %w( src/core/lib/gprpp/arena.cc )
s.files += %w( src/core/lib/gprpp/arena.h )
s.files += %w( src/core/lib/gprpp/atomic.h )
s.files += %w( src/core/lib/gprpp/atomic_utils.h )
s.files += %w( src/core/lib/gprpp/bitset.h )
s.files += %w( src/core/lib/gprpp/construct_destruct.h )
s.files += %w( src/core/lib/gprpp/debug_location.h )

2
package.xml generated

@ -809,7 +809,7 @@
<file baseinstalldir="/" name="src/core/lib/gpr/wrap_memcpy.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/arena.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/arena.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/atomic.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/atomic_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/bitset.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/construct_destruct.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/gprpp/debug_location.h" role="src" />

@ -811,8 +811,8 @@ void ClientChannel::ExternalConnectivityWatcher::
void ClientChannel::ExternalConnectivityWatcher::Notify(
grpc_connectivity_state state, const absl::Status& /* status */) {
bool done = false;
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
MemoryOrder::RELAXED)) {
if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
std::memory_order_relaxed)) {
return; // Already done.
}
// Remove external watcher.
@ -835,8 +835,8 @@ void ClientChannel::ExternalConnectivityWatcher::Notify(
void ClientChannel::ExternalConnectivityWatcher::Cancel() {
bool done = false;
if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
MemoryOrder::RELAXED)) {
if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
std::memory_order_relaxed)) {
return; // Already done.
}
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
@ -1160,7 +1160,7 @@ ClientChannel::~ClientChannel() {
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
GRPC_ERROR_UNREF(disconnect_error_.load(std::memory_order_relaxed));
}
OrphanablePtr<ClientChannel::LoadBalancedCall>
@ -1803,9 +1803,10 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
GRPC_ERROR_UNREF(op->disconnect_with_error);
} else {
// Disconnect.
GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
GPR_ASSERT(disconnect_error_.load(std::memory_order_relaxed) ==
GRPC_ERROR_NONE);
disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
disconnect_error_.store(op->disconnect_with_error,
std::memory_order_release);
UpdateStateAndPickerLocked(
GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(

@ -183,7 +183,7 @@ class ClientChannel {
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
Atomic<bool> done_{false};
std::atomic<bool> done_{false};
};
struct ResolverQueuedCall {
@ -209,7 +209,7 @@ class ClientChannel {
// Note: Does NOT return a new ref.
grpc_error_handle disconnect_error() const {
return disconnect_error_.Load(MemoryOrder::ACQUIRE);
return disconnect_error_.load(std::memory_order_acquire);
}
// Note: All methods with "Locked" suffix must be invoked from within
@ -348,7 +348,7 @@ class ClientChannel {
// Fields accessed from both data plane mutex and control plane
// work_serializer.
//
Atomic<grpc_error_handle> disconnect_error_;
std::atomic<grpc_error_handle> disconnect_error_{GRPC_ERROR_NONE};
//
// Fields guarded by a mutex, since they need to be accessed

@ -39,7 +39,7 @@ SubchannelNode::SubchannelNode(std::string target_address,
SubchannelNode::~SubchannelNode() {}
void SubchannelNode::UpdateConnectivityState(grpc_connectivity_state state) {
connectivity_state_.Store(state, MemoryOrder::RELAXED);
connectivity_state_.store(state, std::memory_order_relaxed);
}
void SubchannelNode::SetChildSocket(RefCountedPtr<SocketNode> socket) {
@ -50,7 +50,7 @@ void SubchannelNode::SetChildSocket(RefCountedPtr<SocketNode> socket) {
Json SubchannelNode::RenderJson() {
// Create and fill the data child.
grpc_connectivity_state state =
connectivity_state_.Load(MemoryOrder::RELAXED);
connectivity_state_.load(std::memory_order_relaxed);
Json::Object data = {
{"state",
Json::Object{

@ -61,7 +61,7 @@ class SubchannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
Atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
std::atomic<grpc_connectivity_state> connectivity_state_{GRPC_CHANNEL_IDLE};
Mutex socket_mu_;
RefCountedPtr<SocketNode> child_socket_ ABSL_GUARDED_BY(socket_mu_);
std::string target_;

@ -425,8 +425,9 @@ void HealthCheckClient::CallState::StartCancel(void* arg,
void HealthCheckClient::CallState::Cancel() {
bool expected = false;
if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE)) {
if (cancelled_.compare_exchange_strong(expected, true,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
call_->Ref(DEBUG_LOCATION, "cancel").release();
GRPC_CALL_COMBINER_START(
&call_combiner_,
@ -471,7 +472,7 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(
state, error == GRPC_ERROR_NONE && !healthy
? "backend unhealthy"
: grpc_error_std_string(error).c_str());
seen_response_.Store(true, MemoryOrder::RELEASE);
seen_response_.store(true, std::memory_order_release);
grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
// Start another recv_message batch.
// This re-uses the ref we're holding.
@ -598,7 +599,7 @@ void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
health_check_client_->call_state_.reset();
if (retry) {
GPR_ASSERT(!health_check_client_->shutting_down_);
if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
if (seen_response_.load(std::memory_order_acquire)) {
// If the call fails after we've gotten a successful response, reset
// the backoff and restart the call immediately.
health_check_client_->retry_backoff_.Reset();

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <grpc/grpc.h>
#include <grpc/support/sync.h>
@ -28,7 +30,6 @@
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gprpp/arena.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -126,10 +127,10 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
OrphanablePtr<ByteStream> recv_message_;
grpc_closure recv_message_ready_;
grpc_slice_buffer recv_message_buffer_;
Atomic<bool> seen_response_{false};
std::atomic<bool> seen_response_{false};
// True if the cancel_stream batch has been started.
Atomic<bool> cancelled_{false};
std::atomic<bool> cancelled_{false};
// recv_trailing_metadata
grpc_metadata_batch recv_trailing_metadata_;

@ -16,6 +16,8 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include "absl/strings/string_view.h"
#include <grpc/grpc.h>
@ -31,7 +33,6 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
@ -57,13 +58,15 @@ class CircuitBreakerCallCounterMap {
explicit CallCounter(Key key) : key_(std::move(key)) {}
~CallCounter() override;
uint32_t Load() { return concurrent_requests_.Load(MemoryOrder::SEQ_CST); }
uint32_t Increment() { return concurrent_requests_.FetchAdd(1); }
void Decrement() { concurrent_requests_.FetchSub(1); }
uint32_t Load() {
return concurrent_requests_.load(std::memory_order_seq_cst);
}
uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
void Decrement() { concurrent_requests_.fetch_sub(1); }
private:
Key key_;
Atomic<uint32_t> concurrent_requests_{0};
std::atomic<uint32_t> concurrent_requests_{0};
};
RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,

@ -64,7 +64,7 @@ class GoogleCloud2ProdResolver : public Resolver {
grpc_httpcli_context context_;
grpc_httpcli_response response_;
grpc_closure on_done_;
Atomic<bool> on_done_called_{false};
std::atomic<bool> on_done_called_{false};
};
// A metadata server query to get the zone.
@ -154,8 +154,9 @@ void GoogleCloud2ProdResolver::MetadataQuery::OnHttpRequestDone(
void GoogleCloud2ProdResolver::MetadataQuery::MaybeCallOnDone(
grpc_error_handle error) {
bool expected = false;
if (!on_done_called_.CompareExchangeStrong(
&expected, true, MemoryOrder::RELAXED, MemoryOrder::RELAXED)) {
if (!on_done_called_.compare_exchange_strong(expected, true,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
// We've already called OnDone(), so just clean up.
GRPC_ERROR_UNREF(error);
Unref();

@ -20,9 +20,10 @@
#include <limits.h>
#include <atomic>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/http2_errors.h"
@ -159,8 +160,8 @@ class ChannelData {
// Member data used to track the state of channel.
grpc_millis last_idle_time_;
Atomic<intptr_t> call_count_{0};
Atomic<ChannelState> state_{IDLE};
std::atomic<intptr_t> call_count_{0};
std::atomic<ChannelState> state_{IDLE};
// Idle timer and its callback closure.
grpc_timer idle_timer_;
@ -201,20 +202,21 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
}
void ChannelData::IncreaseCallCount() {
const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
const intptr_t previous_value =
call_count_.fetch_add(1, std::memory_order_relaxed);
GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
previous_value + 1);
if (previous_value == 0) {
// This call is the one that makes the channel busy.
// Loop here to make sure the previous decrease operation has finished.
ChannelState state = state_.Load(MemoryOrder::RELAXED);
ChannelState state = state_.load(std::memory_order_relaxed);
while (true) {
switch (state) {
// Timer has not been set. Switch to CALLS_ACTIVE.
case IDLE:
// In this case, no other threads will modify the state, so we can
// just store the value.
state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
state_.store(CALLS_ACTIVE, std::memory_order_relaxed);
return;
// Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
case TIMER_PENDING:
@ -222,17 +224,17 @@ void ChannelData::IncreaseCallCount() {
// At this point, the state may have been switched to IDLE by the
// idle timer callback. Therefore, use CAS operation to change the
// state atomically.
// Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
// been properly set in DecreaseCallCount().
if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
MemoryOrder::ACQUIRE,
MemoryOrder::RELAXED)) {
// Use std::memory_order_acquire on success to ensure last_idle_time_
// has been properly set in DecreaseCallCount().
if (state_.compare_exchange_weak(state, TIMER_PENDING_CALLS_ACTIVE,
std::memory_order_acquire,
std::memory_order_relaxed)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
state = state_.load(std::memory_order_relaxed);
break;
}
}
@ -240,16 +242,17 @@ void ChannelData::IncreaseCallCount() {
}
void ChannelData::DecreaseCallCount() {
const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
const intptr_t previous_value =
call_count_.fetch_sub(1, std::memory_order_relaxed);
GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
previous_value - 1);
if (previous_value == 1) {
// This call is the one that makes the channel idle.
// last_idle_time_ does not need to be Atomic<> because busy-loops in
// last_idle_time_ does not need to be std::atomic<> because busy-loops in
// IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
// prevent multiple threads from simultaneously accessing this variable.
last_idle_time_ = ExecCtx::Get()->Now();
ChannelState state = state_.Load(MemoryOrder::RELAXED);
ChannelState state = state_.load(std::memory_order_relaxed);
while (true) {
switch (state) {
// Timer has not been set. Set the timer and switch to TIMER_PENDING
@ -257,7 +260,7 @@ void ChannelData::DecreaseCallCount() {
// Release store here to make other threads see the updated value of
// last_idle_time_.
StartIdleTimer();
state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
state_.store(TIMER_PENDING, std::memory_order_release);
return;
// Timer has been set. Switch to
// TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
@ -267,15 +270,15 @@ void ChannelData::DecreaseCallCount() {
// state atomically.
// Release store here to make the idle timer callback see the updated
// value of last_idle_time_ to properly reset the idle timer.
if (state_.CompareExchangeWeak(
&state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
if (state_.compare_exchange_weak(
state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
std::memory_order_release, std::memory_order_relaxed)) {
return;
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = state_.Load(MemoryOrder::RELAXED);
state = state_.load(std::memory_order_relaxed);
break;
}
}
@ -313,38 +316,41 @@ void ChannelData::IdleTimerCallback(void* arg, grpc_error_handle error) {
return;
}
bool finished = false;
ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
ChannelState state = chand->state_.load(std::memory_order_relaxed);
while (!finished) {
switch (state) {
case TIMER_PENDING:
// Change the state to PROCESSING to block IncreaseCallCout() until the
// EnterIdle() operation finishes, preventing mistakenly entering IDLE
// when active RPC exists.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
finished = chand->state_.compare_exchange_weak(
state, PROCESSING, std::memory_order_acquire,
std::memory_order_relaxed);
if (finished) {
chand->EnterIdle();
chand->state_.Store(IDLE, MemoryOrder::RELAXED);
chand->state_.store(IDLE, std::memory_order_relaxed);
}
break;
case TIMER_PENDING_CALLS_ACTIVE:
finished = chand->state_.CompareExchangeWeak(
&state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
finished = chand->state_.compare_exchange_weak(
state, CALLS_ACTIVE, std::memory_order_relaxed,
std::memory_order_relaxed);
break;
case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
// Change the state to PROCESSING to block IncreaseCallCount() until the
// StartIdleTimer() operation finishes, preventing mistakenly restarting
// the timer after grpc_timer_cancel() when shutdown.
finished = chand->state_.CompareExchangeWeak(
&state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
finished = chand->state_.compare_exchange_weak(
state, PROCESSING, std::memory_order_acquire,
std::memory_order_relaxed);
if (finished) {
chand->StartIdleTimer();
chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
chand->state_.store(TIMER_PENDING, std::memory_order_relaxed);
}
break;
default:
// The state has not been switched to desired value yet, try again.
state = chand->state_.Load(MemoryOrder::RELAXED);
state = chand->state_.load(std::memory_order_relaxed);
break;
}
}

@ -18,6 +18,8 @@
#include "src/core/ext/filters/fault_injection/fault_injection_filter.h"
#include <atomic>
#include "absl/strings/numbers.h"
#include <grpc/support/alloc.h>
@ -28,7 +30,6 @@
#include "src/core/ext/filters/fault_injection/service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/timer.h"
@ -40,9 +41,9 @@ TraceFlag grpc_fault_injection_filter_trace(false, "fault_injection_filter");
namespace {
Atomic<uint32_t> g_active_faults{0};
std::atomic<uint32_t> g_active_faults{0};
static_assert(
std::is_trivially_destructible<Atomic<uint32_t>>::value,
std::is_trivially_destructible<std::atomic<uint32_t>>::value,
"the active fault counter needs to have a trivially destructible type");
inline int GetLinkedMetadatumValueInt(grpc_linked_mdelem* md) {
@ -140,7 +141,7 @@ class CallData {
// Finishes the fault injection, should only be called once.
void FaultInjectionFinished() {
g_active_faults.FetchSub(1, MemoryOrder::RELAXED);
g_active_faults.fetch_sub(1, std::memory_order_relaxed);
}
// This is a callback that will be invoked after the delay timer is up.
@ -400,10 +401,11 @@ void CallData::DecideWhetherToInjectFaults(
}
bool CallData::HaveActiveFaultsQuota(bool increment) {
if (g_active_faults.Load(MemoryOrder::ACQUIRE) >= fi_policy_->max_faults) {
if (g_active_faults.load(std::memory_order_acquire) >=
fi_policy_->max_faults) {
return false;
}
if (increment) g_active_faults.FetchAdd(1, MemoryOrder::RELAXED);
if (increment) g_active_faults.fetch_add(1, std::memory_order_relaxed);
return true;
}

@ -339,8 +339,8 @@ bool MaybeAddServerLoadReportingFilter(const grpc_channel_args& args) {
// time if we build with the filter target.
struct ServerLoadReportingFilterStaticRegistrar {
ServerLoadReportingFilterStaticRegistrar() {
static grpc_core::Atomic<bool> registered{false};
if (registered.Load(grpc_core::MemoryOrder::ACQUIRE)) return;
static std::atomic<bool> registered{false};
if (registered.load(std::memory_order_acquire)) return;
RegisterChannelFilter<ServerLoadReportingChannelData,
ServerLoadReportingCallData>(
"server_load_reporting", GRPC_SERVER_CHANNEL, INT_MAX,
@ -353,7 +353,7 @@ struct ServerLoadReportingFilterStaticRegistrar {
::grpc::load_reporter::MeasureEndBytesReceived();
::grpc::load_reporter::MeasureEndLatencyMs();
::grpc::load_reporter::MeasureOtherCallMetric();
registered.Store(true, grpc_core::MemoryOrder::RELEASE);
registered.store(true, std::memory_order_release);
}
} server_load_reporting_filter_static_registrar;

@ -31,8 +31,8 @@ namespace grpc_core {
namespace {
uint64_t GetAndResetCounter(Atomic<uint64_t>* from) {
return from->Exchange(0, MemoryOrder::RELAXED);
uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
return from->exchange(0, std::memory_order_relaxed);
}
} // namespace
@ -82,7 +82,7 @@ XdsClusterDropStats::Snapshot XdsClusterDropStats::GetSnapshotAndReset() {
}
void XdsClusterDropStats::AddUncategorizedDrops() {
uncategorized_drops_.FetchAdd(1);
uncategorized_drops_.fetch_add(1);
}
void XdsClusterDropStats::AddCallDropped(const std::string& category) {
@ -132,28 +132,29 @@ XdsClusterLocalityStats::~XdsClusterLocalityStats() {
XdsClusterLocalityStats::Snapshot
XdsClusterLocalityStats::GetSnapshotAndReset() {
Snapshot snapshot = {GetAndResetCounter(&total_successful_requests_),
// Don't reset total_requests_in_progress because it's
// not related to a single reporting interval.
total_requests_in_progress_.Load(MemoryOrder::RELAXED),
GetAndResetCounter(&total_error_requests_),
GetAndResetCounter(&total_issued_requests_),
{}};
Snapshot snapshot = {
GetAndResetCounter(&total_successful_requests_),
// Don't reset total_requests_in_progress because it's
// not related to a single reporting interval.
total_requests_in_progress_.load(std::memory_order_relaxed),
GetAndResetCounter(&total_error_requests_),
GetAndResetCounter(&total_issued_requests_),
{}};
MutexLock lock(&backend_metrics_mu_);
snapshot.backend_metrics = std::move(backend_metrics_);
return snapshot;
}
void XdsClusterLocalityStats::AddCallStarted() {
total_issued_requests_.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(1, MemoryOrder::RELAXED);
total_issued_requests_.fetch_add(1, std::memory_order_relaxed);
total_requests_in_progress_.fetch_add(1, std::memory_order_relaxed);
}
void XdsClusterLocalityStats::AddCallFinished(bool fail) {
Atomic<uint64_t>& to_increment =
std::atomic<uint64_t>& to_increment =
fail ? total_error_requests_ : total_successful_requests_;
to_increment.FetchAdd(1, MemoryOrder::RELAXED);
total_requests_in_progress_.FetchAdd(-1, MemoryOrder::ACQ_REL);
to_increment.fetch_add(1, std::memory_order_relaxed);
total_requests_in_progress_.fetch_add(-1, std::memory_order_acq_rel);
}
} // namespace grpc_core

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <map>
#include <string>
@ -29,7 +30,6 @@
#include "absl/strings/string_view.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
@ -144,7 +144,7 @@ class XdsClusterDropStats : public RefCounted<XdsClusterDropStats> {
absl::string_view lrs_server_name_;
absl::string_view cluster_name_;
absl::string_view eds_service_name_;
Atomic<uint64_t> uncategorized_drops_{0};
std::atomic<uint64_t> uncategorized_drops_{0};
// Protects categorized_drops_. A mutex is necessary because the length of
// dropped_requests can be accessed by both the picker (from data plane
// mutex) and the load reporting thread (from the control plane combiner).
@ -221,10 +221,10 @@ class XdsClusterLocalityStats : public RefCounted<XdsClusterLocalityStats> {
absl::string_view eds_service_name_;
RefCountedPtr<XdsLocalityName> name_;
Atomic<uint64_t> total_successful_requests_{0};
Atomic<uint64_t> total_requests_in_progress_{0};
Atomic<uint64_t> total_error_requests_{0};
Atomic<uint64_t> total_issued_requests_{0};
std::atomic<uint64_t> total_successful_requests_{0};
std::atomic<uint64_t> total_requests_in_progress_{0};
std::atomic<uint64_t> total_error_requests_{0};
std::atomic<uint64_t> total_issued_requests_{0};
// Protects backend_metrics_. A mutex is necessary because the length of
// backend_metrics_ can be accessed by both the callback intercepting the

@ -24,6 +24,8 @@
#include <stdlib.h>
#include <string.h>
#include <atomic>
#include "absl/strings/escaping.h"
#include "absl/strings/strip.h"
@ -37,7 +39,6 @@
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/error.h"
@ -86,34 +87,34 @@ CallCountingHelper::CallCountingHelper() {
void CallCountingHelper::RecordCallStarted() {
AtomicCounterData& data =
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()];
data.calls_started.FetchAdd(1, MemoryOrder::RELAXED);
data.last_call_started_cycle.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
data.calls_started.fetch_add(1, std::memory_order_relaxed);
data.last_call_started_cycle.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallFailed() {
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_failed.FetchAdd(1, MemoryOrder::RELAXED);
.calls_failed.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::RecordCallSucceeded() {
per_cpu_counter_data_storage_[ExecCtx::Get()->starting_cpu()]
.calls_succeeded.FetchAdd(1, MemoryOrder::RELAXED);
.calls_succeeded.fetch_add(1, std::memory_order_relaxed);
}
void CallCountingHelper::CollectData(CounterData* out) {
for (size_t core = 0; core < num_cores_; ++core) {
AtomicCounterData& data = per_cpu_counter_data_storage_[core];
out->calls_started += data.calls_started.Load(MemoryOrder::RELAXED);
out->calls_started += data.calls_started.load(std::memory_order_relaxed);
out->calls_succeeded +=
per_cpu_counter_data_storage_[core].calls_succeeded.Load(
MemoryOrder::RELAXED);
out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.Load(
MemoryOrder::RELAXED);
per_cpu_counter_data_storage_[core].calls_succeeded.load(
std::memory_order_relaxed);
out->calls_failed += per_cpu_counter_data_storage_[core].calls_failed.load(
std::memory_order_relaxed);
const gpr_cycle_counter last_call =
per_cpu_counter_data_storage_[core].last_call_started_cycle.Load(
MemoryOrder::RELAXED);
per_cpu_counter_data_storage_[core].last_call_started_cycle.load(
std::memory_order_relaxed);
if (last_call > out->last_call_started_cycle) {
out->last_call_started_cycle = last_call;
}
@ -173,7 +174,7 @@ Json ChannelNode::RenderJson() {
};
// Connectivity state.
// If low-order bit is on, then the field is set.
int state_field = connectivity_state_.Load(MemoryOrder::RELAXED);
int state_field = connectivity_state_.load(std::memory_order_relaxed);
if ((state_field & 1) != 0) {
grpc_connectivity_state state =
static_cast<grpc_connectivity_state>(state_field >> 1);
@ -227,7 +228,7 @@ void ChannelNode::PopulateChildRefs(Json::Object* json) {
void ChannelNode::SetConnectivityState(grpc_connectivity_state state) {
// Store with low-order bit set to indicate that the field is set.
int state_field = (state << 1) + 1;
connectivity_state_.Store(state_field, MemoryOrder::RELAXED);
connectivity_state_.store(state_field, std::memory_order_relaxed);
}
void ChannelNode::AddChildChannel(intptr_t child_uuid) {
@ -474,37 +475,38 @@ SocketNode::SocketNode(std::string local, std::string remote, std::string name,
security_(std::move(security)) {}
void SocketNode::RecordStreamStartedFromLocal() {
streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
last_local_stream_created_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
streams_started_.fetch_add(1, std::memory_order_relaxed);
last_local_stream_created_cycle_.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void SocketNode::RecordStreamStartedFromRemote() {
streams_started_.FetchAdd(1, MemoryOrder::RELAXED);
last_remote_stream_created_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
streams_started_.fetch_add(1, std::memory_order_relaxed);
last_remote_stream_created_cycle_.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void SocketNode::RecordMessagesSent(uint32_t num_sent) {
messages_sent_.FetchAdd(num_sent, MemoryOrder::RELAXED);
last_message_sent_cycle_.Store(gpr_get_cycle_counter(), MemoryOrder::RELAXED);
messages_sent_.fetch_add(num_sent, std::memory_order_relaxed);
last_message_sent_cycle_.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
void SocketNode::RecordMessageReceived() {
messages_received_.FetchAdd(1, MemoryOrder::RELAXED);
last_message_received_cycle_.Store(gpr_get_cycle_counter(),
MemoryOrder::RELAXED);
messages_received_.fetch_add(1, std::memory_order_relaxed);
last_message_received_cycle_.store(gpr_get_cycle_counter(),
std::memory_order_relaxed);
}
Json SocketNode::RenderJson() {
// Create and fill the data child.
Json::Object data;
gpr_timespec ts;
int64_t streams_started = streams_started_.Load(MemoryOrder::RELAXED);
int64_t streams_started = streams_started_.load(std::memory_order_relaxed);
if (streams_started != 0) {
data["streamsStarted"] = std::to_string(streams_started);
gpr_cycle_counter last_local_stream_created_cycle =
last_local_stream_created_cycle_.Load(MemoryOrder::RELAXED);
last_local_stream_created_cycle_.load(std::memory_order_relaxed);
if (last_local_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_local_stream_created_cycle),
@ -512,7 +514,7 @@ Json SocketNode::RenderJson() {
data["lastLocalStreamCreatedTimestamp"] = gpr_format_timespec(ts);
}
gpr_cycle_counter last_remote_stream_created_cycle =
last_remote_stream_created_cycle_.Load(MemoryOrder::RELAXED);
last_remote_stream_created_cycle_.load(std::memory_order_relaxed);
if (last_remote_stream_created_cycle != 0) {
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(last_remote_stream_created_cycle),
@ -520,33 +522,35 @@ Json SocketNode::RenderJson() {
data["lastRemoteStreamCreatedTimestamp"] = gpr_format_timespec(ts);
}
}
int64_t streams_succeeded = streams_succeeded_.Load(MemoryOrder::RELAXED);
int64_t streams_succeeded =
streams_succeeded_.load(std::memory_order_relaxed);
if (streams_succeeded != 0) {
data["streamsSucceeded"] = std::to_string(streams_succeeded);
}
int64_t streams_failed = streams_failed_.Load(MemoryOrder::RELAXED);
int64_t streams_failed = streams_failed_.load(std::memory_order_relaxed);
if (streams_failed != 0) {
data["streamsFailed"] = std::to_string(streams_failed);
}
int64_t messages_sent = messages_sent_.Load(MemoryOrder::RELAXED);
int64_t messages_sent = messages_sent_.load(std::memory_order_relaxed);
if (messages_sent != 0) {
data["messagesSent"] = std::to_string(messages_sent);
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
last_message_sent_cycle_.Load(MemoryOrder::RELAXED)),
last_message_sent_cycle_.load(std::memory_order_relaxed)),
GPR_CLOCK_REALTIME);
data["lastMessageSentTimestamp"] = gpr_format_timespec(ts);
}
int64_t messages_received = messages_received_.Load(MemoryOrder::RELAXED);
int64_t messages_received =
messages_received_.load(std::memory_order_relaxed);
if (messages_received != 0) {
data["messagesReceived"] = std::to_string(messages_received);
ts = gpr_convert_clock_type(
gpr_cycle_counter_to_time(
last_message_received_cycle_.Load(MemoryOrder::RELAXED)),
last_message_received_cycle_.load(std::memory_order_relaxed)),
GPR_CLOCK_REALTIME);
data["lastMessageReceivedTimestamp"] = gpr_format_timespec(ts);
}
int64_t keepalives_sent = keepalives_sent_.Load(MemoryOrder::RELAXED);
int64_t keepalives_sent = keepalives_sent_.load(std::memory_order_relaxed);
if (keepalives_sent != 0) {
data["keepAlivesSent"] = std::to_string(keepalives_sent);
}

@ -21,17 +21,17 @@
#include <grpc/impl/codegen/port_platform.h>
#include <grpc/grpc.h>
#include <atomic>
#include <set>
#include <string>
#include "absl/container/inlined_vector.h"
#include "absl/types/optional.h"
#include <grpc/grpc.h>
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -134,19 +134,19 @@ class CallCountingHelper {
// Define the ctors so that we can use this structure in InlinedVector.
AtomicCounterData() = default;
AtomicCounterData(const AtomicCounterData& that)
: calls_started(that.calls_started.Load(MemoryOrder::RELAXED)),
calls_succeeded(that.calls_succeeded.Load(MemoryOrder::RELAXED)),
calls_failed(that.calls_failed.Load(MemoryOrder::RELAXED)),
: calls_started(that.calls_started.load(std::memory_order_relaxed)),
calls_succeeded(that.calls_succeeded.load(std::memory_order_relaxed)),
calls_failed(that.calls_failed.load(std::memory_order_relaxed)),
last_call_started_cycle(
that.last_call_started_cycle.Load(MemoryOrder::RELAXED)) {}
that.last_call_started_cycle.load(std::memory_order_relaxed)) {}
Atomic<int64_t> calls_started{0};
Atomic<int64_t> calls_succeeded{0};
Atomic<int64_t> calls_failed{0};
Atomic<gpr_cycle_counter> last_call_started_cycle{0};
std::atomic<int64_t> calls_started{0};
std::atomic<int64_t> calls_succeeded{0};
std::atomic<int64_t> calls_failed{0};
std::atomic<gpr_cycle_counter> last_call_started_cycle{0};
// Make sure the size is exactly one cache line.
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(Atomic<intptr_t>) -
sizeof(Atomic<gpr_cycle_counter>)];
uint8_t padding[GPR_CACHELINE_SIZE - 3 * sizeof(std::atomic<intptr_t>) -
sizeof(std::atomic<gpr_cycle_counter>)];
};
// TODO(soheilhy,veblush): Revist this after abseil integration.
// This has a problem when using abseil inlined_vector because it
@ -220,7 +220,7 @@ class ChannelNode : public BaseNode {
// Least significant bit indicates whether the value is set. Remaining
// bits are a grpc_connectivity_state value.
Atomic<int> connectivity_state_{0};
std::atomic<int> connectivity_state_{0};
Mutex child_mu_; // Guards sets below.
std::set<intptr_t> child_channels_;
@ -310,30 +310,30 @@ class SocketNode : public BaseNode {
void RecordStreamStartedFromLocal();
void RecordStreamStartedFromRemote();
void RecordStreamSucceeded() {
streams_succeeded_.FetchAdd(1, MemoryOrder::RELAXED);
streams_succeeded_.fetch_add(1, std::memory_order_relaxed);
}
void RecordStreamFailed() {
streams_failed_.FetchAdd(1, MemoryOrder::RELAXED);
streams_failed_.fetch_add(1, std::memory_order_relaxed);
}
void RecordMessagesSent(uint32_t num_sent);
void RecordMessageReceived();
void RecordKeepaliveSent() {
keepalives_sent_.FetchAdd(1, MemoryOrder::RELAXED);
keepalives_sent_.fetch_add(1, std::memory_order_relaxed);
}
const std::string& remote() { return remote_; }
private:
Atomic<int64_t> streams_started_{0};
Atomic<int64_t> streams_succeeded_{0};
Atomic<int64_t> streams_failed_{0};
Atomic<int64_t> messages_sent_{0};
Atomic<int64_t> messages_received_{0};
Atomic<int64_t> keepalives_sent_{0};
Atomic<gpr_cycle_counter> last_local_stream_created_cycle_{0};
Atomic<gpr_cycle_counter> last_remote_stream_created_cycle_{0};
Atomic<gpr_cycle_counter> last_message_sent_cycle_{0};
Atomic<gpr_cycle_counter> last_message_received_cycle_{0};
std::atomic<int64_t> streams_started_{0};
std::atomic<int64_t> streams_succeeded_{0};
std::atomic<int64_t> streams_failed_{0};
std::atomic<int64_t> messages_sent_{0};
std::atomic<int64_t> messages_received_{0};
std::atomic<int64_t> keepalives_sent_{0};
std::atomic<gpr_cycle_counter> last_local_stream_created_cycle_{0};
std::atomic<gpr_cycle_counter> last_remote_stream_created_cycle_{0};
std::atomic<gpr_cycle_counter> last_message_sent_cycle_{0};
std::atomic<gpr_cycle_counter> last_message_received_cycle_{0};
std::string local_;
std::string remote_;
RefCountedPtr<Security> const security_;

@ -18,7 +18,7 @@
namespace grpc_core {
std::atomic<CoreConfiguration*> CoreConfiguration::config_;
std::atomic<CoreConfiguration*> CoreConfiguration::config_{nullptr};
CoreConfiguration::Builder::Builder() = default;

@ -75,7 +75,7 @@ std::pair<Arena*, void*> Arena::CreateWithAlloc(size_t initial_size,
}
size_t Arena::Destroy() {
size_t size = total_used_.Load(MemoryOrder::RELAXED);
size_t size = total_used_.load(std::memory_order_relaxed);
this->~Arena();
gpr_free_aligned(this);
return size;

@ -27,6 +27,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <new>
#include <utility>
@ -35,7 +36,6 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/atomic.h"
#include <stddef.h>
@ -59,7 +59,7 @@ class Arena {
static constexpr size_t base_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(Arena));
size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(size);
size_t begin = total_used_.FetchAdd(size, MemoryOrder::RELAXED);
size_t begin = total_used_.fetch_add(size, std::memory_order_relaxed);
if (begin + size <= initial_zone_size_) {
return reinterpret_cast<char*>(this) + base_size + begin;
} else {
@ -105,7 +105,7 @@ class Arena {
// Keep track of the total used size. We use this in our call sizing
// hysteresis.
Atomic<size_t> total_used_;
std::atomic<size_t> total_used_{0};
const size_t initial_zone_size_;
gpr_spinlock arena_growth_spinlock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
// If the initial arena allocation wasn't enough, we allocate additional zones

@ -1,104 +0,0 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_H
#define GRPC_CORE_LIB_GPRPP_ATOMIC_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include <grpc/support/atm.h>
namespace grpc_core {
enum class MemoryOrder {
RELAXED = static_cast<int>(std::memory_order_relaxed),
CONSUME = static_cast<int>(std::memory_order_consume),
ACQUIRE = static_cast<int>(std::memory_order_acquire),
RELEASE = static_cast<int>(std::memory_order_release),
ACQ_REL = static_cast<int>(std::memory_order_acq_rel),
SEQ_CST = static_cast<int>(std::memory_order_seq_cst)
};
template <typename T>
class Atomic {
public:
explicit Atomic(T val = T()) : storage_(val) {}
T Load(MemoryOrder order) const {
return storage_.load(static_cast<std::memory_order>(order));
}
void Store(T val, MemoryOrder order) {
storage_.store(val, static_cast<std::memory_order>(order));
}
T Exchange(T desired, MemoryOrder order) {
return storage_.exchange(desired, static_cast<std::memory_order>(order));
}
bool CompareExchangeWeak(T* expected, T desired, MemoryOrder success,
MemoryOrder failure) {
return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_weak(
*expected, desired, static_cast<std::memory_order>(success),
static_cast<std::memory_order>(failure)));
}
bool CompareExchangeStrong(T* expected, T desired, MemoryOrder success,
MemoryOrder failure) {
return GPR_ATM_INC_CAS_THEN(storage_.compare_exchange_strong(
*expected, desired, static_cast<std::memory_order>(success),
static_cast<std::memory_order>(failure)));
}
template <typename Arg>
T FetchAdd(Arg arg, MemoryOrder order = MemoryOrder::SEQ_CST) {
return GPR_ATM_INC_ADD_THEN(storage_.fetch_add(
static_cast<Arg>(arg), static_cast<std::memory_order>(order)));
}
template <typename Arg>
T FetchSub(Arg arg, MemoryOrder order = MemoryOrder::SEQ_CST) {
return GPR_ATM_INC_ADD_THEN(storage_.fetch_sub(
static_cast<Arg>(arg), static_cast<std::memory_order>(order)));
}
// Atomically increment a counter only if the counter value is not zero.
// Returns true if increment took place; false if counter is zero.
bool IncrementIfNonzero() {
T count = storage_.load(std::memory_order_acquire);
do {
// If zero, we are done (without an increment). If not, we must do a CAS
// to maintain the contract: do not increment the counter if it is already
// zero
if (count == 0) {
return false;
}
} while (!CompareExchangeWeak(&count, count + 1, MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
return true;
}
private:
std::atomic<T> storage_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_H */

@ -0,0 +1,47 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_CORE_LIB_GPRPP_ATOMIC_UTILS_H
#define GRPC_CORE_LIB_GPRPP_ATOMIC_UTILS_H
#include <grpc/support/port_platform.h>
#include <atomic>
namespace grpc_core {
// Atomically increment a counter only if the counter value is not zero.
// Returns true if increment took place; false if counter is zero.
template <typename T>
inline bool IncrementIfNonzero(std::atomic<T>* p) {
T count = p->load(std::memory_order_acquire);
do {
// If zero, we are done (without an increment). If not, we must do a CAS
// to maintain the contract: do not increment the counter if it is already
// zero
if (count == 0) {
return false;
}
} while (!p->compare_exchange_weak(
count, count + 1, std::memory_order_acq_rel, std::memory_order_acquire));
return true;
}
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPRPP_ATOMIC_UTILS_H */

@ -27,7 +27,6 @@
#include <cassert>
#include <cinttypes>
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -67,7 +66,7 @@ class DualRefCounted : public Orphanable {
void Unref() {
// Convert strong ref to weak ref.
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(-1, 1), MemoryOrder::ACQ_REL);
refs_.fetch_add(MakeRefPair(-1, 1), std::memory_order_acq_rel);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
#ifndef NDEBUG
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
@ -85,7 +84,7 @@ class DualRefCounted : public Orphanable {
}
void Unref(const DebugLocation& location, const char* reason) {
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(-1, 1), MemoryOrder::ACQ_REL);
refs_.fetch_add(MakeRefPair(-1, 1), std::memory_order_acq_rel);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
#ifndef NDEBUG
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
@ -108,7 +107,7 @@ class DualRefCounted : public Orphanable {
}
RefCountedPtr<Child> RefIfNonZero() GRPC_MUST_USE_RESULT {
uint64_t prev_ref_pair = refs_.Load(MemoryOrder::ACQUIRE);
uint64_t prev_ref_pair = refs_.load(std::memory_order_acquire);
do {
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
#ifndef NDEBUG
@ -119,15 +118,15 @@ class DualRefCounted : public Orphanable {
}
#endif
if (strong_refs == 0) return nullptr;
} while (!refs_.CompareExchangeWeak(
&prev_ref_pair, prev_ref_pair + MakeRefPair(1, 0), MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
} while (!refs_.compare_exchange_weak(
prev_ref_pair, prev_ref_pair + MakeRefPair(1, 0),
std::memory_order_acq_rel, std::memory_order_acquire));
return RefCountedPtr<Child>(static_cast<Child*>(this));
}
RefCountedPtr<Child> RefIfNonZero(const DebugLocation& location,
const char* reason) GRPC_MUST_USE_RESULT {
uint64_t prev_ref_pair = refs_.Load(MemoryOrder::ACQUIRE);
uint64_t prev_ref_pair = refs_.load(std::memory_order_acquire);
do {
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
#ifndef NDEBUG
@ -144,9 +143,9 @@ class DualRefCounted : public Orphanable {
(void)reason;
#endif
if (strong_refs == 0) return nullptr;
} while (!refs_.CompareExchangeWeak(
&prev_ref_pair, prev_ref_pair + MakeRefPair(1, 0), MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE));
} while (!refs_.compare_exchange_weak(
prev_ref_pair, prev_ref_pair + MakeRefPair(1, 0),
std::memory_order_acq_rel, std::memory_order_acquire));
return RefCountedPtr<Child>(static_cast<Child*>(this));
}
@ -169,7 +168,7 @@ class DualRefCounted : public Orphanable {
const char* trace = trace_;
#endif
const uint64_t prev_ref_pair =
refs_.FetchSub(MakeRefPair(0, 1), MemoryOrder::ACQ_REL);
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
#ifndef NDEBUG
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
@ -191,7 +190,7 @@ class DualRefCounted : public Orphanable {
const char* trace = trace_;
#endif
const uint64_t prev_ref_pair =
refs_.FetchSub(MakeRefPair(0, 1), MemoryOrder::ACQ_REL);
refs_.fetch_sub(MakeRefPair(0, 1), std::memory_order_acq_rel);
#ifndef NDEBUG
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
@ -254,7 +253,7 @@ class DualRefCounted : public Orphanable {
void IncrementRefCount() {
#ifndef NDEBUG
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(1, 0), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(1, 0), std::memory_order_relaxed);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
GPR_ASSERT(strong_refs != 0);
@ -263,13 +262,13 @@ class DualRefCounted : public Orphanable {
strong_refs, strong_refs + 1, weak_refs);
}
#else
refs_.FetchAdd(MakeRefPair(1, 0), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(1, 0), std::memory_order_relaxed);
#endif
}
void IncrementRefCount(const DebugLocation& location, const char* reason) {
#ifndef NDEBUG
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(1, 0), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(1, 0), std::memory_order_relaxed);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
GPR_ASSERT(strong_refs != 0);
@ -282,14 +281,14 @@ class DualRefCounted : public Orphanable {
// Use conditionally-important parameters
(void)location;
(void)reason;
refs_.FetchAdd(MakeRefPair(1, 0), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(1, 0), std::memory_order_relaxed);
#endif
}
void IncrementWeakRefCount() {
#ifndef NDEBUG
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(0, 1), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_relaxed);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
if (trace_ != nullptr) {
@ -297,14 +296,14 @@ class DualRefCounted : public Orphanable {
weak_refs, weak_refs + 1, strong_refs);
}
#else
refs_.FetchAdd(MakeRefPair(0, 1), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_relaxed);
#endif
}
void IncrementWeakRefCount(const DebugLocation& location,
const char* reason) {
#ifndef NDEBUG
const uint64_t prev_ref_pair =
refs_.FetchAdd(MakeRefPair(0, 1), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_relaxed);
const uint32_t strong_refs = GetStrongRefs(prev_ref_pair);
const uint32_t weak_refs = GetWeakRefs(prev_ref_pair);
if (trace_ != nullptr) {
@ -316,14 +315,14 @@ class DualRefCounted : public Orphanable {
// Use conditionally-important parameters
(void)location;
(void)reason;
refs_.FetchAdd(MakeRefPair(0, 1), MemoryOrder::RELAXED);
refs_.fetch_add(MakeRefPair(0, 1), std::memory_order_relaxed);
#endif
}
#ifndef NDEBUG
const char* trace_;
#endif
Atomic<uint64_t> refs_;
std::atomic<uint64_t> refs_{0};
};
} // namespace grpc_core

@ -168,28 +168,30 @@ class ThreadState {
void Fork::GlobalInit() {
if (!override_enabled_) {
support_enabled_.Store(GPR_GLOBAL_CONFIG_GET(grpc_enable_fork_support),
MemoryOrder::RELAXED);
support_enabled_.store(GPR_GLOBAL_CONFIG_GET(grpc_enable_fork_support),
std::memory_order_relaxed);
}
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
exec_ctx_state_ = new internal::ExecCtxState();
thread_state_ = new internal::ThreadState();
}
}
void Fork::GlobalShutdown() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
delete exec_ctx_state_;
delete thread_state_;
}
}
bool Fork::Enabled() { return support_enabled_.Load(MemoryOrder::RELAXED); }
bool Fork::Enabled() {
return support_enabled_.load(std::memory_order_relaxed);
}
// Testing Only
void Fork::Enable(bool enable) {
override_enabled_ = true;
support_enabled_.Store(enable, MemoryOrder::RELAXED);
support_enabled_.store(enable, std::memory_order_relaxed);
}
void Fork::DoIncExecCtxCount() { exec_ctx_state_->IncExecCtxCount(); }
@ -205,38 +207,38 @@ Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
}
bool Fork::BlockExecCtx() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
return exec_ctx_state_->BlockExecCtx();
}
return false;
}
void Fork::AllowExecCtx() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
exec_ctx_state_->AllowExecCtx();
}
}
void Fork::IncThreadCount() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
thread_state_->IncThreadCount();
}
}
void Fork::DecThreadCount() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
thread_state_->DecThreadCount();
}
}
void Fork::AwaitThreads() {
if (support_enabled_.Load(MemoryOrder::RELAXED)) {
if (support_enabled_.load(std::memory_order_relaxed)) {
thread_state_->AwaitThreads();
}
}
internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
internal::ThreadState* Fork::thread_state_ = nullptr;
Atomic<bool> Fork::support_enabled_(false);
std::atomic<bool> Fork::support_enabled_(false);
bool Fork::override_enabled_ = false;
Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
} // namespace grpc_core

@ -21,7 +21,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/atomic.h"
#include <atomic>
/*
* NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
@ -48,14 +48,14 @@ class Fork {
// Increment the count of active ExecCtxs.
// Will block until a pending fork is complete if one is in progress.
static void IncExecCtxCount() {
if (GPR_UNLIKELY(support_enabled_.Load(MemoryOrder::RELAXED))) {
if (GPR_UNLIKELY(support_enabled_.load(std::memory_order_relaxed))) {
DoIncExecCtxCount();
}
}
// Decrement the count of active ExecCtxs
static void DecExecCtxCount() {
if (GPR_UNLIKELY(support_enabled_.Load(MemoryOrder::RELAXED))) {
if (GPR_UNLIKELY(support_enabled_.load(std::memory_order_relaxed))) {
DoDecExecCtxCount();
}
}
@ -93,7 +93,7 @@ class Fork {
static internal::ExecCtxState* exec_ctx_state_;
static internal::ThreadState* thread_state_;
static grpc_core::Atomic<bool> support_enabled_;
static std::atomic<bool> support_enabled_;
static bool override_enabled_;
static child_postfork_func reset_child_polling_engine_;
};

@ -27,9 +27,9 @@ namespace grpc_core {
//
bool MultiProducerSingleConsumerQueue::Push(Node* node) {
node->next.Store(nullptr, MemoryOrder::RELAXED);
Node* prev = head_.Exchange(node, MemoryOrder::ACQ_REL);
prev->next.Store(node, MemoryOrder::RELEASE);
node->next.store(nullptr, std::memory_order_relaxed);
Node* prev = head_.exchange(node, std::memory_order_acq_rel);
prev->next.store(node, std::memory_order_release);
return prev == &stub_;
}
@ -42,7 +42,7 @@ MultiProducerSingleConsumerQueue::Pop() {
MultiProducerSingleConsumerQueue::Node*
MultiProducerSingleConsumerQueue::PopAndCheckEnd(bool* empty) {
Node* tail = tail_;
Node* next = tail_->next.Load(MemoryOrder::ACQUIRE);
Node* next = tail_->next.load(std::memory_order_acquire);
if (tail == &stub_) {
// indicates the list is actually (ephemerally) empty
if (next == nullptr) {
@ -51,21 +51,21 @@ MultiProducerSingleConsumerQueue::PopAndCheckEnd(bool* empty) {
}
tail_ = next;
tail = next;
next = tail->next.Load(MemoryOrder::ACQUIRE);
next = tail->next.load(std::memory_order_acquire);
}
if (next != nullptr) {
*empty = false;
tail_ = next;
return tail;
}
Node* head = head_.Load(MemoryOrder::ACQUIRE);
Node* head = head_.load(std::memory_order_acquire);
if (tail != head) {
*empty = false;
// indicates a retry is in order: we're still adding
return nullptr;
}
Push(&stub_);
next = tail->next.Load(MemoryOrder::ACQUIRE);
next = tail->next.load(std::memory_order_acquire);
if (next != nullptr) {
*empty = false;
tail_ = next;

@ -21,11 +21,12 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h"
#include <atomic>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
// Multiple-producer single-consumer lock free queue, based upon the
@ -35,12 +36,12 @@ class MultiProducerSingleConsumerQueue {
public:
// List node. Application node types can inherit from this.
struct Node {
Atomic<Node*> next;
std::atomic<Node*> next{nullptr};
};
MultiProducerSingleConsumerQueue() : head_{&stub_}, tail_(&stub_) {}
~MultiProducerSingleConsumerQueue() {
GPR_ASSERT(head_.Load(MemoryOrder::RELAXED) == &stub_);
GPR_ASSERT(head_.load(std::memory_order_relaxed) == &stub_);
GPR_ASSERT(tail_ == &stub_);
}
@ -61,7 +62,7 @@ class MultiProducerSingleConsumerQueue {
// make sure head & tail don't share a cacheline
union {
char padding_[GPR_CACHELINE_SIZE];
Atomic<Node*> head_;
std::atomic<Node*> head_{nullptr};
};
Node* tail_;
Node stub_;

@ -29,7 +29,7 @@
#include <cassert>
#include <cinttypes>
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -70,18 +70,18 @@ class RefCount {
// Increases the ref-count by `n`.
void Ref(Value n = 1) {
#ifndef NDEBUG
const Value prior = value_.FetchAdd(n, MemoryOrder::RELAXED);
const Value prior = value_.fetch_add(n, std::memory_order_relaxed);
if (trace_ != nullptr) {
gpr_log(GPR_INFO, "%s:%p ref %" PRIdPTR " -> %" PRIdPTR, trace_, this,
prior, prior + n);
}
#else
value_.FetchAdd(n, MemoryOrder::RELAXED);
value_.fetch_add(n, std::memory_order_relaxed);
#endif
}
void Ref(const DebugLocation& location, const char* reason, Value n = 1) {
#ifndef NDEBUG
const Value prior = value_.FetchAdd(n, MemoryOrder::RELAXED);
const Value prior = value_.fetch_add(n, std::memory_order_relaxed);
if (trace_ != nullptr) {
gpr_log(GPR_INFO, "%s:%p %s:%d ref %" PRIdPTR " -> %" PRIdPTR " %s",
trace_, this, location.file(), location.line(), prior, prior + n,
@ -91,26 +91,26 @@ class RefCount {
// Use conditionally-important parameters
(void)location;
(void)reason;
value_.FetchAdd(n, MemoryOrder::RELAXED);
value_.fetch_add(n, std::memory_order_relaxed);
#endif
}
// Similar to Ref() with an assert on the ref-count being non-zero.
void RefNonZero() {
#ifndef NDEBUG
const Value prior = value_.FetchAdd(1, MemoryOrder::RELAXED);
const Value prior = value_.fetch_add(1, std::memory_order_relaxed);
if (trace_ != nullptr) {
gpr_log(GPR_INFO, "%s:%p ref %" PRIdPTR " -> %" PRIdPTR, trace_, this,
prior, prior + 1);
}
assert(prior > 0);
#else
value_.FetchAdd(1, MemoryOrder::RELAXED);
value_.fetch_add(1, std::memory_order_relaxed);
#endif
}
void RefNonZero(const DebugLocation& location, const char* reason) {
#ifndef NDEBUG
const Value prior = value_.FetchAdd(1, MemoryOrder::RELAXED);
const Value prior = value_.fetch_add(1, std::memory_order_relaxed);
if (trace_ != nullptr) {
gpr_log(GPR_INFO, "%s:%p %s:%d ref %" PRIdPTR " -> %" PRIdPTR " %s",
trace_, this, location.file(), location.line(), prior, prior + 1,
@ -133,7 +133,7 @@ class RefCount {
trace_, this, prior, prior + 1);
}
#endif
return value_.IncrementIfNonzero();
return IncrementIfNonzero(&value_);
}
bool RefIfNonZero(const DebugLocation& location, const char* reason) {
#ifndef NDEBUG
@ -148,7 +148,7 @@ class RefCount {
// Avoid unused-parameter warnings for debug-only parameters
(void)location;
(void)reason;
return value_.IncrementIfNonzero();
return IncrementIfNonzero(&value_);
}
// Decrements the ref-count and returns true if the ref-count reaches 0.
@ -159,7 +159,7 @@ class RefCount {
// safely access it, since another thread might free us in the interim.
auto* trace = trace_;
#endif
const Value prior = value_.FetchSub(1, MemoryOrder::ACQ_REL);
const Value prior = value_.fetch_sub(1, std::memory_order_acq_rel);
#ifndef NDEBUG
if (trace != nullptr) {
gpr_log(GPR_INFO, "%s:%p unref %" PRIdPTR " -> %" PRIdPTR, trace, this,
@ -176,7 +176,7 @@ class RefCount {
// safely access it, since another thread might free us in the interim.
auto* trace = trace_;
#endif
const Value prior = value_.FetchSub(1, MemoryOrder::ACQ_REL);
const Value prior = value_.fetch_sub(1, std::memory_order_acq_rel);
#ifndef NDEBUG
if (trace != nullptr) {
gpr_log(GPR_INFO, "%s:%p %s:%d unref %" PRIdPTR " -> %" PRIdPTR " %s",
@ -193,12 +193,12 @@ class RefCount {
}
private:
Value get() const { return value_.Load(MemoryOrder::RELAXED); }
Value get() const { return value_.load(std::memory_order_relaxed); }
#ifndef NDEBUG
const char* trace_;
#endif
Atomic<Value> value_;
std::atomic<Value> value_{0};
};
// PolymorphicRefCount enforces polymorphic destruction of RefCounted.

@ -29,7 +29,8 @@ inline void* InfLenFIFOQueue::PopFront() {
// mutex. This function will assume that there is at least one element in the
// queue (i.e. queue_head_->content is valid).
void* result = queue_head_->content;
count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);
count_.store(count_.load(std::memory_order_relaxed) - 1,
std::memory_order_relaxed);
// Updates Stats when trace flag turned on.
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
@ -40,7 +41,7 @@ inline void* InfLenFIFOQueue::PopFront() {
stats_.max_queue_time = gpr_time_max(
gpr_convert_clock_type(stats_.max_queue_time, GPR_TIMESPAN), wait_time);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
if (count_.load(std::memory_order_relaxed) == 0) {
stats_.busy_queue_time =
gpr_time_add(stats_.busy_queue_time,
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), busy_time));
@ -57,7 +58,7 @@ inline void* InfLenFIFOQueue::PopFront() {
queue_head_ = queue_head_->next;
// Signal waiting thread
if (count_.Load(MemoryOrder::RELAXED) > 0) {
if (count_.load(std::memory_order_relaxed) > 0) {
TopWaiter()->cv.Signal();
}
@ -92,7 +93,7 @@ InfLenFIFOQueue::InfLenFIFOQueue() {
}
InfLenFIFOQueue::~InfLenFIFOQueue() {
GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
GPR_ASSERT(count_.load(std::memory_order_relaxed) == 0);
for (size_t i = 0; i < delete_list_count_; ++i) {
gpr_free(delete_list_[i]);
}
@ -102,7 +103,7 @@ InfLenFIFOQueue::~InfLenFIFOQueue() {
void InfLenFIFOQueue::Put(void* elem) {
MutexLock l(&mu_);
int curr_count = count_.Load(MemoryOrder::RELAXED);
int curr_count = count_.load(std::memory_order_relaxed);
if (queue_tail_ == queue_head_ && curr_count != 0) {
// List is full. Expands list to double size by inserting new chunk of nodes
@ -134,7 +135,7 @@ void InfLenFIFOQueue::Put(void* elem) {
queue_tail_->insert_time = current_time;
}
count_.Store(curr_count + 1, MemoryOrder::RELAXED);
count_.store(curr_count + 1, std::memory_order_relaxed);
queue_tail_ = queue_tail_->next;
TopWaiter()->cv.Signal();
@ -143,7 +144,7 @@ void InfLenFIFOQueue::Put(void* elem) {
void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
MutexLock l(&mu_);
if (count_.Load(MemoryOrder::RELAXED) == 0) {
if (count_.load(std::memory_order_relaxed) == 0) {
gpr_timespec start_time;
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
@ -154,14 +155,14 @@ void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
PushWaiter(&self);
do {
self.cv.Wait(&mu_);
} while (count_.Load(MemoryOrder::RELAXED) == 0);
} while (count_.load(std::memory_order_relaxed) == 0);
RemoveWaiter(&self);
if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
wait_time != nullptr) {
*wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
}
}
GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
GPR_DEBUG_ASSERT(count_.load(std::memory_order_relaxed) > 0);
return PopFront();
}

@ -21,8 +21,9 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_core {
@ -70,7 +71,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
// Returns number of elements in queue currently.
// There might be concurrently add/remove on queue, so count might change
// quickly.
int count() const override { return count_.Load(MemoryOrder::RELAXED); }
int count() const override { return count_.load(std::memory_order_relaxed); }
struct Node {
Node* next; // Linking
@ -157,7 +158,7 @@ class InfLenFIFOQueue : public MPMCQueueInterface {
Node* queue_head_ = nullptr; // Head of the queue, remove position
Node* queue_tail_ = nullptr; // End of queue, insert position
Atomic<int> count_{0}; // Number of elements in queue
std::atomic<int> count_{0}; // Number of elements in queue
int num_nodes_ = 0; // Number of nodes allocated
Stats stats_; // Stats info

@ -72,7 +72,7 @@ size_t ThreadPool::DefaultStackSize() {
void ThreadPool::AssertHasNotBeenShutDown() {
// For debug checking purpose, using RELAXED order is sufficient.
GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED));
GPR_DEBUG_ASSERT(!shut_down_.load(std::memory_order_relaxed));
}
ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
@ -102,7 +102,7 @@ ThreadPool::ThreadPool(int num_threads, const char* thd_name,
ThreadPool::~ThreadPool() {
// For debug checking purpose, using RELAXED order is sufficient.
shut_down_.Store(true, MemoryOrder::RELAXED);
shut_down_.store(true, std::memory_order_relaxed);
for (int i = 0; i < num_threads_; ++i) {
queue_->Put(nullptr);

@ -134,7 +134,8 @@ class ThreadPool : public ThreadPoolInterface {
ThreadPoolWorker** threads_ = nullptr; // Array of worker threads
MPMCQueueInterface* queue_ = nullptr; // Closure queue
Atomic<bool> shut_down_{false}; // Destructor has been called if set to true
std::atomic<bool> shut_down_{
false}; // Destructor has been called if set to true
void SharedThreadPoolConstructor();
// For ThreadPool, default stack size for mobile platform is 1952K. for other

@ -137,12 +137,12 @@ class TcpZerocopySendRecord {
}
// References: 1 reference per sendmsg(), and 1 for the tcp_write().
void Ref() { ref_.FetchAdd(1, MemoryOrder::RELAXED); }
void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
// Unref: called when we get an error queue notification for a sendmsg(), if a
// sendmsg() failed or when tcp_write() is done.
bool Unref() {
const intptr_t prior = ref_.FetchSub(1, MemoryOrder::ACQ_REL);
const intptr_t prior = ref_.fetch_sub(1, std::memory_order_acq_rel);
GPR_DEBUG_ASSERT(prior > 0);
if (prior == 1) {
AllSendsComplete();
@ -160,7 +160,7 @@ class TcpZerocopySendRecord {
void AssertEmpty() {
GPR_DEBUG_ASSERT(buf_.count == 0);
GPR_DEBUG_ASSERT(buf_.length == 0);
GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0);
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
}
// When all sendmsg() calls associated with this tcp_write() have been
@ -168,12 +168,12 @@ class TcpZerocopySendRecord {
// for each sendmsg()) and all reference counts have been dropped, drop our
// reference to the underlying data since we no longer need it.
void AllSendsComplete() {
GPR_DEBUG_ASSERT(ref_.Load(MemoryOrder::RELAXED) == 0);
GPR_DEBUG_ASSERT(ref_.load(std::memory_order_relaxed) == 0);
grpc_slice_buffer_reset_and_unref_internal(&buf_);
}
grpc_slice_buffer buf_;
Atomic<intptr_t> ref_;
std::atomic<intptr_t> ref_{0};
OutgoingOffset out_offset_;
};
@ -287,7 +287,7 @@ class TcpZerocopySendCtx {
// Indicate that we are disposing of this zerocopy context. This indicator
// will prevent new zerocopy writes from being issued.
void Shutdown() { shutdown_.Store(true, MemoryOrder::RELEASE); }
void Shutdown() { shutdown_.store(true, std::memory_order_release); }
// Indicates that there are no inflight tcp_write() instances with zerocopy
// enabled.
@ -318,7 +318,7 @@ class TcpZerocopySendCtx {
}
TcpZerocopySendRecord* TryGetSendRecordLocked() {
if (shutdown_.Load(MemoryOrder::ACQUIRE)) {
if (shutdown_.load(std::memory_order_acquire)) {
return nullptr;
}
if (free_send_records_size_ == 0) {
@ -340,7 +340,7 @@ class TcpZerocopySendCtx {
int free_send_records_size_;
Mutex lock_;
uint32_t last_send_ = 0;
Atomic<bool> shutdown_;
std::atomic<bool> shutdown_{false};
bool enabled_ = false;
size_t threshold_bytes_ = kDefaultSendBytesThreshold;
std::unordered_map<uint32_t, TcpZerocopySendRecord*> ctx_lookup_;

@ -45,7 +45,7 @@ class WorkSerializer::WorkSerializerImpl : public Orphanable {
// An initial size of 1 keeps track of whether the work serializer has been
// orphaned.
Atomic<size_t> size_{1};
std::atomic<size_t> size_{1};
MultiProducerSingleConsumerQueue queue_;
};
@ -55,7 +55,7 @@ void WorkSerializer::WorkSerializerImpl::Run(
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
}
const size_t prev_size = size_.FetchAdd(1);
const size_t prev_size = size_.fetch_add(1);
// The work serializer should not have been orphaned.
GPR_DEBUG_ASSERT(prev_size > 0);
if (prev_size == 1) {
@ -83,7 +83,7 @@ void WorkSerializer::WorkSerializerImpl::Orphan() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this);
}
size_t prev_size = size_.FetchSub(1);
size_t prev_size = size_.fetch_sub(1);
if (prev_size == 1) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, " Destroying");
@ -101,7 +101,7 @@ void WorkSerializer::WorkSerializerImpl::DrainQueue() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) {
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this);
}
size_t prev_size = size_.FetchSub(1);
size_t prev_size = size_.fetch_sub(1);
GPR_DEBUG_ASSERT(prev_size >= 1);
// It is possible that while draining the queue, one of the callbacks ended
// up orphaning the work serializer. In that case, delete the object.

@ -18,12 +18,12 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <functional>
#include "absl/synchronization/mutex.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"

@ -105,13 +105,13 @@ struct batch_control {
} completion_data;
grpc_closure start_batch;
grpc_closure finish_batch;
grpc_core::Atomic<intptr_t> steps_to_complete;
std::atomic<intptr_t> steps_to_complete{0};
gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
void set_num_steps_to_complete(uintptr_t steps) {
steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
steps_to_complete.store(steps, std::memory_order_release);
}
bool completed_batch_step() {
return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
return steps_to_complete.fetch_sub(1, std::memory_order_acq_rel) == 1;
}
};

@ -23,6 +23,7 @@
#include <stdio.h>
#include <string.h>
#include <atomic>
#include <vector>
#include "absl/strings/str_format.h"
@ -38,7 +39,6 @@
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/timer.h"
@ -224,7 +224,7 @@ class CqEventQueue {
/* Note: The counter is not incremented/decremented atomically with push/pop.
* The count is only eventually consistent */
intptr_t num_items() const {
return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
return num_queue_items_.load(std::memory_order_relaxed);
}
bool Push(grpc_cq_completion* c);
@ -239,14 +239,14 @@ class CqEventQueue {
/* A lazy counter of number of items in the queue. This is NOT atomically
incremented/decremented along with push/pop operations and hence is only
eventually consistent */
grpc_core::Atomic<intptr_t> num_queue_items_{0};
std::atomic<intptr_t> num_queue_items_{0};
};
struct cq_next_data {
~cq_next_data() {
GPR_ASSERT(queue.num_items() == 0);
#ifndef NDEBUG
if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
if (pending_events.load(std::memory_order_acquire) != 0) {
gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
}
#endif
@ -257,11 +257,11 @@ struct cq_next_data {
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
grpc_core::Atomic<intptr_t> things_queued_ever{0};
std::atomic<intptr_t> things_queued_ever{0};
/** Number of outstanding events (+1 if not shut down)
Initial count is dropped by grpc_completion_queue_shutdown */
grpc_core::Atomic<intptr_t> pending_events{1};
std::atomic<intptr_t> pending_events{1};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called = false;
@ -277,7 +277,7 @@ struct cq_pluck_data {
GPR_ASSERT(completed_head.next ==
reinterpret_cast<uintptr_t>(&completed_head));
#ifndef NDEBUG
if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
if (pending_events.load(std::memory_order_acquire) != 0) {
gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
}
#endif
@ -289,17 +289,17 @@ struct cq_pluck_data {
/** Number of pending events (+1 if we're not shutdown).
Initial count is dropped by grpc_completion_queue_shutdown. */
grpc_core::Atomic<intptr_t> pending_events{1};
std::atomic<intptr_t> pending_events{1};
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
grpc_core::Atomic<intptr_t> things_queued_ever{0};
std::atomic<intptr_t> things_queued_ever{0};
/** 0 initially. 1 once we completed shutting */
/* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
* (pending_events == 0). So consider removing this in future and use
* pending_events */
grpc_core::Atomic<bool> shutdown{false};
std::atomic<bool> shutdown{false};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called = false;
@ -314,7 +314,7 @@ struct cq_callback_data {
~cq_callback_data() {
#ifndef NDEBUG
if (pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 0) {
if (pending_events.load(std::memory_order_acquire) != 0) {
gpr_log(GPR_ERROR, "Destroying CQ without draining it fully.");
}
#endif
@ -324,7 +324,7 @@ struct cq_callback_data {
/** Number of pending events (+1 if we're not shutdown).
Initial count is dropped by grpc_completion_queue_shutdown. */
grpc_core::Atomic<intptr_t> pending_events{1};
std::atomic<intptr_t> pending_events{1};
/** 0 initially. 1 once we initiated shutdown */
bool shutdown_called = false;
@ -459,7 +459,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
storage->done(storage->done_arg, storage);
ret = 1;
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
@ -476,7 +476,7 @@ int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
bool CqEventQueue::Push(grpc_cq_completion* c) {
queue_.Push(
reinterpret_cast<grpc_core::MultiProducerSingleConsumerQueue::Node*>(c));
return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
return num_queue_items_.fetch_add(1, std::memory_order_relaxed) == 0;
}
grpc_cq_completion* CqEventQueue::Pop() {
@ -497,7 +497,7 @@ grpc_cq_completion* CqEventQueue::Pop() {
}
if (c) {
num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
num_queue_items_.fetch_sub(1, std::memory_order_relaxed);
}
return c;
@ -648,17 +648,17 @@ static void cq_check_tag(grpc_completion_queue* /*cq*/, void* /*tag*/,
static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* /*tag*/) {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
return grpc_core::IncrementIfNonzero(&cqd->pending_events);
}
static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* /*tag*/) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
return grpc_core::IncrementIfNonzero(&cqd->pending_events);
}
static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* /*tag*/) {
cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
return cqd->pending_events.IncrementIfNonzero();
return grpc_core::IncrementIfNonzero(&cqd->pending_events);
}
bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
@ -714,13 +714,13 @@ static void cq_end_op_for_next(
} else {
/* Add the completion to the queue */
bool is_first = cqd->queue.Push(storage);
cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
/* Since we do not hold the cq lock here, it is important to do an 'acquire'
load here (instead of a 'no_barrier' load) to match with the release
store
(done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
(done via pending_events.fetch_sub(1, ACQ_REL)) in cq_shutdown_next
*/
if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
if (cqd->pending_events.load(std::memory_order_acquire) != 1) {
/* Only kick if this is the first item queued */
if (is_first) {
gpr_mu_lock(cq->mu);
@ -734,8 +734,7 @@ static void cq_end_op_for_next(
GRPC_ERROR_UNREF(kick_error);
}
}
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
@ -744,7 +743,7 @@ static void cq_end_op_for_next(
}
} else {
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
cqd->pending_events.store(0, std::memory_order_release);
gpr_mu_lock(cq->mu);
cq_finish_shutdown_next(cq);
gpr_mu_unlock(cq->mu);
@ -792,12 +791,12 @@ static void cq_end_op_for_pluck(
cq_check_tag(cq, tag, false); /* Used in debug builds only */
/* Add to the list of completions */
cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.fetch_add(1, std::memory_order_relaxed);
cqd->completed_tail->next =
reinterpret_cast<uintptr_t>(storage) | (1u & cqd->completed_tail->next);
cqd->completed_tail = storage;
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
cq_finish_shutdown_pluck(cq);
gpr_mu_unlock(cq->mu);
} else {
@ -857,7 +856,7 @@ static void cq_end_op_for_callback(
cq_check_tag(cq, tag, true); /* Used in debug builds only */
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
cq_finish_shutdown_callback(cq);
}
@ -912,12 +911,12 @@ class ExecCtxNext : public grpc_core::ExecCtx {
GPR_ASSERT(a->stolen_completion == nullptr);
intptr_t current_last_seen_things_queued_ever =
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.load(std::memory_order_relaxed);
if (current_last_seen_things_queued_ever !=
a->last_seen_things_queued_ever) {
a->last_seen_things_queued_ever =
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.load(std::memory_order_relaxed);
/* Pop a cq_completion from the queue. Returns NULL if the queue is empty
* might return NULL in some cases even if the queue is not empty; but
@ -976,7 +975,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
cqd->things_queued_ever.load(std::memory_order_relaxed),
cq,
deadline_millis,
nullptr,
@ -1015,7 +1014,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
}
if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
if (cqd->pending_events.load(std::memory_order_acquire) == 0) {
/* Before returning, check if the queue has any items left over (since
MultiProducerSingleConsumerQueue::Pop() can sometimes return NULL
even if the queue is not empty. If so, keep retrying but do not
@ -1065,7 +1064,7 @@ static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
}
if (cqd->queue.num_items() > 0 &&
cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
cqd->pending_events.load(std::memory_order_acquire) > 0) {
gpr_mu_lock(cq->mu);
cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
gpr_mu_unlock(cq->mu);
@ -1089,7 +1088,7 @@ static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
GPR_ASSERT(cqd->pending_events.load(std::memory_order_relaxed) == 0);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
@ -1111,10 +1110,10 @@ static void cq_shutdown_next(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
/* Doing acq/release FetchSub here to match with
/* Doing acq/release fetch_sub here to match with
* cq_begin_op_for_next and cq_end_op_for_next functions which read/write
* on this counter without necessarily holding a lock on cq */
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
cq_finish_shutdown_next(cq);
}
gpr_mu_unlock(cq->mu);
@ -1164,12 +1163,12 @@ class ExecCtxPluck : public grpc_core::ExecCtx {
GPR_ASSERT(a->stolen_completion == nullptr);
gpr_atm current_last_seen_things_queued_ever =
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.load(std::memory_order_relaxed);
if (current_last_seen_things_queued_ever !=
a->last_seen_things_queued_ever) {
gpr_mu_lock(cq->mu);
a->last_seen_things_queued_ever =
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
cqd->things_queued_ever.load(std::memory_order_relaxed);
grpc_cq_completion* c;
grpc_cq_completion* prev = &cqd->completed_head;
while ((c = reinterpret_cast<grpc_cq_completion*>(
@ -1225,7 +1224,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
gpr_mu_lock(cq->mu);
grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
cq_is_finished_arg is_finished_arg = {
cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
cqd->things_queued_ever.load(std::memory_order_relaxed),
cq,
deadline_millis,
nullptr,
@ -1262,7 +1261,7 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
}
prev = c;
}
if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
if (cqd->shutdown.load(std::memory_order_relaxed)) {
gpr_mu_unlock(cq->mu);
ret.type = GRPC_QUEUE_SHUTDOWN;
ret.success = 0;
@ -1324,8 +1323,8 @@ static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
cqd->shutdown.Store(true, grpc_core::MemoryOrder::RELAXED);
GPR_ASSERT(!cqd->shutdown.load(std::memory_order_relaxed));
cqd->shutdown.store(true, std::memory_order_relaxed);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
}
@ -1349,7 +1348,7 @@ static void cq_shutdown_pluck(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
cq_finish_shutdown_pluck(cq);
}
gpr_mu_unlock(cq->mu);
@ -1392,7 +1391,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
return;
}
cqd->shutdown_called = true;
if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
if (cqd->pending_events.fetch_sub(1, std::memory_order_acq_rel) == 1) {
gpr_mu_unlock(cq->mu);
cq_finish_shutdown_callback(cq);
} else {

@ -18,16 +18,16 @@
#include <grpc/support/port_platform.h>
#include <grpc/grpc.h>
#include <string.h>
#include <atomic>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"

@ -1193,7 +1193,7 @@ Server::CallData::CallData(grpc_call_element* elem,
}
Server::CallData::~CallData() {
GPR_ASSERT(state_.Load(MemoryOrder::RELAXED) != CallState::PENDING);
GPR_ASSERT(state_.load(std::memory_order_relaxed) != CallState::PENDING);
GRPC_ERROR_UNREF(recv_initial_metadata_error_);
if (host_.has_value()) {
grpc_slice_unref_internal(*host_);
@ -1206,26 +1206,26 @@ Server::CallData::~CallData() {
}
void Server::CallData::SetState(CallState state) {
state_.Store(state, MemoryOrder::RELAXED);
state_.store(state, std::memory_order_relaxed);
}
bool Server::CallData::MaybeActivate() {
CallState expected = CallState::PENDING;
return state_.CompareExchangeStrong(&expected, CallState::ACTIVATED,
MemoryOrder::ACQ_REL,
MemoryOrder::RELAXED);
return state_.compare_exchange_strong(expected, CallState::ACTIVATED,
std::memory_order_acq_rel,
std::memory_order_relaxed);
}
void Server::CallData::FailCallCreation() {
CallState expected_not_started = CallState::NOT_STARTED;
CallState expected_pending = CallState::PENDING;
if (state_.CompareExchangeStrong(&expected_not_started, CallState::ZOMBIED,
MemoryOrder::ACQ_REL,
MemoryOrder::ACQUIRE)) {
if (state_.compare_exchange_strong(expected_not_started, CallState::ZOMBIED,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
KillZombie();
} else if (state_.CompareExchangeStrong(&expected_pending, CallState::ZOMBIED,
MemoryOrder::ACQ_REL,
MemoryOrder::RELAXED)) {
} else if (state_.compare_exchange_strong(
expected_pending, CallState::ZOMBIED,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
// Zombied call will be destroyed when it's removed from the pending
// queue... later.
}
@ -1281,7 +1281,7 @@ void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {
RequestMatcherInterface* rm = calld->matcher_;
Server* server = rm->server();
if (error != GRPC_ERROR_NONE || server->ShutdownCalled()) {
calld->state_.Store(CallState::ZOMBIED, MemoryOrder::RELAXED);
calld->state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
calld->KillZombie();
return;
}
@ -1305,7 +1305,7 @@ void Server::CallData::KillZombie() {
void Server::CallData::StartNewRpc(grpc_call_element* elem) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
if (server_->ShutdownCalled()) {
state_.Store(CallState::ZOMBIED, MemoryOrder::RELAXED);
state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
KillZombie();
return;
}

@ -19,6 +19,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <list>
#include <vector>
@ -31,7 +32,6 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channelz.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/surface/completion_queue.h"
#include "src/core/lib/transport/transport.h"
@ -288,7 +288,7 @@ class Server : public InternallyRefCounted<Server> {
grpc_call* call_;
Atomic<CallState> state_{CallState::NOT_STARTED};
std::atomic<CallState> state_{CallState::NOT_STARTED};
absl::optional<grpc_slice> path_;
absl::optional<grpc_slice> host_;
@ -366,7 +366,7 @@ class Server : public InternallyRefCounted<Server> {
// Take a shutdown ref for a request (increment by 2) and return if shutdown
// has already been called.
bool ShutdownRefOnRequest() {
int old_value = shutdown_refs_.FetchAdd(2, MemoryOrder::ACQ_REL);
int old_value = shutdown_refs_.fetch_add(2, std::memory_order_acq_rel);
return (old_value & 1) != 0;
}
@ -374,26 +374,26 @@ class Server : public InternallyRefCounted<Server> {
// (for in-flight request) and possibly call MaybeFinishShutdown if
// appropriate.
void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
if (shutdown_refs_.FetchSub(2, MemoryOrder::ACQ_REL) == 2) {
if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) {
MutexLock lock(&mu_global_);
MaybeFinishShutdown();
}
}
void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
if (shutdown_refs_.FetchSub(1, MemoryOrder::ACQ_REL) == 1) {
if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
MaybeFinishShutdown();
}
}
bool ShutdownCalled() const {
return (shutdown_refs_.Load(MemoryOrder::ACQUIRE) & 1) == 0;
return (shutdown_refs_.load(std::memory_order_acquire) & 1) == 0;
}
// Returns whether there are no more shutdown refs, which means that shutdown
// has been called and all accepted requests have been published if using an
// AllocatingRequestMatcher.
bool ShutdownReady() const {
return shutdown_refs_.Load(MemoryOrder::ACQUIRE) == 0;
return shutdown_refs_.load(std::memory_order_acquire) == 0;
}
grpc_channel_args* const channel_args_;
@ -430,7 +430,7 @@ class Server : public InternallyRefCounted<Server> {
// the lowest bit will be 0 (defaults to 1) and the counter will be even. The
// server should not notify on shutdown until the counter is 0 (shutdown is
// called and there are no requests that are accepted but not started).
Atomic<int> shutdown_refs_{1};
std::atomic<int> shutdown_refs_{1};
bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);

@ -101,7 +101,8 @@ void AsyncConnectivityStateWatcherInterface::Notify(
//
ConnectivityStateTracker::~ConnectivityStateTracker() {
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
for (const auto& p : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
@ -121,7 +122,8 @@ void ConnectivityStateTracker::AddWatcher(
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
this, watcher.get());
}
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (initial_state != current_state) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO,
@ -150,14 +152,15 @@ void ConnectivityStateTracker::RemoveWatcher(
void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
const absl::Status& status,
const char* reason) {
grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
grpc_connectivity_state current_state =
state_.load(std::memory_order_relaxed);
if (state == current_state) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
name_, this, ConnectivityStateName(current_state),
ConnectivityStateName(state), reason, status.ToString().c_str());
}
state_.Store(state, MemoryOrder::RELAXED);
state_.store(state, std::memory_order_relaxed);
status_ = status;
for (const auto& p : watchers_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
@ -174,7 +177,7 @@ void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
}
grpc_connectivity_state ConnectivityStateTracker::state() const {
grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
grpc_connectivity_state state = state_.load(std::memory_order_relaxed);
if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
name_, this, ConnectivityStateName(state));

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include <atomic>
#include <map>
#include <memory>
@ -29,7 +30,6 @@
#include <grpc/grpc.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@ -131,7 +131,7 @@ class ConnectivityStateTracker {
private:
const char* name_;
Atomic<grpc_connectivity_state> state_;
std::atomic<grpc_connectivity_state> state_{grpc_connectivity_state()};
absl::Status status_;
// TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
// be a set instead of a map.

@ -145,10 +145,10 @@ AllocatedMetadata::AllocatedMetadata(
AllocatedMetadata::~AllocatedMetadata() {
grpc_slice_unref_internal(key());
grpc_slice_unref_internal(value());
void* user_data = user_data_.data.Load(grpc_core::MemoryOrder::RELAXED);
void* user_data = user_data_.data.load(std::memory_order_relaxed);
if (user_data) {
destroy_user_data_func destroy_user_data =
user_data_.destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED);
user_data_.destroy_user_data.load(std::memory_order_relaxed);
destroy_user_data(user_data);
}
}
@ -189,10 +189,10 @@ InternedMetadata::InternedMetadata(const grpc_slice& key,
InternedMetadata::~InternedMetadata() {
grpc_slice_unref_internal(key());
grpc_slice_unref_internal(value());
void* user_data = user_data_.data.Load(grpc_core::MemoryOrder::RELAXED);
void* user_data = user_data_.data.load(std::memory_order_relaxed);
if (user_data) {
destroy_user_data_func destroy_user_data =
user_data_.destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED);
user_data_.destroy_user_data.load(std::memory_order_relaxed);
destroy_user_data(user_data);
}
}
@ -560,9 +560,9 @@ grpc_mdelem grpc_mdelem_from_grpc_metadata(grpc_metadata* metadata) {
}
static void* get_user_data(UserData* user_data, void (*destroy_func)(void*)) {
if (user_data->destroy_user_data.Load(grpc_core::MemoryOrder::ACQUIRE) ==
if (user_data->destroy_user_data.load(std::memory_order_acquire) ==
destroy_func) {
return user_data->data.Load(grpc_core::MemoryOrder::RELAXED);
return user_data->data.load(std::memory_order_relaxed);
} else {
return nullptr;
}
@ -594,16 +594,16 @@ static void* set_user_data(UserData* ud, void (*destroy_func)(void*),
void* data) {
GPR_ASSERT((data == nullptr) == (destroy_func == nullptr));
grpc_core::ReleasableMutexLock lock(&ud->mu_user_data);
if (ud->destroy_user_data.Load(grpc_core::MemoryOrder::RELAXED)) {
if (ud->destroy_user_data.load(std::memory_order_relaxed)) {
/* user data can only be set once */
lock.Release();
if (destroy_func != nullptr) {
destroy_func(data);
}
return ud->data.Load(grpc_core::MemoryOrder::RELAXED);
return ud->data.load(std::memory_order_relaxed);
}
ud->data.Store(data, grpc_core::MemoryOrder::RELAXED);
ud->destroy_user_data.Store(destroy_func, grpc_core::MemoryOrder::RELEASE);
ud->data.store(data, std::memory_order_relaxed);
ud->destroy_user_data.store(destroy_func, std::memory_order_release);
return data;
}

@ -21,14 +21,14 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/log.h>
#include <atomic>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/log.h>
#include <grpc/slice.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_utils.h"
@ -203,8 +203,8 @@ typedef void (*destroy_user_data_func)(void* data);
struct UserData {
Mutex mu_user_data;
grpc_core::Atomic<destroy_user_data_func> destroy_user_data;
grpc_core::Atomic<void*> data;
std::atomic<destroy_user_data_func> destroy_user_data{nullptr};
std::atomic<void*> data{nullptr};
};
class StaticMetadata {
@ -241,7 +241,7 @@ class RefcountedMdBase {
#ifndef NDEBUG
void Ref(const char* file, int line) {
grpc_mdelem_trace_ref(this, key_, value_, RefValue(), file, line);
const intptr_t prior = refcnt_.FetchAdd(1, MemoryOrder::RELAXED);
const intptr_t prior = refcnt_.fetch_add(1, std::memory_order_relaxed);
GPR_ASSERT(prior > 0);
}
bool Unref(const char* file, int line) {
@ -253,10 +253,10 @@ class RefcountedMdBase {
/* we can assume the ref count is >= 1 as the application is calling
this function - meaning that no adjustment to mdtab_free is necessary,
simplifying the logic here to be just an atomic increment */
refcnt_.FetchAdd(1, MemoryOrder::RELAXED);
refcnt_.fetch_add(1, std::memory_order_relaxed);
}
bool Unref() {
const intptr_t prior = refcnt_.FetchSub(1, MemoryOrder::ACQ_REL);
const intptr_t prior = refcnt_.fetch_sub(1, std::memory_order_acq_rel);
GPR_DEBUG_ASSERT(prior > 0);
return prior == 1;
}
@ -266,15 +266,17 @@ class RefcountedMdBase {
void TraceAtStart(const char* tag);
#endif
intptr_t RefValue() { return refcnt_.Load(MemoryOrder::RELAXED); }
bool AllRefsDropped() { return refcnt_.Load(MemoryOrder::ACQUIRE) == 0; }
bool FirstRef() { return refcnt_.FetchAdd(1, MemoryOrder::RELAXED) == 0; }
intptr_t RefValue() { return refcnt_.load(std::memory_order_relaxed); }
bool AllRefsDropped() { return refcnt_.load(std::memory_order_acquire) == 0; }
bool FirstRef() {
return refcnt_.fetch_add(1, std::memory_order_relaxed) == 0;
}
private:
/* must be byte compatible with grpc_mdelem_data */
grpc_slice key_;
grpc_slice value_;
grpc_core::Atomic<intptr_t> refcnt_;
std::atomic<intptr_t> refcnt_{0};
uint32_t hash_ = 0;
};

@ -42,7 +42,7 @@ namespace {
const char* kFailPolicyName = "fail_lb";
Atomic<int> g_num_lb_picks;
std::atomic<int> g_num_lb_picks;
class FailPolicy : public LoadBalancingPolicy {
public:
@ -66,7 +66,7 @@ class FailPolicy : public LoadBalancingPolicy {
explicit FailPicker(absl::Status status) : status_(status) {}
PickResult Pick(PickArgs /*args*/) override {
g_num_lb_picks.FetchAdd(1);
g_num_lb_picks.fetch_add(1);
return PickResult::Fail(status_);
}
@ -179,7 +179,7 @@ static void test_retry_lb_fail(grpc_end2end_test_config config) {
grpc_call_error error;
grpc_slice details;
grpc_core::g_num_lb_picks.Store(0, grpc_core::MemoryOrder::RELAXED);
grpc_core::g_num_lb_picks.store(0, std::memory_order_relaxed);
grpc_arg args[] = {
grpc_channel_arg_integer_create(
@ -259,8 +259,7 @@ static void test_retry_lb_fail(grpc_end2end_test_config config) {
cq_verifier_destroy(cqv);
int num_picks =
grpc_core::g_num_lb_picks.Load(grpc_core::MemoryOrder::RELAXED);
int num_picks = grpc_core::g_num_lb_picks.load(std::memory_order_relaxed);
gpr_log(GPR_INFO, "NUM LB PICKS: %d", num_picks);
GPR_ASSERT(num_picks == 2);

@ -56,13 +56,13 @@ class SimpleFunctorForAdd : public grpc_completion_queue_functor {
~SimpleFunctorForAdd() {}
static void Run(struct grpc_completion_queue_functor* cb, int /*ok*/) {
auto* callback = static_cast<SimpleFunctorForAdd*>(cb);
callback->count_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
callback->count_.fetch_add(1, std::memory_order_relaxed);
}
int count() { return count_.Load(grpc_core::MemoryOrder::RELAXED); }
int count() { return count_.load(std::memory_order_relaxed); }
private:
grpc_core::Atomic<int> count_{0};
std::atomic<int> count_{0};
};
static void test_add(void) {

@ -24,8 +24,8 @@
#include <memory.h>
#include <stdio.h>
#include <atomic>
#include <atomic>
#include <string>
#include "absl/strings/str_cat.h"

@ -1762,7 +1762,7 @@ src/core/lib/gpr/useful.h \
src/core/lib/gpr/wrap_memcpy.cc \
src/core/lib/gprpp/arena.cc \
src/core/lib/gprpp/arena.h \
src/core/lib/gprpp/atomic.h \
src/core/lib/gprpp/atomic_utils.h \
src/core/lib/gprpp/bitset.h \
src/core/lib/gprpp/construct_destruct.h \
src/core/lib/gprpp/debug_location.h \

@ -1601,7 +1601,7 @@ src/core/lib/gpr/wrap_memcpy.cc \
src/core/lib/gprpp/README.md \
src/core/lib/gprpp/arena.cc \
src/core/lib/gprpp/arena.h \
src/core/lib/gprpp/atomic.h \
src/core/lib/gprpp/atomic_utils.h \
src/core/lib/gprpp/bitset.h \
src/core/lib/gprpp/construct_destruct.h \
src/core/lib/gprpp/debug_location.h \

Loading…
Cancel
Save