Open census filter stats api (#26739)

* Use new stats API in open census filter

* Fix time and latency calculation

* Fix parent census context

* Add tests

* Reviewer comments

* Reviewer comments

* Reviewer comments

* Reviewer comments

* Fix error unref

* Add a context object for the overall call

* Remove TODO

* Reviewer comments
pull/26848/head^2
Yash Tibrewal 4 years ago committed by GitHub
parent eb4b65ae5a
commit a3d264e8fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      src/core/ext/filters/client_channel/client_channel.cc
  2. 1
      src/core/ext/filters/client_channel/client_channel.h
  3. 267
      src/cpp/ext/filters/census/client_filter.cc
  4. 65
      src/cpp/ext/filters/census/client_filter.h
  5. 10
      src/cpp/ext/filters/census/context.cc
  6. 6
      src/cpp/ext/filters/census/context.h
  7. 67
      src/cpp/ext/filters/census/open_census_call_tracer.h
  8. 101
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc

@ -2750,8 +2750,6 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
if (batch->recv_initial_metadata) {
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
recv_initial_metadata_flags_ =
batch->payload->recv_initial_metadata.recv_flags;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
@ -2863,8 +2861,11 @@ void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete(
void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, *self->recv_initial_metadata_flags_);
if (error == GRPC_ERROR_NONE) {
// recv_initial_metadata_flags is not populated for clients
self->call_attempt_tracer_->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, 0 /* recv_initial_metadata_flags */);
}
Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(error));
}
@ -2872,7 +2873,9 @@ void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void ClientChannel::LoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
if (*self->recv_message_ != nullptr) {
self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
}
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
GRPC_ERROR_REF(error));
}

@ -507,7 +507,6 @@ class ClientChannel::LoadBalancedCall
// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t* recv_initial_metadata_flags_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;

