Use mutext to protect drop call counts in grpclb client stats.

pull/18580/head
Mark D. Roth 6 years ago
parent 0b5ec20ce8
commit 96a26fdb0b
  1. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
  3. 14
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  4. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
  5. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h

@ -558,7 +558,7 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
// subchannel call (and therefore no client_load_reporting filter) // subchannel call (and therefore no client_load_reporting filter)
// for dropped calls. // for dropped calls.
if (client_stats_ != nullptr) { if (client_stats_ != nullptr) {
client_stats_->AddCallDroppedLocked(drop_token); client_stats_->AddCallDropped(drop_token);
} }
return PICK_COMPLETE; return PICK_COMPLETE;
} }
@ -917,7 +917,7 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload. // Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr); GPR_ASSERT(send_message_payload_ == nullptr);
grpc_grpclb_request* request = grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create_locked(client_stats_.get()); grpc_grpclb_load_report_request_create(client_stats_.get());
// Skip client load report if the counters were all zero in the last // Skip client load report if the counters were all zero in the last
// report and they are still zero in this one. // report and they are still zero in this one.
if (LoadReportCountersAreZero(request)) { if (LoadReportCountersAreZero(request)) {

@ -25,6 +25,8 @@
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/mutex_lock.h"
namespace grpc_core { namespace grpc_core {
void GrpcLbClientStats::AddCallStarted() { void GrpcLbClientStats::AddCallStarted() {
@ -43,11 +45,12 @@ void GrpcLbClientStats::AddCallFinished(
} }
} }
void GrpcLbClientStats::AddCallDroppedLocked(const char* token) { void GrpcLbClientStats::AddCallDropped(const char* token) {
// Increment num_calls_started and num_calls_finished. // Increment num_calls_started and num_calls_finished.
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
// Record the drop. // Record the drop.
MutexLock lock(&drop_count_mu_);
if (drop_token_counts_ == nullptr) { if (drop_token_counts_ == nullptr) {
drop_token_counts_.reset(New<DroppedCallCounts>()); drop_token_counts_.reset(New<DroppedCallCounts>());
} }
@ -69,7 +72,7 @@ void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
} // namespace } // namespace
void GrpcLbClientStats::GetLocked( void GrpcLbClientStats::Get(
int64_t* num_calls_started, int64_t* num_calls_finished, int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send, int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received, int64_t* num_calls_finished_known_received,
@ -80,6 +83,7 @@ void GrpcLbClientStats::GetLocked(
&num_calls_finished_with_client_failed_to_send_); &num_calls_finished_with_client_failed_to_send_);
AtomicGetAndResetCounter(num_calls_finished_known_received, AtomicGetAndResetCounter(num_calls_finished_known_received,
&num_calls_finished_known_received_); &num_calls_finished_known_received_);
MutexLock lock(&drop_count_mu_);
*drop_token_counts = std::move(drop_token_counts_); *drop_token_counts = std::move(drop_token_counts_);
} }

@ -41,17 +41,16 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts; typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
GrpcLbClientStats() {} GrpcLbClientStats() { gpr_mu_init(&drop_count_mu_); }
~GrpcLbClientStats() { gpr_mu_destroy(&drop_count_mu_); }
void AddCallStarted(); void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send, void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received); bool finished_known_received);
// This method is not thread-safe; caller must synchronize. void AddCallDropped(const char* token);
void AddCallDroppedLocked(const char* token);
// This method is not thread-safe; caller must synchronize. void Get(int64_t* num_calls_started, int64_t* num_calls_finished,
void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send, int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received, int64_t* num_calls_finished_known_received,
UniquePtr<DroppedCallCounts>* drop_token_counts); UniquePtr<DroppedCallCounts>* drop_token_counts);
@ -63,13 +62,12 @@ class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
} }
private: private:
// This field must only be accessed via *_locked() methods.
UniquePtr<DroppedCallCounts> drop_token_counts_;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started_ = 0; gpr_atm num_calls_started_ = 0;
gpr_atm num_calls_finished_ = 0; gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0; gpr_atm num_calls_finished_known_received_ = 0;
gpr_mu drop_count_mu_; // Guards drop_token_counts_.
UniquePtr<DroppedCallCounts> drop_token_counts_;
}; };
} // namespace grpc_core } // namespace grpc_core

@ -107,7 +107,7 @@ static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
return true; return true;
} }
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( grpc_grpclb_request* grpc_grpclb_load_report_request_create(
grpc_core::GrpcLbClientStats* client_stats) { grpc_core::GrpcLbClientStats* client_stats) {
grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>( grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
gpr_zalloc(sizeof(grpc_grpclb_request))); gpr_zalloc(sizeof(grpc_grpclb_request)));
@ -122,7 +122,7 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts> grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
drop_counts; drop_counts;
client_stats->GetLocked( client_stats->Get(
&req->client_stats.num_calls_started, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished, &req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_client_failed_to_send, &req->client_stats.num_calls_finished_with_client_failed_to_send,

@ -43,7 +43,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */ /** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name); grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name);
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked( grpc_grpclb_request* grpc_grpclb_load_report_request_create(
grpc_core::GrpcLbClientStats* client_stats); grpc_core::GrpcLbClientStats* client_stats);
/** Protocol Buffers v3-encode \a request */ /** Protocol Buffers v3-encode \a request */

Loading…
Cancel
Save