C++-ify grpclb client stats.

reviewable/pr15436/r1
Mark D. Roth 7 years ago
parent 784e43507c
commit 290d35e1d2
  1. 20
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
  2. 37
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  3. 133
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.cc
  4. 59
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  5. 30
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
  6. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  7. 1
      src/core/lib/gprpp/memory.h

@ -35,9 +35,10 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
static void destroy_channel_elem(grpc_channel_element* elem) {}
namespace {
struct call_data {
// Stats object to update.
grpc_grpclb_client_stats* client_stats;
grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure* original_on_complete_for_send;
@ -47,6 +48,7 @@ struct call_data {
grpc_closure* original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded;
};
} // namespace
static void on_complete_for_send(void* arg, grpc_error* error) {
@ -72,11 +74,11 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Get stats object from context and take a ref.
GPR_ASSERT(args->context != nullptr);
if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) {
calld->client_stats =
grpc_grpclb_client_stats_ref(static_cast<grpc_grpclb_client_stats*>(
args->context[GRPC_GRPCLB_CLIENT_STATS].value));
calld->client_stats = static_cast<grpc_core::GrpcLbClientStats*>(
args->context[GRPC_GRPCLB_CLIENT_STATS].value)
->Ref();
// Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats);
calld->client_stats->AddCallStarted();
}
return GRPC_ERROR_NONE;
}
@ -88,12 +90,12 @@ static void destroy_call_elem(grpc_call_element* elem,
if (calld->client_stats != nullptr) {
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
calld->client_stats->AddCallFinished(
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
calld->recv_initial_metadata_succeeded /* known_received */);
// All done, so unref the stats object.
grpc_grpclb_client_stats_unref(calld->client_stats);
// TODO(roth): Eliminate this once filter stack is converted to C++.
calld->client_stats.reset();
}
}

