support call tracer in client channel code

pull/26714/head
Mark D. Roth 4 years ago
parent fdf0192004
commit 8cc1534c89
  1. 125
      src/core/ext/filters/client_channel/client_channel.cc
  2. 30
      src/core/ext/filters/client_channel/client_channel.h
  3. 4
      src/core/ext/filters/client_channel/retry_filter.cc
  4. 6
      src/core/lib/channel/call_tracer.h
  5. 3
      src/core/lib/channel/context.h
  6. 6
      src/cpp/ext/filters/census/open_census_call_tracer.h

@ -352,7 +352,8 @@ class DynamicTerminationFilter::CallData {
calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
calld->lb_call_ = client_channel->CreateLoadBalancedCall(
args, pollent, nullptr,
service_config_call_data->call_dispatch_controller());
service_config_call_data->call_dispatch_controller(),
/*is_transparent_retry=*/false);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
@ -1174,10 +1175,11 @@ RefCountedPtr<ClientChannel::LoadBalancedCall>
ClientChannel::CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller) {
return args.arena->New<LoadBalancedCall>(this, args, pollent,
on_call_destruction_complete,
call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) {
return args.arena->New<LoadBalancedCall>(
this, args, pollent, on_call_destruction_complete,
call_dispatch_controller, is_transparent_retry);
}
namespace {
@ -2553,10 +2555,23 @@ class ClientChannel::LoadBalancedCall::LbCallState
// LoadBalancedCall
//
namespace {
CallTracer::CallAttemptTracer* GetCallAttemptTracer(
grpc_call_context_element* context, bool is_transparent_retry) {
auto* call_tracer =
static_cast<CallTracer*>(context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) return nullptr;
return call_tracer->RecordNewAttempt(is_transparent_retry);
}
} // namespace
ClientChannel::LoadBalancedCall::LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller)
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry)
: RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
? "LoadBalancedCall"
: nullptr),
@ -2570,7 +2585,9 @@ ClientChannel::LoadBalancedCall::LoadBalancedCall(
call_context_(args.context),
pollent_(pollent),
on_call_destruction_complete_(on_call_destruction_complete),
call_dispatch_controller_(call_dispatch_controller) {}
call_dispatch_controller_(call_dispatch_controller),
call_attempt_tracer_(
GetCallAttemptTracer(args.context, is_transparent_retry)) {}
ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
grpc_slice_unref_internal(path_);
@ -2705,9 +2722,54 @@ void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
// Intercept recv_trailing_metadata_ready for LB callback.
// Record send ops in tracer.
if (call_attempt_tracer_ != nullptr) {
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata,
batch->payload->send_initial_metadata.send_initial_metadata_flags);
}
if (batch->send_message) {
call_attempt_tracer_->RecordSendMessage(
batch->payload->send_message.send_message.get());
}
if (batch->send_trailing_metadata) {
call_attempt_tracer_->RecordSendTrailingMetadata(
batch->payload->send_trailing_metadata.send_trailing_metadata);
}
// Intercept recv ops.
if (batch->recv_initial_metadata) {
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
recv_initial_metadata_flags_ =
batch->payload->recv_initial_metadata.recv_flags;
peer_string_ = batch->payload->recv_initial_metadata.peer_string;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
this, nullptr);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_ready_;
}
if (batch->recv_message) {
recv_message_ = batch->payload->recv_message.recv_message;
original_recv_message_ready_ =
batch->payload->recv_message.recv_message_ready;
GRPC_CLOSURE_INIT(&recv_message_ready_, RecvMessageReady, this, nullptr);
batch->payload->recv_message.recv_message_ready = &recv_message_ready_;
}
}
// Intercept recv_trailing_metadata even if there is no call tracer,
// since we may need to notify the LB policy about trailing metadata.
if (batch->recv_trailing_metadata) {
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
this, nullptr);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
}
// If we've previously been cancelled, immediately fail any new batches.
if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
@ -2784,10 +2846,35 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
}
void ClientChannel::LoadBalancedCall::
RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg,
grpc_error_handle error) {
void ClientChannel::LoadBalancedCall::RecvInitialMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedInitialMetadata(
self->recv_initial_metadata_, *self->recv_initial_metadata_flags_,
self->peer_string_);
Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvMessageReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
self->call_attempt_tracer_->RecordReceivedMessage(
self->recv_message_->get());
Closure::Run(DEBUG_LOCATION, self->original_recv_message_ready_,
GRPC_ERROR_REF(error));
}
void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
void* arg, grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
// Notify the call tracer about trailing metadata.
if (self->call_attempt_tracer_ != nullptr) {
self->call_attempt_tracer_->RecordReceivedTrailingMetadata(
self->recv_trailing_metadata_);
}
// If the LB policy requested a callback for trailing metadata, invoke
// the callback.
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
// Set error if call did not succeed.
grpc_error_handle error_for_lb = GRPC_ERROR_NONE;
@ -2828,20 +2915,6 @@ void ClientChannel::LoadBalancedCall::
error);
}
void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch) {
recv_trailing_metadata_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
original_recv_trailing_metadata_ready_ =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
grpc_schedule_on_exec_ctx);
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&recv_trailing_metadata_ready_;
}
void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
SubchannelCall::Args call_args = {
std::move(connected_subchannel_), pollent_, path_, call_start_time_,

@ -39,6 +39,7 @@
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
@ -135,7 +136,8 @@ class ClientChannel {
RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
private:
class CallData;
@ -386,7 +388,8 @@ class ClientChannel::LoadBalancedCall
LoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller);
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
~LoadBalancedCall() override;
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
@ -440,10 +443,9 @@ class ClientChannel::LoadBalancedCall
// Resumes all pending batches on subchannel_call_.
void PendingBatchesResume();
static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
void* arg, grpc_error_handle error);
void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
grpc_transport_stream_op_batch* batch);
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
static void RecvMessageReady(void* arg, grpc_error_handle error);
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
void CreateSubchannelCall();
// Invoked when a pick is completed, on both success or failure.
@ -471,6 +473,8 @@ class ClientChannel::LoadBalancedCall
grpc_closure* on_call_destruction_complete_;
ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
// Set when we get a cancel_stream op.
grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
@ -495,7 +499,19 @@ class ClientChannel::LoadBalancedCall
RefCountedPtr<SubchannelCall> subchannel_call_;
// For intercepting recv_trailing_metadata_ready for the LB policy.
// For intercepting recv_initial_metadata_ready.
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t* recv_initial_metadata_flags_ = nullptr;
gpr_atm* peer_string_ = nullptr;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
// For intercepting recv_message_ready.
OrphanablePtr<ByteStream>* recv_message_ = nullptr;
grpc_closure recv_message_ready_;
grpc_closure* original_recv_message_ready_ = nullptr;
// For intercepting recv_trailing_metadata_ready.
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;

