Merge remote-tracking branch 'yashkt/StatsApi' into stats_api

pull/26714/head
Mark D. Roth 4 years ago
commit 8b22fa5425
  1. 16
      src/core/ext/filters/client_channel/client_channel.cc
  2. 26
      src/core/lib/channel/call_tracer.h
  3. 16
      src/cpp/ext/filters/census/open_census_call_tracer.h

@ -2562,7 +2562,7 @@ CallTracer::CallAttemptTracer* GetCallAttemptTracer(
auto* call_tracer =
static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) return nullptr;
return call_tracer->RecordNewAttempt(is_transparent_retry);
return call_tracer->StartNewAttempt(is_transparent_retry);
}
} // namespace
@ -2722,8 +2722,13 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
// Record send ops in tracer.
// Handle call tracing.
if (call_attempt_tracer_ != nullptr) {
// Record send ops in tracer.
if (batch->cancel_stream) {
call_attempt_tracer_->RecordCancel(
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
}
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata,
@ -2731,7 +2736,7 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
batch->payload->send_message.send_message.get());
*batch->payload->send_message.send_message);
}
if (batch->send_trailing_metadata) {
call_attempt_tracer_->RecordSendTrailingMetadata(
@ -2859,8 +2864,7 @@ void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void ClientChannel::LoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedMessage(
self->recv_message_->get());
self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
GRPC_ERROR_REF(error));
}
@ -2869,7 +2873,7 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
// Notify the call tracer about trailing metadata.
if (self->call_attempt_tracer_ != nullptr) {
if (self->call_attempt_tracer_ != nullptr && error == GRPC_ERROR_NONE) {
self->call_attempt_tracer_->RecordReceivedTrailingMetadata(
self->recv_trailing_metadata_);
}

@ -43,20 +43,23 @@ class CallTracer {
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0;
virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0;
virtual void RecordOnDoneSendInitialMetadata() = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream* send_message) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
// The `RecordReceivedxx()` methods should only be invoked when the
// metadata/message was successfully received, i.e., without any error.
// TODO(yashkt): We are using gpr_atm here instead of absl::string_view
// since that's what the transport API uses, and performing an atomic load
// is unnecessary if the census tracer does not need it at present. Fix this
// when the transport API changes.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t flags,
gpr_atm* peer_string) = 0;
virtual void RecordReceivedMessage(const ByteStream* recv_message) = 0;
virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0;
virtual void RecordReceivedTrailingMetadata(
grpc_metadata_batch* recv_trailing_metadata) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
// Records annotations if supported by the attached tracer library, no-op
// otherwise.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.
virtual void RecordEnd(const grpc_call_final_info& final_info) = 0;
@ -66,11 +69,12 @@ class CallTracer {
// Records a new attempt for the associated call. \a transparent denotes
// whether the attempt is being made as a transparent retry or as a
// non-transparent retry/heding attempt.
virtual CallAttemptTracer* RecordNewAttempt(bool is_transparent_retry) = 0;
// Records annotations if supported by the attached tracer library, no-op
// otherwise.
virtual void RecordAnnotation(absl::string_view annotation) = 0;
// non-transparent retry/heding attempt. (There will be at least one attempt
// even if the call is not being retried.) The `CallTracer` object retains
// ownership to the newly created `CallAttemptTracer` object. RecordEnd()
// serves as an indication that the call stack is done with all API calls, and
// the tracer library is free to destroy it after that.
virtual CallAttemptTracer* StartNewAttempt(bool is_transparent_retry) = 0;
};
} // namespace grpc_core

@ -28,27 +28,22 @@ namespace grpc {
class OpenCensusCallTracer : public grpc_core::CallTracer {
public:
class OpenCensusCallAttemptTracer
: public grpc_core::CallTracer::CallAttemptTracer {
class OpenCensusCallAttemptTracer : public CallAttemptTracer {
public:
~OpenCensusCallAttemptTracer() override {}
void RecordSendInitialMetadata(
grpc_metadata_batch* /* send_initial_metadata */,
uint32_t /* flags */) override {}
void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {}
void RecordOnDoneSendInitialMetadata() override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /* send_trailing_metadata */) override {}
void RecordSendMessage(
const grpc_core::ByteStream* /* send_message */) override {}
void RecordSendMessage(const ByteStream& /* send_message */) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */,
gpr_atm* /* peer_string */) override {}
void RecordReceivedMessage(
const grpc_core::ByteStream* /* recv_message */) override {}
void RecordReceivedMessage(const ByteStream& /* recv_message */) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /* recv_trailing_metadata */) override {}
void RecordCancel(grpc_error_handle /* cancel_error */) override {}
void RecordAnnotation(absl::string_view /* annotation */) override {}
void RecordEnd(const grpc_call_final_info& /* final_info */) override {}
CensusContext* context() { return &context_; }
@ -57,11 +52,10 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
CensusContext context_;
};
OpenCensusCallAttemptTracer* RecordNewAttempt(
OpenCensusCallAttemptTracer* StartNewAttempt(
bool /* is_transparent_retry */) override {
return nullptr;
}
void RecordAnnotation(absl::string_view /* annotation */) override {}
CensusContext* context() { return &context_; }

Loading…
Cancel
Save