@ -159,9 +159,8 @@ class GrpcLb : public LoadBalancingPolicy {
// The LB token associated with the pick. This is set via user_data in
// the pick.
grpc_mdelem lb_token;
// Stats for client-side load reporting. Note that this holds a
// reference, which must be either passed on via context or unreffed.
grpc_grpclb_client_stats* client_stats = nullptr;
// Stats for client-side load reporting.
RefCountedPtr<GrpcLbClientStats> client_stats;
// Next pending pick.
PendingPick* next = nullptr;
};
@ -186,7 +185,8 @@ class GrpcLb : public LoadBalancingPolicy {
void StartQuery();
grpc_grpclb_client_stats* client_stats() const { return client_stats_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
bool seen_initial_response() const { return seen_initial_response_; }
private:
@ -237,7 +237,7 @@ class GrpcLb : public LoadBalancingPolicy {
// The stats for client-side load reporting associated with this LB call.
// Created after the first serverlist is received.
grpc_grpclb_client_stats* client_stats_ = nullptr;
RefCountedPtr<GrpcLbClientStats> client_stats_;
grpc_millis client_stats_report_interval_ = 0;
grpc_timer client_load_report_timer_;
bool client_load_report_timer_callback_pending_ = false;
@ -548,9 +548,6 @@ GrpcLb::BalancerCallState::~BalancerCallState() {
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
grpc_slice_unref_internal(lb_call_status_details_);
if (client_stats_ != nullptr) {
grpc_grpclb_client_stats_unref(client_stats_);
}
}
void GrpcLb::BalancerCallState::Orphan() {
@ -673,22 +670,22 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
grpc_grpclb_request* request) {
grpc_grpclb_dropped_call_counts* drop_entries =
static_cast<grpc_grpclb_dropped_call_counts*>(
GrpcLbClientStats::DroppedCallCounts* drop_entries =
static_cast<GrpcLbClientStats::DroppedCallCounts*>(
request->client_stats.calls_finished_with_drop.arg);
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
request->client_stats.num_calls_finished_known_received == 0 &&
(drop_entries == nullptr || drop_entries->num_entries == 0);
(drop_entries == nullptr || drop_entries->size() == 0);
}
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create_locked(client_stats_);
grpc_grpclb_load_report_request_create_locked(client_stats_.get());
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (LoadReportCountersAreZero(request)) {
@ -814,7 +811,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = grpc_grpclb_client_stats_create();
lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
@ -1516,7 +1513,7 @@ grpc_error* AddLbTokenToInitialMetadata(
// Destroy function used when embedding client stats in call context.
void DestroyClientStats(void* arg) {
grpc_grpclb_client_stats_unref(static_cast<grpc_grpclb_client_stats*>(arg));
static_cast<GrpcLbClientStats*>(arg)->Unref();
}
void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
@ -1537,14 +1534,12 @@ void GrpcLb::PendingPickSetMetadataAndContext(PendingPick* pp) {
// Pass on client stats via context. Passes ownership of the reference.
if (pp->client_stats != nullptr) {
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value =
pp->client_stats;
pp->client_stats.release();
pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy =
DestroyClientStats;
}
} else {
if (pp->client_stats != nullptr) {
grpc_grpclb_client_stats_unref(pp->client_stats);
}
pp->client_stats.reset();
}
}
@ -1610,8 +1605,8 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
grpc_grpclb_client_stats_add_call_dropped_locked(
server->load_balance_token, lb_calld_->client_stats());
lb_calld_->client_stats()->AddCallDroppedLocked(
server->load_balance_token);
}
if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE);
@ -1624,7 +1619,7 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
}
// Set client_stats and user_data.
if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) {
pp->client_stats = grpc_grpclb_client_stats_ref(lb_calld_->client_stats());
pp->client_stats = lb_calld_->client_stats()->Ref();
}
GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token;

@ -22,131 +22,66 @@
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include "src/core/lib/channel/channel_args.h"
namespace grpc_core {
#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats"
struct grpc_grpclb_client_stats {
gpr_refcount refs;
// This field must only be accessed via *_locked() methods.
grpc_grpclb_dropped_call_counts* drop_token_counts;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() {
grpc_grpclb_client_stats* client_stats =
static_cast<grpc_grpclb_client_stats*>(gpr_zalloc(sizeof(*client_stats)));
gpr_ref_init(&client_stats->refs, 1);
return client_stats;
}
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats) {
gpr_ref(&client_stats->refs);
return client_stats;
}
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
void GrpcLbClientStats::AddCallStarted() {
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
}
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
void GrpcLbClientStats::AddCallFinished(
bool finished_with_client_failed_to_send, bool finished_known_received) {
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_,
(gpr_atm)1);
}
if (finished_known_received) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received,
(gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1);
}
}
void grpc_grpclb_client_stats_add_call_dropped_locked(
char* token, grpc_grpclb_client_stats* client_stats) {
void GrpcLbClientStats::AddCallDroppedLocked(char* token) {
// Increment num_calls_started and num_calls_finished.
gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1);
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1);
// Record the drop.
if (client_stats->drop_token_counts == nullptr) {
client_stats->drop_token_counts =
static_cast<grpc_grpclb_dropped_call_counts*>(
gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts)));
if (drop_token_counts_ == nullptr) {
drop_token_counts_.reset(New<DroppedCallCounts>());
}
grpc_grpclb_dropped_call_counts* drop_token_counts =
client_stats->drop_token_counts;
for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
++drop_token_counts->token_counts[i].count;
for (size_t i = 0; i < drop_token_counts_->size(); ++i) {
if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) {
++(*drop_token_counts_)[i].count;
return;
}
}
// Not found, so add a new entry. We double the size of the array each time.
size_t new_num_entries = 2;
while (new_num_entries < drop_token_counts->num_entries + 1) {
new_num_entries *= 2;
}
drop_token_counts->token_counts = static_cast<grpc_grpclb_drop_token_count*>(
gpr_realloc(drop_token_counts->token_counts,
new_num_entries * sizeof(grpc_grpclb_drop_token_count)));
grpc_grpclb_drop_token_count* new_entry =
&drop_token_counts->token_counts[drop_token_counts->num_entries++];
new_entry->token = gpr_strdup(token);
new_entry->count = 1;
// Not found, so add a new entry.
drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1);
}
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
namespace {
void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) {
*value = static_cast<int64_t>(gpr_atm_acq_load(counter));
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
} // namespace
void GrpcLbClientStats::GetLocked(
int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
*drop_token_counts = client_stats->drop_token_counts;
client_stats->drop_token_counts = nullptr;
UniquePtr<DroppedCallCounts>* drop_token_counts) {
AtomicGetAndResetCounter(num_calls_started, &num_calls_started_);
AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_);
AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send,
&num_calls_finished_with_client_failed_to_send_);
AtomicGetAndResetCounter(num_calls_finished_known_received,
&num_calls_finished_known_received_);
*drop_token_counts = std::move(drop_token_counts_);
}
void grpc_grpclb_dropped_call_counts_destroy(
grpc_grpclb_dropped_call_counts* drop_entries) {
if (drop_entries != nullptr) {
for (size_t i = 0; i < drop_entries->num_entries; ++i) {
gpr_free(drop_entries->token_counts[i].token);
}
gpr_free(drop_entries->token_counts);
gpr_free(drop_entries);
}
}
} // namespace grpc_core