@ -2179,7 +2179,9 @@ RetryFilter::CallData::CreateLoadBalancedCall(
// This callback holds a ref to the CallStackDestructionBarrier
// object until the LB call is destroyed.
call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this),
call_dispatch_controller);
call_dispatch_controller,
// TODO(roth): Change this when we support transparent retries.
/*is_transparent_retry=*/false);
}
void RetryFilter::CallData::CreateCallAttempt() {

@ -46,11 +46,11 @@ class CallTracer {
virtual void RecordOnDoneSendInitialMetadata(gpr_atm* peer_string) = 0;
virtual void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) = 0;
virtual void RecordSendMessage(const ByteStream& send_message) = 0;
virtual void RecordSendMessage(const ByteStream* send_message) = 0;
virtual void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata, uint32_t* flags,
grpc_metadata_batch* recv_initial_metadata, uint32_t flags,
gpr_atm* peer_string) = 0;
virtual void RecordReceivedMessage(const ByteStream& recv_message) = 0;
virtual void RecordReceivedMessage(const ByteStream* recv_message) = 0;
virtual void RecordReceivedTrailingMetadata(
grpc_metadata_batch* recv_trailing_metadata) = 0;
virtual void RecordCancel(grpc_error_handle cancel_error) = 0;

@ -32,6 +32,9 @@ typedef enum {
/// Value is a \a census_context.
GRPC_CONTEXT_TRACING,
/// Value is a CallTracer object.
GRPC_CONTEXT_CALL_TRACER,
/// Reserved for traffic_class_context.
GRPC_CONTEXT_TRAFFIC,

@ -39,12 +39,12 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
void RecordSendTrailingMetadata(
grpc_metadata_batch* /* send_trailing_metadata */) override {}
void RecordSendMessage(
const grpc_core::ByteStream& /* send_message */) override {}
const grpc_core::ByteStream* /* send_message */) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /* recv_initial_metadata */, uint32_t* /* flags */,
grpc_metadata_batch* /* recv_initial_metadata */, uint32_t /* flags */,
gpr_atm* /* peer_string */) override {}
void RecordReceivedMessage(
const grpc_core::ByteStream& /* recv_message */) override {}
const grpc_core::ByteStream* /* recv_message */) override {}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /* recv_trailing_metadata */) override {}
void RecordCancel(grpc_error_handle /* cancel_error */) override {}

Loading…
Cancel
Save