use CallTracer API in client channel code (#26714)

* Add internal API to help trace retries and collect metrics

* Add headers

* Reviewer comments

* Revert changes for grpc_metadata_batch

* Regenerate projects

* support call tracer in client channel code

* Reviewer comments

* Reviewer comments

* Reviewer comments

* reviewer comments

* add RecordOnDoneSendInitialMetadata() hook

* Fix sanity

* make on_call_destruction_complete always invoked via LoadBalancedCall

* fix handling of status, stats, latency, and peer name

* fix build & clang-format

Co-authored-by: Yash Tibrewal <yashkt@google.com>
reviewable/pr26764/r1
Mark D. Roth 3 years ago committed by GitHub
parent b08b28bbfb
commit 011fdefe36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 238
      src/core/ext/filters/client_channel/client_channel.cc
  2. 63
      src/core/ext/filters/client_channel/client_channel.h
  3. 8
      src/core/ext/filters/client_channel/retry_filter.cc
  4. 21
      src/core/lib/channel/call_tracer.h
  5. 2
      src/core/lib/channel/channel_stack.h
  6. 3
      src/core/lib/channel/context.h
  7. 13
      src/cpp/ext/filters/census/open_census_call_tracer.h

@ -199,7 +199,7 @@ class ClientChannel::CallData {
grpc_polling_entity* pollent_ = nullptr;
grpc_closure pick_closure_;
grpc_closure resolution_done_closure_;
// Accessed while holding ClientChannel::resolution_mu_.
bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
@ -319,13 +319,11 @@ class DynamicTerminationFilter::CallData {
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
auto* calld = static_cast<CallData*>(elem->call_data);
RefCountedPtr<SubchannelCall> subchannel_call;
if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
subchannel_call = calld->lb_call_->subchannel_call();
}
RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call =
std::move(calld->lb_call_);
calld->~CallData();
if (GPR_LIKELY(subchannel_call != nullptr)) {
subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
if (GPR_LIKELY(lb_call != nullptr)) {
lb_call->set_on_call_destruction_complete(then_schedule_closure);
} else {
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
@ -343,16 +341,16 @@ class DynamicTerminationFilter::CallData {
auto* calld = static_cast<CallData*>(elem->call_data);
auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
ClientChannel* client_channel = chand->chand_;
grpc_call_element_args args = {
calld->owning_call_, nullptr,
calld->call_context_, calld->path_,
calld->call_start_time_, calld->deadline_,
calld->arena_, calld->call_combiner_};
grpc_call_element_args args = {calld->owning_call_, nullptr,
calld->call_context_, calld->path_,
/*start_time=*/0, calld->deadline_,
calld->arena_, calld->call_combiner_};
auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
calld->lb_call_ = client_channel->CreateLoadBalancedCall(
args, pollent, nullptr,
service_config_call_data->call_dispatch_controller());
service_config_call_data->call_dispatch_controller(),
/*is_transparent_retry=*/false);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
@ -363,7 +361,6 @@ class DynamicTerminationFilter::CallData {
private:
explicit CallData(const grpc_call_element_args& args)
: path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
@ -373,7 +370,6 @@ class DynamicTerminationFilter::CallData {
~CallData() { grpc_slice_unref_internal(path_); }
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
@ -1174,10 +1170,11 @@ RefCountedPtr<ClientChannel::LoadBalancedCall>
ClientChannel::CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller) {
return args.arena->New<LoadBalancedCall>(this, args, pollent,
on_call_destruction_complete,
call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) {
return args.arena->New<LoadBalancedCall>(
this, args, pollent, on_call_destruction_complete,
call_dispatch_controller, is_transparent_retry);
}
namespace {
@ -2316,8 +2313,9 @@ void ClientChannel::CallData::
void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem,
grpc_error_handle error) {
GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
// TODO(roth): Does this callback need to hold a ref to the call stack?
GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr);
ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error);
}
void ClientChannel::CallData::ResolutionDone(void* arg,
@ -2553,16 +2551,28 @@ class ClientChannel::LoadBalancedCall::LbCallState
// LoadBalancedCall
//
namespace {
CallTracer::CallAttemptTracer* GetCallAttemptTracer(
grpc_call_context_element* context, bool is_transparent_retry) {
auto* call_tracer =
static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) return nullptr;
return call_tracer->StartNewAttempt(is_transparent_retry);
}
} // namespace
ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller)
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "LoadBalancedCall"
: nullptr),
chand_(chand),
path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
@ -2570,7 +2580,9 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
call_context_(args.context),
pollent_(pollent),
on_call_destruction_complete_(on_call_destruction_complete),
call_dispatch_controller_(call_dispatch_controller) {}
call_dispatch_controller_(call_dispatch_controller),
call_attempt_tracer_(
GetCallAttemptTracer(args.context, is_transparent_retry)) {}
ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
grpc_slice_unref_internal(path_);
@ -2584,7 +2596,17 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i] == nullptr);
}
if (on_call_destruction_complete_ != nullptr) {
// Compute latency and report it to the tracer.
if (call_attempt_tracer_ != nullptr) {
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
call_attempt_tracer_->RecordEnd(latency);
}
// Arrange to invoke on_call_destruction_complete_ once we know that
// the call stack has been destroyed.
if (GPR_LIKELY(subchannel_call_ != nullptr)) {
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
} else {
ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
GRPC_ERROR_NONE);
}
@ -2705,9 +2727,65 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
// Intercept recv_trailing_metadata_ready for LB callback.
// Handle call tracing.
if (call_attempt_tracer_ != nullptr) {
// Record send ops in tracer.
if (batch->cancel_stream) {
call_attempt_tracer_->RecordCancel(
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
}
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata,
batch->payload->send_initial_metadata.send_initial_metadata_flags);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
*batch->payload->send_message.send_message);
}
if (batch->send_trailing_metadata) {
call_attempt_tracer_->RecordSendTrailingMetadata(
batch->payload->send_trailing_metadata.send_trailing_metadata);
}
// Intercept recv ops.
if (batch->recv_initial_metadata) {
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
recv_initial_metadata_flags_ =
batch->payload->recv_initial_metadata.recv_flags;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
this, nullptr);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_ready_;
}
if (batch->recv_message) {
recv_message_ = batch->payload->recv_message.recv_message;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr);
batch->payload->recv_message.recv_message_ready = &recv_message_ready_;
}
}
// Intercept recv_trailing_metadata even if there is no call tracer,
// since we may need to notify the LB policy about trailing metadata.
if (batch->recv_trailing_metadata) {
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
transport_stream_stats_ =
batch->payload->recv_trailing_metadata.collect_stats;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, nullptr);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
}
// If we've previously been cancelled, immediately fail any new batches.
if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
@ -2784,38 +2862,79 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
}
void ClientChannel::LoadBalancedCall::
RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg,
grpc_error_handle error) {
void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
// Set error if call did not succeed.
grpc_error_handle error_for_lb = GRPC_ERROR_NONE;
self->call_attempt_tracer_->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, *self->recv_initial_metadata_flags_);
Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
// Check if we have a tracer or an LB callback to invoke.
if (self->call_attempt_tracer_ != nullptr ||
self->lb_recv_trailing_metadata_ready_ != nullptr) {
// Get the call's status.
absl::Status status;
if (error != GRPC_ERROR_NONE) {
error_for_lb = error;
// Get status from error.
grpc_status_code code;
grpc_slice message = grpc_empty_slice();
grpc_error_get_status(error, self->deadline_, &code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
status = absl::Status(static_cast<absl::StatusCode>(code),
StringViewFromSlice(message));
} else {
// Get status from headers.
const auto& fields = self->recv_trailing_metadata_->idx.named;
GPR_ASSERT(fields.grpc_status != nullptr);
grpc_status_code status =
grpc_status_code code =
grpc_get_status_code_from_metadata(fields.grpc_status->md);
std::string msg;
if (status != GRPC_STATUS_OK) {
error_for_lb = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
GRPC_ERROR_INT_GRPC_STATUS, status);
if (code != GRPC_STATUS_OK) {
absl::string_view message;
if (fields.grpc_message != nullptr) {
error_for_lb = grpc_error_set_str(
error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
message = StringViewFromSlice(GRPC_MDVALUE(fields.grpc_message->md));
}
status = absl::Status(static_cast<absl::StatusCode>(code), message);
}
}
// Invoke callback to LB policy.
Metadata trailing_metadata(self, self->recv_trailing_metadata_);
LbCallState lb_call_state(self);
self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
&lb_call_state);
if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
// If we have a tracer, notify it.
if (self->call_attempt_tracer_ != nullptr) {
self->call_attempt_tracer_->RecordReceivedTrailingMetadata(
status, self->recv_trailing_metadata_,
*self->transport_stream_stats_);
}
// If the LB policy requested a callback for trailing metadata, invoke
// the callback.
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
grpc_error_handle error_for_lb = absl_status_to_grpc_error(status);
Metadata trailing_metadata(self, self->recv_trailing_metadata_);
LbCallState lb_call_state(self);
self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
&lb_call_state);
GRPC_ERROR_UNREF(error_for_lb);
}
}
// Chain to original callback.
if (self->failure_error_ != GRPC_ERROR_NONE) {
@ -2828,23 +2947,9 @@ void ClientChannel::LoadBalancedCall::
error);
}
void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch) {
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
}
void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
SubchannelCall::Args call_args = {
std::move(connected_subchannel_), pollent_, path_, call_start_time_,
std::move(connected_subchannel_), pollent_, path_, /*start_time=*/0,
deadline_, arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
@ -2856,10 +2961,6 @@ void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
"chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
this, subchannel_call_.get(), grpc_error_std_string(error).c_str());
}
if (on_call_destruction_complete_ != nullptr) {
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
on_call_destruction_complete_ = nullptr;
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
PendingBatchesFail(error, YieldCallCombiner);
} else {
@ -2940,6 +3041,7 @@ void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
}
void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) {
// TODO(roth): Does this callback need to hold a ref to LoadBalancedCall?
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
}

