|
|
|
@ -1123,7 +1123,8 @@ static void pending_batches_add(grpc_call_element* elem, |
|
|
|
|
if (batch->send_trailing_metadata) { |
|
|
|
|
calld->pending_send_trailing_metadata = true; |
|
|
|
|
} |
|
|
|
|
if (calld->bytes_buffered_for_retry > chand->per_rpc_retry_buffer_size) { |
|
|
|
|
if (GPR_UNLIKELY(calld->bytes_buffered_for_retry > |
|
|
|
|
chand->per_rpc_retry_buffer_size)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: exceeded retry buffer size, committing", |
|
|
|
@ -1470,7 +1471,7 @@ static bool maybe_retry(grpc_call_element* elem, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Check status.
|
|
|
|
|
if (status == GRPC_STATUS_OK) { |
|
|
|
|
if (GPR_LIKELY(status == GRPC_STATUS_OK)) { |
|
|
|
|
if (calld->retry_throttle_data != nullptr) { |
|
|
|
|
calld->retry_throttle_data->RecordSuccess(); |
|
|
|
|
} |
|
|
|
@ -1664,8 +1665,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { |
|
|
|
|
// the recv_trailing_metadata on_complete callback, then defer
|
|
|
|
|
// propagating this callback back to the surface. We can evaluate whether
|
|
|
|
|
// to retry when recv_trailing_metadata comes back.
|
|
|
|
|
if ((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && |
|
|
|
|
!retry_state->completed_recv_trailing_metadata) { |
|
|
|
|
if (GPR_UNLIKELY((batch_data->trailing_metadata_available || |
|
|
|
|
error != GRPC_ERROR_NONE) && |
|
|
|
|
!retry_state->completed_recv_trailing_metadata)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: deferring recv_initial_metadata_ready " |
|
|
|
@ -1753,8 +1755,9 @@ static void recv_message_ready(void* arg, grpc_error* error) { |
|
|
|
|
// the recv_trailing_metadata on_complete callback, then defer
|
|
|
|
|
// propagating this callback back to the surface. We can evaluate whether
|
|
|
|
|
// to retry when recv_trailing_metadata comes back.
|
|
|
|
|
if ((batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && |
|
|
|
|
!retry_state->completed_recv_trailing_metadata) { |
|
|
|
|
if (GPR_UNLIKELY( |
|
|
|
|
(batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && |
|
|
|
|
!retry_state->completed_recv_trailing_metadata)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: deferring recv_message_ready (nullptr " |
|
|
|
@ -1867,7 +1870,8 @@ static void add_closures_for_deferred_recv_callbacks( |
|
|
|
|
closure_to_execute* closures, size_t* num_closures) { |
|
|
|
|
if (batch_data->batch.recv_trailing_metadata) { |
|
|
|
|
// Add closure for deferred recv_initial_metadata_ready.
|
|
|
|
|
if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { |
|
|
|
|
if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != |
|
|
|
|
nullptr)) { |
|
|
|
|
closure_to_execute* closure = &closures[(*num_closures)++]; |
|
|
|
|
closure->closure = GRPC_CLOSURE_INIT( |
|
|
|
|
&batch_data->recv_initial_metadata_ready, |
|
|
|
@ -1879,7 +1883,8 @@ static void add_closures_for_deferred_recv_callbacks( |
|
|
|
|
retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; |
|
|
|
|
} |
|
|
|
|
// Add closure for deferred recv_message_ready.
|
|
|
|
|
if (retry_state->recv_message_ready_deferred_batch != nullptr) { |
|
|
|
|
if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != |
|
|
|
|
nullptr)) { |
|
|
|
|
closure_to_execute* closure = &closures[(*num_closures)++]; |
|
|
|
|
closure->closure = GRPC_CLOSURE_INIT( |
|
|
|
|
&batch_data->recv_message_ready, invoke_recv_message_callback, |
|
|
|
@ -2048,7 +2053,7 @@ static void on_complete(void* arg, grpc_error* error) { |
|
|
|
|
// an error or (b) we receive status.
|
|
|
|
|
grpc_status_code status = GRPC_STATUS_OK; |
|
|
|
|
grpc_mdelem* server_pushback_md = nullptr; |
|
|
|
|
if (error != GRPC_ERROR_NONE) { // Case (a).
|
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a).
|
|
|
|
|
call_finished = true; |
|
|
|
|
grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, |
|
|
|
|
nullptr); |
|
|
|
@ -2224,13 +2229,13 @@ static void add_retriable_send_initial_metadata_op( |
|
|
|
|
grpc_metadata_batch_copy(&calld->send_initial_metadata, |
|
|
|
|
&batch_data->send_initial_metadata, |
|
|
|
|
batch_data->send_initial_metadata_storage); |
|
|
|
|
if (batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts != |
|
|
|
|
nullptr) { |
|
|
|
|
if (GPR_UNLIKELY(batch_data->send_initial_metadata.idx.named |
|
|
|
|
.grpc_previous_rpc_attempts != nullptr)) { |
|
|
|
|
grpc_metadata_batch_remove( |
|
|
|
|
&batch_data->send_initial_metadata, |
|
|
|
|
batch_data->send_initial_metadata.idx.named.grpc_previous_rpc_attempts); |
|
|
|
|
} |
|
|
|
|
if (calld->num_attempts_completed > 0) { |
|
|
|
|
if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) { |
|
|
|
|
grpc_mdelem retry_md = grpc_mdelem_from_slices( |
|
|
|
|
GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS, |
|
|
|
|
*retry_count_strings[calld->num_attempts_completed - 1]); |
|
|
|
@ -2480,7 +2485,8 @@ static void add_subchannel_batches_for_pending_batches( |
|
|
|
|
// If we previously completed a recv_trailing_metadata op
|
|
|
|
|
// initiated by start_internal_recv_trailing_metadata(), use the
|
|
|
|
|
// result of that instead of trying to re-start this op.
|
|
|
|
|
if (retry_state->recv_trailing_metadata_internal_batch != nullptr) { |
|
|
|
|
if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch != |
|
|
|
|
nullptr))) { |
|
|
|
|
// If the batch completed, then trigger the completion callback
|
|
|
|
|
// directly, so that we return the previously returned results to
|
|
|
|
|
// the application. Otherwise, just unref the internally
|
|
|
|
@ -2628,7 +2634,7 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
|
|
|
|
chand, calld, calld->subchannel_call, grpc_error_string(new_error)); |
|
|
|
|
} |
|
|
|
|
if (new_error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { |
|
|
|
|
new_error = grpc_error_add_child(new_error, error); |
|
|
|
|
pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
|
|
|
|
} else { |
|
|
|
@ -2649,7 +2655,7 @@ static void pick_done(void* arg, grpc_error* error) { |
|
|
|
|
grpc_call_element* elem = static_cast<grpc_call_element*>(arg); |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
if (calld->pick.connected_subchannel == nullptr) { |
|
|
|
|
if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) { |
|
|
|
|
// Failed to create subchannel.
|
|
|
|
|
// If there was no error, this is an LB policy drop, in which case
|
|
|
|
|
// we return an error; otherwise, we may retry.
|
|
|
|
@ -2708,7 +2714,7 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { |
|
|
|
|
// Note: chand->lb_policy may have changed since we started our pick,
|
|
|
|
|
// in which case we will be cancelling the pick on a policy other than
|
|
|
|
|
// the one we started it on. However, this will just be a no-op.
|
|
|
|
|
if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { |
|
|
|
|
if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", |
|
|
|
|
chand, calld, chand->lb_policy.get()); |
|
|
|
@ -2780,7 +2786,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { |
|
|
|
|
calld, chand->lb_policy.get()); |
|
|
|
|
} |
|
|
|
|
// Only get service config data on the first attempt.
|
|
|
|
|
if (calld->num_attempts_completed == 0) { |
|
|
|
|
if (GPR_LIKELY(calld->num_attempts_completed == 0)) { |
|
|
|
|
apply_service_config_to_call_locked(elem); |
|
|
|
|
} |
|
|
|
|
// If the application explicitly set wait_for_ready, use that.
|
|
|
|
@ -2807,7 +2813,8 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { |
|
|
|
|
calld->method_params != nullptr && |
|
|
|
|
calld->method_params->wait_for_ready() != |
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_UNSET; |
|
|
|
|
if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { |
|
|
|
|
if (GPR_UNLIKELY(!wait_for_ready_set_from_api && |
|
|
|
|
wait_for_ready_set_from_service_config)) { |
|
|
|
|
if (calld->method_params->wait_for_ready() == |
|
|
|
|
ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { |
|
|
|
|
send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; |
|
|
|
@ -2821,7 +2828,7 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { |
|
|
|
|
calld->pick.on_complete = &calld->pick_closure; |
|
|
|
|
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); |
|
|
|
|
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); |
|
|
|
|
if (pick_done) { |
|
|
|
|
if (GPR_LIKELY(pick_done)) { |
|
|
|
|
// Pick completed synchronously.
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", |
|
|
|
@ -2852,7 +2859,7 @@ static void pick_after_resolver_result_cancel_locked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
static_cast<pick_after_resolver_result_args*>(arg); |
|
|
|
|
if (args->finished) { |
|
|
|
|
if (GPR_LIKELY(args->finished)) { |
|
|
|
|
gpr_free(args); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -2885,7 +2892,7 @@ static void pick_after_resolver_result_done_locked(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
pick_after_resolver_result_args* args = |
|
|
|
|
static_cast<pick_after_resolver_result_args*>(arg); |
|
|
|
|
if (args->finished) { |
|
|
|
|
if (GPR_UNLIKELY(args->finished)) { |
|
|
|
|
/* cancelled, do nothing */ |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "call cancelled before resolver result"); |
|
|
|
@ -2897,13 +2904,13 @@ static void pick_after_resolver_result_done_locked(void* arg, |
|
|
|
|
grpc_call_element* elem = args->elem; |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
if (error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
async_pick_done_locked(elem, GRPC_ERROR_REF(error)); |
|
|
|
|
} else if (chand->resolver == nullptr) { |
|
|
|
|
} else if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
|
// Shutting down.
|
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, |
|
|
|
@ -2911,7 +2918,7 @@ static void pick_after_resolver_result_done_locked(void* arg, |
|
|
|
|
} |
|
|
|
|
async_pick_done_locked( |
|
|
|
|
elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
|
} else if (chand->lb_policy == nullptr) { |
|
|
|
|
} else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { |
|
|
|
|
// Transient resolver failure.
|
|
|
|
|
// If call has wait_for_ready=true, try again; otherwise, fail.
|
|
|
|
|
uint32_t send_initial_metadata_flags = |
|
|
|
@ -2946,7 +2953,7 @@ static void pick_after_resolver_result_done_locked(void* arg, |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick", |
|
|
|
|
chand, calld); |
|
|
|
|
} |
|
|
|
|
if (pick_callback_start_locked(elem)) { |
|
|
|
|
if (GPR_LIKELY(pick_callback_start_locked(elem))) { |
|
|
|
|
// Even if the LB policy returns a result synchronously, we have
|
|
|
|
|
// already added our polling entity to chand->interested_parties
|
|
|
|
|
// in order to wait for the resolver result, so we need to
|
|
|
|
@ -2985,21 +2992,21 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
GPR_ASSERT(calld->pick.connected_subchannel == nullptr); |
|
|
|
|
GPR_ASSERT(calld->subchannel_call == nullptr); |
|
|
|
|
if (chand->lb_policy != nullptr) { |
|
|
|
|
if (GPR_LIKELY(chand->lb_policy != nullptr)) { |
|
|
|
|
// We already have an LB policy, so ask it for a pick.
|
|
|
|
|
if (pick_callback_start_locked(elem)) { |
|
|
|
|
if (GPR_LIKELY(pick_callback_start_locked(elem))) { |
|
|
|
|
// Pick completed synchronously.
|
|
|
|
|
pick_done_locked(elem, GRPC_ERROR_NONE); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// We do not yet have an LB policy, so wait for a resolver result.
|
|
|
|
|
if (chand->resolver == nullptr) { |
|
|
|
|
if (GPR_UNLIKELY(chand->resolver == nullptr)) { |
|
|
|
|
pick_done_locked(elem, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (!chand->started_resolving) { |
|
|
|
|
if (GPR_UNLIKELY(!chand->started_resolving)) { |
|
|
|
|
start_resolving_locked(chand); |
|
|
|
|
} |
|
|
|
|
pick_after_resolver_result_start_locked(elem); |
|
|
|
@ -3022,11 +3029,11 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0); |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
|
|
|
|
grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); |
|
|
|
|
} |
|
|
|
|
// If we've previously been cancelled, immediately fail any new batches.
|
|
|
|
|
if (calld->cancel_error != GRPC_ERROR_NONE) { |
|
|
|
|
if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s", |
|
|
|
|
chand, calld, grpc_error_string(calld->cancel_error)); |
|
|
|
@ -3037,7 +3044,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Handle cancellation.
|
|
|
|
|
if (batch->cancel_stream) { |
|
|
|
|
if (GPR_UNLIKELY(batch->cancel_stream)) { |
|
|
|
|
// Stash a copy of cancel_error in our call data, so that we can use
|
|
|
|
|
// it for subsequent operations. This ensures that if the call is
|
|
|
|
|
// cancelled before any batches are passed down (e.g., if the deadline
|
|
|
|
@ -3083,7 +3090,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
// We do not yet have a subchannel call.
|
|
|
|
|
// For batches containing a send_initial_metadata op, enter the channel
|
|
|
|
|
// combiner to start a pick.
|
|
|
|
|
if (batch->send_initial_metadata) { |
|
|
|
|
if (GPR_LIKELY(batch->send_initial_metadata)) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner", |
|
|
|
|
chand, calld); |
|
|
|
@ -3116,7 +3123,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, |
|
|
|
|
calld->arena = args->arena; |
|
|
|
|
calld->owning_call = args->call_stack; |
|
|
|
|
calld->call_combiner = args->call_combiner; |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
|
|
|
|
grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, |
|
|
|
|
calld->deadline); |
|
|
|
|
} |
|
|
|
@ -3131,14 +3138,14 @@ static void cc_destroy_call_elem(grpc_call_element* elem, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
if (chand->deadline_checking_enabled) { |
|
|
|
|
if (GPR_LIKELY(chand->deadline_checking_enabled)) { |
|
|
|
|
grpc_deadline_state_destroy(elem); |
|
|
|
|
} |
|
|
|
|
grpc_slice_unref_internal(calld->path); |
|
|
|
|
calld->retry_throttle_data.reset(); |
|
|
|
|
calld->method_params.reset(); |
|
|
|
|
GRPC_ERROR_UNREF(calld->cancel_error); |
|
|
|
|
if (calld->subchannel_call != nullptr) { |
|
|
|
|
if (GPR_LIKELY(calld->subchannel_call != nullptr)) { |
|
|
|
|
grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, |
|
|
|
|
then_schedule_closure); |
|
|
|
|
then_schedule_closure = nullptr; |
|
|
|
@ -3148,7 +3155,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, |
|
|
|
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
|
|
|
|
GPR_ASSERT(calld->pending_batches[i].batch == nullptr); |
|
|
|
|
} |
|
|
|
|
if (calld->pick.connected_subchannel != nullptr) { |
|
|
|
|
if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) { |
|
|
|
|
calld->pick.connected_subchannel.reset(); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { |
|
|
|
|