@ -27,6 +27,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/context_util.h"
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include "src/core/lib/surface/call.h"
@ -35,8 +36,65 @@
namespace grpc {
constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
constexpr uint32_t CensusClientCallData::kMaxTagsLen;
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTraceContextLen;
constexpr uint32_t
OpenCensusCallTracer::OpenCensusCallAttemptTracer::kMaxTagsLen;
grpc_error_handle CensusClientCallData::Init(
grpc_call_element* /* elem */, const grpc_call_element_args* args) {
auto tracer = args->arena->New<OpenCensusCallTracer>(args);
GPR_DEBUG_ASSERT(args->context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
args->context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
args->context[GRPC_CONTEXT_CALL_TRACER].destroy = [](void* tracer) {
(static_cast<OpenCensusCallTracer*>(tracer))->~OpenCensusCallTracer();
};
return GRPC_ERROR_NONE;
}
//
// OpenCensusCallTracer::OpenCensusCallAttemptTracer
//
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
uint32_t /* flags */) {
GenerateClientContextFromParentWithTags(parent_->qualified_method_, &context_,
parent_->context_);
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
kMaxTraceContextLen);
if (tracing_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
send_initial_metadata, &tracing_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_TRACE_BIN,
grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
GRPC_BATCH_GRPC_TRACE_BIN));
}
grpc_slice tags = grpc_empty_slice();
// TODO(unknown): Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
send_initial_metadata, &stats_bin_,
grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
GRPC_BATCH_GRPC_TAGS_BIN));
}
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordSendMessage(
const grpc_core::ByteStream& /* send_message */) {
++sent_message_count_;
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordReceivedMessage(
const grpc_core::ByteStream& /* recv_message */) {
++recv_message_count_;
}
namespace {
@ -53,135 +111,114 @@ void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
} // namespace
void CensusClientCallData::OnDoneRecvTrailingMetadataCb(
void* user_data, grpc_error_handle error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusClientCallData* calld =
reinterpret_cast<CensusClientCallData*>(elem->call_data);
GPR_ASSERT(calld != nullptr);
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
FilterTrailingMetadata(calld->recv_trailing_metadata_,
&calld->elapsed_time_);
}
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->initial_on_done_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats& transport_stream_stats) {
FilterTrailingMetadata(recv_trailing_metadata, &elapsed_time_);
const uint64_t request_size = transport_stream_stats.outgoing.data_bytes;
const uint64_t response_size = transport_stream_stats.incoming.data_bytes;
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
status_code_ = status.code();
std::string final_status = absl::StatusCodeToString(status_code_);
tags.emplace_back(ClientStatusTagKey(), final_status);
::opencensus::stats::Record(
{{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
{RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
{RpcClientServerLatency(),
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}},
tags);
}
void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
grpc_error_handle error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusClientCallData* calld =
reinterpret_cast<CensusClientCallData*>(elem->call_data);
CensusChannelData* channeld =
reinterpret_cast<CensusChannelData*>(elem->channel_data);
GPR_ASSERT(calld != nullptr);
GPR_ASSERT(channeld != nullptr);
// Stream messages are no longer valid after receiving trailing metadata.
if ((*calld->recv_message_) != nullptr) {
calld->recv_message_count_++;
}
grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
GRPC_ERROR_REF(error));
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordCancel(
grpc_error_handle cancel_error) {
status_code_ = absl::StatusCode::kCancelled;
GRPC_ERROR_UNREF(cancel_error);
}
void CensusClientCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
if (op->send_initial_metadata() != nullptr) {
census_context* ctxt = op->get_census_context();
GenerateClientContext(
qualified_method_, tracer_.context(),
(ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
size_t tracing_len = TraceContextSerialize(
tracer_.context()->Context(), tracing_buf_, kMaxTraceContextLen);
if (tracing_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_initial_metadata()->batch(), &tracing_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_TRACE_BIN,
grpc_core::UnmanagedMemorySlice(tracing_buf_, tracing_len)),
GRPC_BATCH_GRPC_TRACE_BIN));
}
grpc_slice tags = grpc_empty_slice();
// TODO(unknown): Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_initial_metadata()->batch(), &stats_bin_,
grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags),
GRPC_BATCH_GRPC_TAGS_BIN));
}
}
if (op->send_message() != nullptr) {
++sent_message_count_;
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::RecordEnd(
const gpr_timespec& /* latency */) {
double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(parent_->method_));
tags.emplace_back(ClientStatusTagKey(), StatusCodeToString(status_code_));
::opencensus::stats::Record(
{{RpcClientRoundtripLatency(), latency_ms},
{RpcClientSentMessagesPerRpc(), sent_message_count_},
{RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
tags);
if (status_code_ != absl::StatusCode::kOk) {
context_.Span().SetStatus(opencensus::trace::StatusCode(status_code_),
StatusCodeToString(status_code_));
}
if (op->recv_message() != nullptr) {
recv_message_ = op->op()->payload->recv_message.recv_message;
initial_on_done_recv_message_ =
op->op()->payload->recv_message.recv_message_ready;
op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
context_.EndSpan();
grpc_core::MutexLock lock(&parent_->mu_);
if (--parent_->num_active_rpcs_ == 0) {
parent_->time_at_last_attempt_end_ = absl::Now();
}
if (op->recv_trailing_metadata() != nullptr) {
recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
initial_on_done_recv_trailing_metadata_ =
op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
op->op()->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&on_done_recv_trailing_metadata_;
if (arena_allocated_) {
this->~OpenCensusCallAttemptTracer();
} else {
delete this;
}
// Call next op.
grpc_call_next_op(elem, op->op());
}
grpc_error_handle CensusClientCallData::Init(
grpc_call_element* elem, const grpc_call_element_args* args) {
path_ = grpc_slice_ref_internal(args->path);
start_time_ = absl::Now();
method_ = GetMethod(&path_);
qualified_method_ = absl::StrCat("Sent.", method_);
GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
OnDoneRecvTrailingMetadataCb, elem,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
//
// OpenCensusCallTracer
//
OpenCensusCallTracer::OpenCensusCallTracer(const grpc_call_element_args* args)
: path_(grpc_slice_ref_internal(args->path)),
method_(GetMethod(&path_)),
qualified_method_(absl::StrCat("Sent.", method_)),
arena_(args->arena) {
auto* parent_context = reinterpret_cast<CensusContext*>(
args->context[GRPC_CONTEXT_TRACING].value);
GenerateClientContext(qualified_method_, &context_,
(parent_context == nullptr) ? nullptr : parent_context);
}
void CensusClientCallData::Destroy(grpc_call_element* /*elem*/,
const grpc_call_final_info* final_info,
grpc_closure* /*then_call_closure*/) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
OpenCensusCallTracer::~OpenCensusCallTracer() {
std::vector<std::pair<opencensus::tags::TagKey, std::string>> tags =
tracer_.context()->tags().tags();
std::string method = absl::StrCat(method_);
tags.emplace_back(ClientMethodTagKey(), method);
std::string final_status =
absl::StrCat(StatusCodeToString(final_info->final_status));
tags.emplace_back(ClientStatusTagKey(), final_status);
context_.tags().tags();
tags.emplace_back(ClientMethodTagKey(), std::string(method_));
::opencensus::stats::Record(
{{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
{RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
{RpcClientRoundtripLatency(), latency_ms},
{RpcClientServerLatency(),
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
{RpcClientSentMessagesPerRpc(), sent_message_count_},
{RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
{{RpcClientRetriesPerCall(), retries_ - 1}, // exclude first attempt
{RpcClientTransparentRetriesPerCall(), transparent_retries_},
{RpcClientRetryDelayPerCall(), ToDoubleMilliseconds(retry_delay_)}},
tags);
grpc_slice_unref_internal(path_);
if (final_info->final_status != GRPC_STATUS_OK) {
// TODO(unknown): Map grpc_status_code to trace::StatusCode.
tracer_.context()->Span().SetStatus(
opencensus::trace::StatusCode::UNKNOWN,
StatusCodeToString(final_info->final_status));
}
OpenCensusCallTracer::OpenCensusCallAttemptTracer*
OpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
// We allocate the first attempt on the arena and all subsequent attempts on
// the heap, so that in the common case we don't require a heap allocation,
// nor do we unnecessarily grow the arena.
bool is_first_attempt = true;
{
grpc_core::MutexLock lock(&mu_);
if (transparent_retries_ != 0 || retries_ != 0) {
is_first_attempt = false;
if (num_active_rpcs_ == 0) {
retry_delay_ += absl::Now() - time_at_last_attempt_end_;
}
}
if (is_transparent_retry) {
++transparent_retries_;
} else {
++retries_;
}
++num_active_rpcs_;
}
if (is_first_attempt) {
return arena_->New<OpenCensusCallAttemptTracer>(this,
true /* arena_allocated */);
}
tracer_.context()->EndSpan();
return new OpenCensusCallAttemptTracer(this, false /* arena_allocated */);
}
} // namespace grpc

@ -34,71 +34,8 @@ namespace grpc {
// a call at a time.
class CensusClientCallData : public CallData {
public:
// Maximum size of trace context is sent on the wire.
static constexpr uint32_t kMaxTraceContextLen = 64;
// Maximum size of tags that are sent on the wire.
static constexpr uint32_t kMaxTagsLen = 2048;
CensusClientCallData()
: recv_trailing_metadata_(nullptr),
initial_on_done_recv_trailing_metadata_(nullptr),
initial_on_done_recv_message_(nullptr),
elapsed_time_(0),
recv_message_(nullptr),
recv_message_count_(0),
sent_message_count_(0) {
memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&path_, 0, sizeof(grpc_slice));
memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure));
memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
}
grpc_error_handle Init(grpc_call_element* elem,
grpc_error_handle Init(grpc_call_element* /* elem */,
const grpc_call_element_args* args) override;
void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
static void OnDoneRecvTrailingMetadataCb(void* user_data,
grpc_error_handle error);
static void OnDoneSendInitialMetadataCb(void* user_data,
grpc_error_handle error);
static void OnDoneRecvMessageCb(void* user_data, grpc_error_handle error);
private:
OpenCensusCallTracer tracer_;
// Metadata elements for tracing and census stats data.
grpc_linked_mdelem stats_bin_;
grpc_linked_mdelem tracing_bin_;
// Client method.
absl::string_view method_;
std::string qualified_method_;
grpc_slice path_;
// The recv trailing metadata callbacks.
grpc_metadata_batch* recv_trailing_metadata_;
grpc_closure* initial_on_done_recv_trailing_metadata_;
grpc_closure on_done_recv_trailing_metadata_;
// recv message
grpc_closure* initial_on_done_recv_message_;
grpc_closure on_done_recv_message_;
// Start time (for measuring latency).
absl::Time start_time_;
// Server elapsed time in nanoseconds.
uint64_t elapsed_time_;
// The received message--may be null.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_;
// Number of messages in this RPC.
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference when adding trace context
// metatdata to outgoing message.
char tracing_buf_[kMaxTraceContextLen];
};
} // namespace grpc

@ -67,6 +67,16 @@ void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
new (ctxt) CensusContext(method, tags);
}
void GenerateClientContextFromParentWithTags(absl::string_view method,
CensusContext* ctxt,
const CensusContext& parent_ctxt) {
// Destruct the current CensusContext to free the Span memory before
// overwriting it below.
ctxt->~CensusContext();
GPR_DEBUG_ASSERT(parent_ctxt.Context().IsValid());
new (ctxt) CensusContext(method, &parent_ctxt.Span(), parent_ctxt.tags());
}
size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,
char* tracing_buf, size_t tracing_buf_size) {
if (tracing_buf_size <

@ -106,6 +106,12 @@ void GenerateServerContext(absl::string_view tracing, absl::string_view method,
void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
CensusContext* parent_ctx);
// Creates a new client context with \a parent_ctxt as the parent and propagates
// the tags as well.
void GenerateClientContextFromParentWithTags(absl::string_view method,
CensusContext* ctxt,
const CensusContext& parent_ctxt);
// Returns the incoming data size from the grpc call final info.
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info);

@ -30,42 +30,83 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
public:
class OpenCensusCallAttemptTracer : public CallAttemptTracer {
public:
explicit OpenCensusCallAttemptTracer(OpenCensusCallTracer* parent,
bool arena_allocated)
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
}
void RecordSendInitialMetadata(
grpc_metadata_batch* /* send_initial_metadata */,
uint32_t /* flags */) override {}
uint32_t /* flags */) override;
void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /* send_trailing_metadata */) override {}
void RecordSendMessage(
const grpc_core::ByteStream& /* send_message */) override {}
const grpc_core::ByteStream& /* send_message */) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /* recv_initial_metadata */,
uint32_t /* flags */) override {}
void RecordReceivedMessage(
const grpc_core::ByteStream& /* recv_message */) override {}
const grpc_core::ByteStream& /* recv_message */) override;
void RecordReceivedTrailingMetadata(
absl::Status /* status */,
grpc_metadata_batch* /* recv_trailing_metadata */,
absl::Status /* status */, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats& /* transport_stream_stats */)
override {}
void RecordCancel(grpc_error_handle /* cancel_error */) override {}
void RecordEnd(const gpr_timespec& /* latency */) override {}
override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /* latency */) override;
CensusContext* context() { return &context_; }
private:
// Maximum size of trace context is sent on the wire.
static constexpr uint32_t kMaxTraceContextLen = 64;
// Maximum size of tags that are sent on the wire.
static constexpr uint32_t kMaxTagsLen = 2048;
OpenCensusCallTracer* parent_;
const bool arena_allocated_;
CensusContext context_;
// Metadata elements for tracing and census stats data.
grpc_linked_mdelem stats_bin_;
grpc_linked_mdelem tracing_bin_;
// Start time (for measuring latency).
absl::Time start_time_;
// Server elapsed time in nanoseconds.
uint64_t elapsed_time_ = 0;
// Number of messages in this RPC.
uint64_t recv_message_count_ = 0;
uint64_t sent_message_count_ = 0;
// End status code
absl::StatusCode status_code_;
// Buffer needed for grpc_slice to reference when adding trace context
// metatdata to outgoing message.
char tracing_buf_[kMaxTraceContextLen];
};
OpenCensusCallAttemptTracer* StartNewAttempt(
bool /* is_transparent_retry */) override {
return nullptr;
}
explicit OpenCensusCallTracer(const grpc_call_element_args* args);
~OpenCensusCallTracer() override;
CensusContext* context() { return &context_; }
OpenCensusCallAttemptTracer* StartNewAttempt(
bool is_transparent_retry) override;
private:
// Client method.
grpc_slice path_;
absl::string_view method_;
std::string qualified_method_;
CensusContext context_;
grpc_core::Arena* arena_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Transparent retries per call
uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Retry delay
absl::Duration retry_delay_ ABSL_GUARDED_BY(&mu_);
absl::Time time_at_last_attempt_end_ ABSL_GUARDED_BY(&mu_);
uint32_t num_active_rpcs_ ABSL_GUARDED_BY(&mu_) = 0;
};
}; // namespace grpc