@ -39,6 +39,7 @@
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -135,7 +136,8 @@ class ClientChannel {
RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
private:
class CallData;
@ -370,25 +372,30 @@ class ClientChannel {
// ClientChannel::LoadBalancedCall
//
// This object is ref-counted, but it cannot inherit from RefCounted<>,
// because it is allocated on the arena and can't free its memory when
// its refcount goes to zero. So instead, it manually implements the
// same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
// TODO(roth): Consider whether this actually needs to be RefCounted<>.
// Ideally, it should be single-owner, or at least InternallyRefCounted<>.
class ClientChannel::LoadBalancedCall
: public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> {
public:
// If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed.
// If it is null, then the caller is responsible for checking whether
// the LB call has a subchannel call and ensuring that the
// on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed.
// If it is null, then the caller is responsible for calling
// set_on_call_destruction_complete() before the LoadBalancedCall is
// destroyed.
LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
~LoadBalancedCall() override;
// Callers must call this before unreffing if they did not set the
// closure via the ctor.
void set_on_call_destruction_complete(
grpc_closure* on_call_destruction_complete) {
on_call_destruction_complete_ = on_call_destruction_complete;
}
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Invoked by channel for queued LB picks when the picker is updated.
@ -402,10 +409,6 @@ class ClientChannel::LoadBalancedCall
// will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error);
RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_;
}
private:
class LbQueuedCallCanceller;
class Metadata;
@ -440,10 +443,10 @@ class ClientChannel::LoadBalancedCall
// Resumes all pending batches on subchannel_call_.
void PendingBatchesResume();
static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
void* arg, grpc_error_handle error);
void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch);
static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error);
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
static void RecvMessageReady(void* arg, grpc_error_handle error);
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
@ -461,7 +464,6 @@ class ClientChannel::LoadBalancedCall
// that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead.
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
@ -471,6 +473,10 @@ class ClientChannel::LoadBalancedCall
grpc_closure* on_call_destruction_complete_;
ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
// Set when we get a cancel_stream op.
grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
@ -495,8 +501,25 @@ class ClientChannel::LoadBalancedCall
RefCountedPtr<SubchannelCall> subchannel_call_;
// For intercepting recv_trailing_metadata_ready for the LB policy.
// For intercepting send_initial_metadata on_complete.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;
// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t* recv_initial_metadata_flags_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
// For intercepting recv_message_ready.
OrphanablePtr<ByteStream>* recv_message_ = nullptr;
grpc_closure recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
// For intercepting recv_trailing_metadata_ready.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_transport_stream_stats* transport_stream_stats_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;

