Revert "use CallTracer API in client channel code (#26714)" (#26772)

This reverts commit 011fdefe36.
pull/26786/head
Mark D. Roth 3 years ago committed by GitHub
parent a989e0bfb5
commit 98f57af91c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 226
      src/core/ext/filters/client_channel/client_channel.cc
  2. 63
      src/core/ext/filters/client_channel/client_channel.h
  3. 8
      src/core/ext/filters/client_channel/retry_filter.cc
  4. 21
      src/core/lib/channel/call_tracer.h
  5. 2
      src/core/lib/channel/channel_stack.h
  6. 3
      src/core/lib/channel/context.h
  7. 13
      src/cpp/ext/filters/census/open_census_call_tracer.h

@ -199,7 +199,7 @@ class ClientChannel::CallData {
grpc_polling_entity* pollent_ = nullptr; grpc_polling_entity* pollent_ = nullptr;
grpc_closure resolution_done_closure_; grpc_closure pick_closure_;
// Accessed while holding ClientChannel::resolution_mu_. // Accessed while holding ClientChannel::resolution_mu_.
bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
@ -319,11 +319,13 @@ class DynamicTerminationFilter::CallData {
const grpc_call_final_info* /*final_info*/, const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) { grpc_closure* then_schedule_closure) {
auto* calld = static_cast<CallData*>(elem->call_data); auto* calld = static_cast<CallData*>(elem->call_data);
RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call = RefCountedPtr<SubchannelCall> subchannel_call;
std::move(calld->lb_call_); if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
subchannel_call = calld->lb_call_->subchannel_call();
}
calld->~CallData(); calld->~CallData();
if (GPR_LIKELY(lb_call != nullptr)) { if (GPR_LIKELY(subchannel_call != nullptr)) {
lb_call->set_on_call_destruction_complete(then_schedule_closure); subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
} else { } else {
// TODO(yashkt) : This can potentially be a Closure::Run // TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE); ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
@ -341,16 +343,16 @@ class DynamicTerminationFilter::CallData {
auto* calld = static_cast<CallData*>(elem->call_data); auto* calld = static_cast<CallData*>(elem->call_data);
auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data); auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
ClientChannel* client_channel = chand->chand_; ClientChannel* client_channel = chand->chand_;
grpc_call_element_args args = {calld->owning_call_, nullptr, grpc_call_element_args args = {
calld->owning_call_, nullptr,
calld->call_context_, calld->path_, calld->call_context_, calld->path_,
/*start_time=*/0, calld->deadline_, calld->call_start_time_, calld->deadline_,
calld->arena_, calld->call_combiner_}; calld->arena_, calld->call_combiner_};
auto* service_config_call_data = static_cast<ServiceConfigCallData*>( auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
calld->lb_call_ = client_channel->CreateLoadBalancedCall( calld->lb_call_ = client_channel->CreateLoadBalancedCall(
args, pollent, nullptr, args, pollent, nullptr,
service_config_call_data->call_dispatch_controller(), service_config_call_data->call_dispatch_controller());
/*is_transparent_retry=*/false);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand, "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
@ -361,6 +363,7 @@ class DynamicTerminationFilter::CallData {
private: private:
explicit CallData(const grpc_call_element_args& args) explicit CallData(const grpc_call_element_args& args)
: path_(grpc_slice_ref_internal(args.path)), : path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline), deadline_(args.deadline),
arena_(args.arena), arena_(args.arena),
owning_call_(args.call_stack), owning_call_(args.call_stack),
@ -370,6 +373,7 @@ class DynamicTerminationFilter::CallData {
~CallData() { grpc_slice_unref_internal(path_); } ~CallData() { grpc_slice_unref_internal(path_); }
grpc_slice path_; // Request path. grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_; grpc_millis deadline_;
Arena* arena_; Arena* arena_;
grpc_call_stack* owning_call_; grpc_call_stack* owning_call_;
@ -1165,11 +1169,10 @@ RefCountedPtr<ClientChannel::LoadBalancedCall>
ClientChannel::CreateLoadBalancedCall( ClientChannel::CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent, const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller) {
bool is_transparent_retry) { return args.arena->New<LoadBalancedCall>(this, args, pollent,
return args.arena->New<LoadBalancedCall>( on_call_destruction_complete,
this, args, pollent, on_call_destruction_complete, call_dispatch_controller);
call_dispatch_controller, is_transparent_retry);
} }
namespace { namespace {
@ -2310,9 +2313,8 @@ void ClientChannel::CallData::
void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem, void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem,
grpc_error_handle error) { grpc_error_handle error) {
// TODO(roth): Does this callback need to hold a ref to the call stack? GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
GRPC_CLOSURE_INIT(&resolution_done_closure_, ResolutionDone, elem, nullptr); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, &resolution_done_closure_, error);
} }
void ClientChannel::CallData::ResolutionDone(void* arg, void ClientChannel::CallData::ResolutionDone(void* arg,
@ -2548,28 +2550,16 @@ class ClientChannel::LoadBalancedCall::LbCallState
// LoadBalancedCall // LoadBalancedCall
// //
namespace {
CallTracer::CallAttemptTracer* GetCallAttemptTracer(
grpc_call_context_element* context, bool is_transparent_retry) {
auto* call_tracer =
static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) return nullptr;
return call_tracer->StartNewAttempt(is_transparent_retry);
}
} // namespace
ClientChannel::LoadBalancedCall::LoadBalancedCall( ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args, ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller)
bool is_transparent_retry)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "LoadBalancedCall" ? "LoadBalancedCall"
: nullptr), : nullptr),
chand_(chand), chand_(chand),
path_(grpc_slice_ref_internal(args.path)), path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline), deadline_(args.deadline),
arena_(args.arena), arena_(args.arena),
owning_call_(args.call_stack), owning_call_(args.call_stack),
@ -2577,9 +2567,7 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
call_context_(args.context), call_context_(args.context),
pollent_(pollent), pollent_(pollent),
on_call_destruction_complete_(on_call_destruction_complete), on_call_destruction_complete_(on_call_destruction_complete),
call_dispatch_controller_(call_dispatch_controller), call_dispatch_controller_(call_dispatch_controller) {}
call_attempt_tracer_(
GetCallAttemptTracer(args.context, is_transparent_retry)) {}
ClientChannel::LoadBalancedCall::~LoadBalancedCall() { ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
grpc_slice_unref_internal(path_); grpc_slice_unref_internal(path_);
@ -2593,17 +2581,7 @@ ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) { for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
GPR_ASSERT(pending_batches_[i] == nullptr); GPR_ASSERT(pending_batches_[i] == nullptr);
} }
// Compute latency and report it to the tracer. if (on_call_destruction_complete_ != nullptr) {
if (call_attempt_tracer_ != nullptr) {
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
call_attempt_tracer_->RecordEnd(latency);
}
// Arrange to invoke on_call_destruction_complete_ once we know that
// the call stack has been destroyed.
if (GPR_LIKELY(subchannel_call_ != nullptr)) {
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
} else {
ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_, ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
@ -2724,65 +2702,9 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch( void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) { grpc_transport_stream_op_batch* batch) {
// Handle call tracing. // Intercept recv_trailing_metadata_ready for LB callback.
if (call_attempt_tracer_ != nullptr) {
// Record send ops in tracer.
if (batch->cancel_stream) {
call_attempt_tracer_->RecordCancel(
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
}
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata,
batch->payload->send_initial_metadata.send_initial_metadata_flags);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
SendInitialMetadataOnComplete, this, nullptr);
batch->on_complete = &send_initial_metadata_on_complete_;
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
*batch->payload->send_message.send_message);
}
if (batch->send_trailing_metadata) {
call_attempt_tracer_->RecordSendTrailingMetadata(
batch->payload->send_trailing_metadata.send_trailing_metadata);
}
// Intercept recv ops.
if (batch->recv_initial_metadata) {
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
recv_initial_metadata_flags_ =
batch->payload->recv_initial_metadata.recv_flags;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
this, nullptr);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_ready_;
}
if (batch->recv_message) {
recv_message_ = batch->payload->recv_message.recv_message;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr);
batch->payload->recv_message.recv_message_ready = &recv_message_ready_;
}
}
// Intercept recv_trailing_metadata even if there is no call tracer,
// since we may need to notify the LB policy about trailing metadata.
if (batch->recv_trailing_metadata) { if (batch->recv_trailing_metadata) {
recv_trailing_metadata_ = InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
transport_stream_stats_ =
batch->payload->recv_trailing_metadata.collect_stats;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, nullptr);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
} }
// If we've previously been cancelled, immediately fail any new batches. // If we've previously been cancelled, immediately fail any new batches.
if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) { if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
@ -2859,79 +2781,38 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
} }
} }
void ClientChannel::LoadBalancedCall::SendInitialMetadataOnComplete( void ClientChannel::LoadBalancedCall::
void* arg, grpc_error_handle error) { RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg,
auto* self = static_cast<LoadBalancedCall*>(arg); grpc_error_handle error) {
self->call_attempt_tracer_->RecordOnDoneSendInitialMetadata(
self->peer_string_);
Closure::Run(DEBUG_LOCATION,
self->original_send_initial_metadata_on_complete_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, *self->recv_initial_metadata_flags_);
Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedMessage(**self->recv_message_);
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg); auto* self = static_cast<LoadBalancedCall*>(arg);
// Check if we have a tracer or an LB callback to invoke. if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
if (self->call_attempt_tracer_ != nullptr || // Set error if call did not succeed.
self->lb_recv_trailing_metadata_ready_ != nullptr) { grpc_error_handle error_for_lb = GRPC_ERROR_NONE;
// Get the call's status.
absl::Status status;
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
// Get status from error. error_for_lb = error;
grpc_status_code code;
grpc_slice message = grpc_empty_slice();
grpc_error_get_status(error, self->deadline_, &code, &message,
/*http_error=*/nullptr, /*error_string=*/nullptr);
status = absl::Status(static_cast<absl::StatusCode>(code),
StringViewFromSlice(message));
} else { } else {
// Get status from headers.
const auto& fields = self->recv_trailing_metadata_->idx.named; const auto& fields = self->recv_trailing_metadata_->idx.named;
GPR_ASSERT(fields.grpc_status != nullptr); GPR_ASSERT(fields.grpc_status != nullptr);
grpc_status_code code = grpc_status_code status =
grpc_get_status_code_from_metadata(fields.grpc_status->md); grpc_get_status_code_from_metadata(fields.grpc_status->md);
if (code != GRPC_STATUS_OK) { std::string msg;
absl::string_view message; if (status != GRPC_STATUS_OK) {
error_for_lb = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
GRPC_ERROR_INT_GRPC_STATUS, status);
if (fields.grpc_message != nullptr) { if (fields.grpc_message != nullptr) {
message = StringViewFromSlice(GRPC_MDVALUE(fields.grpc_message->md)); error_for_lb = grpc_error_set_str(
} error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
status = absl::Status(static_cast<absl::StatusCode>(code), message); grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
} }
} }
// If we have a tracer, notify it.
if (self->call_attempt_tracer_ != nullptr) {
self->call_attempt_tracer_->RecordReceivedTrailingMetadata(
status, self->recv_trailing_metadata_,
*self->transport_stream_stats_);
} }
// If the LB policy requested a callback for trailing metadata, invoke // Invoke callback to LB policy.
// the callback.
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
grpc_error_handle error_for_lb = absl_status_to_grpc_error(status);
Metadata trailing_metadata(self, self->recv_trailing_metadata_); Metadata trailing_metadata(self, self->recv_trailing_metadata_);
LbCallState lb_call_state(self); LbCallState lb_call_state(self);
self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata, self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
&lb_call_state); &lb_call_state);
GRPC_ERROR_UNREF(error_for_lb); if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
}
} }
// Chain to original callback. // Chain to original callback.
if (self->failure_error_ != GRPC_ERROR_NONE) { if (self->failure_error_ != GRPC_ERROR_NONE) {
@ -2944,9 +2825,23 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
error); error);
} }
void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch) {
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
}
void ClientChannel::LoadBalancedCall::CreateSubchannelCall() { void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
SubchannelCall::Args call_args = { SubchannelCall::Args call_args = {
std::move(connected_subchannel_), pollent_, path_, /*start_time=*/0, std::move(connected_subchannel_), pollent_, path_, call_start_time_,
deadline_, arena_, deadline_, arena_,
// TODO(roth): When we implement hedging support, we will probably // TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call. // need to use a separate call context for each subchannel call.
@ -2958,6 +2853,10 @@ void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
"chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_, "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
this, subchannel_call_.get(), grpc_error_std_string(error).c_str()); this, subchannel_call_.get(), grpc_error_std_string(error).c_str());
} }
if (on_call_destruction_complete_ != nullptr) {
subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
on_call_destruction_complete_ = nullptr;
}
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
PendingBatchesFail(error, YieldCallCombiner); PendingBatchesFail(error, YieldCallCombiner);
} else { } else {
@ -3038,7 +2937,6 @@ void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
} }
void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) { void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) {
// TODO(roth): Does this callback need to hold a ref to LoadBalancedCall?
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
} }