@ -21,47 +21,52 @@
#include <grpc/support/port_platform.h>
#include <stdbool.h>
#include <grpc/support/atm.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/ref_counted.h"
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
namespace grpc_core {
typedef struct {
char* token;
class GrpcLbClientStats : public RefCounted<GrpcLbClientStats> {
public:
struct DropTokenCount {
UniquePtr<char> token;
int64_t count;
} grpc_grpclb_drop_token_count;
typedef struct {
grpc_grpclb_drop_token_count* token_counts;
size_t num_entries;
} grpc_grpclb_dropped_call_counts;
DropTokenCount(UniquePtr<char> token, int64_t count)
: token(std::move(token)), count(count) {}
};
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts;
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
GrpcLbClientStats() {}
void AddCallStarted();
void AddCallFinished(bool finished_with_client_failed_to_send,
bool finished_known_received);
// This method is not thread-safe; caller must synchronize.
void grpc_grpclb_client_stats_add_call_dropped_locked(
char* token, grpc_grpclb_client_stats* client_stats);
void AddCallDroppedLocked(char* token);
// This method is not thread-safe; caller must synchronize.
void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received,
grpc_grpclb_dropped_call_counts** drop_token_counts);
UniquePtr<DroppedCallCounts>* drop_token_counts);
private:
// This field must only be accessed via *_locked() methods.
UniquePtr<DroppedCallCounts> drop_token_counts_;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started_ = 0;
gpr_atm num_calls_finished_ = 0;
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0;
gpr_atm num_calls_finished_known_received_ = 0;
};
void grpc_grpclb_dropped_call_counts_destroy(
grpc_grpclb_dropped_call_counts* drop_entries);
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/

@ -89,16 +89,16 @@ static bool encode_string(pb_ostream_t* stream, const pb_field_t* field,
static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
void* const* arg) {
grpc_grpclb_dropped_call_counts* drop_entries =
static_cast<grpc_grpclb_dropped_call_counts*>(*arg);
grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(*arg);
if (drop_entries == nullptr) return true;
for (size_t i = 0; i < drop_entries->num_entries; ++i) {
for (size_t i = 0; i < drop_entries->size(); ++i) {
if (!pb_encode_tag_for_field(stream, field)) return false;
grpc_lb_v1_ClientStatsPerToken drop_message;
drop_message.load_balance_token.funcs.encode = encode_string;
drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
drop_message.load_balance_token.arg = (*drop_entries)[i].token.get();
drop_message.has_num_calls = true;
drop_message.num_calls = drop_entries->token_counts[i].count;
drop_message.num_calls = (*drop_entries)[i].count;
if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
&drop_message)) {
return false;
@ -108,7 +108,7 @@ static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field,
}
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats* client_stats) {
grpc_core::GrpcLbClientStats* client_stats) {
grpc_grpclb_request* req = static_cast<grpc_grpclb_request*>(
gpr_zalloc(sizeof(grpc_grpclb_request)));
req->has_client_stats = true;
@ -120,13 +120,15 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
grpc_core::UniquePtr<grpc_core::GrpcLbClientStats::DroppedCallCounts>
drop_counts;
client_stats->GetLocked(
&req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
&req->client_stats.num_calls_finished_known_received,
reinterpret_cast<grpc_grpclb_dropped_call_counts**>(
&req->client_stats.calls_finished_with_drop.arg));
&req->client_stats.num_calls_finished_known_received, &drop_counts);
// Will be deleted in grpc_grpclb_request_destroy().
req->client_stats.calls_finished_with_drop.arg = drop_counts.release();
return req;
}
@ -149,10 +151,10 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request) {
void grpc_grpclb_request_destroy(grpc_grpclb_request* request) {
if (request->has_client_stats) {
grpc_grpclb_dropped_call_counts* drop_entries =
static_cast<grpc_grpclb_dropped_call_counts*>(
grpc_core::GrpcLbClientStats::DroppedCallCounts* drop_entries =
static_cast<grpc_core::GrpcLbClientStats::DroppedCallCounts*>(
request->client_stats.calls_finished_with_drop.arg);
grpc_grpclb_dropped_call_counts_destroy(drop_entries);
grpc_core::Delete(drop_entries);
}
gpr_free(request);
}

@ -42,7 +42,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name);
grpc_grpclb_request* grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats* client_stats);
grpc_core::GrpcLbClientStats* client_stats);
/** Protocol Buffers v3-encode \a request */
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request);

@ -55,6 +55,7 @@ inline T* New(Args&&... args) {
// Alternative to delete, since we cannot use it (for fear of libstdc++)
template <typename T>
inline void Delete(T* p) {
if (p == nullptr) return;
p->~T();
if (alignof(T) > kAlignmentForDefaultAllocationInBytes) {
gpr_free_aligned(p);

Loading…
Cancel
Save