@ -512,7 +512,6 @@ class RetryFilter::CallData {
BackOff retry_backoff_;
grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
@ -2044,7 +2043,6 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
.set_max_backoff(
retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())),
path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline),
arena_(args.arena),
owning_call_(args.call_stack),
@ -2178,14 +2176,16 @@ RefCountedPtr<ClientChannel::LoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller) {
grpc_call_element_args args = {owning_call_, nullptr, call_context_,
path_, call_start_time_, deadline_,
path_, /*start_time=*/0, deadline_,
arena_, call_combiner_};
return chand_->client_channel_->CreateLoadBalancedCall(
args, pollent_,
// This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
call_dispatch_controller);
call_dispatch_controller,
// TODO(roth): Change this when we support transparent retries.
/*is_transparent_retry=*/false);
}
void RetryFilter::CallData::CreateCallAttempt() {

@ -45,26 +45,27 @@ class CallTracer {
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0;
virtual void RecordOnDoneSendInitialMetadata() = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
// The `RecordReceivedxx()` methods should only be invoked when the
// metadata/message was successfully received, i.e., without any error.
// TODO(yashkt): We are using gpr_atm here instead of absl::string_view
// since that's what the transport API uses, and performing an atomic load
// is unnecessary if the census tracer does not need it at present. Fix this
// when the transport API changes.
virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t flags,
gpr_atm* peer_string) = 0;
grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0;
virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0;
virtual void RecordReceivedTrailingMetadata(
grpc_metadata_batch* recv_trailing_metadata) = 0;
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats& transport_stream_stats) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
// Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object.
virtual void RecordEnd(const grpc_call_final_info& final_info) = 0;
virtual void RecordEnd(const gpr_timespec& latency) = 0;
};
virtual ~CallTracer() {}

