From f9e6144692bd92cffe5690d5e9dfac92458acc17 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 12 Jul 2019 15:08:36 -0700 Subject: [PATCH] Avoid unnecessary ref of connected subchannel when creating subchannel call. --- .../filters/client_channel/client_channel.cc | 10 +-- .../health/health_check_client.cc | 6 +- .../ext/filters/client_channel/subchannel.cc | 67 ++++++++++--------- .../ext/filters/client_channel/subchannel.h | 31 ++++----- 4 files changed, 58 insertions(+), 56 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 0b612e67a33..257616a124e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -2157,9 +2157,8 @@ void CallData::DoRetry(grpc_call_element* elem, GPR_ASSERT(method_params_ != nullptr); const auto* retry_policy = method_params_->retry_policy(); GPR_ASSERT(retry_policy != nullptr); - // Reset subchannel call and connected subchannel. + // Reset subchannel call. subchannel_call_.reset(); - connected_subchannel_.reset(); // Compute backoff delay. grpc_millis next_attempt_time; if (server_pushback_ms >= 0) { @@ -3277,13 +3276,14 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) { ChannelData* chand = static_cast(elem->channel_data); const size_t parent_data_size = enable_retries_ ? sizeof(SubchannelCallRetryState) : 0; - const ConnectedSubchannel::CallArgs call_args = { - pollent_, path_, call_start_time_, deadline_, arena_, + SubchannelCall::Args call_args = { + std::move(connected_subchannel_), pollent_, path_, call_start_time_, + deadline_, arena_, // TODO(roth): When we implement hedging support, we will probably // need to use a separate call context for each subchannel call. call_context_, call_combiner_, parent_data_size}; grpc_error* error = GRPC_ERROR_NONE; - subchannel_call_ = connected_subchannel_->CreateCall(call_args, &error); + subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, this, subchannel_call_.get(), grpc_error_string(error)); diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index faa2ba5b3b1..a6b910b5dad 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -310,7 +310,8 @@ void HealthCheckClient::CallState::Orphan() { } void HealthCheckClient::CallState::StartCall() { - ConnectedSubchannel::CallArgs args = { + SubchannelCall::Args args = { + health_check_client_->connected_subchannel_, &pollent_, GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH, gpr_now(GPR_CLOCK_MONOTONIC), // start_time @@ -321,8 +322,7 @@ void HealthCheckClient::CallState::StartCall() { 0, // parent_data_size }; grpc_error* error = GRPC_ERROR_NONE; - call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error) - .release(); + call_ = SubchannelCall::Create(std::move(args), &error).release(); // Register after-destruction callback. GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction, this, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index b6ab47fb09d..c3521f46cee 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -117,14 +117,37 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate, elem->filter->start_transport_op(elem, op); } -RefCountedPtr ConnectedSubchannel::CreateCall( - const CallArgs& args, grpc_error** error) { +size_t ConnectedSubchannel::GetInitialCallSizeEstimate( + size_t parent_data_size) const { + size_t allocation_size = + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); + if (parent_data_size > 0) { + allocation_size += + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + + parent_data_size; + } else { + allocation_size += channel_stack_->call_stack_size; + } + return allocation_size; +} + +// +// SubchannelCall +// + +RefCountedPtr SubchannelCall::Create(Args args, + grpc_error** error) { const size_t allocation_size = - GetInitialCallSizeEstimate(args.parent_data_size); - RefCountedPtr call( - new (args.arena->Alloc(allocation_size)) - SubchannelCall(Ref(DEBUG_LOCATION, "subchannel_call"), args)); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call.get()); + args.connected_subchannel->GetInitialCallSizeEstimate( + args.parent_data_size); + return RefCountedPtr(new (args.arena->Alloc( + allocation_size)) SubchannelCall(std::move(args), error)); +} + +SubchannelCall::SubchannelCall(Args args, grpc_error** error) + : connected_subchannel_(std::move(args.connected_subchannel)), + deadline_(args.deadline) { + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this); const grpc_call_element_args call_args = { callstk, /* call_stack */ nullptr, /* server_transport_data */ @@ -135,38 +158,20 @@ RefCountedPtr ConnectedSubchannel::CreateCall( args.arena, /* arena */ args.call_combiner /* call_combiner */ }; - *error = grpc_call_stack_init(channel_stack_, 1, SubchannelCall::Destroy, - call.get(), &call_args); + *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1, + SubchannelCall::Destroy, this, &call_args); if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) { const char* error_string = grpc_error_string(*error); gpr_log(GPR_ERROR, "error: %s", error_string); - return call; + return; } grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); - if (channelz_subchannel_ != nullptr) { - channelz_subchannel_->RecordCallStarted(); + auto* channelz_node = connected_subchannel_->channelz_subchannel(); + if (channelz_node != nullptr) { + channelz_node->RecordCallStarted(); } - return call; } -size_t ConnectedSubchannel::GetInitialCallSizeEstimate( - size_t parent_data_size) const { - size_t allocation_size = - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); - if (parent_data_size > 0) { - allocation_size += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + - parent_data_size; - } else { - allocation_size += channel_stack_->call_stack_size; - } - return allocation_size; -} - -// -// SubchannelCall -// - void SubchannelCall::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("subchannel_call_process_op", 0); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 64d679af6c5..d1c4b7d1073 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -72,17 +72,6 @@ class SubchannelCall; class ConnectedSubchannel : public ConnectedSubchannelInterface { public: - struct CallArgs { - grpc_polling_entity* pollent; - grpc_slice path; - gpr_timespec start_time; - grpc_millis deadline; - Arena* arena; - grpc_call_context_element* context; - CallCombiner* call_combiner; - size_t parent_data_size; - }; - ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, RefCountedPtr channelz_subchannel); @@ -92,8 +81,6 @@ class ConnectedSubchannel : public ConnectedSubchannelInterface { grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); - RefCountedPtr CreateCall(const CallArgs& args, - grpc_error** error); grpc_channel_stack* channel_stack() const { return channel_stack_; } const grpc_channel_args* args() const override { return args_; } @@ -114,10 +101,18 @@ class ConnectedSubchannel : public ConnectedSubchannelInterface { // Implements the interface of RefCounted<>. class SubchannelCall { public: - SubchannelCall(RefCountedPtr connected_subchannel, - const ConnectedSubchannel::CallArgs& args) - : connected_subchannel_(std::move(connected_subchannel)), - deadline_(args.deadline) {} + struct Args { + RefCountedPtr connected_subchannel; + grpc_polling_entity* pollent; + grpc_slice path; + gpr_timespec start_time; + grpc_millis deadline; + Arena* arena; + grpc_call_context_element* context; + CallCombiner* call_combiner; + size_t parent_data_size; + }; + static RefCountedPtr Create(Args args, grpc_error** error); // Continues processing a transport stream op batch. void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); @@ -150,6 +145,8 @@ class SubchannelCall { template friend class RefCountedPtr; + SubchannelCall(Args args, grpc_error** error); + // If channelz is enabled, intercepts recv_trailing so that we may check the // status and associate it to a subchannel. void MaybeInterceptRecvTrailingMetadata(