|
|
@ -891,6 +891,7 @@ typedef struct client_channel_call_data { |
|
|
|
grpc_closure pick_cancel_closure; |
|
|
|
grpc_closure pick_cancel_closure; |
|
|
|
|
|
|
|
|
|
|
|
grpc_polling_entity* pollent; |
|
|
|
grpc_polling_entity* pollent; |
|
|
|
|
|
|
|
bool pollent_added_to_interested_parties; |
|
|
|
|
|
|
|
|
|
|
|
// Batches are added to this list when received from above.
|
|
|
|
// Batches are added to this list when received from above.
|
|
|
|
// They are removed when we are done handling the batch (i.e., when
|
|
|
|
// They are removed when we are done handling the batch (i.e., when
|
|
|
@ -949,7 +950,6 @@ static void retry_commit(grpc_call_element* elem, |
|
|
|
static void start_internal_recv_trailing_metadata(grpc_call_element* elem); |
|
|
|
static void start_internal_recv_trailing_metadata(grpc_call_element* elem); |
|
|
|
static void on_complete(void* arg, grpc_error* error); |
|
|
|
static void on_complete(void* arg, grpc_error* error); |
|
|
|
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); |
|
|
|
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); |
|
|
|
static void pick_after_resolver_result_start_locked(grpc_call_element* elem); |
|
|
|
|
|
|
|
static void start_pick_locked(void* arg, grpc_error* ignored); |
|
|
|
static void start_pick_locked(void* arg, grpc_error* ignored); |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
@ -2684,59 +2684,133 @@ static void pick_done(void* arg, grpc_error* error) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void maybe_add_call_to_channel_interested_parties_locked( |
|
|
|
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
if (!calld->pollent_added_to_interested_parties) { |
|
|
|
|
|
|
|
calld->pollent_added_to_interested_parties = true; |
|
|
|
|
|
|
|
grpc_polling_entity_add_to_pollset_set(calld->pollent, |
|
|
|
|
|
|
|
chand->interested_parties); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void maybe_del_call_from_channel_interested_parties_locked( |
|
|
|
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
if (calld->pollent_added_to_interested_parties) { |
|
|
|
|
|
|
|
calld->pollent_added_to_interested_parties = false; |
|
|
|
|
|
|
|
grpc_polling_entity_del_from_pollset_set(calld->pollent, |
|
|
|
|
|
|
|
chand->interested_parties); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Invoked when a pick is completed to leave the client_channel combiner
|
|
|
|
// Invoked when a pick is completed to leave the client_channel combiner
|
|
|
|
// and continue processing in the call combiner.
|
|
|
|
// and continue processing in the call combiner.
|
|
|
|
|
|
|
|
// If needed, removes the call's polling entity from chand->interested_parties.
|
|
|
|
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { |
|
|
|
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
maybe_del_call_from_channel_interested_parties_locked(elem); |
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, |
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
GRPC_CLOSURE_SCHED(&calld->pick_closure, error); |
|
|
|
GRPC_CLOSURE_SCHED(&calld->pick_closure, error); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// A wrapper around pick_done_locked() that is used in cases where
|
|
|
|
namespace grpc_core { |
|
|
|
// either (a) the pick was deferred pending a resolver result or (b) the
|
|
|
|
|
|
|
|
// pick was done asynchronously. Removes the call's polling entity from
|
|
|
|
|
|
|
|
// chand->interested_parties before invoking pick_done_locked().
|
|
|
|
|
|
|
|
static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) { |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
grpc_polling_entity_del_from_pollset_set(calld->pollent, |
|
|
|
|
|
|
|
chand->interested_parties); |
|
|
|
|
|
|
|
pick_done_locked(elem, error); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
// Performs subchannel pick via LB policy.
|
|
|
|
// holding the call combiner.
|
|
|
|
class LbPicker { |
|
|
|
static void pick_callback_cancel_locked(void* arg, grpc_error* error) { |
|
|
|
public: |
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
// Starts a pick on chand->lb_policy.
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
static void StartLocked(grpc_call_element* elem) { |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
// Note: chand->lb_policy may have changed since we started our pick,
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
// in which case we will be cancelling the pick on a policy other than
|
|
|
|
|
|
|
|
// the one we started it on. However, this will just be a no-op.
|
|
|
|
|
|
|
|
if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", |
|
|
|
chand, calld, chand->lb_policy.get()); |
|
|
|
chand, calld, chand->lb_policy.get()); |
|
|
|
} |
|
|
|
} |
|
|
|
chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); |
|
|
|
// If this is a retry, use the send_initial_metadata payload that
|
|
|
|
|
|
|
|
// we've cached; otherwise, use the pending batch. The
|
|
|
|
|
|
|
|
// send_initial_metadata batch will be the first pending batch in the
|
|
|
|
|
|
|
|
// list, as set by get_batch_index() above.
|
|
|
|
|
|
|
|
calld->pick.initial_metadata = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? &calld->send_initial_metadata |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata.send_initial_metadata; |
|
|
|
|
|
|
|
calld->pick.initial_metadata_flags = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? calld->send_initial_metadata_flags |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
|
|
|
calld->pick.on_complete = &calld->pick_closure; |
|
|
|
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); |
|
|
|
|
|
|
|
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); |
|
|
|
|
|
|
|
if (GPR_LIKELY(pick_done)) { |
|
|
|
|
|
|
|
// Pick completed synchronously.
|
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
pick_done_locked(elem, GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
// Pick will be returned asynchronously.
|
|
|
|
|
|
|
|
// Add the polling entity from call_data to the channel_data's
|
|
|
|
|
|
|
|
// interested_parties, so that the I/O of the LB policy can be done
|
|
|
|
|
|
|
|
// under it. It will be removed in pick_done_locked().
|
|
|
|
|
|
|
|
maybe_add_call_to_channel_interested_parties_locked(elem); |
|
|
|
|
|
|
|
// Request notification on call cancellation.
|
|
|
|
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); |
|
|
|
|
|
|
|
grpc_call_combiner_set_notify_on_cancel( |
|
|
|
|
|
|
|
calld->call_combiner, |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, |
|
|
|
|
|
|
|
&LbPicker::CancelLocked, elem, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(chand->combiner))); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
|
|
|
|
private: |
|
|
|
// Unrefs the LB policy and invokes async_pick_done_locked().
|
|
|
|
// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
|
|
|
|
static void pick_callback_done_locked(void* arg, grpc_error* error) { |
|
|
|
// Unrefs the LB policy and invokes pick_done_locked().
|
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
static void DoneLocked(void* arg, grpc_error* error) { |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand, |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
calld); |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
pick_done_locked(elem, GRPC_ERROR_REF(error)); |
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
|
|
|
} |
|
|
|
} |
|
|
|
async_pick_done_locked(elem, GRPC_ERROR_REF(error)); |
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
} |
|
|
|
// holding the call combiner.
|
|
|
|
|
|
|
|
static void CancelLocked(void* arg, grpc_error* error) { |
|
|
|
|
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
// Note: chand->lb_policy may have changed since we started our pick,
|
|
|
|
|
|
|
|
// in which case we will be cancelling the pick on a policy other than
|
|
|
|
|
|
|
|
// the one we started it on. However, this will just be a no-op.
|
|
|
|
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: cancelling pick from LB policy %p", chand, |
|
|
|
|
|
|
|
calld, chand->lb_policy.get()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} // namespace grpc_core
|
|
|
|
|
|
|
|
|
|
|
|
// Applies service config to the call. Must be invoked once we know
|
|
|
|
// Applies service config to the call. Must be invoked once we know
|
|
|
|
// that the resolver has returned results to the channel.
|
|
|
|
// that the resolver has returned results to the channel.
|
|
|
@ -2766,6 +2840,24 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { |
|
|
|
grpc_deadline_state_reset(elem, calld->deadline); |
|
|
|
grpc_deadline_state_reset(elem, calld->deadline); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// If the service config set wait_for_ready and the application
|
|
|
|
|
|
|
|
// did not explicitly set it, use the value from the service config.
|
|
|
|
|
|
|
|
uint32_t* send_initial_metadata_flags = |
|
|
|
|
|
|
|
&calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
|
|
|
if (GPR_UNLIKELY( |
|
|
|
|
|
|
|
calld->method_params->wait_for_ready() != |
|
|
|
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_UNSET && |
|
|
|
|
|
|
|
!(*send_initial_metadata_flags & |
|
|
|
|
|
|
|
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { |
|
|
|
|
|
|
|
if (calld->method_params->wait_for_ready() == |
|
|
|
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { |
|
|
|
|
|
|
|
*send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
*send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
// If no retry policy, disable retries.
|
|
|
|
// If no retry policy, disable retries.
|
|
|
@ -2776,215 +2868,164 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Starts a pick on chand->lb_policy.
|
|
|
|
// Invoked once resolver results are available.
|
|
|
|
// Returns true if pick is completed synchronously.
|
|
|
|
static void process_service_config_and_start_lb_pick_locked( |
|
|
|
static bool pick_callback_start_locked(grpc_call_element* elem) { |
|
|
|
grpc_call_element* elem) { |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, |
|
|
|
|
|
|
|
calld, chand->lb_policy.get()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Only get service config data on the first attempt.
|
|
|
|
// Only get service config data on the first attempt.
|
|
|
|
if (GPR_LIKELY(calld->num_attempts_completed == 0)) { |
|
|
|
if (GPR_LIKELY(calld->num_attempts_completed == 0)) { |
|
|
|
apply_service_config_to_call_locked(elem); |
|
|
|
apply_service_config_to_call_locked(elem); |
|
|
|
} |
|
|
|
} |
|
|
|
// If the application explicitly set wait_for_ready, use that.
|
|
|
|
// Start LB pick.
|
|
|
|
// Otherwise, if the service config specified a value for this
|
|
|
|
grpc_core::LbPicker::StartLocked(elem); |
|
|
|
// method, use that.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// The send_initial_metadata batch will be the first one in the list,
|
|
|
|
|
|
|
|
// as set by get_batch_index() above.
|
|
|
|
|
|
|
|
calld->pick.initial_metadata = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? &calld->send_initial_metadata |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata.send_initial_metadata; |
|
|
|
|
|
|
|
uint32_t send_initial_metadata_flags = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? calld->send_initial_metadata_flags |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
|
|
|
const bool wait_for_ready_set_from_api = |
|
|
|
|
|
|
|
send_initial_metadata_flags & |
|
|
|
|
|
|
|
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; |
|
|
|
|
|
|
|
const bool wait_for_ready_set_from_service_config = |
|
|
|
|
|
|
|
calld->method_params != nullptr && |
|
|
|
|
|
|
|
calld->method_params->wait_for_ready() != |
|
|
|
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_UNSET; |
|
|
|
|
|
|
|
if (GPR_UNLIKELY(!wait_for_ready_set_from_api && |
|
|
|
|
|
|
|
wait_for_ready_set_from_service_config)) { |
|
|
|
|
|
|
|
if (calld->method_params->wait_for_ready() == |
|
|
|
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { |
|
|
|
|
|
|
|
send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
calld->pick.initial_metadata_flags = send_initial_metadata_flags; |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
|
|
|
calld->pick.on_complete = &calld->pick_closure; |
|
|
|
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); |
|
|
|
|
|
|
|
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); |
|
|
|
|
|
|
|
if (GPR_LIKELY(pick_done)) { |
|
|
|
|
|
|
|
// Pick completed synchronously.
|
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); |
|
|
|
|
|
|
|
grpc_call_combiner_set_notify_on_cancel( |
|
|
|
|
|
|
|
calld->call_combiner, |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, |
|
|
|
|
|
|
|
pick_callback_cancel_locked, elem, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(chand->combiner))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return pick_done; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
typedef struct { |
|
|
|
namespace grpc_core { |
|
|
|
grpc_call_element* elem; |
|
|
|
|
|
|
|
bool finished; |
|
|
|
// Handles waiting for a resolver result.
|
|
|
|
grpc_closure closure; |
|
|
|
// Used only for the first call on an idle channel.
|
|
|
|
grpc_closure cancel_closure; |
|
|
|
class ResolverResultWaiter { |
|
|
|
} pick_after_resolver_result_args; |
|
|
|
public: |
|
|
|
|
|
|
|
explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { |
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
// holding the call combiner.
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
static void pick_after_resolver_result_cancel_locked(void* arg, |
|
|
|
|
|
|
|
grpc_error* error) { |
|
|
|
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
|
|
|
static_cast<pick_after_resolver_result_args*>(arg); |
|
|
|
|
|
|
|
if (GPR_LIKELY(args->finished)) { |
|
|
|
|
|
|
|
gpr_free(args); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// If we don't yet have a resolver result, then a closure for
|
|
|
|
|
|
|
|
// pick_after_resolver_result_done_locked() will have been added to
|
|
|
|
|
|
|
|
// chand->waiting_for_resolver_result_closures, and it may not be invoked
|
|
|
|
|
|
|
|
// until after this call has been destroyed. We mark the operation as
|
|
|
|
|
|
|
|
// finished, so that when pick_after_resolver_result_done_locked()
|
|
|
|
|
|
|
|
// is called, it will be a no-op. We also immediately invoke
|
|
|
|
|
|
|
|
// async_pick_done_locked() to propagate the error back to the caller.
|
|
|
|
|
|
|
|
args->finished = true; |
|
|
|
|
|
|
|
grpc_call_element* elem = args->elem; |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: cancelling pick waiting for resolver result", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Note: Although we are not in the call combiner here, we are
|
|
|
|
|
|
|
|
// basically stealing the call combiner from the pending pick, so
|
|
|
|
|
|
|
|
// it's safe to call async_pick_done_locked() here -- we are
|
|
|
|
|
|
|
|
// essentially calling it here instead of calling it in
|
|
|
|
|
|
|
|
// pick_after_resolver_result_done_locked().
|
|
|
|
|
|
|
|
async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
|
|
|
"Pick cancelled", &error, 1)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_done_locked(void* arg, |
|
|
|
|
|
|
|
grpc_error* error) { |
|
|
|
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
|
|
|
static_cast<pick_after_resolver_result_args*>(arg); |
|
|
|
|
|
|
|
if (GPR_UNLIKELY(args->finished)) { |
|
|
|
|
|
|
|
/* cancelled, do nothing */ |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "call cancelled before resolver result"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
gpr_free(args); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
args->finished = true; |
|
|
|
|
|
|
|
grpc_call_element* elem = args->elem; |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: deferring pick pending resolver result", |
|
|
|
chand, calld); |
|
|
|
chand, calld); |
|
|
|
} |
|
|
|
} |
|
|
|
async_pick_done_locked(elem, GRPC_ERROR_REF(error)); |
|
|
|
// Add closure to be run when a resolver result is available.
|
|
|
|
} else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, |
|
|
|
// Shutting down.
|
|
|
|
grpc_combiner_scheduler(chand->combiner)); |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
AddToWaitingList(); |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, |
|
|
|
// Set cancellation closure, so that we abort if the call is cancelled.
|
|
|
|
calld); |
|
|
|
GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, |
|
|
|
|
|
|
|
this, grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
|
|
|
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, |
|
|
|
|
|
|
|
&cancel_closure_); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
|
|
|
// Adds closure_ to chand->waiting_for_resolver_result_closures.
|
|
|
|
|
|
|
|
void AddToWaitingList() { |
|
|
|
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem_->channel_data); |
|
|
|
|
|
|
|
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, |
|
|
|
|
|
|
|
&done_closure_, GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Invoked when a resolver result is available.
|
|
|
|
|
|
|
|
static void DoneLocked(void* arg, grpc_error* error) { |
|
|
|
|
|
|
|
ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
|
|
|
|
|
|
|
// If CancelLocked() has already run, delete ourselves without doing
|
|
|
|
|
|
|
|
// anything. Note that the call stack may have already been destroyed,
|
|
|
|
|
|
|
|
// so it's not safe to access anything in elem_.
|
|
|
|
|
|
|
|
if (GPR_UNLIKELY(self->finished_)) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "call cancelled before resolver result"); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
Delete(self); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
async_pick_done_locked( |
|
|
|
// Otherwise, process the resolver result.
|
|
|
|
elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
grpc_call_element* elem = self->elem_; |
|
|
|
} else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
// Transient resolver failure.
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
// If call has wait_for_ready=true, try again; otherwise, fail.
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
|
|
|
uint32_t send_initial_metadata_flags = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? calld->send_initial_metadata_flags |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
|
|
|
if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
"chand=%p calld=%p: resolver returned but no LB policy; " |
|
|
|
|
|
|
|
"wait_for_ready=true; trying again", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
chand, calld); |
|
|
|
} |
|
|
|
} |
|
|
|
pick_after_resolver_result_start_locked(elem); |
|
|
|
pick_done_locked(elem, GRPC_ERROR_REF(error)); |
|
|
|
|
|
|
|
} else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
|
|
|
|
// Shutting down.
|
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, |
|
|
|
|
|
|
|
calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
pick_done_locked(elem, |
|
|
|
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
|
|
|
|
} else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { |
|
|
|
|
|
|
|
// Transient resolver failure.
|
|
|
|
|
|
|
|
// If call has wait_for_ready=true, try again; otherwise, fail.
|
|
|
|
|
|
|
|
uint32_t send_initial_metadata_flags = |
|
|
|
|
|
|
|
calld->seen_send_initial_metadata |
|
|
|
|
|
|
|
? calld->send_initial_metadata_flags |
|
|
|
|
|
|
|
: calld->pending_batches[0] |
|
|
|
|
|
|
|
.batch->payload->send_initial_metadata |
|
|
|
|
|
|
|
.send_initial_metadata_flags; |
|
|
|
|
|
|
|
if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: resolver returned but no LB policy; " |
|
|
|
|
|
|
|
"wait_for_ready=true; trying again", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Re-add ourselves to the waiting list.
|
|
|
|
|
|
|
|
self->AddToWaitingList(); |
|
|
|
|
|
|
|
// Return early so that we don't set finished_ to true below.
|
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: resolver returned but no LB policy; " |
|
|
|
|
|
|
|
"wait_for_ready=false; failing", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
pick_done_locked( |
|
|
|
|
|
|
|
elem, |
|
|
|
|
|
|
|
grpc_error_set_int( |
|
|
|
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), |
|
|
|
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
|
|
|
|
|
|
|
} |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", |
|
|
|
"chand=%p calld=%p: resolver returned but no LB policy; " |
|
|
|
|
|
|
|
"wait_for_ready=false; failing", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
chand, calld); |
|
|
|
} |
|
|
|
} |
|
|
|
async_pick_done_locked( |
|
|
|
process_service_config_and_start_lb_pick_locked(elem); |
|
|
|
elem, |
|
|
|
|
|
|
|
grpc_error_set_int( |
|
|
|
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), |
|
|
|
|
|
|
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
self->finished_ = true; |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
} |
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
// Invoked when the call is cancelled.
|
|
|
|
|
|
|
|
// Note: This runs under the client_channel combiner, but will NOT be
|
|
|
|
|
|
|
|
// holding the call combiner.
|
|
|
|
|
|
|
|
static void CancelLocked(void* arg, grpc_error* error) { |
|
|
|
|
|
|
|
ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); |
|
|
|
|
|
|
|
// If DoneLocked() has already run, delete ourselves without doing anything.
|
|
|
|
|
|
|
|
if (GPR_LIKELY(self->finished_)) { |
|
|
|
|
|
|
|
Delete(self); |
|
|
|
|
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
if (GPR_LIKELY(pick_callback_start_locked(elem))) { |
|
|
|
// If we are being cancelled, immediately invoke pick_done_locked()
|
|
|
|
// Even if the LB policy returns a result synchronously, we have
|
|
|
|
// to propagate the error back to the caller.
|
|
|
|
// already added our polling entity to chand->interested_parties
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
|
|
|
// in order to wait for the resolver result, so we need to
|
|
|
|
grpc_call_element* elem = self->elem_; |
|
|
|
// remove it here. Therefore, we call async_pick_done_locked()
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
// instead of pick_done_locked().
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
async_pick_done_locked(elem, GRPC_ERROR_NONE); |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
|
|
|
"chand=%p calld=%p: cancelling call waiting for name " |
|
|
|
|
|
|
|
"resolution", |
|
|
|
|
|
|
|
chand, calld); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Note: Although we are not in the call combiner here, we are
|
|
|
|
|
|
|
|
// basically stealing the call combiner from the pending pick, so
|
|
|
|
|
|
|
|
// it's safe to call pick_done_locked() here -- we are essentially
|
|
|
|
|
|
|
|
// calling it here instead of calling it in DoneLocked().
|
|
|
|
|
|
|
|
pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
|
|
|
"Pick cancelled", &error, 1)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
self->finished_ = true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { |
|
|
|
grpc_call_element* elem_; |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
grpc_closure done_closure_; |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
grpc_closure cancel_closure_; |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
bool finished_ = false; |
|
|
|
gpr_log(GPR_INFO, |
|
|
|
}; |
|
|
|
"chand=%p calld=%p: deferring pick pending resolver result", chand, |
|
|
|
|
|
|
|
calld); |
|
|
|
} // namespace grpc_core
|
|
|
|
} |
|
|
|
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
|
|
|
static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args))); |
|
|
|
|
|
|
|
args->elem = elem; |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, |
|
|
|
|
|
|
|
args, grpc_combiner_scheduler(chand->combiner)); |
|
|
|
|
|
|
|
grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, |
|
|
|
|
|
|
|
&args->closure, GRPC_ERROR_NONE); |
|
|
|
|
|
|
|
grpc_call_combiner_set_notify_on_cancel( |
|
|
|
|
|
|
|
calld->call_combiner, |
|
|
|
|
|
|
|
GRPC_CLOSURE_INIT(&args->cancel_closure, |
|
|
|
|
|
|
|
pick_after_resolver_result_cancel_locked, args, |
|
|
|
|
|
|
|
grpc_combiner_scheduler(chand->combiner))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void start_pick_locked(void* arg, grpc_error* ignored) { |
|
|
|
static void start_pick_locked(void* arg, grpc_error* ignored) { |
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
@ -2993,31 +3034,24 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { |
|
|
|
GPR_ASSERT(calld->pick.connected_subchannel == nullptr); |
|
|
|
GPR_ASSERT(calld->pick.connected_subchannel == nullptr); |
|
|
|
GPR_ASSERT(calld->subchannel_call == nullptr); |
|
|
|
GPR_ASSERT(calld->subchannel_call == nullptr); |
|
|
|
if (GPR_LIKELY(chand->lb_policy != nullptr)) { |
|
|
|
if (GPR_LIKELY(chand->lb_policy != nullptr)) { |
|
|
|
// We already have an LB policy, so ask it for a pick.
|
|
|
|
// We already have resolver results, so process the service config
|
|
|
|
if (GPR_LIKELY(pick_callback_start_locked(elem))) { |
|
|
|
// and start an LB pick.
|
|
|
|
// Pick completed synchronously.
|
|
|
|
process_service_config_and_start_lb_pick_locked(elem); |
|
|
|
pick_done_locked(elem, GRPC_ERROR_NONE); |
|
|
|
} else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
return; |
|
|
|
pick_done_locked(elem, |
|
|
|
} |
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// We do not yet have an LB policy, so wait for a resolver result.
|
|
|
|
// We do not yet have an LB policy, so wait for a resolver result.
|
|
|
|
if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
|
|
|
|
pick_done_locked(elem, |
|
|
|
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
|
|
|
|
return; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (GPR_UNLIKELY(!chand->started_resolving)) { |
|
|
|
if (GPR_UNLIKELY(!chand->started_resolving)) { |
|
|
|
start_resolving_locked(chand); |
|
|
|
start_resolving_locked(chand); |
|
|
|
} |
|
|
|
} |
|
|
|
pick_after_resolver_result_start_locked(elem); |
|
|
|
// Create a new waiter, which will delete itself when done.
|
|
|
|
|
|
|
|
grpc_core::New<grpc_core::ResolverResultWaiter>(elem); |
|
|
|
|
|
|
|
// Add the polling entity from call_data to the channel_data's
|
|
|
|
|
|
|
|
// interested_parties, so that the I/O of the resolver can be done
|
|
|
|
|
|
|
|
// under it. It will be removed in pick_done_locked().
|
|
|
|
|
|
|
|
maybe_add_call_to_channel_interested_parties_locked(elem); |
|
|
|
} |
|
|
|
} |
|
|
|
// We need to wait for either a resolver result or for an async result
|
|
|
|
|
|
|
|
// from the LB policy. Add the polling entity from call_data to the
|
|
|
|
|
|
|
|
// channel_data's interested_parties, so that the I/O of the LB policy
|
|
|
|
|
|
|
|
// and resolver can be done under it. The polling entity will be
|
|
|
|
|
|
|
|
// removed in async_pick_done_locked().
|
|
|
|
|
|
|
|
grpc_polling_entity_add_to_pollset_set(calld->pollent, |
|
|
|
|
|
|
|
chand->interested_parties); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
//
|
|
|
|