|
|
|
@ -869,32 +869,34 @@ static bool pick_subchannel_locked( |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
grpc_transport_stream_op *op = arg; |
|
|
|
|
grpc_call_element *elem = op->handler_private.args[0]; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
static void cc_start_transport_stream_op_locked_inner( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
channel_data *chand = elem->channel_data; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
grpc_subchannel_call *call; |
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
|
/* need to recheck that another thread hasn't set the call */ |
|
|
|
|
call = GET_CALL(calld); |
|
|
|
|
if (call == CANCELLED_CALL) { |
|
|
|
|
grpc_transport_stream_op_finish_with_failure( |
|
|
|
|
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); |
|
|
|
|
goto done; |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (call != NULL) { |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, call, op); |
|
|
|
|
goto done; |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* if this is a cancellation, then we can raise our cancelled flag */ |
|
|
|
|
if (op->cancel_error != GRPC_ERROR_NONE) { |
|
|
|
|
if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, |
|
|
|
|
(gpr_atm)(uintptr_t)CANCELLED_CALL)) { |
|
|
|
|
goto retry; |
|
|
|
|
/* recurse to retry */ |
|
|
|
|
cc_start_transport_stream_op_locked_inner(exec_ctx, op, elem); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} else { |
|
|
|
|
// Stash a copy of cancel_error in our call data, so that we can use
|
|
|
|
|
// it for subsequent operations. This ensures that if the call is
|
|
|
|
@ -914,7 +916,8 @@ retry: |
|
|
|
|
} |
|
|
|
|
grpc_transport_stream_op_finish_with_failure( |
|
|
|
|
exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); |
|
|
|
|
goto done; |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* if we don't have a subchannel, try to get one */ |
|
|
|
@ -954,14 +957,29 @@ retry: |
|
|
|
|
gpr_atm_rel_store(&calld->subchannel_call, |
|
|
|
|
(gpr_atm)(uintptr_t)subchannel_call); |
|
|
|
|
retry_waiting_locked(exec_ctx, calld); |
|
|
|
|
goto retry; |
|
|
|
|
/* recurse to retry */ |
|
|
|
|
cc_start_transport_stream_op_locked_inner(exec_ctx, op, elem); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* nothing to be done but wait */ |
|
|
|
|
add_waiting_locked(calld, op); |
|
|
|
|
done: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, |
|
|
|
|
void *arg, |
|
|
|
|
grpc_error *error_ignored) { |
|
|
|
|
GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); |
|
|
|
|
|
|
|
|
|
grpc_transport_stream_op *op = arg; |
|
|
|
|
grpc_call_element *elem = op->handler_private.args[0]; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
|
|
|
|
|
cc_start_transport_stream_op_locked_inner(exec_ctx, op, elem); |
|
|
|
|
|
|
|
|
|
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, |
|
|
|
|
"start_transport_stream_op"); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op", 0); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The logic here is fairly complicated, due to (a) the fact that we
|
|
|
|
@ -983,11 +1001,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
grpc_transport_stream_op_finish_with_failure( |
|
|
|
|
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op", 0); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (call != NULL) { |
|
|
|
|
grpc_subchannel_call_process_op(exec_ctx, call, op); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op", 0); |
|
|
|
|
/* early out */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
/* we failed; lock and figure out what to do */ |
|
|
|
@ -999,6 +1019,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, |
|
|
|
|
cc_start_transport_stream_op_locked, op, |
|
|
|
|
grpc_combiner_scheduler(chand->combiner, false)), |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
GPR_TIMER_END("cc_start_transport_stream_op", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Gets data from the service config. Invoked when the resolver returns
|
|
|
|
|