@ -78,7 +78,7 @@ struct grpc_call_element_args {
const void* server_transport_data;
grpc_call_context_element* context;
const grpc_slice& path;
gpr_cycle_counter start_time;
gpr_cycle_counter start_time; // Note: not populated in subchannel stack.
grpc_millis deadline;
grpc_core::Arena* arena;
grpc_core::CallCombiner* call_combiner;

@ -32,6 +32,9 @@ typedef enum {
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
/// Value is a CallTracer object.
GRPC_CONTEXT_CALL_TRACER,
/// Reserved for traffic_class_context.
GRPC_CONTEXT_TRAFFIC,

@ -33,20 +33,23 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
void RecordSendInitialMetadata(
grpc_metadata_batch* /* send_initial_metadata */,
uint32_t /* flags */) override {}
void RecordOnDoneSendInitialMetadata() override {}
void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /* send_trailing_metadata */) override {}
void RecordSendMessage(
const grpc_core::ByteStream& /* send_message */) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */,
gpr_atm* /* peer_string */) override {}
grpc_metadata_batch* /* recv_initial_metadata */,
uint32_t /* flags */) override {}
void RecordReceivedMessage(
const grpc_core::ByteStream& /* recv_message */) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /* recv_trailing_metadata */) override {}
absl::Status /* status */,
grpc_metadata_batch* /* recv_trailing_metadata */,
const grpc_transport_stream_stats& /* transport_stream_stats */)
override {}
void RecordCancel(grpc_error_handle /* cancel_error */) override {}
void RecordEnd(const grpc_call_final_info& /* final_info */) override {}
void RecordEnd(const gpr_timespec& /* latency */) override {}
CensusContext* context() { return &context_; }

Loading…
Cancel
Save