@ -39,7 +39,6 @@
#include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
@ -136,8 +135,7 @@ class ClientChannel {
RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall( RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent, const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller);
bool is_transparent_retry);
private: private:
class CallData; class CallData;
@ -371,30 +369,25 @@ class ClientChannel {
// ClientChannel::LoadBalancedCall // ClientChannel::LoadBalancedCall
// //
// TODO(roth): Consider whether this actually needs to be RefCounted<>. // This object is ref-counted, but it cannot inherit from RefCounted<>,
// Ideally, it should be single-owner, or at least InternallyRefCounted<>. // because it is allocated on the arena and can't free its memory when
// its refcount goes to zero. So instead, it manually implements the
// same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
class ClientChannel::LoadBalancedCall class ClientChannel::LoadBalancedCall
: public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> { : public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> {
public: public:
// If on_call_destruction_complete is non-null, then it will be // If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed. // invoked once the LoadBalancedCall is completely destroyed.
// If it is null, then the caller is responsible for calling // If it is null, then the caller is responsible for checking whether
// set_on_call_destruction_complete() before the LoadBalancedCall is // the LB call has a subchannel call and ensuring that the
// destroyed. // on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed.
LoadBalancedCall( LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args, ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller);
bool is_transparent_retry);
~LoadBalancedCall() override; ~LoadBalancedCall() override;
// Callers must call this before unreffing if they did not set the
// closure via the ctor.
void set_on_call_destruction_complete(
grpc_closure* on_call_destruction_complete) {
on_call_destruction_complete_ = on_call_destruction_complete;
}
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Invoked by channel for queued LB picks when the picker is updated. // Invoked by channel for queued LB picks when the picker is updated.
@ -408,6 +401,10 @@ class ClientChannel::LoadBalancedCall
// will not run until after this method returns. // will not run until after this method returns.
void AsyncPickDone(grpc_error_handle error); void AsyncPickDone(grpc_error_handle error);
RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_;
}
private: private:
class LbQueuedCallCanceller; class LbQueuedCallCanceller;
class Metadata; class Metadata;
@ -442,10 +439,10 @@ class ClientChannel::LoadBalancedCall
// Resumes all pending batches on subchannel_call_. // Resumes all pending batches on subchannel_call_.
void PendingBatchesResume(); void PendingBatchesResume();
static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error); static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); void* arg, grpc_error_handle error);
static void RecvMessageReady(void* arg, grpc_error_handle error); void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); grpc_transport_stream_op_batch* batch);
void CreateSubchannelCall(); void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure. // Invoked when a pick is completed, on both success or failure.
@ -463,6 +460,7 @@ class ClientChannel::LoadBalancedCall
// that uses any one of them, we should store them in the call // that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead. // context. This will save per-call memory overhead.
grpc_slice path_; // Request path. grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_; grpc_millis deadline_;
Arena* arena_; Arena* arena_;
grpc_call_stack* owning_call_; grpc_call_stack* owning_call_;
@ -472,10 +470,6 @@ class ClientChannel::LoadBalancedCall
grpc_closure* on_call_destruction_complete_; grpc_closure* on_call_destruction_complete_;
ConfigSelector::CallDispatchController* call_dispatch_controller_; ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
// Set when we get a cancel_stream op. // Set when we get a cancel_stream op.
grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
@ -500,25 +494,8 @@ class ClientChannel::LoadBalancedCall
RefCountedPtr<SubchannelCall> subchannel_call_; RefCountedPtr<SubchannelCall> subchannel_call_;
// For intercepting send_initial_metadata on_complete. // For intercepting recv_trailing_metadata_ready for the LB policy.
gpr_atm* peer_string_ = nullptr;
grpc_closure send_initial_metadata_on_complete_;
grpc_closure* original_send_initial_metadata_on_complete_ = nullptr;
// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t* recv_initial_metadata_flags_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
// For intercepting recv_message_ready.
OrphanablePtr<ByteStream>* recv_message_ = nullptr;
grpc_closure recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
// For intercepting recv_trailing_metadata_ready.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr; grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_transport_stream_stats* transport_stream_stats_ = nullptr;
grpc_closure recv_trailing_metadata_ready_; grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;

