Avoid using seq_cst atomic operations in grpcpp when unnecessary.

These cases are almost all in the callback API.

Also use atomic<intptr_t> insteda of atomic_int for consistency with
gpr_atm and grpc_core::Atomic.
pull/19365/head
Soheil Hassas Yeganeh 6 years ago
parent e6d9b7b19e
commit 85f08100d4
  1. 51
      include/grpcpp/impl/codegen/client_callback.h
  2. 42
      include/grpcpp/impl/codegen/server_callback.h
  3. 10
      include/grpcpp/impl/codegen/server_interceptor.h
  4. 2
      include/grpcpp/server_impl.h

@ -19,6 +19,7 @@
#ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
#define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
#include <atomic>
#include <functional>
#include <grpcpp/impl/codegen/call.h>
@ -419,7 +420,8 @@ class ClientCallbackReaderWriterImpl
static void operator delete(void*, void*) { assert(0); }
void MaybeFinish() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
Status s = std::move(finish_status_);
auto* reactor = reactor_;
auto* call = call_.call();
@ -489,7 +491,7 @@ class ClientCallbackReaderWriterImpl
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&read_ops_);
} else {
@ -510,7 +512,7 @@ class ClientCallbackReaderWriterImpl
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&write_ops_);
} else {
@ -531,7 +533,7 @@ class ClientCallbackReaderWriterImpl
},
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&writes_done_ops_);
} else {
@ -539,8 +541,10 @@ class ClientCallbackReaderWriterImpl
}
}
virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
virtual void RemoveHold() override { MaybeFinish(); }
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
private:
friend class ClientCallbackReaderWriterFactory<Request, Response>;
@ -581,7 +585,7 @@ class ClientCallbackReaderWriterImpl
bool read_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
};
@ -619,7 +623,8 @@ class ClientCallbackReaderImpl
static void operator delete(void*, void*) { assert(0); }
void MaybeFinish() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
Status s = std::move(finish_status_);
auto* reactor = reactor_;
auto* call = call_.call();
@ -669,7 +674,7 @@ class ClientCallbackReaderImpl
void Read(Response* msg) override {
read_ops_.RecvMessage(msg);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&read_ops_);
} else {
@ -677,8 +682,10 @@ class ClientCallbackReaderImpl
}
}
virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
virtual void RemoveHold() override { MaybeFinish(); }
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
private:
friend class ClientCallbackReaderFactory<Response>;
@ -712,7 +719,7 @@ class ClientCallbackReaderImpl
bool read_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
};
@ -750,7 +757,8 @@ class ClientCallbackWriterImpl
static void operator delete(void*, void*) { assert(0); }
void MaybeFinish() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
Status s = std::move(finish_status_);
auto* reactor = reactor_;
auto* call = call_.call();
@ -819,7 +827,7 @@ class ClientCallbackWriterImpl
}
// TODO(vjpai): don't assert
GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&write_ops_);
} else {
@ -840,7 +848,7 @@ class ClientCallbackWriterImpl
},
&writes_done_ops_);
writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (started_) {
call_.PerformOps(&writes_done_ops_);
} else {
@ -848,8 +856,10 @@ class ClientCallbackWriterImpl
}
}
virtual void AddHold(int holds) override { callbacks_outstanding_ += holds; }
virtual void RemoveHold() override { MaybeFinish(); }
void AddHold(int holds) override {
callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
}
void RemoveHold() override { MaybeFinish(); }
private:
friend class ClientCallbackWriterFactory<Request>;
@ -889,7 +899,7 @@ class ClientCallbackWriterImpl
bool writes_done_ops_at_start_{false};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
};
@ -951,7 +961,8 @@ class ClientCallbackUnaryImpl final
}
void MaybeFinish() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
Status s = std::move(finish_status_);
auto* reactor = reactor_;
auto* call = call_.call();
@ -991,7 +1002,7 @@ class ClientCallbackUnaryImpl final
Status finish_status_;
// This call will have 2 callbacks: start and finish
std::atomic_int callbacks_outstanding_{2};
std::atomic<intptr_t> callbacks_outstanding_{2};
bool started_{false};
};

