@ -95,8 +95,7 @@
headers . Therefore , sockaddr . h must always be included first */
# include "src/core/lib/iomgr/sockaddr.h"
# include <errno.h>
# include <limits.h>
# include <string.h>
# include <grpc/byte_buffer_reader.h>
@ -108,13 +107,16 @@
# include "src/core/ext/filters/client_channel/client_channel.h"
# include "src/core/ext/filters/client_channel/client_channel_factory.h"
# include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
# include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
# include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
# include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
# include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
# include "src/core/ext/filters/client_channel/lb_policy_factory.h"
# include "src/core/ext/filters/client_channel/lb_policy_registry.h"
# include "src/core/ext/filters/client_channel/parse_address.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/channel/channel_stack.h"
# include "src/core/lib/iomgr/combiner.h"
# include "src/core/lib/iomgr/sockaddr.h"
# include "src/core/lib/iomgr/sockaddr_utils.h"
@ -126,6 +128,7 @@
# include "src/core/lib/support/string.h"
# include "src/core/lib/surface/call.h"
# include "src/core/lib/surface/channel.h"
# include "src/core/lib/surface/channel_init.h"
# include "src/core/lib/transport/static_metadata.h"
# define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
@ -147,6 +150,10 @@ static grpc_error *initial_metadata_add_lb_token(
lb_token_mdelem_storage , lb_token ) ;
}
static void destroy_client_stats ( void * arg ) {
grpc_grpclb_client_stats_unref ( arg ) ;
}
typedef struct wrapped_rr_closure_arg {
/* the closure instance using this struct as argument */
grpc_closure wrapper_closure ;
@ -163,6 +170,13 @@ typedef struct wrapped_rr_closure_arg {
* initial metadata */
grpc_connected_subchannel * * target ;
/* the context to be populated for the subchannel call */
grpc_call_context_element * context ;
/* Stats for client-side load reporting. Note that this holds a
* reference , which must be either passed on via context or unreffed . */
grpc_grpclb_client_stats * client_stats ;
/* the LB token associated with the pick */
grpc_mdelem lb_token ;
@ -202,6 +216,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
( void * ) * wc_arg - > target , ( void * ) wc_arg - > rr_policy ) ;
abort ( ) ;
}
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT ( wc_arg - > client_stats ! = NULL ) ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . value = wc_arg - > client_stats ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy = destroy_client_stats ;
} else {
grpc_grpclb_client_stats_unref ( wc_arg - > client_stats ) ;
}
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO , " Unreffing RR %p " , ( void * ) wc_arg - > rr_policy ) ;
@ -237,6 +257,7 @@ typedef struct pending_pick {
static void add_pending_pick ( pending_pick * * root ,
const grpc_lb_policy_pick_args * pick_args ,
grpc_connected_subchannel * * target ,
grpc_call_context_element * context ,
grpc_closure * on_complete ) {
pending_pick * pp = gpr_zalloc ( sizeof ( * pp ) ) ;
pp - > next = * root ;
@ -244,6 +265,7 @@ static void add_pending_pick(pending_pick **root,
pp - > target = target ;
pp - > wrapped_on_complete_arg . wrapped_closure = on_complete ;
pp - > wrapped_on_complete_arg . target = target ;
pp - > wrapped_on_complete_arg . context = context ;
pp - > wrapped_on_complete_arg . initial_metadata = pick_args - > initial_metadata ;
pp - > wrapped_on_complete_arg . lb_token_mdelem_storage =
pick_args - > lb_token_mdelem_storage ;
@ -287,8 +309,8 @@ typedef struct glb_lb_policy {
grpc_client_channel_factory * cc_factory ;
grpc_channel_args * args ;
/** deadline for the LB's call */
gpr_timespec deadline ;
/** timeout in milliseconds for the LB call. 0 means no deadline. */
int lb_call_timeout_ms ;
/** for communicating with the LB server */
grpc_channel * lb_channel ;
@ -316,6 +338,10 @@ typedef struct glb_lb_policy {
/************************************************************/
/* client data associated with the LB server communication */
/************************************************************/
/* Finished sending initial request. */
grpc_closure lb_on_sent_initial_request ;
/* Status from the LB server has been received. This signals the end of the LB
* call . */
grpc_closure lb_on_server_status_received ;
@ -348,6 +374,23 @@ typedef struct glb_lb_policy {
/** LB call retry timer */
grpc_timer lb_call_retry_timer ;
bool initial_request_sent ;
bool seen_initial_response ;
/* Stats for client-side load reporting. Should be unreffed and
* recreated whenever lb_call is replaced . */
grpc_grpclb_client_stats * client_stats ;
/* Interval and timer for next client load report. */
gpr_timespec client_stats_report_interval ;
grpc_timer client_load_report_timer ;
bool client_load_report_timer_pending ;
bool last_client_load_report_counters_were_zero ;
/* Closure used for either the load report timer or the callback for
* completion of sending the load report . */
grpc_closure client_load_report_closure ;
/* Client load report message payload. */
grpc_byte_buffer * client_load_report_payload ;
} glb_lb_policy ;
/* Keeps track and reacts to changes in connectivity of the RR instance */
@ -552,8 +595,8 @@ static bool pick_from_internal_rr_locked(
grpc_connected_subchannel * * target , wrapped_rr_closure_arg * wc_arg ) {
GPR_ASSERT ( rr_policy ! = NULL ) ;
const bool pick_done = grpc_lb_policy_pick_locked (
exec_ctx , rr_policy , pick_args , target , ( void * * ) & wc_arg - > lb_token ,
& wc_arg - > wrapper_closure ) ;
exec_ctx , rr_policy , pick_args , target , wc_arg - > context ,
( void * * ) & wc_arg - > lb_token , & wc_arg - > wrapper_closure ) ;
if ( pick_done ) {
/* synchronous grpc_lb_policy_pick call. Unref the RR policy. */
if ( grpc_lb_glb_trace ) {
@ -567,7 +610,12 @@ static bool pick_from_internal_rr_locked(
pick_args - > lb_token_mdelem_storage ,
GRPC_MDELEM_REF ( wc_arg - > lb_token ) ) ;
gpr_free ( wc_arg ) ;
// Pass on client stats via context. Passes ownership of the reference.
GPR_ASSERT ( wc_arg - > client_stats ! = NULL ) ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . value = wc_arg - > client_stats ;
wc_arg - > context [ GRPC_GRPCLB_CLIENT_STATS ] . destroy = destroy_client_stats ;
gpr_free ( wc_arg - > free_when_done ) ;
}
/* else, the pending pick will be registered and taken care of by the
* pending pick list inside the RR policy ( glb_policy - > rr_policy ) .
@ -690,6 +738,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
glb_policy - > pending_picks = pp - > next ;
GRPC_LB_POLICY_REF ( glb_policy - > rr_policy , " rr_handover_pending_pick " ) ;
pp - > wrapped_on_complete_arg . rr_policy = glb_policy - > rr_policy ;
pp - > wrapped_on_complete_arg . client_stats =
grpc_grpclb_client_stats_ref ( glb_policy - > client_stats ) ;
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO , " Pending pick about to PICK from 0x% " PRIxPTR " " ,
( intptr_t ) glb_policy - > rr_policy ) ;
@ -750,18 +800,11 @@ static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
gpr_free ( balancer_name ) ;
}
static void * copy_balancer_name ( void * balancer_name ) {
return gpr_strdup ( balancer_name ) ;
}
static grpc_slice_hash_table_entry targets_info_entry_create (
const char * address , const char * balancer_name ) {
static const grpc_slice_hash_table_vtable vtable = { destroy_balancer_name ,
copy_balancer_name } ;
grpc_slice_hash_table_entry entry ;
entry . key = grpc_slice_from_copied_string ( address ) ;
entry . value = ( void * ) balancer_name ;
entry . vtable = & vtable ;
entry . value = gpr_strdup ( balancer_name ) ;
return entry ;
}
@ -825,11 +868,8 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
uri_path ) ;
gpr_free ( uri_path ) ;
* targets_info =
grpc_slice_hash_table_create ( num_grpclb_addrs , targets_info_entries ) ;
for ( size_t i = 0 ; i < num_grpclb_addrs ; i + + ) {
grpc_slice_unref_internal ( exec_ctx , targets_info_entries [ i ] . key ) ;
}
* targets_info = grpc_slice_hash_table_create (
num_grpclb_addrs , targets_info_entries , destroy_balancer_name ) ;
gpr_free ( targets_info_entries ) ;
return target_uri_str ;
@ -841,10 +881,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO ( roth ) : For now , we ignore non - balancer addresses , but in the
* future , we may change the behavior such that we fall back to using
* the non - balancer addresses if we cannot reach any balancers . At that
* time , this should be changed to allow a list with no balancer addresses ,
* since the resolver might fail to return a balancer address even when
* this is the right LB policy to use . */
* the non - balancer addresses if we cannot reach any balancers . In the
* fallback case , we should use the LB policy indicated by
* GRPC_ARG_LB_POLICY_NAME ( although if that specifies grpclb or is
* unset , we should default to pick_first ) . */
const grpc_arg * arg =
grpc_channel_args_find ( args - > args , GRPC_ARG_LB_ADDRESSES ) ;
if ( arg = = NULL | | arg - > type ! = GRPC_ARG_POINTER ) {
@ -874,9 +914,22 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_uri_destroy ( uri ) ;
glb_policy - > cc_factory = args - > client_channel_factory ;
glb_policy - > args = grpc_channel_args_copy ( args - > args ) ;
GPR_ASSERT ( glb_policy - > cc_factory ! = NULL ) ;
arg = grpc_channel_args_find ( args - > args , GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS ) ;
glb_policy - > lb_call_timeout_ms =
grpc_channel_arg_get_integer ( arg , ( grpc_integer_options ) { 0 , 0 , INT_MAX } ) ;
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg ;
new_arg . key = GRPC_ARG_LB_POLICY_NAME ;
new_arg . type = GRPC_ARG_STRING ;
new_arg . value . string = " grpclb " ;
static const char * args_to_remove [ ] = { GRPC_ARG_LB_POLICY_NAME } ;
glb_policy - > args = grpc_channel_args_copy_and_add_and_remove (
args - > args , args_to_remove , GPR_ARRAY_SIZE ( args_to_remove ) , & new_arg , 1 ) ;
grpc_slice_hash_table * targets_info = NULL ;
/* Create a client channel over them to communicate with a LB service */
char * lb_service_target_addresses =
@ -890,6 +943,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_channel_args_destroy ( exec_ctx , lb_channel_args ) ;
gpr_free ( lb_service_target_addresses ) ;
if ( glb_policy - > lb_channel = = NULL ) {
gpr_free ( ( void * ) glb_policy - > server_name ) ;
grpc_channel_args_destroy ( exec_ctx , glb_policy - > args ) ;
gpr_free ( glb_policy ) ;
return NULL ;
}
@ -905,6 +960,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT ( glb_policy - > pending_pings = = NULL ) ;
gpr_free ( ( void * ) glb_policy - > server_name ) ;
grpc_channel_args_destroy ( exec_ctx , glb_policy - > args ) ;
if ( glb_policy - > client_stats ! = NULL ) {
grpc_grpclb_client_stats_unref ( glb_policy - > client_stats ) ;
}
grpc_channel_destroy ( glb_policy - > lb_channel ) ;
glb_policy - > lb_channel = NULL ;
grpc_connectivity_state_destroy ( exec_ctx , & glb_policy - > state_tracker ) ;
@ -1021,7 +1079,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
static int glb_pick_locked ( grpc_exec_ctx * exec_ctx , grpc_lb_policy * pol ,
const grpc_lb_policy_pick_args * pick_args ,
grpc_connected_subchannel * * target , void * * user_data ,
grpc_connected_subchannel * * target ,
grpc_call_context_element * context , void * * user_data ,
grpc_closure * on_complete ) {
if ( pick_args - > lb_token_mdelem_storage = = NULL ) {
* target = NULL ;
@ -1033,7 +1092,6 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
glb_lb_policy * glb_policy = ( glb_lb_policy * ) pol ;
glb_policy - > deadline = pick_args - > deadline ;
bool pick_done ;
if ( glb_policy - > rr_policy ! = NULL ) {
@ -1049,6 +1107,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_schedule_on_exec_ctx ) ;
wc_arg - > rr_policy = glb_policy - > rr_policy ;
wc_arg - > target = target ;
wc_arg - > context = context ;
GPR_ASSERT ( glb_policy - > client_stats ! = NULL ) ;
wc_arg - > client_stats =
grpc_grpclb_client_stats_ref ( glb_policy - > client_stats ) ;
wc_arg - > wrapped_closure = on_complete ;
wc_arg - > lb_token_mdelem_storage = pick_args - > lb_token_mdelem_storage ;
wc_arg - > initial_metadata = pick_args - > initial_metadata ;
@ -1062,7 +1124,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
" picks " ,
( void * ) ( glb_policy ) ) ;
}
add_pending_pick ( & glb_policy - > pending_picks , pick_args , target ,
add_pending_pick ( & glb_policy - > pending_picks , pick_args , target , context ,
on_complete ) ;
if ( ! glb_policy - > started_picking ) {
@ -1103,6 +1165,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
exec_ctx , & glb_policy - > state_tracker , current , notify ) ;
}
static void send_client_load_report_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) ;
static void schedule_next_client_load_report ( grpc_exec_ctx * exec_ctx ,
glb_lb_policy * glb_policy ) {
const gpr_timespec now = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
const gpr_timespec next_client_load_report_time =
gpr_time_add ( now , glb_policy - > client_stats_report_interval ) ;
grpc_closure_init ( & glb_policy - > client_load_report_closure ,
send_client_load_report_locked , glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner , false ) ) ;
grpc_timer_init ( exec_ctx , & glb_policy - > client_load_report_timer ,
next_client_load_report_time ,
& glb_policy - > client_load_report_closure , now ) ;
}
static void client_load_report_done_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
glb_lb_policy * glb_policy = arg ;
grpc_byte_buffer_destroy ( glb_policy - > client_load_report_payload ) ;
glb_policy - > client_load_report_payload = NULL ;
if ( error ! = GRPC_ERROR_NONE | | glb_policy - > lb_call = = NULL ) {
glb_policy - > client_load_report_timer_pending = false ;
GRPC_LB_POLICY_WEAK_UNREF ( exec_ctx , & glb_policy - > base ,
" client_load_report " ) ;
return ;
}
schedule_next_client_load_report ( exec_ctx , glb_policy ) ;
}
static void do_send_client_load_report_locked ( grpc_exec_ctx * exec_ctx ,
glb_lb_policy * glb_policy ) {
grpc_op op ;
memset ( & op , 0 , sizeof ( op ) ) ;
op . op = GRPC_OP_SEND_MESSAGE ;
op . data . send_message . send_message = glb_policy - > client_load_report_payload ;
grpc_closure_init ( & glb_policy - > client_load_report_closure ,
client_load_report_done_locked , glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner , false ) ) ;
grpc_call_error call_error = grpc_call_start_batch_and_execute (
exec_ctx , glb_policy - > lb_call , & op , 1 ,
& glb_policy - > client_load_report_closure ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
}
static bool load_report_counters_are_zero ( grpc_grpclb_request * request ) {
return request - > client_stats . num_calls_started = = 0 & &
request - > client_stats . num_calls_finished = = 0 & &
request - > client_stats . num_calls_finished_with_drop_for_rate_limiting = =
0 & &
request - > client_stats
. num_calls_finished_with_drop_for_load_balancing = = 0 & &
request - > client_stats . num_calls_finished_with_client_failed_to_send = =
0 & &
request - > client_stats . num_calls_finished_known_received = = 0 ;
}
static void send_client_load_report_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
glb_lb_policy * glb_policy = arg ;
if ( error = = GRPC_ERROR_CANCELLED | | glb_policy - > lb_call = = NULL ) {
glb_policy - > client_load_report_timer_pending = false ;
GRPC_LB_POLICY_WEAK_UNREF ( exec_ctx , & glb_policy - > base ,
" client_load_report " ) ;
return ;
}
// Construct message payload.
GPR_ASSERT ( glb_policy - > client_load_report_payload = = NULL ) ;
grpc_grpclb_request * request =
grpc_grpclb_load_report_request_create ( glb_policy - > client_stats ) ;
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if ( load_report_counters_are_zero ( request ) ) {
if ( glb_policy - > last_client_load_report_counters_were_zero ) {
grpc_grpclb_request_destroy ( request ) ;
schedule_next_client_load_report ( exec_ctx , glb_policy ) ;
return ;
}
glb_policy - > last_client_load_report_counters_were_zero = true ;
} else {
glb_policy - > last_client_load_report_counters_were_zero = false ;
}
grpc_slice request_payload_slice = grpc_grpclb_request_encode ( request ) ;
glb_policy - > client_load_report_payload =
grpc_raw_byte_buffer_create ( & request_payload_slice , 1 ) ;
grpc_slice_unref_internal ( exec_ctx , request_payload_slice ) ;
grpc_grpclb_request_destroy ( request ) ;
// If we've already sent the initial request, then we can go ahead and
// sent the load report. Otherwise, we need to wait until the initial
// request has been sent to send this
// (see lb_on_sent_initial_request_locked() below).
if ( glb_policy - > initial_request_sent ) {
do_send_client_load_report_locked ( exec_ctx , glb_policy ) ;
}
}
static void lb_on_sent_initial_request_locked ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) ;
static void lb_on_server_status_received_locked ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) ;
static void lb_on_response_received_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
@ -1117,13 +1277,24 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
* glb_policy - > base . interested_parties , which is comprised of the polling
* entities from \ a client_channel . */
grpc_slice host = grpc_slice_from_copied_string ( glb_policy - > server_name ) ;
gpr_timespec deadline =
glb_policy - > lb_call_timeout_ms = = 0
? gpr_inf_future ( GPR_CLOCK_MONOTONIC )
: gpr_time_add ( gpr_now ( GPR_CLOCK_MONOTONIC ) ,
gpr_time_from_millis ( glb_policy - > lb_call_timeout_ms ,
GPR_TIMESPAN ) ) ;
glb_policy - > lb_call = grpc_channel_create_pollset_set_call (
exec_ctx , glb_policy - > lb_channel , NULL , GRPC_PROPAGATE_DEFAULTS ,
glb_policy - > base . interested_parties ,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD ,
& host , glb_policy - > deadline , NULL ) ;
& host , deadline , NULL ) ;
grpc_slice_unref_internal ( exec_ctx , host ) ;
if ( glb_policy - > client_stats ! = NULL ) {
grpc_grpclb_client_stats_unref ( glb_policy - > client_stats ) ;
}
glb_policy - > client_stats = grpc_grpclb_client_stats_create ( ) ;
grpc_metadata_array_init ( & glb_policy - > lb_initial_metadata_recv ) ;
grpc_metadata_array_init ( & glb_policy - > lb_trailing_metadata_recv ) ;
@ -1135,6 +1306,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
grpc_slice_unref_internal ( exec_ctx , request_payload_slice ) ;
grpc_grpclb_request_destroy ( request ) ;
grpc_closure_init ( & glb_policy - > lb_on_sent_initial_request ,
lb_on_sent_initial_request_locked , glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner , false ) ) ;
grpc_closure_init ( & glb_policy - > lb_on_server_status_received ,
lb_on_server_status_received_locked , glb_policy ,
grpc_combiner_scheduler ( glb_policy - > base . combiner , false ) ) ;
@ -1148,6 +1322,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
GRPC_GRPCLB_RECONNECT_JITTER ,
GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000 ,
GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000 ) ;
glb_policy - > initial_request_sent = false ;
glb_policy - > seen_initial_response = false ;
glb_policy - > last_client_load_report_counters_were_zero = false ;
}
static void lb_call_destroy_locked ( grpc_exec_ctx * exec_ctx ,
@ -1161,6 +1339,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_byte_buffer_destroy ( glb_policy - > lb_request_payload ) ;
grpc_slice_unref_internal ( exec_ctx , glb_policy - > lb_call_status_details ) ;
if ( ! glb_policy - > client_load_report_timer_pending ) {
grpc_timer_cancel ( exec_ctx , & glb_policy - > client_load_report_timer ) ;
}
}
/*
@ -1189,21 +1371,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
op - > flags = 0 ;
op - > reserved = NULL ;
op + + ;
op - > op = GRPC_OP_RECV_INITIAL_METADATA ;
op - > data . recv_initial_metadata . recv_initial_metadata =
& glb_policy - > lb_initial_metadata_recv ;
op - > flags = 0 ;
op - > reserved = NULL ;
op + + ;
GPR_ASSERT ( glb_policy - > lb_request_payload ! = NULL ) ;
op - > op = GRPC_OP_SEND_MESSAGE ;
op - > data . send_message . send_message = glb_policy - > lb_request_payload ;
op - > flags = 0 ;
op - > reserved = NULL ;
op + + ;
/* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref
* count goes to zero ) to be unref ' d in lb_on_sent_initial_request_locked ( ) */
GRPC_LB_POLICY_WEAK_REF ( & glb_policy - > base , " lb_on_server_status_received " ) ;
call_error = grpc_call_start_batch_and_execute (
exec_ctx , glb_policy - > lb_call , ops , ( size_t ) ( op - ops ) ,
& glb_policy - > lb_on_sent_initial_request ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
op = ops ;
op - > op = GRPC_OP_RECV_STATUS_ON_CLIENT ;
op - > data . recv_status_on_client . trailing_metadata =
& glb_policy - > lb_trailing_metadata_recv ;
@ -1235,6 +1423,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx,
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
}
static void lb_on_sent_initial_request_locked ( grpc_exec_ctx * exec_ctx ,
void * arg , grpc_error * error ) {
glb_lb_policy * glb_policy = arg ;
glb_policy - > initial_request_sent = true ;
// If we attempted to send a client load report before the initial
// request was sent, send the load report now.
if ( glb_policy - > client_load_report_payload ! = NULL ) {
do_send_client_load_report_locked ( exec_ctx , glb_policy ) ;
}
GRPC_LB_POLICY_WEAK_UNREF ( exec_ctx , & glb_policy - > base ,
" lb_on_response_received_locked " ) ;
}
static void lb_on_response_received_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
glb_lb_policy * glb_policy = arg ;
@ -1250,58 +1451,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_byte_buffer_reader_init ( & bbr , glb_policy - > lb_response_payload ) ;
grpc_slice response_slice = grpc_byte_buffer_reader_readall ( & bbr ) ;
grpc_byte_buffer_destroy ( glb_policy - > lb_response_payload ) ;
grpc_grpclb_serverlist * serverlist =
grpc_grpclb_response_parse_serverlist ( response_slice ) ;
if ( serverlist ! = NULL ) {
GPR_ASSERT ( glb_policy - > lb_call ! = NULL ) ;
grpc_slice_unref_internal ( exec_ctx , response_slice ) ;
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO , " Serverlist with %lu servers received " ,
( unsigned long ) serverlist - > num_servers ) ;
for ( size_t i = 0 ; i < serverlist - > num_servers ; + + i ) {
grpc_resolved_address addr ;
parse_server ( serverlist - > servers [ i ] , & addr ) ;
char * ipport ;
grpc_sockaddr_to_string ( & ipport , & addr , false ) ;
gpr_log ( GPR_INFO , " Serverlist[%lu]: %s " , ( unsigned long ) i , ipport ) ;
gpr_free ( ipport ) ;
grpc_grpclb_initial_response * response = NULL ;
if ( ! glb_policy - > seen_initial_response & &
( response = grpc_grpclb_initial_response_parse ( response_slice ) ) ! =
NULL ) {
if ( response - > has_client_stats_report_interval ) {
glb_policy - > client_stats_report_interval =
gpr_time_max ( gpr_time_from_seconds ( 1 , GPR_TIMESPAN ) ,
grpc_grpclb_duration_to_timespec (
& response - > client_stats_report_interval ) ) ;
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO ,
" received initial LB response message; "
" client load reporting interval = % " PRId64 " .%09d sec " ,
glb_policy - > client_stats_report_interval . tv_sec ,
glb_policy - > client_stats_report_interval . tv_nsec ) ;
}
/* take a weak ref (won't prevent calling of \a glb_shutdown() if the
* strong ref count goes to zero ) to be unref ' d in
* send_client_load_report ( ) */
glb_policy - > client_load_report_timer_pending = true ;
GRPC_LB_POLICY_WEAK_REF ( & glb_policy - > base , " client_load_report " ) ;
schedule_next_client_load_report ( exec_ctx , glb_policy ) ;
} else if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO ,
" received initial LB response message; "
" client load reporting NOT enabled " ) ;
}
grpc_grpclb_initial_response_destroy ( response ) ;
glb_policy - > seen_initial_response = true ;
} else {
grpc_grpclb_serverlist * serverlist =
grpc_grpclb_response_parse_serverlist ( response_slice ) ;
if ( serverlist ! = NULL ) {
GPR_ASSERT ( glb_policy - > lb_call ! = NULL ) ;
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO , " Serverlist with %lu servers received " ,
( unsigned long ) serverlist - > num_servers ) ;
for ( size_t i = 0 ; i < serverlist - > num_servers ; + + i ) {
grpc_resolved_address addr ;
parse_server ( serverlist - > servers [ i ] , & addr ) ;
char * ipport ;
grpc_sockaddr_to_string ( & ipport , & addr , false ) ;
gpr_log ( GPR_INFO , " Serverlist[%lu]: %s " , ( unsigned long ) i , ipport ) ;
gpr_free ( ipport ) ;
}
}
/* update serverlist */
if ( serverlist - > num_servers > 0 ) {
if ( grpc_grpclb_serverlist_equals ( glb_policy - > serverlist , serverlist ) ) {
/* update serverlist */
if ( serverlist - > num_servers > 0 ) {
if ( grpc_grpclb_serverlist_equals ( glb_policy - > serverlist ,
serverlist ) ) {
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO ,
" Incoming server list identical to current, ignoring. " ) ;
}
grpc_grpclb_destroy_serverlist ( serverlist ) ;
} else { /* new serverlist */
if ( glb_policy - > serverlist ! = NULL ) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist ( glb_policy - > serverlist ) ;
}
/* and update the copy in the glb_lb_policy instance. This
* serverlist instance will be destroyed either upon the next
* update or in glb_destroy ( ) */
glb_policy - > serverlist = serverlist ;
rr_handover_locked ( exec_ctx , glb_policy ) ;
}
} else {
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO ,
" Incoming server list identical to current, ignoring. " ) ;
" Received empty server list. Picks will stay pending until "
" a response with > 0 servers is received " ) ;
}
grpc_grpclb_destroy_serverlist ( serverlist ) ;
} else { /* new serverlist */
if ( glb_policy - > serverlist ! = NULL ) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist ( glb_policy - > serverlist ) ;
}
/* and update the copy in the glb_lb_policy instance. This serverlist
* instance will be destroyed either upon the next update or in
* glb_destroy ( ) */
glb_policy - > serverlist = serverlist ;
rr_handover_locked ( exec_ctx , glb_policy ) ;
}
} else {
if ( grpc_lb_glb_trace ) {
gpr_log ( GPR_INFO ,
" Received empty server list. Picks will stay pending until a "
" response with > 0 servers is received " ) ;
}
grpc_grpclb_destroy_serverlist ( glb_policy - > serverlist ) ;
} else { /* serverlist == NULL */
gpr_log ( GPR_ERROR , " Invalid LB response received: '%s'. Ignoring. " ,
grpc_dump_slice ( response_slice , GPR_DUMP_ASCII | GPR_DUMP_HEX ) ) ;
}
} else { /* serverlist == NULL */
gpr_log ( GPR_ERROR , " Invalid LB response received: '%s'. Ignoring. " ,
grpc_dump_slice ( response_slice , GPR_DUMP_ASCII | GPR_DUMP_HEX ) ) ;
grpc_slice_unref_internal ( exec_ctx , response_slice ) ;
}
grpc_slice_unref_internal ( exec_ctx , response_slice ) ;
if ( ! glb_policy - > shutting_down ) {
/* keep listening for serverlist updates */
op - > op = GRPC_OP_RECV_MESSAGE ;
@ -1413,9 +1647,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() {
}
/* Plugin registration */
// Only add client_load_reporting filter if the grpclb LB policy is used.
static bool maybe_add_client_load_reporting_filter (
grpc_exec_ctx * exec_ctx , grpc_channel_stack_builder * builder , void * arg ) {
const grpc_channel_args * args =
grpc_channel_stack_builder_get_channel_arguments ( builder ) ;
const grpc_arg * channel_arg =
grpc_channel_args_find ( args , GRPC_ARG_LB_POLICY_NAME ) ;
if ( channel_arg ! = NULL & & channel_arg - > type = = GRPC_ARG_STRING & &
strcmp ( channel_arg - > value . string , " grpclb " ) = = 0 ) {
return grpc_channel_stack_builder_append_filter (
builder , ( const grpc_channel_filter * ) arg , NULL , NULL ) ;
}
return true ;
}
void grpc_lb_policy_grpclb_init ( ) {
grpc_register_lb_policy ( grpc_glb_lb_factory_create ( ) ) ;
grpc_register_tracer ( " glb " , & grpc_lb_glb_trace ) ;
grpc_channel_init_register_stage ( GRPC_CLIENT_SUBCHANNEL ,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY ,
maybe_add_client_load_reporting_filter ,
( void * ) & grpc_client_load_reporting_filter ) ;
}
void grpc_lb_policy_grpclb_shutdown ( ) { }