@ -512,6 +512,7 @@ class RetryFilter::CallData {
BackOff retry_backoff_; BackOff retry_backoff_;
grpc_slice path_; // Request path. grpc_slice path_; // Request path.
gpr_cycle_counter call_start_time_;
grpc_millis deadline_; grpc_millis deadline_;
Arena* arena_; Arena* arena_;
grpc_call_stack* owning_call_; grpc_call_stack* owning_call_;
@ -2043,6 +2044,7 @@ RetryFilter::CallData::CallData(RetryFilter* chand,
.set_max_backoff( .set_max_backoff(
retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())), retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())),
path_(grpc_slice_ref_internal(args.path)), path_(grpc_slice_ref_internal(args.path)),
call_start_time_(args.start_time),
deadline_(args.deadline), deadline_(args.deadline),
arena_(args.arena), arena_(args.arena),
owning_call_(args.call_stack), owning_call_(args.call_stack),
@ -2176,16 +2178,14 @@ RefCountedPtr<ClientChannel::LoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall( RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller) { ConfigSelector::CallDispatchController* call_dispatch_controller) {
grpc_call_element_args args = {owning_call_, nullptr, call_context_, grpc_call_element_args args = {owning_call_, nullptr, call_context_,
path_, /*start_time=*/0, deadline_, path_, call_start_time_, deadline_,
arena_, call_combiner_}; arena_, call_combiner_};
return chand_->client_channel_->CreateLoadBalancedCall( return chand_->client_channel_->CreateLoadBalancedCall(
args, pollent_, args, pollent_,
// This callback holds a ref to the CallStackDestructionBarrier // This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed. // object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this), call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
call_dispatch_controller, call_dispatch_controller);
// TODO(roth): Change this when we support transparent retries.
/*is_transparent_retry=*/false);
} }
void RetryFilter::CallData::CreateCallAttempt() { void RetryFilter::CallData::CreateCallAttempt() {

@ -45,27 +45,26 @@ class CallTracer {
// arguments. // arguments.
virtual void RecordSendInitialMetadata( virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0; grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0;
virtual void RecordOnDoneSendInitialMetadata() = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
// The `RecordReceivedxx()` methods should only be invoked when the
// metadata/message was successfully received, i.e., without any error.
// TODO(yashkt): We are using gpr_atm here instead of absl::string_view // TODO(yashkt): We are using gpr_atm here instead of absl::string_view
// since that's what the transport API uses, and performing an atomic load // since that's what the transport API uses, and performing an atomic load
// is unnecessary if the census tracer does not need it at present. Fix this // is unnecessary if the census tracer does not need it at present. Fix this
// when the transport API changes. // when the transport API changes.
virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
// The `RecordReceivedInitialMetadata()` and `RecordReceivedMessage()`
// methods should only be invoked when the metadata/message was
// successfully received, i.e., without any error.
virtual void RecordReceivedInitialMetadata( virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t flags) = 0; grpc_metadata_batch* recv_initial_metadata, uint32_t flags,
gpr_atm* peer_string) = 0;
virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0; virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0;
virtual void RecordReceivedTrailingMetadata( virtual void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata, grpc_metadata_batch* recv_trailing_metadata) = 0;
const grpc_transport_stream_stats& transport_stream_stats) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0; virtual void RecordCancel(grpc_error_handle cancel_error) = 0;
// Should be the last API call to the object. Once invoked, the tracer // Should be the last API call to the object. Once invoked, the tracer
// library is free to destroy the object. // library is free to destroy the object.
virtual void RecordEnd(const gpr_timespec& latency) = 0; virtual void RecordEnd(const grpc_call_final_info& final_info) = 0;
}; };
virtual ~CallTracer() {} virtual ~CallTracer() {}

