|
|
|
@ -1623,74 +1623,34 @@ CallInitiator ClientChannel::CreateLoadBalancedCall( |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata), |
|
|
|
|
initiator = call.initiator, |
|
|
|
|
handler = std::move(call.handler)]() mutable { |
|
|
|
|
|
|
|
|
|
const bool wait_for_ready = |
|
|
|
|
client_initial_metadata->GetOrCreatePointer(WaitForReady())->value; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Map( |
|
|
|
|
// Wait for the LB picker.
|
|
|
|
|
Loop([last_picker = |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(), |
|
|
|
|
client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata)]() mutable { |
|
|
|
|
std::move(client_initial_metadata), |
|
|
|
|
initiator, handler]() mutable { |
|
|
|
|
return Map( |
|
|
|
|
picker_.Next(last_picker), |
|
|
|
|
[last_picker = &last_picker, |
|
|
|
|
client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata)]( |
|
|
|
|
[&last_picker, &client_initial_metadata, &initiator]( |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
|
|
|
|
picker) mutable { |
|
|
|
|
// FIXME: if work_serializer_dispatch experiment not enabled, need to
|
|
|
|
|
// hop into WorkSerializer to unref previous picker
|
|
|
|
|
last_picker = std::move(picker); |
|
|
|
|
// Returns 3 possible things:
|
|
|
|
|
// - Continue to queue the pick
|
|
|
|
|
// - non-OK status to fail the pick
|
|
|
|
|
// - {connected subchannel, LB call tracker}
|
|
|
|
|
return PickSubchannel(*last_picker, |
|
|
|
|
client_initial_metadata); |
|
|
|
|
// - a connected subchannel to complete the pick
|
|
|
|
|
return PickSubchannel( |
|
|
|
|
*last_picker, client_initial_metadata, initiator); |
|
|
|
|
}); |
|
|
|
|
}), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
picker_.Next() |
|
|
|
|
resolver_data_for_calls_.NextWhen( |
|
|
|
|
[wait_for_ready]( |
|
|
|
|
const absl::StatusOr<ResolverDataForCalls> result) { |
|
|
|
|
// If the resolver reports an error but the call is
|
|
|
|
|
// wait_for_ready, keep waiting for the next result
|
|
|
|
|
// instead of failing the call.
|
|
|
|
|
if (!result.ok()) return !wait_for_ready; |
|
|
|
|
// Not an error. Make sure we actually have a result.
|
|
|
|
|
return *result != nullptr; |
|
|
|
|
}), |
|
|
|
|
// Handle resolver result.
|
|
|
|
|
[self, initiator = std::move(initiator), |
|
|
|
|
handler = std::move(handler), |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata)]( |
|
|
|
|
ResolverDataForCalls resolver_data) mutable { |
|
|
|
|
// Apply service config to call.
|
|
|
|
|
absl::Status status = ApplyServiceConfigToCall( |
|
|
|
|
self.get(), *resolver_data.config_selector, |
|
|
|
|
client_initial_metadata); |
|
|
|
|
if (!status.ok()) return status; |
|
|
|
|
// Now inject initial metadata into the call.
|
|
|
|
|
// FIXME: how do I chain this such that it doesn't need another call to
|
|
|
|
|
// SpawnGuarded()?
|
|
|
|
|
initiator.SpawnGuarded( |
|
|
|
|
"send_initial_metadata", |
|
|
|
|
[initiator, client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata)]() mutable { |
|
|
|
|
return initiator.PushClientInitialMetadata( |
|
|
|
|
std::move(client_initial_metadata)); |
|
|
|
|
}); |
|
|
|
|
// Finish constructing the call with the right filter
|
|
|
|
|
// stack and destination.
|
|
|
|
|
handler.SetStack(std::move(resolver_data.filter_stack)); |
|
|
|
|
self->call_destination_->StartCall(std::move(handler)); |
|
|
|
|
return absl::OkStatus(); |
|
|
|
|
}); |
|
|
|
|
// Create call stack on the connected subchannel.
|
|
|
|
|
[handler = std::move(handler)]( |
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { |
|
|
|
|
handler.SetStack(connected_subchannel->GetStack()); |
|
|
|
|
connected_subchannel->call_destination_->StartCall( |
|
|
|
|
std::move(handler)); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
// Return the initiator.
|
|
|
|
|
return call.initiator; |
|
|
|
@ -1906,9 +1866,10 @@ void ClientChannel::LoadBalancedCall::RecordLatency() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
LoopCtl<absl::StatusOr<PickSubchannelResult>> ClientChannel::PickSubchannel( |
|
|
|
|
LoadBalancingPolicy::SubchannelPicker& picker, |
|
|
|
|
ClientMetadataHandle& client_initial_metadata) { |
|
|
|
|
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> |
|
|
|
|
ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, |
|
|
|
|
ClientMetadataHandle& client_initial_metadata, |
|
|
|
|
CallInitiator& call_initiator) { |
|
|
|
|
// Perform LB pick.
|
|
|
|
|
LoadBalancingPolicy::PickArgs pick_args; |
|
|
|
|
Slice* path = client_initial_metadata->get_pointer(HttpPathMetadata()); |
|
|
|
@ -1918,12 +1879,12 @@ LoopCtl<absl::StatusOr<PickSubchannelResult>> ClientChannel::PickSubchannel( |
|
|
|
|
pick_args.call_state = &lb_call_state; |
|
|
|
|
LbMetadata initial_metadata(client_initial_metadata.get()); |
|
|
|
|
pick_args.initial_metadata = &initial_metadata; |
|
|
|
|
auto result = picker->Pick(pick_args); |
|
|
|
|
auto result = picker.Pick(pick_args); |
|
|
|
|
// Handle result.
|
|
|
|
|
return HandlePickResult<bool>( |
|
|
|
|
&result, |
|
|
|
|
// CompletePick
|
|
|
|
|
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick) { |
|
|
|
|
[&](LoadBalancingPolicy::PickResult::Complete* complete_pick) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"client_channel=%p lb_call=%p: LB pick succeeded: subchannel=%p", |
|
|
|
@ -1934,13 +1895,12 @@ LoopCtl<absl::StatusOr<PickSubchannelResult>> ClientChannel::PickSubchannel( |
|
|
|
|
// holding the data plane mutex.
|
|
|
|
|
SubchannelWrapper* subchannel = |
|
|
|
|
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get()); |
|
|
|
|
PickSubchannelResult result; |
|
|
|
|
result.connected_subchannel = subchannel->connected_subchannel(); |
|
|
|
|
auto connected_subchannel = subchannel->connected_subchannel(); |
|
|
|
|
// If the subchannel has no connected subchannel (e.g., if the
|
|
|
|
|
// subchannel has moved out of state READY but the LB policy hasn't
|
|
|
|
|
// yet seen that change and given us a new picker), then just
|
|
|
|
|
// queue the pick. We'll try again as soon as we get a new picker.
|
|
|
|
|
if (result.connected_subchannel == nullptr) { |
|
|
|
|
if (connected_subchannel == nullptr) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"client_channel=%p lb_call=%p: subchannel returned by LB picker " |
|
|
|
@ -1949,10 +1909,25 @@ LoopCtl<absl::StatusOr<PickSubchannelResult>> ClientChannel::PickSubchannel( |
|
|
|
|
} |
|
|
|
|
return Continue{}; |
|
|
|
|
} |
|
|
|
|
result.call_tracker = |
|
|
|
|
std::move(complete_pick->subchannel_call_tracker); |
|
|
|
|
if (result.call_tracker_ != nullptr) result.call_tracker_->Start(); |
|
|
|
|
return result; |
|
|
|
|
// If the LB policy returned a call tracker, inform it that the
|
|
|
|
|
// call is starting and add it to context, so that we can notify
|
|
|
|
|
// it when the call finishes.
|
|
|
|
|
if (complete_pick->subchannel_call_tracker != nullptr) { |
|
|
|
|
complete_pick->subchannel_call_tracker->Start(); |
|
|
|
|
call_initiator.SetContext( |
|
|
|
|
std::move(complete_pick->subchannel_call_tracker)); |
|
|
|
|
} |
|
|
|
|
// Now that we're done with client initial metadata, push it
|
|
|
|
|
// into the call initiator.
|
|
|
|
|
call_initiator.SpawnGuarded( |
|
|
|
|
"send_initial_metadata", |
|
|
|
|
[call_initiator, client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata)]() mutable { |
|
|
|
|
return call_initiator.PushClientInitialMetadata( |
|
|
|
|
std::move(client_initial_metadata)); |
|
|
|
|
}); |
|
|
|
|
// Return the connected subchannel.
|
|
|
|
|
return connected_subchannel; |
|
|
|
|
}, |
|
|
|
|
// QueuePick
|
|
|
|
|
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) { |
|
|
|
|