From 96a26fdb0b3b882a946f8a5c1d4d3fa9514eb781 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 29 Mar 2019 14:58:58 -0700 Subject: [PATCH] Use mutext to protect drop call counts in grpclb client stats. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 4 ++-- .../lb_policy/grpclb/grpclb_client_stats.cc | 8 ++++++-- .../lb_policy/grpclb/grpclb_client_stats.h | 20 +++++++++---------- .../lb_policy/grpclb/load_balancer_api.cc | 4 ++-- .../lb_policy/grpclb/load_balancer_api.h | 2 +- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5bf15aa8f7f..aebd2fd3faa 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -558,7 +558,7 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) { // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. if (client_stats_ != nullptr) { - client_stats_->AddCallDroppedLocked(drop_token); + client_stats_->AddCallDropped(drop_token); } return PICK_COMPLETE; } @@ -917,7 +917,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() { // Construct message payload. GPR_ASSERT(send_message_payload_ == nullptr); grpc_grpclb_request* request = - grpc_grpclb_load_report_request_create_locked(client_stats_.get()); + grpc_grpclb_load_report_request_create(client_stats_.get()); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (LoadReportCountersAreZero(request)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc index 1c7ed871d74..84b9c41a734 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc @@ -25,6 +25,8 @@ #include #include +#include "src/core/lib/gprpp/mutex_lock.h" + namespace grpc_core { void GrpcLbClientStats::AddCallStarted() { @@ -43,11 +45,12 @@ void GrpcLbClientStats::AddCallFinished( } } -void GrpcLbClientStats::AddCallDroppedLocked(const char* token) { +void GrpcLbClientStats::AddCallDropped(const char* token) { // Increment num_calls_started and num_calls_finished. gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); // Record the drop. + MutexLock lock(&drop_count_mu_); if (drop_token_counts_ == nullptr) { drop_token_counts_.reset(New()); } @@ -69,7 +72,7 @@ void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) { } // namespace -void GrpcLbClientStats::GetLocked( +void GrpcLbClientStats::Get( int64_t* num_calls_started, int64_t* num_calls_finished, int64_t* num_calls_finished_with_client_failed_to_send, int64_t* num_calls_finished_known_received, @@ -80,6 +83,7 @@ void GrpcLbClientStats::GetLocked( &num_calls_finished_with_client_failed_to_send_); AtomicGetAndResetCounter(num_calls_finished_known_received, &num_calls_finished_known_received_); + MutexLock lock(&drop_count_mu_); *drop_token_counts = std::move(drop_token_counts_); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index cb261ee16c7..fdebdf55c17 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -41,20 +41,19 @@ class GrpcLbClientStats : public RefCounted { typedef InlinedVector DroppedCallCounts; - GrpcLbClientStats() {} + GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); } + ~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); } void AddCallStarted(); void AddCallFinished(bool finished_with_client_failed_to_send, bool finished_known_received); - // This method is not thread-safe; caller must synchronize. - void AddCallDroppedLocked(const char* token); + void AddCallDropped(const char* token); - // This method is not thread-safe; caller must synchronize. - void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received, - UniquePtr* drop_token_counts); + void Get(int64_t* num_calls_started, int64_t* num_calls_finished, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received, + UniquePtr* drop_token_counts); // A destruction function to use as the user_data key when attaching // client stats to a grpc_mdelem. @@ -63,13 +62,12 @@ class GrpcLbClientStats : public RefCounted { } private: - // This field must only be accessed via *_locked() methods. - UniquePtr drop_token_counts_; - // These fields may be accessed from multiple threads at a time. gpr_atm num_calls_started_ = 0; gpr_atm num_calls_finished_ = 0; gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; gpr_atm num_calls_finished_known_received_ = 0; + gpr_mu drop_count_mu_; // Guards drop_token_counts_. + UniquePtr drop_token_counts_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc index 594c8cf6e94..b51db110395 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc @@ -107,7 +107,7 @@ static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field, return true; } -grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( +grpc_grpclb_request* grpc_grpclb_load_report_request_create( grpc_core::GrpcLbClientStats* client_stats) { grpc_grpclb_request* req = static_cast( gpr_zalloc(sizeof(grpc_grpclb_request))); @@ -122,7 +122,7 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; grpc_core::UniquePtr drop_counts; - client_stats->GetLocked( + client_stats->Get( &req->client_stats.num_calls_started, &req->client_stats.num_calls_finished, &req->client_stats.num_calls_finished_with_client_failed_to_send, diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 3c1d41a01b1..8005f6fe301 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -43,7 +43,7 @@ typedef struct { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name); -grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( +grpc_grpclb_request* grpc_grpclb_load_report_request_create( grpc_core::GrpcLbClientStats* client_stats); /** Protocol Buffers v3-encode \a request */