@ -77,6 +77,14 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free ( wc ) ;
gpr_free ( wc ) ;
}
}
/* Linked list of pending pick requests. It stores all information needed to
* eventually call ( Round Robin ' s ) pick ( ) on them . They mainly stay pending
* waiting for the RR policy to be created / updated .
*
* One particularity is the wrapping of the user - provided \ a on_complete closure
* ( in \ a wrapped_on_complete and \ a wrapped_on_complete_arg ) . This is needed in
* order to correctly unref the RR policy instance upon completion of the pick .
* See \ a wrapped_rr_closure for details . */
typedef struct pending_pick {
typedef struct pending_pick {
struct pending_pick * next ;
struct pending_pick * next ;
grpc_polling_entity * pollent ;
grpc_polling_entity * pollent ;
@ -87,6 +95,7 @@ typedef struct pending_pick {
wrapped_rr_closure_arg * wrapped_on_complete_arg ;
wrapped_rr_closure_arg * wrapped_on_complete_arg ;
} pending_pick ;
} pending_pick ;
/* Same as the \a pending_pick struct but for ping operations */
typedef struct pending_ping {
typedef struct pending_ping {
struct pending_ping * next ;
struct pending_ping * next ;
grpc_closure * wrapped_notify ;
grpc_closure * wrapped_notify ;
@ -95,7 +104,7 @@ typedef struct pending_ping {
typedef struct glb_lb_policy glb_lb_policy ;
typedef struct glb_lb_policy glb_lb_policy ;
# define MAX_LBCD_OPS_LEN 6
/* Used internally for the client call to the LB */
typedef struct lb_client_data {
typedef struct lb_client_data {
gpr_mu mu ;
gpr_mu mu ;
grpc_closure md_sent ;
grpc_closure md_sent ;
@ -138,7 +147,7 @@ struct glb_lb_policy {
grpc_client_channel_factory * cc_factory ;
grpc_client_channel_factory * cc_factory ;
/** for communicating with the LB server */
/** for communicating with the LB server */
grpc_channel * lb_server_ channel ;
grpc_channel * lb_channel ;
/** the RR policy to use of the backend servers returned by the LB server */
/** the RR policy to use of the backend servers returned by the LB server */
grpc_lb_policy * rr_policy ;
grpc_lb_policy * rr_policy ;
@ -148,15 +157,17 @@ struct glb_lb_policy {
/** our connectivity state tracker */
/** our connectivity state tracker */
grpc_connectivity_state_tracker state_tracker ;
grpc_connectivity_state_tracker state_tracker ;
/** stores the deserialized response from the LB. May be NULL until one such
* response has arrived . */
grpc_grpclb_serverlist * serverlist ;
grpc_grpclb_serverlist * serverlist ;
/** list of picks that are waiting on connectivity */
/** list of picks that are waiting on RR's policy connectivity */
pending_pick * pending_picks ;
pending_pick * pending_picks ;
/** list of pings that are waiting on connectivity */
/** list of pings that are waiting on RR's policy connectivity */
pending_ping * pending_pings ;
pending_ping * pending_pings ;
/** data associated with the communication with the LB server */
/** client data associated with the LB server communication */
lb_client_data * lbcd ;
lb_client_data * lbcd ;
/** for tracking of the RR connectivity */
/** for tracking of the RR connectivity */
@ -175,12 +186,12 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
* perform a handover */
* perform a handover */
rr_handover ( exec_ctx , p , error ) ;
rr_handover ( exec_ctx , p , error ) ;
} else {
} else {
/* shutting down and no new serverlist available. b ail out. */
/* shutting down and no new serverlist available. B ail out. */
gpr_free ( rrcd ) ;
gpr_free ( rrcd ) ;
}
}
} else {
} else {
if ( error = = GRPC_ERROR_NONE ) {
if ( error = = GRPC_ERROR_NONE ) {
/* not shutting down. m imic the RR's policy state */
/* RR not shutting down. M imic the RR's policy state */
grpc_connectivity_state_set ( exec_ctx , & p - > state_tracker , rrcd - > state ,
grpc_connectivity_state_set ( exec_ctx , & p - > state_tracker , rrcd - > state ,
error , " rr_connectivity_changed " ) ;
error , " rr_connectivity_changed " ) ;
/* resubscribe */
/* resubscribe */
@ -281,50 +292,62 @@ static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
}
}
static void res_rcvd_cb ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
static void res_rcvd_cb ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) {
/* look inside lbcd->response_payload, ideally to send it back as the
* serverlist . */
lb_client_data * lbcd = arg ;
lb_client_data * lbcd = arg ;
grpc_op ops [ 2 ] ;
grpc_op ops [ 2 ] ;
memset ( ops , 0 , sizeof ( ops ) ) ;
memset ( ops , 0 , sizeof ( ops ) ) ;
grpc_op * op = ops ;
grpc_op * op = ops ;
if ( lbcd - > response_payload ) {
if ( lbcd - > response_payload ! = NULL ) {
/* Received data from the LB server. Look inside lbcd->response_payload, for
* a serverlist . */
grpc_byte_buffer_reader bbr ;
grpc_byte_buffer_reader bbr ;
grpc_byte_buffer_reader_init ( & bbr , lbcd - > response_payload ) ;
grpc_byte_buffer_reader_init ( & bbr , lbcd - > response_payload ) ;
gpr_slice response_slice = grpc_byte_buffer_reader_readall ( & bbr ) ;
gpr_slice response_slice = grpc_byte_buffer_reader_readall ( & bbr ) ;
grpc_byte_buffer_destroy ( lbcd - > response_payload ) ;
grpc_byte_buffer_destroy ( lbcd - > response_payload ) ;
grpc_grpclb_serverlist * serverlist =
grpc_grpclb_serverlist * serverlist =
grpc_grpclb_response_parse_serverlist ( response_slice ) ;
grpc_grpclb_response_parse_serverlist ( response_slice ) ;
if ( serverlist ) {
if ( serverlist ! = NULL ) {
gpr_slice_unref ( response_slice ) ;
gpr_slice_unref ( response_slice ) ;
if ( grpc_lb_glb_trace ) {
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO , " Serverlist with %zu servers received " ,
gpr_log ( GPR_INFO , " Serverlist with %zu servers received " ,
serverlist - > num_servers ) ;
serverlist - > num_servers ) ;
}
}
/* update serverlist */
/* update serverlist */
if ( serverlist - > num_servers > 0 ) {
if ( serverlist - > num_servers > 0 ) {
if ( grpc_grpclb_serverlist_equals ( lbcd - > p - > serverlist , serverlist ) ) {
if ( grpc_grpclb_serverlist_equals ( lbcd - > p - > serverlist , serverlist ) ) {
gpr_log ( GPR_INFO ,
if ( grpc_lb_glb_trace ) {
" Incoming server list identical to current, ignoring. " ) ;
gpr_log ( GPR_INFO ,
} else {
" Incoming server list identical to current, ignoring. " ) ;
}
} else { /* new serverlist */
if ( lbcd - > p - > serverlist ! = NULL ) {
if ( lbcd - > p - > serverlist ! = NULL ) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist ( lbcd - > p - > serverlist ) ;
grpc_grpclb_destroy_serverlist ( lbcd - > p - > serverlist ) ;
}
}
/* and update the copy in the glb_lb_policy instance */
lbcd - > p - > serverlist = serverlist ;
lbcd - > p - > serverlist = serverlist ;
}
}
}
if ( lbcd - > p - > rr_policy = = NULL ) {
if ( lbcd - > p - > rr_policy = = NULL ) {
/* initial "handover", in this case from a null RR policy, meaning
/* initial "handover", in this case from a null RR policy, meaning it'll
* it ' ll
* just create the first one */
* just create the first RR policy instance */
rr_handover ( exec_ctx , lbcd - > p , error ) ;
rr_handover ( exec_ctx , lbcd - > p , error ) ;
} else {
/* unref the RR policy, eventually leading to its substitution with a
* new one constructed from the received serverlist ( see
* rr_connectivity_changed ) */
GRPC_LB_POLICY_UNREF ( exec_ctx , lbcd - > p - > rr_policy ,
" serverlist_received " ) ;
}
} else {
} else {
/* unref the RR policy, eventually leading to its substitution with a
if ( grpc_lb_glb_trace ) {
* new one constructed from the received serverlist ( see
gpr_log ( GPR_INFO ,
* rr_connectivity_changed ) */
" Received empty server list. Picks will stay pending until a "
GRPC_LB_POLICY_UNREF ( exec_ctx , lbcd - > p - > rr_policy ,
" response with > 0 servers is received " ) ;
" serverlist_received " ) ;
}
}
}
/* listen for a potential serverlist update */
/* keep listening for serverlist updates */
op - > op = GRPC_OP_RECV_MESSAGE ;
op - > op = GRPC_OP_RECV_MESSAGE ;
op - > data . recv_message = & lbcd - > response_payload ;
op - > data . recv_message = & lbcd - > response_payload ;
op - > flags = 0 ;
op - > flags = 0 ;
@ -335,24 +358,26 @@ static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
& lbcd - > res_rcvd ) ; /* loop */
& lbcd - > res_rcvd ) ; /* loop */
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
return ;
return ;
} else {
gpr_log ( GPR_ERROR , " Invalid LB response received: '%s' " ,
gpr_dump_slice ( response_slice , GPR_DUMP_ASCII ) ) ;
gpr_slice_unref ( response_slice ) ;
/* Disconnect from server returning invalid response. */
op - > op = GRPC_OP_SEND_CLOSE_FROM_CLIENT ;
op - > flags = 0 ;
op - > reserved = NULL ;
op + + ;
grpc_call_error call_error = grpc_call_start_batch_and_execute (
exec_ctx , lbcd - > c , ops , ( size_t ) ( op - ops ) , & lbcd - > close_sent ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
}
}
GPR_ASSERT ( serverlist = = NULL ) ;
gpr_log ( GPR_ERROR , " Invalid LB response received: '%s' " ,
gpr_dump_slice ( response_slice , GPR_DUMP_ASCII ) ) ;
gpr_slice_unref ( response_slice ) ;
/* Disconnect from server returning invalid response. */
op - > op = GRPC_OP_SEND_CLOSE_FROM_CLIENT ;
op - > flags = 0 ;
op - > reserved = NULL ;
op + + ;
grpc_call_error call_error = grpc_call_start_batch_and_execute (
exec_ctx , lbcd - > c , ops , ( size_t ) ( op - ops ) , & lbcd - > close_sent ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
}
}
/* empty payload: call cancelled by server. Cleanups happening in
/* empty payload: call cancelled by server. Cleanups happening in
* srv_status_rcvd_cb */
* srv_status_rcvd_cb */
}
}
static void close_sent_cb ( grpc_exec_ctx * exec_ctx , void * arg ,
static void close_sent_cb ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_error * error ) {
if ( grpc_lb_glb_trace ) {
if ( grpc_lb_glb_trace ) {
@ -360,21 +385,22 @@ static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg,
" Close from LB client sent. Waiting from server status now " ) ;
" Close from LB client sent. Waiting from server status now " ) ;
}
}
}
}
static void srv_status_rcvd_cb ( grpc_exec_ctx * exec_ctx , void * arg ,
static void srv_status_rcvd_cb ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_error * error ) {
lb_client_data * lbcd = arg ;
lb_client_data * lbcd = arg ;
glb_lb_policy * p = lbcd - > p ;
glb_lb_policy * p = lbcd - > p ;
if ( grpc_lb_glb_trace ) {
if ( grpc_lb_glb_trace ) {
gpr_log (
gpr_log ( GPR_INFO ,
GPR_INFO ,
" status from lb server received. Status = %d, Details = '%s', "
" status from lb server received. Status = %d, Details = '%s', Capaticy "
" Capaticy "
" = %zu " ,
" = %zu " ,
lbcd - > status , lbcd - > status_details , lbcd - > status_details_capacity ) ;
lbcd - > status , lbcd - > status_details , lbcd - > status_details_capacity ) ;
}
}
grpc_call_destroy ( lbcd - > c ) ;
grpc_call_destroy ( lbcd - > c ) ;
grpc_channel_destroy ( lbcd - > p - > lb_server_ channel ) ;
grpc_channel_destroy ( lbcd - > p - > lb_channel ) ;
lbcd - > p - > lb_server_ channel = NULL ;
lbcd - > p - > lb_channel = NULL ;
lb_client_data_destroy ( lbcd ) ;
lb_client_data_destroy ( lbcd ) ;
p - > lbcd = NULL ;
p - > lbcd = NULL ;
}
}
@ -392,16 +418,14 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *p) {
grpc_closure_init ( & lbcd - > close_sent , close_sent_cb , lbcd ) ;
grpc_closure_init ( & lbcd - > close_sent , close_sent_cb , lbcd ) ;
grpc_closure_init ( & lbcd - > srv_status_rcvd , srv_status_rcvd_cb , lbcd ) ;
grpc_closure_init ( & lbcd - > srv_status_rcvd , srv_status_rcvd_cb , lbcd ) ;
/* TODO(dgq): get the deadline from the client/user instead of fabricating
/* TODO(dgq): get the deadline from the client config instead of fabricating
* one
* one here . */
* here . Make it a policy arg ? */
lbcd - > deadline = gpr_time_add ( gpr_now ( GPR_CLOCK_MONOTONIC ) ,
lbcd - > deadline = gpr_time_add ( gpr_now ( GPR_CLOCK_MONOTONIC ) ,
gpr_time_from_seconds ( 3 , GPR_TIMESPAN ) ) ;
gpr_time_from_seconds ( 3 , GPR_TIMESPAN ) ) ;
lbcd - > c = grpc_channel_create_pollset_set_call (
lbcd - > c = grpc_channel_create_pollset_set_call (
p - > lb_server_channel , NULL , GRPC_PROPAGATE_DEFAULTS ,
p - > lb_channel , NULL , GRPC_PROPAGATE_DEFAULTS , p - > base . interested_parties ,
p - > base . interested_parties , " /BalanceLoad " ,
" /BalanceLoad " , NULL , /* FIXME(dgq): which "host" value to use? */
NULL , /* FIXME(dgq): which "host" value to use? */
lbcd - > deadline , NULL ) ;
lbcd - > deadline , NULL ) ;
grpc_metadata_array_init ( & lbcd - > initial_metadata_recv ) ;
grpc_metadata_array_init ( & lbcd - > initial_metadata_recv ) ;
@ -409,7 +433,9 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *p) {
grpc_grpclb_request * request = grpc_grpclb_request_create (
grpc_grpclb_request * request = grpc_grpclb_request_create (
" load.balanced.service.name " ) ; /* FIXME(dgq): get the name of the load
" load.balanced.service.name " ) ; /* FIXME(dgq): get the name of the load
balanced service from above . */
balanced service from somewhere
( client
config ? ) . */
gpr_slice request_payload_slice = grpc_grpclb_request_encode ( request ) ;
gpr_slice request_payload_slice = grpc_grpclb_request_encode ( request ) ;
lbcd - > request_payload =
lbcd - > request_payload =
grpc_raw_byte_buffer_create ( & request_payload_slice , 1 ) ;
grpc_raw_byte_buffer_create ( & request_payload_slice , 1 ) ;
@ -536,7 +562,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
static void query_for_backends ( grpc_exec_ctx * exec_ctx , glb_lb_policy * p ) {
static void query_for_backends ( grpc_exec_ctx * exec_ctx , glb_lb_policy * p ) {
GPR_ASSERT ( p - > lb_server_ channel ! = NULL ) ;
GPR_ASSERT ( p - > lb_channel ! = NULL ) ;
p - > lbcd = lb_client_data_create ( p ) ;
p - > lbcd = lb_client_data_create ( p ) ;
grpc_call_error call_error ;
grpc_call_error call_error ;
@ -612,10 +638,8 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
gpr_free ( host_ports [ i ] ) ;
gpr_free ( host_ports [ i ] ) ;
}
}
gpr_free ( host_ports ) ;
gpr_free ( host_ports ) ;
gpr_free ( args . addresses - > addrs ) ;
gpr_free ( args . addresses - > addrs ) ;
gpr_free ( args . addresses ) ;
gpr_free ( args . addresses ) ;
return rr ;
return rr ;
}
}
@ -709,7 +733,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
wrapped_on_complete ) ;
wrapped_on_complete ) ;
if ( r ! = 0 ) {
if ( r ! = 0 ) {
/* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered
/* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered
* wrapped closure */
* wrapped closure : it ' ll only take care of unreffing the RR policy */
warg - > wrapped_closure = NULL ;
warg - > wrapped_closure = NULL ;
grpc_exec_ctx_sched ( exec_ctx , wrapped_on_complete , GRPC_ERROR_NONE , NULL ) ;
grpc_exec_ctx_sched ( exec_ctx , wrapped_on_complete , GRPC_ERROR_NONE , NULL ) ;
}
}
@ -808,7 +832,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
( const char * * ) addr_strs , args - > addresses - > naddrs , " , " , & uri_path_len ) ;
( const char * * ) addr_strs , args - > addresses - > naddrs , " , " , & uri_path_len ) ;
/* will pick using pick_first */
/* will pick using pick_first */
p - > lb_server_ channel = grpc_client_channel_factory_create_channel (
p - > lb_channel = grpc_client_channel_factory_create_channel (
exec_ctx , p - > cc_factory , target_uri_str ,
exec_ctx , p - > cc_factory , target_uri_str ,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING , NULL ) ;
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING , NULL ) ;
@ -818,7 +842,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
}
}
gpr_free ( addr_strs ) ;
gpr_free ( addr_strs ) ;
if ( p - > lb_server_ channel = = NULL ) {
if ( p - > lb_channel = = NULL ) {
gpr_free ( p ) ;
gpr_free ( p ) ;
return NULL ;
return NULL ;
}
}