Merge pull request #19636 from markdroth/connected_subchannel_ref

Avoid unnecessary ref of connected subchannel when creating subchannel call.
pull/19539/head
Mark D. Roth 5 years ago committed by GitHub
commit db1b726024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/filters/client_channel/client_channel.cc
  2. 6
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 67
      src/core/ext/filters/client_channel/subchannel.cc
  4. 31
      src/core/ext/filters/client_channel/subchannel.h

@ -2164,9 +2164,8 @@ void CallData::DoRetry(grpc_call_element* elem,
GPR_ASSERT(method_params_ != nullptr);
const auto* retry_policy = method_params_->retry_policy();
GPR_ASSERT(retry_policy != nullptr);
// Reset subchannel call and connected subchannel.
// Reset subchannel call.
subchannel_call_.reset();
connected_subchannel_.reset();
// Compute backoff delay.
grpc_millis next_attempt_time;
if (server_pushback_ms >= 0) {
@ -3284,13 +3283,14 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
const size_t parent_data_size =
enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
const ConnectedSubchannel::CallArgs call_args = {
pollent_, path_, call_start_time_, deadline_, arena_,
SubchannelCall::Args call_args = {
std::move(connected_subchannel_), pollent_, path_, call_start_time_,
deadline_, arena_,
// TODO(roth): When we implement hedging support, we will probably
// need to use a separate call context for each subchannel call.
call_context_, call_combiner_, parent_data_size};
grpc_error* error = GRPC_ERROR_NONE;
subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error);
subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, this, subchannel_call_.get(), grpc_error_string(error));

@ -310,7 +310,8 @@ void HealthCheckClient::CallState::Orphan() {
}
void HealthCheckClient::CallState::StartCall() {
ConnectedSubchannel::CallArgs args = {
SubchannelCall::Args args = {
health_check_client_->connected_subchannel_,
&pollent_,
GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
gpr_now(GPR_CLOCK_MONOTONIC), // start_time
@ -321,8 +322,7 @@ void HealthCheckClient::CallState::StartCall() {
0, // parent_data_size
};
grpc_error* error = GRPC_ERROR_NONE;
call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error)
.release();
call_ = SubchannelCall::Create(std::move(args), &error).release();
// Register after-destruction callback.
GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
this, grpc_schedule_on_exec_ctx);

@ -117,14 +117,37 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
elem->filter->start_transport_op(elem, op);
}
RefCountedPtr<SubchannelCall> ConnectedSubchannel::CreateCall(
const CallArgs& args, grpc_error** error) {
size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
size_t parent_data_size) const {
size_t allocation_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall));
if (parent_data_size > 0) {
allocation_size +=
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
parent_data_size;
} else {
allocation_size += channel_stack_->call_stack_size;
}
return allocation_size;
}
//
// SubchannelCall
//
RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
grpc_error** error) {
const size_t allocation_size =
GetInitialCallSizeEstimate(args.parent_data_size);
RefCountedPtr<SubchannelCall> call(
new (args.arena->Alloc(allocation_size))
SubchannelCall(Ref(DEBUG_LOCATION, "subchannel_call"), args));
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call.get());
args.connected_subchannel->GetInitialCallSizeEstimate(
args.parent_data_size);
return RefCountedPtr<SubchannelCall>(new (args.arena->Alloc(
allocation_size)) SubchannelCall(std::move(args), error));
}
SubchannelCall::SubchannelCall(Args args, grpc_error** error)
: connected_subchannel_(std::move(args.connected_subchannel)),
deadline_(args.deadline) {
grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */
@ -135,38 +158,20 @@ RefCountedPtr<SubchannelCall> ConnectedSubchannel::CreateCall(
args.arena, /* arena */
args.call_combiner /* call_combiner */
};
*error = grpc_call_stack_init(channel_stack_, 1, SubchannelCall::Destroy,
call.get(), &call_args);
*error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
SubchannelCall::Destroy, this, &call_args);
if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
const char* error_string = grpc_error_string(*error);
gpr_log(GPR_ERROR, "error: %s", error_string);
return call;
return;
}
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
if (channelz_subchannel_ != nullptr) {
channelz_subchannel_->RecordCallStarted();
auto* channelz_node = connected_subchannel_->channelz_subchannel();
if (channelz_node != nullptr) {
channelz_node->RecordCallStarted();
}
return call;
}
size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
size_t parent_data_size) const {
size_t allocation_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall));
if (parent_data_size > 0) {
allocation_size +=
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
parent_data_size;
} else {
allocation_size += channel_stack_->call_stack_size;
}
return allocation_size;
}
//
// SubchannelCall
//
void SubchannelCall::StartTransportStreamOpBatch(
grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("subchannel_call_process_op", 0);

@ -72,17 +72,6 @@ class SubchannelCall;
class ConnectedSubchannel : public ConnectedSubchannelInterface {
public:
struct CallArgs {
grpc_polling_entity* pollent;
grpc_slice path;
gpr_timespec start_time;
grpc_millis deadline;
Arena* arena;
grpc_call_context_element* context;
CallCombiner* call_combiner;
size_t parent_data_size;
};
ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
@ -92,8 +81,6 @@ class ConnectedSubchannel : public ConnectedSubchannelInterface {
grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
grpc_error** error);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
const grpc_channel_args* args() const override { return args_; }
@ -114,10 +101,18 @@ class ConnectedSubchannel : public ConnectedSubchannelInterface {
// Implements the interface of RefCounted<>.
class SubchannelCall {
public:
SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
const ConnectedSubchannel::CallArgs& args)
: connected_subchannel_(std::move(connected_subchannel)),
deadline_(args.deadline) {}
struct Args {
RefCountedPtr<ConnectedSubchannel> connected_subchannel;
grpc_polling_entity* pollent;
grpc_slice path;
gpr_timespec start_time;
grpc_millis deadline;
Arena* arena;
grpc_call_context_element* context;
CallCombiner* call_combiner;
size_t parent_data_size;
};
static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
// Continues processing a transport stream op batch.
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
@ -150,6 +145,8 @@ class SubchannelCall {
template <typename T>
friend class RefCountedPtr;
SubchannelCall(Args args, grpc_error** error);
// If channelz is enabled, intercepts recv_trailing so that we may check the
// status and associate it to a subchannel.
void MaybeInterceptRecvTrailingMetadata(

Loading…
Cancel
Save