@ -78,7 +78,7 @@ struct grpc_call_element_args {
const void* server_transport_data; const void* server_transport_data;
grpc_call_context_element* context; grpc_call_context_element* context;
const grpc_slice& path; const grpc_slice& path;
gpr_cycle_counter start_time; // Note: not populated in subchannel stack. gpr_cycle_counter start_time;
grpc_millis deadline; grpc_millis deadline;
grpc_core::Arena* arena; grpc_core::Arena* arena;
grpc_core::CallCombiner* call_combiner; grpc_core::CallCombiner* call_combiner;

@ -32,9 +32,6 @@ typedef enum {
/// Value is a \a census_context. /// Value is a \a census_context.
GRPC_CONTEXT_TRACING, GRPC_CONTEXT_TRACING,
/// Value is a CallTracer object.
GRPC_CONTEXT_CALL_TRACER,
/// Reserved for traffic_class_context. /// Reserved for traffic_class_context.
GRPC_CONTEXT_TRAFFIC, GRPC_CONTEXT_TRAFFIC,

@ -33,23 +33,20 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
void RecordSendInitialMetadata( void RecordSendInitialMetadata(
grpc_metadata_batch* /* send_initial_metadata */, grpc_metadata_batch* /* send_initial_metadata */,
uint32_t /* flags */) override {} uint32_t /* flags */) override {}
void RecordOnDoneSendInitialMetadata(gpr_atm* /* peer_string */) override {} void RecordOnDoneSendInitialMetadata() override {}
void RecordSendTrailingMetadata( void RecordSendTrailingMetadata(
grpc_metadata_batch* /* send_trailing_metadata */) override {} grpc_metadata_batch* /* send_trailing_metadata */) override {}
void RecordSendMessage( void RecordSendMessage(
const grpc_core::ByteStream& /* send_message */) override {} const grpc_core::ByteStream& /* send_message */) override {}
void RecordReceivedInitialMetadata( void RecordReceivedInitialMetadata(
grpc_metadata_batch* /* recv_initial_metadata */, grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */,
uint32_t /* flags */) override {} gpr_atm* /* peer_string */) override {}
void RecordReceivedMessage( void RecordReceivedMessage(
const grpc_core::ByteStream& /* recv_message */) override {} const grpc_core::ByteStream& /* recv_message */) override {}
void RecordReceivedTrailingMetadata( void RecordReceivedTrailingMetadata(
absl::Status /* status */, grpc_metadata_batch* /* recv_trailing_metadata */) override {}
grpc_metadata_batch* /* recv_trailing_metadata */,
const grpc_transport_stream_stats& /* transport_stream_stats */)
override {}
void RecordCancel(grpc_error_handle /* cancel_error */) override {} void RecordCancel(grpc_error_handle /* cancel_error */) override {}
void RecordEnd(const gpr_timespec& /* latency */) override {} void RecordEnd(const grpc_call_final_info& /* final_info */) override {}
CensusContext* context() { return &context_; } CensusContext* context() { return &context_; }

Loading…
Cancel
Save