diff --git a/src/core/client_channel/client_channel.cc b/src/core/client_channel/client_channel.cc index f1566dfdad4..a991b4063f3 100644 --- a/src/core/client_channel/client_channel.cc +++ b/src/core/client_channel/client_channel.cc @@ -1623,74 +1623,34 @@ CallInitiator ClientChannel::CreateLoadBalancedCall( client_initial_metadata = std::move(client_initial_metadata), initiator = call.initiator, handler = std::move(call.handler)]() mutable { - - const bool wait_for_ready = - client_initial_metadata->GetOrCreatePointer(WaitForReady())->value; - - return Map( // Wait for the LB picker. Loop([last_picker = RefCountedPtr(), client_initial_metadata = - std::move(client_initial_metadata)]() mutable { + std::move(client_initial_metadata), + initiator, handler]() mutable { return Map( picker_.Next(last_picker), - [last_picker = &last_picker, - client_initial_metadata = - std::move(client_initial_metadata)]( + [&last_picker, &client_initial_metadata, &initiator]( RefCountedPtr picker) mutable { -// FIXME: if work_serializer_dispatch experiment not enabled, need to -// hop into WorkSerializer to unref previous picker last_picker = std::move(picker); // Returns 3 possible things: // - Continue to queue the pick // - non-OK status to fail the pick - // - {connected subchannel, LB call tracker} - return PickSubchannel(*last_picker, - client_initial_metadata); + // - a connected subchannel to complete the pick + return PickSubchannel( + *last_picker, client_initial_metadata, initiator); }); }), - - - picker_.Next() - resolver_data_for_calls_.NextWhen( - [wait_for_ready]( - const absl::StatusOr result) { - // If the resolver reports an error but the call is - // wait_for_ready, keep waiting for the next result - // instead of failing the call. - if (!result.ok()) return !wait_for_ready; - // Not an error. Make sure we actually have a result. - return *result != nullptr; - }), - // Handle resolver result. - [self, initiator = std::move(initiator), - handler = std::move(handler), - client_initial_metadata = std::move(client_initial_metadata)]( - ResolverDataForCalls resolver_data) mutable { - // Apply service config to call. - absl::Status status = ApplyServiceConfigToCall( - self.get(), *resolver_data.config_selector, - client_initial_metadata); - if (!status.ok()) return status; - // Now inject initial metadata into the call. -// FIXME: how do I chain this such that it doesn't need another call to -// SpawnGuarded()? - initiator.SpawnGuarded( - "send_initial_metadata", - [initiator, client_initial_metadata = - std::move(client_initial_metadata)]() mutable { - return initiator.PushClientInitialMetadata( - std::move(client_initial_metadata)); - }); - // Finish constructing the call with the right filter - // stack and destination. - handler.SetStack(std::move(resolver_data.filter_stack)); - self->call_destination_->StartCall(std::move(handler)); - return absl::OkStatus(); - }); + // Create call stack on the connected subchannel. + [handler = std::move(handler)]( + RefCountedPtr connected_subchannel) { + handler.SetStack(connected_subchannel->GetStack()); + connected_subchannel->call_destination_->StartCall( + std::move(handler)); + }); }); // Return the initiator. return call.initiator; @@ -1906,9 +1866,10 @@ void ClientChannel::LoadBalancedCall::RecordLatency() { } } -LoopCtl> ClientChannel::PickSubchannel( - LoadBalancingPolicy::SubchannelPicker& picker, - ClientMetadataHandle& client_initial_metadata) { +LoopCtl>> +ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, + ClientMetadataHandle& client_initial_metadata, + CallInitiator& call_initiator) { // Perform LB pick. LoadBalancingPolicy::PickArgs pick_args; Slice* path = client_initial_metadata->get_pointer(HttpPathMetadata()); @@ -1918,12 +1879,12 @@ LoopCtl> ClientChannel::PickSubchannel( pick_args.call_state = &lb_call_state; LbMetadata initial_metadata(client_initial_metadata.get()); pick_args.initial_metadata = &initial_metadata; - auto result = picker->Pick(pick_args); + auto result = picker.Pick(pick_args); // Handle result. return HandlePickResult( &result, // CompletePick - [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) { + [&](LoadBalancingPolicy::PickResult::Complete* complete_pick) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "client_channel=%p lb_call=%p: LB pick succeeded: subchannel=%p", @@ -1934,13 +1895,12 @@ LoopCtl> ClientChannel::PickSubchannel( // holding the data plane mutex. SubchannelWrapper* subchannel = static_cast(complete_pick->subchannel.get()); - PickSubchannelResult result; - result.connected_subchannel = subchannel->connected_subchannel(); + auto connected_subchannel = subchannel->connected_subchannel(); // If the subchannel has no connected subchannel (e.g., if the // subchannel has moved out of state READY but the LB policy hasn't // yet seen that change and given us a new picker), then just // queue the pick. We'll try again as soon as we get a new picker. - if (result.connected_subchannel == nullptr) { + if (connected_subchannel == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "client_channel=%p lb_call=%p: subchannel returned by LB picker " @@ -1949,10 +1909,25 @@ LoopCtl> ClientChannel::PickSubchannel( } return Continue{}; } - result.call_tracker = - std::move(complete_pick->subchannel_call_tracker); - if (result.call_tracker_ != nullptr) result.call_tracker_->Start(); - return result; + // If the LB policy returned a call tracker, inform it that the + // call is starting and add it to context, so that we can notify + // it when the call finishes. + if (complete_pick->subchannel_call_tracker != nullptr) { + complete_pick->subchannel_call_tracker->Start(); + call_initiator.SetContext( + std::move(complete_pick->subchannel_call_tracker)); + } + // Now that we're done with client initial metadata, push it + // into the call initiator. + call_initiator.SpawnGuarded( + "send_initial_metadata", + [call_initiator, client_initial_metadata = + std::move(client_initial_metadata)]() mutable { + return call_initiator.PushClientInitialMetadata( + std::move(client_initial_metadata)); + }); + // Return the connected subchannel. + return connected_subchannel; }, // QueuePick [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) { diff --git a/src/core/client_channel/client_channel.h b/src/core/client_channel/client_channel.h index 4e87b2cfa34..706155e3089 100644 --- a/src/core/client_channel/client_channel.h +++ b/src/core/client_channel/client_channel.h @@ -146,16 +146,6 @@ class ClientChannel : public CallFactory { }; #endif - // The result of the load balancing promise. - struct PickSubchannelResult { - RefCountedPtr connected_subchannel_; - std::unique_ptr - call_tracker; - }; - LoopCtl> PickSubchannel( - LoadBalancingPolicy::SubchannelPicker& picker, - ClientMetadataHandle& client_initial_metadata); - void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); void OnResolverErrorLocked(absl::Status status) @@ -201,6 +191,18 @@ class ClientChannel : public CallFactory { const grpc_channel_info* info); #endif + // Does an LB pick for a call. Returns one of the following things: + // - Continue{}, meaning to queue the pick + // - a non-OK status, meaning to fail the call + // - a connected subchannel, meaning that the pick is complete + // When the pick is complete, pushes client_initial_metadata onto + // call_initiator. Also adds the subchannel call tracker (if any) to + // context. + LoopCtl>> PickSubchannel( + LoadBalancingPolicy::SubchannelPicker& picker, + ClientMetadataHandle& client_initial_metadata, + CallInitiator& call_initiator); + // // Fields set at construction and never modified. //