@ -68,13 +68,13 @@ class ServerReactor {
// remain unmet.
void MaybeCallOnCancel() {
if (on_cancel_conditions_remaining_.fetch_sub(
1, std::memory_order_acq_rel) == 1) {
if (GPR_UNLIKELY(on_cancel_conditions_remaining_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
OnCancel();
}
}
std::atomic_int on_cancel_conditions_remaining_{2};
std::atomic<intptr_t> on_cancel_conditions_remaining_{2};
};
template <class Request, class Response>
@ -568,7 +568,7 @@ class CallbackUnaryHandler : public MethodHandler {
void SendInitialMetadata(std::function<void(bool)> f) override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
// TODO(vjpai): Consider taking f as a move-capture if we adopt C++14
// and if performance of this operation matters
meta_tag_.Set(call_.call(),
@ -618,7 +618,8 @@ class CallbackUnaryHandler : public MethodHandler {
ResponseType* response() { return allocator_state_->response(); }
void MaybeDone() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
allocator_state_->Release();
@ -640,7 +641,7 @@ class CallbackUnaryHandler : public MethodHandler {
experimental::MessageHolder<RequestType, ResponseType>* const
allocator_state_;
std::function<void()> call_requester_;
std::atomic_int callbacks_outstanding_{
std::atomic<intptr_t> callbacks_outstanding_{
2}; // reserve for Finish and CompletionOp
};
};
@ -712,7 +713,7 @@ class CallbackClientStreamingHandler : public MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnSendInitialMetadataDone(ok);
@ -730,7 +731,7 @@ class CallbackClientStreamingHandler : public MethodHandler {
}
void Read(RequestType* req) override {
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
read_ops_.RecvMessage(req);
call_.PerformOps(&read_ops_);
}
@ -761,7 +762,8 @@ class CallbackClientStreamingHandler : public MethodHandler {
ResponseType* response() { return &resp_; }
void MaybeDone() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
reactor_->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
@ -785,7 +787,7 @@ class CallbackClientStreamingHandler : public MethodHandler {
ResponseType resp_;
std::function<void()> call_requester_;
experimental::ServerReadReactor<RequestType, ResponseType>* reactor_;
std::atomic_int callbacks_outstanding_{
std::atomic<intptr_t> callbacks_outstanding_{
3}; // reserve for OnStarted, Finish, and CompletionOp
};
};
@ -867,7 +869,7 @@ class CallbackServerStreamingHandler : public MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnSendInitialMetadataDone(ok);
@ -885,7 +887,7 @@ class CallbackServerStreamingHandler : public MethodHandler {
}
void Write(const ResponseType* resp, WriteOptions options) override {
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (options.is_last_message()) {
options.set_buffer_hint();
}
@ -939,7 +941,8 @@ class CallbackServerStreamingHandler : public MethodHandler {
const RequestType* request() { return req_; }
void MaybeDone() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
reactor_->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
@ -963,7 +966,7 @@ class CallbackServerStreamingHandler : public MethodHandler {
const RequestType* req_;
std::function<void()> call_requester_;
experimental::ServerWriteReactor<RequestType, ResponseType>* reactor_;
std::atomic_int callbacks_outstanding_{
std::atomic<intptr_t> callbacks_outstanding_{
3}; // reserve for OnStarted, Finish, and CompletionOp
};
};
@ -1031,7 +1034,7 @@ class CallbackBidiHandler : public MethodHandler {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
meta_tag_.Set(call_.call(),
[this](bool ok) {
reactor_->OnSendInitialMetadataDone(ok);
@ -1049,7 +1052,7 @@ class CallbackBidiHandler : public MethodHandler {
}
void Write(const ResponseType* resp, WriteOptions options) override {
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
if (options.is_last_message()) {
options.set_buffer_hint();
}
@ -1077,7 +1080,7 @@ class CallbackBidiHandler : public MethodHandler {
}
void Read(RequestType* req) override {
callbacks_outstanding_++;
callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
read_ops_.RecvMessage(req);
call_.PerformOps(&read_ops_);
}
@ -1112,7 +1115,8 @@ class CallbackBidiHandler : public MethodHandler {
~ServerCallbackReaderWriterImpl() {}
void MaybeDone() {
if (--callbacks_outstanding_ == 0) {
if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1, std::memory_order_acq_rel) == 1)) {
reactor_->OnDone();
grpc_call* call = call_.call();
auto call_requester = std::move(call_requester_);
@ -1137,7 +1141,7 @@ class CallbackBidiHandler : public MethodHandler {
Call call_;
std::function<void()> call_requester_;
experimental::ServerBidiReactor<RequestType, ResponseType>* reactor_;
std::atomic_int callbacks_outstanding_{
std::atomic<intptr_t> callbacks_outstanding_{
3}; // reserve for OnStarted, Finish, and CompletionOp
};
};

@ -98,9 +98,7 @@ class ServerRpcInfo {
ServerRpcInfo(grpc_impl::ServerContext* ctx, const char* method,
internal::RpcMethod::RpcType type)
: ctx_(ctx), method_(method), type_(static_cast<Type>(type)) {
ref_.store(1);
}
: ctx_(ctx), method_(method), type_(static_cast<Type>(type)) {}
// Runs interceptor at pos \a pos.
void RunInterceptor(
@ -122,9 +120,9 @@ class ServerRpcInfo {
}
}
void Ref() { ref_++; }
void Ref() { ref_.fetch_add(1, std::memory_order_relaxed); }
void Unref() {
if (--ref_ == 0) {
if (GPR_UNLIKELY(ref_.fetch_sub(1, std::memory_order_acq_rel) == 1)) {
delete this;
}
}
@ -132,7 +130,7 @@ class ServerRpcInfo {
grpc_impl::ServerContext* ctx_ = nullptr;
const char* method_ = nullptr;
const Type type_;
std::atomic_int ref_;
std::atomic<intptr_t> ref_{1};
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
friend class internal::InterceptorBatchMethodsImpl;

@ -342,7 +342,7 @@ class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
// during decreasing load, so it is less performance-critical.
grpc::internal::Mutex callback_reqs_mu_;
grpc::internal::CondVar callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::atomic<intptr_t> callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;

Loading…
Cancel
Save