Avoid lock/unlock for common client channel pick.

pull/3788/head
David Garcia Quintas 9 years ago
parent 504cc5e3d0
commit 5994bd167a
  1. 25
      src/core/channel/client_channel.c
  2. 17
      src/core/client_config/subchannel.c
  3. 22
      src/core/client_config/subchannel.h

@ -196,13 +196,12 @@ static int is_empty(void *p, int len) {
return 1;
}
static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
@ -230,10 +229,18 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
}
}
static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
gpr_mu_lock(&calld->mu_state);
started_call_locked(exec_ctx, arg, iomgr_success);
}
static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
grpc_subchannel_call_create_status call_creation_status;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@ -248,11 +255,15 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
&calld->subchannel_call,
&calld->async_setup_task);
call_creation_status = grpc_subchannel_create_call(
exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
&calld->async_setup_task);
if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
started_call_locked(exec_ctx, calld, iomgr_success);
} else {
gpr_mu_unlock(&calld->mu_state);
}
}
}
}

@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
grpc_subchannel_call_create_status call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
w4c->target, w4c->notify);
call_creation_status = grpc_subchannel_create_call(
exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_closure *notify) {
grpc_subchannel_call_create_status grpc_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target, grpc_closure *notify) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con);
notify->cb(exec_ctx, notify->cb_arg, 1);
return GRPC_SUBCHANNEL_CALL_CREATE_READY;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
} else {
gpr_mu_unlock(&c->mu);
}
return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
}
}

@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_closure *notify);
typedef enum {
GRPC_SUBCHANNEL_CALL_CREATE_READY,
GRPC_SUBCHANNEL_CALL_CREATE_PENDING
} grpc_subchannel_call_create_status;
/** construct a subchannel call (possibly asynchronously).
*
* If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
* return immediately and \a target will point to a connected \a subchannel_call
* instance. Note that \a notify will \em not be invoked in this case.
* Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
* subchannel call will be created asynchronously, invoking the \a notify
* callback upon completion. */
grpc_subchannel_call_create_status grpc_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
grpc_subchannel_call **target, grpc_closure *notify);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,

Loading…
Cancel
Save