@ -51,12 +51,13 @@ typedef struct { grpc_channel_element *back; } lb_channel_data;
typedef struct { grpc_call_element * back ; } lb_call_data ;
static void lb_call_op ( grpc_call_element * elem , grpc_call_op * op ) {
static void lb_call_op ( grpc_call_element * elem , grpc_call_element * from_elem ,
grpc_call_op * op ) {
lb_call_data * calld = elem - > call_data ;
switch ( op - > dir ) {
case GRPC_CALL_UP :
calld - > back - > filter - > call_op ( calld - > back , op ) ;
calld - > back - > filter - > call_op ( calld - > back , elem , op ) ;
break ;
case GRPC_CALL_DOWN :
grpc_call_next_op ( elem , op ) ;
@ -65,12 +66,14 @@ static void lb_call_op(grpc_call_element *elem, grpc_call_op *op) {
}
/* Currently we assume all channel operations should just be pushed up. */
static void lb_channel_op ( grpc_channel_element * elem , grpc_channel_op * op ) {
static void lb_channel_op ( grpc_channel_element * elem ,
grpc_channel_element * from_elem ,
grpc_channel_op * op ) {
lb_channel_data * chand = elem - > channel_data ;
switch ( op - > dir ) {
case GRPC_CALL_UP :
chand - > back - > filter - > channel_op ( chand - > back , op ) ;
chand - > back - > filter - > channel_op ( chand - > back , elem , op ) ;
break ;
case GRPC_CALL_DOWN :
grpc_channel_next_op ( elem , op ) ;
@ -201,8 +204,9 @@ static int prepare_activate(call_data *calld, child_entry *on_child) {
static void do_nothing ( void * ignored , grpc_op_error error ) { }
static void complete_activate ( call_data * calld , child_entry * on_child ,
static void complete_activate ( grpc_call_element * elem , child_entry * on_child ,
grpc_call_op * op ) {
call_data * calld = elem - > call_data ;
grpc_call_element * child_elem =
grpc_call_stack_element ( calld - > s . active . child_stack , 0 ) ;
@ -219,15 +223,17 @@ static void complete_activate(call_data *calld, child_entry *on_child,
dop . data . deadline = calld - > deadline ;
dop . done_cb = do_nothing ;
dop . user_data = NULL ;
child_elem - > filter - > call_op ( child_elem , & dop ) ;
child_elem - > filter - > call_op ( child_elem , elem , & dop ) ;
}
/* continue the start call down the stack, this nees to happen after metadata
are flushed */
child_elem - > filter - > call_op ( child_elem , op ) ;
child_elem - > filter - > call_op ( child_elem , elem , op ) ;
}
static void start_rpc ( call_data * calld , channel_data * chand , grpc_call_op * op ) {
static void start_rpc ( grpc_call_element * elem , grpc_call_op * op ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
gpr_mu_lock ( & chand - > mu ) ;
if ( calld - > state = = CALL_CANCELLED ) {
gpr_mu_unlock ( & chand - > mu ) ;
@ -241,7 +247,7 @@ static void start_rpc(call_data *calld, channel_data *chand, grpc_call_op *op) {
if ( prepare_activate ( calld , chand - > active_child ) ) {
gpr_mu_unlock ( & chand - > mu ) ;
/* activate the request (pass it down) outside the lock */
complete_activate ( calld , chand - > active_child , op ) ;
complete_activate ( elem , chand - > active_child , op ) ;
} else {
gpr_mu_unlock ( & chand - > mu ) ;
}
@ -299,7 +305,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE :
child_elem = grpc_call_stack_element ( calld - > s . active . child_stack , 0 ) ;
gpr_mu_unlock ( & chand - > mu ) ;
child_elem - > filter - > call_op ( child_elem , op ) ;
child_elem - > filter - > call_op ( child_elem , elem , op ) ;
return ; /* early out */
case CALL_WAITING :
remove_waiting_child ( chand , calld ) ;
@ -333,9 +339,9 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort ( ) ;
}
static void call_op ( grpc_call_element * elem , grpc_call_op * op ) {
static void call_op ( grpc_call_element * elem , grpc_call_element * from_elem ,
grpc_call_op * op ) {
call_data * calld = elem - > call_data ;
channel_data * chand = elem - > channel_data ;
grpc_call_element * child_elem ;
GPR_ASSERT ( elem - > filter = = & grpc_client_channel_filter ) ;
GRPC_CALL_LOG_OP ( GPR_INFO , elem , op ) ;
@ -350,7 +356,7 @@ static void call_op(grpc_call_element *elem, grpc_call_op *op) {
break ;
case GRPC_SEND_START :
/* filter out the start event to find which child to send on */
start_rpc ( calld , chand , op ) ;
start_rpc ( elem , op ) ;
break ;
case GRPC_CANCEL_OP :
cancel_rpc ( elem , op ) ;
@ -363,7 +369,7 @@ static void call_op(grpc_call_element *elem, grpc_call_op *op) {
case GRPC_CALL_DOWN :
child_elem = grpc_call_stack_element ( calld - > s . active . child_stack , 0 ) ;
GPR_ASSERT ( calld - > state = = CALL_ACTIVE ) ;
child_elem - > filter - > call_op ( child_elem , op ) ;
child_elem - > filter - > call_op ( child_elem , elem , op ) ;
break ;
}
break ;
@ -395,7 +401,7 @@ static void broadcast_channel_op_down(grpc_channel_element *elem,
if ( op - > type = = GRPC_CHANNEL_GOAWAY ) {
gpr_slice_ref ( op - > data . goaway . message ) ;
}
child_elem - > filter - > channel_op ( child_elem , op ) ;
child_elem - > filter - > channel_op ( child_elem , elem , op ) ;
}
if ( op - > type = = GRPC_CHANNEL_GOAWAY ) {
gpr_slice_unref ( op - > data . goaway . message ) ;
@ -411,7 +417,8 @@ static void broadcast_channel_op_down(grpc_channel_element *elem,
gpr_free ( children ) ;
}
static void channel_op ( grpc_channel_element * elem , grpc_channel_op * op ) {
static void channel_op ( grpc_channel_element * elem ,
grpc_channel_element * from_elem , grpc_channel_op * op ) {
GPR_ASSERT ( elem - > filter = = & grpc_client_channel_filter ) ;
switch ( op - > type ) {
@ -627,7 +634,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
that guarantee we need to do some curly locking here */
for ( i = 0 ; i < waiting_child_count ; i + + ) {
if ( waiting_children [ i ] ) {
complete_activate ( waiting_children [ i ] , child_ent , & call_ops [ i ] ) ;
complete_activate ( waiting_children [ i ] - > elem , child_ent , & call_ops [ i ] ) ;
}
}
gpr_free ( waiting_children ) ;