pull/3643/head
David Garcia Quintas 9 years ago
parent 0fe72429a0
commit c22adbcd1f
  1. 29
      src/core/channel/client_uchannel.c

@ -54,7 +54,7 @@
typedef struct call_data call_data; typedef struct call_data call_data;
typedef struct { typedef struct client_uchannel_channel_data {
/** metadata context for this channel */ /** metadata context for this channel */
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
@ -171,13 +171,12 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
&chand->connectivity_cb); &chand->connectivity_cb);
} }
static void started_call(grpc_exec_ctx *exec_ctx, void *arg, static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) { int iomgr_success) {
call_data *calld = arg; call_data *calld = arg;
grpc_transport_stream_op op; grpc_transport_stream_op op;
int have_waiting; int have_waiting;
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED && iomgr_success == 0) { if (calld->state == CALL_CANCELLED && iomgr_success == 0) {
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
@ -215,6 +214,13 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
gpr_mu_lock(&calld->mu_state);
started_call_locked(exec_ctx, arg, iomgr_success);
}
static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
grpc_transport_stream_op *new_op) { grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
@ -334,13 +340,18 @@ static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
calld->state = CALL_WAITING_FOR_SEND; calld->state = CALL_WAITING_FOR_SEND;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
} else { } else {
grpc_subchannel_call_create_status call_creation_status;
grpc_pollset *pollset = calld->waiting_op.bind_pollset; grpc_pollset *pollset = calld->waiting_op.bind_pollset;
calld->state = CALL_WAITING_FOR_CALL; calld->state = CALL_WAITING_FOR_CALL;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, started_call, calld); grpc_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(exec_ctx, chand->subchannel, pollset, call_creation_status = grpc_subchannel_create_call(
&calld->subchannel_call, exec_ctx, chand->subchannel, pollset, &calld->subchannel_call,
&calld->async_setup_task); &calld->async_setup_task);
if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
started_call_locked(exec_ctx, calld, 1);
} else {
gpr_mu_unlock(&calld->mu_state);
}
} }
} }
break; break;
@ -417,9 +428,7 @@ static void cmc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
break; break;
case CALL_WAITING_FOR_CALL: case CALL_WAITING_FOR_CALL:
case CALL_WAITING_FOR_SEND: case CALL_WAITING_FOR_SEND:
gpr_log(GPR_ERROR, "should never reach here"); GPR_UNREACHABLE_CODE(return );
abort();
break;
} }
} }

Loading…
Cancel
Save