@ -88,6 +88,10 @@ class StatsPluginEnd2EndTest : public ::testing::Test {
server_address_, ::grpc::InsecureChannelCredentials()));
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
}
void TearDown() override {
server_->Shutdown();
server_thread_.join();
@ -414,6 +418,103 @@ TEST_F(StatsPluginEnd2EndTest, RequestReceivedMessagesPerRpc) {
}
}
TEST_F(StatsPluginEnd2EndTest, TestRetryStatsWithoutAdditionalRetries) {
View client_retries_cumulative_view(ClientRetriesCumulative());
View client_transparent_retries_cumulative_view(
ClientTransparentRetriesCumulative());
View client_retry_delay_per_call_view(ClientRetryDelayPerCallCumulative());
EchoRequest request;
request.set_message("foo");
EchoResponse response;
const int count = 5;
for (int i = 0; i < count; ++i) {
{
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(
client_retries_cumulative_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_), ::testing::Eq(0))));
EXPECT_THAT(
client_transparent_retries_cumulative_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_), ::testing::Eq(0))));
EXPECT_THAT(
client_retry_delay_per_call_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::Property(&Distribution::mean, ::testing::Eq(0)))));
}
}
TEST_F(StatsPluginEnd2EndTest, TestRetryStatsWithAdditionalRetries) {
View client_retries_cumulative_view(ClientRetriesCumulative());
View client_transparent_retries_cumulative_view(
ClientTransparentRetriesCumulative());
View client_retry_delay_per_call_view(ClientRetryDelayPerCallCumulative());
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_RETRIES, 1);
args.SetString(GRPC_ARG_SERVICE_CONFIG,
"{\n"
" \"methodConfig\": [ {\n"
" \"name\": [\n"
" { \"service\": \"grpc.testing.EchoTestService\" }\n"
" ],\n"
" \"retryPolicy\": {\n"
" \"maxAttempts\": 3,\n"
" \"initialBackoff\": \"0.1s\",\n"
" \"maxBackoff\": \"120s\",\n"
" \"backoffMultiplier\": 1,\n"
" \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
" }\n"
" } ]\n"
"}");
auto channel =
CreateCustomChannel(server_address_, InsecureChannelCredentials(), args);
ResetStub(channel);
EchoRequest request;
request.mutable_param()->mutable_expected_error()->set_code(
StatusCode::ABORTED);
request.set_message("foo");
EchoResponse response;
const int count = 5;
for (int i = 0; i < count; ++i) {
{
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
EXPECT_EQ(status.error_code(), StatusCode::ABORTED);
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(client_retries_cumulative_view.GetData().int_data(),
::testing::UnorderedElementsAre(
::testing::Pair(::testing::ElementsAre(client_method_name_),
::testing::Eq((i + 1) * 2))));
EXPECT_THAT(
client_transparent_retries_cumulative_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_), ::testing::Eq(0))));
auto data = client_retry_delay_per_call_view.GetData().distribution_data();
for (const auto& entry : data) {
gpr_log(GPR_ERROR, "Mean Retry Delay %s: %lf ms", entry.first[0].c_str(),
entry.second.mean());
}
// We expect the retry delay to be around 100ms.
EXPECT_THAT(
client_retry_delay_per_call_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::Property(
&Distribution::mean,
::testing::AllOf(::testing::Ge(50), ::testing::Le(300))))));
}
}
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save