|
|
|
@ -394,7 +394,7 @@ struct subchannel_batch_data { |
|
|
|
|
|
|
|
|
|
gpr_refcount refs; |
|
|
|
|
grpc_call_element* elem; |
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call; |
|
|
|
|
grpc_subchannel_call* subchannel_call; // Holds a ref.
|
|
|
|
|
// The batch to use in the subchannel call.
|
|
|
|
|
// Its payload field points to subchannel_call_retry_state.batch_payload.
|
|
|
|
|
grpc_transport_stream_op_batch batch; |
|
|
|
@ -478,7 +478,7 @@ struct pending_batch { |
|
|
|
|
bool send_ops_cached; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/** Call data. Holds a pointer to SubchannelCall and the
|
|
|
|
|
/** Call data. Holds a pointer to grpc_subchannel_call and the
|
|
|
|
|
associated machinery to create such a pointer. |
|
|
|
|
Handles queueing of stream ops until a call object is ready, waiting |
|
|
|
|
for initial metadata before trying to create a call object, |
|
|
|
@ -504,6 +504,10 @@ struct call_data { |
|
|
|
|
last_attempt_got_server_pushback(false) {} |
|
|
|
|
|
|
|
|
|
~call_data() { |
|
|
|
|
if (GPR_LIKELY(subchannel_call != nullptr)) { |
|
|
|
|
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, |
|
|
|
|
"client_channel_destroy_call"); |
|
|
|
|
} |
|
|
|
|
grpc_slice_unref_internal(path); |
|
|
|
|
GRPC_ERROR_UNREF(cancel_error); |
|
|
|
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { |
|
|
|
@ -532,7 +536,7 @@ struct call_data { |
|
|
|
|
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; |
|
|
|
|
grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params; |
|
|
|
|
|
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call; |
|
|
|
|
grpc_subchannel_call* subchannel_call = nullptr; |
|
|
|
|
|
|
|
|
|
// Set when we get a cancel_stream op.
|
|
|
|
|
grpc_error* cancel_error = GRPC_ERROR_NONE; |
|
|
|
@ -803,8 +807,8 @@ static void pending_batches_add(grpc_call_element* elem, |
|
|
|
|
calld->subchannel_call == nullptr |
|
|
|
|
? nullptr |
|
|
|
|
: static_cast<subchannel_call_retry_state*>( |
|
|
|
|
|
|
|
|
|
calld->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
calld->subchannel_call)); |
|
|
|
|
retry_commit(elem, retry_state); |
|
|
|
|
// If we are not going to retry and have not yet started, pretend
|
|
|
|
|
// retries are disabled so that we don't bother with retry overhead.
|
|
|
|
@ -892,10 +896,10 @@ static void resume_pending_batch_in_call_combiner(void* arg, |
|
|
|
|
grpc_error* ignored) { |
|
|
|
|
grpc_transport_stream_op_batch* batch = |
|
|
|
|
static_cast<grpc_transport_stream_op_batch*>(arg); |
|
|
|
|
grpc_core::SubchannelCall* subchannel_call = |
|
|
|
|
static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg); |
|
|
|
|
grpc_subchannel_call* subchannel_call = |
|
|
|
|
static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
subchannel_call->StartTransportStreamOpBatch(batch); |
|
|
|
|
grpc_subchannel_call_process_op(subchannel_call, batch); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This is called via the call combiner, so access to calld is synchronized.
|
|
|
|
@ -915,7 +919,7 @@ static void pending_batches_resume(grpc_call_element* elem) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting %" PRIuPTR |
|
|
|
|
" pending batches on subchannel_call=%p", |
|
|
|
|
chand, calld, num_batches, calld->subchannel_call.get()); |
|
|
|
|
chand, calld, num_batches, calld->subchannel_call); |
|
|
|
|
} |
|
|
|
|
grpc_core::CallCombinerClosureList closures; |
|
|
|
|
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { |
|
|
|
@ -926,7 +930,7 @@ static void pending_batches_resume(grpc_call_element* elem) { |
|
|
|
|
maybe_inject_recv_trailing_metadata_ready_for_lb( |
|
|
|
|
*calld->request->pick(), batch); |
|
|
|
|
} |
|
|
|
|
batch->handler_private.extra_arg = calld->subchannel_call.get(); |
|
|
|
|
batch->handler_private.extra_arg = calld->subchannel_call; |
|
|
|
|
GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
|
|
|
|
resume_pending_batch_in_call_combiner, batch, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
@ -1015,7 +1019,12 @@ static void do_retry(grpc_call_element* elem, |
|
|
|
|
const ClientChannelMethodParams::RetryPolicy* retry_policy = |
|
|
|
|
calld->method_params->retry_policy(); |
|
|
|
|
GPR_ASSERT(retry_policy != nullptr); |
|
|
|
|
calld->subchannel_call.reset(); |
|
|
|
|
// Reset subchannel call and connected subchannel.
|
|
|
|
|
if (calld->subchannel_call != nullptr) { |
|
|
|
|
GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, |
|
|
|
|
"client_channel_call_retry"); |
|
|
|
|
calld->subchannel_call = nullptr; |
|
|
|
|
} |
|
|
|
|
if (calld->have_request) { |
|
|
|
|
calld->have_request = false; |
|
|
|
|
calld->request.Destroy(); |
|
|
|
@ -1069,7 +1078,8 @@ static bool maybe_retry(grpc_call_element* elem, |
|
|
|
|
subchannel_call_retry_state* retry_state = nullptr; |
|
|
|
|
if (batch_data != nullptr) { |
|
|
|
|
retry_state = static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
if (retry_state->retry_dispatched) { |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, |
|
|
|
@ -1170,10 +1180,13 @@ namespace { |
|
|
|
|
subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, |
|
|
|
|
call_data* calld, int refcount, |
|
|
|
|
bool set_on_complete) |
|
|
|
|
: elem(elem), subchannel_call(calld->subchannel_call) { |
|
|
|
|
: elem(elem), |
|
|
|
|
subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, |
|
|
|
|
"batch_data_create")) { |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
calld->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
calld->subchannel_call)); |
|
|
|
|
batch.payload = &retry_state->batch_payload; |
|
|
|
|
gpr_ref_init(&refs, refcount); |
|
|
|
|
if (set_on_complete) { |
|
|
|
@ -1187,7 +1200,7 @@ subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, |
|
|
|
|
void subchannel_batch_data::destroy() { |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data(subchannel_call)); |
|
|
|
|
if (batch.send_initial_metadata) { |
|
|
|
|
grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); |
|
|
|
|
} |
|
|
|
@ -1200,7 +1213,7 @@ void subchannel_batch_data::destroy() { |
|
|
|
|
if (batch.recv_trailing_metadata) { |
|
|
|
|
grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); |
|
|
|
|
} |
|
|
|
|
subchannel_call.reset(); |
|
|
|
|
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); |
|
|
|
|
} |
|
|
|
@ -1247,7 +1260,8 @@ static void invoke_recv_initial_metadata_callback(void* arg, |
|
|
|
|
// Return metadata.
|
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
grpc_metadata_batch_move( |
|
|
|
|
&retry_state->recv_initial_metadata, |
|
|
|
|
pending->batch->payload->recv_initial_metadata.recv_initial_metadata); |
|
|
|
@ -1279,7 +1293,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
retry_state->completed_recv_initial_metadata = true; |
|
|
|
|
// If a retry was already dispatched, then we're not going to use the
|
|
|
|
|
// result of this recv_initial_metadata op, so do nothing.
|
|
|
|
@ -1340,7 +1355,8 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { |
|
|
|
|
// Return payload.
|
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
*pending->batch->payload->recv_message.recv_message = |
|
|
|
|
std::move(retry_state->recv_message); |
|
|
|
|
// Update bookkeeping.
|
|
|
|
@ -1368,7 +1384,8 @@ static void recv_message_ready(void* arg, grpc_error* error) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
++retry_state->completed_recv_message_count; |
|
|
|
|
// If a retry was already dispatched, then we're not going to use the
|
|
|
|
|
// result of this recv_message op, so do nothing.
|
|
|
|
@ -1456,7 +1473,8 @@ static void add_closure_for_recv_trailing_metadata_ready( |
|
|
|
|
// Return metadata.
|
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
grpc_metadata_batch_move( |
|
|
|
|
&retry_state->recv_trailing_metadata, |
|
|
|
|
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); |
|
|
|
@ -1558,7 +1576,8 @@ static void run_closures_for_completed_call(subchannel_batch_data* batch_data, |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
// Construct list of closures to execute.
|
|
|
|
|
grpc_core::CallCombinerClosureList closures; |
|
|
|
|
// First, add closure for recv_trailing_metadata_ready.
|
|
|
|
@ -1592,7 +1611,8 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
retry_state->completed_recv_trailing_metadata = true; |
|
|
|
|
// Get the call's status and check for server pushback metadata.
|
|
|
|
|
grpc_status_code status = GRPC_STATUS_OK; |
|
|
|
@ -1715,7 +1735,8 @@ static void on_complete(void* arg, grpc_error* error) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
batch_data->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
batch_data->subchannel_call)); |
|
|
|
|
// Update bookkeeping in retry_state.
|
|
|
|
|
if (batch_data->batch.send_initial_metadata) { |
|
|
|
|
retry_state->completed_send_initial_metadata = true; |
|
|
|
@ -1771,10 +1792,10 @@ static void on_complete(void* arg, grpc_error* error) { |
|
|
|
|
static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { |
|
|
|
|
grpc_transport_stream_op_batch* batch = |
|
|
|
|
static_cast<grpc_transport_stream_op_batch*>(arg); |
|
|
|
|
grpc_core::SubchannelCall* subchannel_call = |
|
|
|
|
static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg); |
|
|
|
|
grpc_subchannel_call* subchannel_call = |
|
|
|
|
static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg); |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
subchannel_call->StartTransportStreamOpBatch(batch); |
|
|
|
|
grpc_subchannel_call_process_op(subchannel_call, batch); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Adds a closure to closures that will execute batch in the call combiner.
|
|
|
|
@ -1783,7 +1804,7 @@ static void add_closure_for_subchannel_batch( |
|
|
|
|
grpc_core::CallCombinerClosureList* closures) { |
|
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
batch->handler_private.extra_arg = calld->subchannel_call.get(); |
|
|
|
|
batch->handler_private.extra_arg = calld->subchannel_call; |
|
|
|
|
GRPC_CLOSURE_INIT(&batch->handler_private.closure, |
|
|
|
|
start_batch_in_call_combiner, batch, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
@ -1957,7 +1978,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
calld->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
calld->subchannel_call)); |
|
|
|
|
// Create batch_data with 2 refs, since this batch will be unreffed twice:
|
|
|
|
|
// once for the recv_trailing_metadata_ready callback when the subchannel
|
|
|
|
|
// batch returns, and again when we actually get a recv_trailing_metadata
|
|
|
|
@ -1967,7 +1989,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { |
|
|
|
|
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); |
|
|
|
|
retry_state->recv_trailing_metadata_internal_batch = batch_data; |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch); |
|
|
|
|
grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If there are any cached send ops that need to be replayed on the
|
|
|
|
@ -2174,7 +2196,8 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { |
|
|
|
|
} |
|
|
|
|
subchannel_call_retry_state* retry_state = |
|
|
|
|
static_cast<subchannel_call_retry_state*>( |
|
|
|
|
calld->subchannel_call->GetParentData()); |
|
|
|
|
grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
calld->subchannel_call)); |
|
|
|
|
// Construct list of closures to execute, one for each pending batch.
|
|
|
|
|
grpc_core::CallCombinerClosureList closures; |
|
|
|
|
// Replay previously-returned send_* ops if needed.
|
|
|
|
@ -2197,7 +2220,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting %" PRIuPTR |
|
|
|
|
" retriable batches on subchannel_call=%p", |
|
|
|
|
chand, calld, closures.size(), calld->subchannel_call.get()); |
|
|
|
|
chand, calld, closures.size(), calld->subchannel_call); |
|
|
|
|
} |
|
|
|
|
// Note: This will yield the call combiner.
|
|
|
|
|
closures.RunClosures(calld->call_combiner); |
|
|
|
@ -2222,22 +2245,22 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { |
|
|
|
|
calld->call_combiner, // call_combiner
|
|
|
|
|
parent_data_size // parent_data_size
|
|
|
|
|
}; |
|
|
|
|
grpc_error* new_error = GRPC_ERROR_NONE; |
|
|
|
|
calld->subchannel_call = |
|
|
|
|
calld->request->pick()->connected_subchannel->CreateCall(call_args, |
|
|
|
|
&new_error); |
|
|
|
|
grpc_error* new_error = |
|
|
|
|
calld->request->pick()->connected_subchannel->CreateCall( |
|
|
|
|
call_args, &calld->subchannel_call); |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", |
|
|
|
|
chand, calld, calld->subchannel_call.get(), |
|
|
|
|
grpc_error_string(new_error)); |
|
|
|
|
chand, calld, calld->subchannel_call, grpc_error_string(new_error)); |
|
|
|
|
} |
|
|
|
|
if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { |
|
|
|
|
new_error = grpc_error_add_child(new_error, error); |
|
|
|
|
pending_batches_fail(elem, new_error, true /* yield_call_combiner */); |
|
|
|
|
} else { |
|
|
|
|
if (parent_data_size > 0) { |
|
|
|
|
new (calld->subchannel_call->GetParentData()) subchannel_call_retry_state( |
|
|
|
|
calld->request->pick()->subchannel_call_context); |
|
|
|
|
new (grpc_connected_subchannel_call_get_parent_data( |
|
|
|
|
calld->subchannel_call)) |
|
|
|
|
subchannel_call_retry_state( |
|
|
|
|
calld->request->pick()->subchannel_call_context); |
|
|
|
|
} |
|
|
|
|
pending_batches_resume(elem); |
|
|
|
|
} |
|
|
|
@ -2465,7 +2488,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); |
|
|
|
|
} else { |
|
|
|
|
// Note: This will release the call combiner.
|
|
|
|
|
calld->subchannel_call->StartTransportStreamOpBatch(batch); |
|
|
|
|
grpc_subchannel_call_process_op(calld->subchannel_call, batch); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -2479,7 +2502,7 @@ static void cc_start_transport_stream_op_batch( |
|
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"chand=%p calld=%p: starting batch on subchannel_call=%p", chand, |
|
|
|
|
calld, calld->subchannel_call.get()); |
|
|
|
|
calld, calld->subchannel_call); |
|
|
|
|
} |
|
|
|
|
pending_batches_resume(elem); |
|
|
|
|
return; |
|
|
|
@ -2522,7 +2545,8 @@ static void cc_destroy_call_elem(grpc_call_element* elem, |
|
|
|
|
grpc_closure* then_schedule_closure) { |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
if (GPR_LIKELY(calld->subchannel_call != nullptr)) { |
|
|
|
|
calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure); |
|
|
|
|
grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, |
|
|
|
|
then_schedule_closure); |
|
|
|
|
then_schedule_closure = nullptr; |
|
|
|
|
} |
|
|
|
|
calld->~call_data(); |
|
|
|
@ -2728,8 +2752,8 @@ void grpc_client_channel_watch_connectivity_state( |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> |
|
|
|
|
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) { |
|
|
|
|
grpc_subchannel_call* grpc_client_channel_get_subchannel_call( |
|
|
|
|
grpc_call_element* elem) { |
|
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
|
return calld->subchannel_call; |
|
|
|
|
} |
|
|
|
|