@ -124,6 +124,21 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
static void end_all_the_calls ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_error * error ) ;
static void start_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) ;
static void finish_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) ;
static void cancel_pings ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_error * error ) ;
static void send_ping_locked ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_ping_type ping_type ,
grpc_closure * on_initiate ,
grpc_closure * on_complete ) ;
# define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0
# define DEFAULT_MAX_PINGS_BETWEEN_DATA 3
/*******************************************************************************
* CONSTRUCTION / DESTRUCTION / REFCOUNTING
*/
@ -155,16 +170,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
grpc_combiner_destroy ( exec_ctx , t - > combiner ) ;
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
while ( t - > pings . next ! = & t - > pings ) {
grpc_chttp2_outstanding_ping * ping = t - > pings . next ;
grpc_closure_sched ( exec_ctx , ping - > on_recv ,
GRPC_ERROR_CREATE ( " Transport closed " ) ) ;
ping - > next - > prev = ping - > prev ;
ping - > prev - > next = ping - > next ;
gpr_free ( ping ) ;
}
cancel_pings ( exec_ctx , t , GRPC_ERROR_CREATE ( " Transport destroyed " ) ) ;
while ( t - > write_cb_pool ) {
grpc_chttp2_write_cb * next = t - > write_cb_pool - > next ;
@ -172,6 +178,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
t - > write_cb_pool = next ;
}
gpr_free ( t - > ping_acks ) ;
gpr_free ( t - > peer_string ) ;
gpr_free ( t ) ;
}
@ -224,10 +231,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t - > is_client = is_client ;
t - > outgoing_window = DEFAULT_WINDOW ;
t - > incoming_window = DEFAULT_WINDOW ;
t - > stream_lookahead = DEFAULT_WINDOW ;
t - > connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET ;
t - > ping_counter = 1 ;
t - > pings . next = t - > pings . prev = & t - > pings ;
t - > deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0 ;
t - > is_first_frame = true ;
grpc_connectivity_state_init (
@ -248,6 +251,22 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init ( & t - > destructive_reclaimer_locked ,
destructive_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > start_bdp_ping_locked , start_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > finish_bdp_ping_locked , finish_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_bdp_estimator_init ( & t - > bdp_estimator , t - > peer_string ) ;
t - > last_pid_update = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
grpc_pid_controller_init (
& t - > pid_controller ,
( grpc_pid_controller_args ) { . gain_p = 4 ,
. gain_i = 8 ,
. gain_d = 0 ,
. initial_control_value = log2 ( DEFAULT_WINDOW ) ,
. min_control_value = - 1 ,
. max_control_value = 22 ,
. integral_range = 10 } ) ;
grpc_chttp2_goaway_parser_init ( & t - > goaway_parser ) ;
grpc_chttp2_hpack_parser_init ( exec_ctx , & t - > hpack_parser ) ;
@ -290,6 +309,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
DEFAULT_MAX_HEADER_LIST_SIZE ) ;
t - > ping_policy = ( grpc_chttp2_repeated_ping_policy ) {
. max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA ,
. min_time_between_pings =
gpr_time_from_millis ( DEFAULT_MIN_TIME_BETWEEN_PINGS_MS , GPR_TIMESPAN ) ,
} ;
if ( channel_args ) {
for ( i = 0 ; i < channel_args - > num_args ; i + + ) {
if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
@ -306,14 +331,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t - > next_stream_id = ( uint32_t ) value ;
}
}
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES ) ) {
const grpc_integer_options options = { - 1 , 5 , INT_MAX } ;
const int value =
grpc_channel_arg_get_integer ( & channel_args - > args [ i ] , options ) ;
if ( value > = 0 ) {
t - > stream_lookahead = ( uint32_t ) value ;
}
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER ) ) {
const grpc_integer_options options = { - 1 , 0 , INT_MAX } ;
@ -323,6 +340,19 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_hpack_compressor_set_max_usable_size ( & t - > hpack_compressor ,
( uint32_t ) value ) ;
}
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA ) ) {
t - > ping_policy . max_pings_without_data = grpc_channel_arg_get_integer (
& channel_args - > args [ i ] ,
( grpc_integer_options ) { DEFAULT_MAX_PINGS_BETWEEN_DATA , 0 , INT_MAX } ) ;
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS ) ) {
t - > ping_policy . min_time_between_pings = gpr_time_from_millis (
grpc_channel_arg_get_integer (
& channel_args - > args [ i ] ,
( grpc_integer_options ) { DEFAULT_MIN_TIME_BETWEEN_PINGS_MS , 0 ,
INT_MAX } ) ,
GPR_TIMESPAN ) ;
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE ) ) {
t - > write_buffer_size = ( uint32_t ) grpc_channel_arg_get_integer (
@ -334,24 +364,26 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_setting_id setting_id ;
grpc_integer_options integer_options ;
bool availability [ 2 ] /* server, client */ ;
} settings_map [ ] = {
{ GRPC_ARG_MAX_CONCURRENT_STREAMS ,
GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS ,
{ - 1 , 0 , INT_MAX } ,
{ true , false } } ,
{ GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER ,
GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ,
{ - 1 , 0 , INT_MAX } ,
{ true , true } } ,
{ GRPC_ARG_MAX_METADATA_SIZE ,
GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
{ - 1 , 0 , INT_MAX } ,
{ true , true } } ,
{ GRPC_ARG_HTTP2_MAX_FRAME_SIZE ,
GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ,
{ - 1 , 16384 , 16777215 } ,
{ true , true } } ,
} ;
} settings_map [ ] = { { GRPC_ARG_MAX_CONCURRENT_STREAMS ,
GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS ,
{ - 1 , 0 , INT32_MAX } ,
{ true , false } } ,
{ GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER ,
GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ,
{ - 1 , 0 , INT32_MAX } ,
{ true , true } } ,
{ GRPC_ARG_MAX_METADATA_SIZE ,
GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
{ - 1 , 0 , INT32_MAX } ,
{ true , true } } ,
{ GRPC_ARG_HTTP2_MAX_FRAME_SIZE ,
GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ,
{ - 1 , 16384 , 16777215 } ,
{ true , true } } ,
{ GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES ,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
{ - 1 , 5 , INT32_MAX } ,
{ true , true } } } ;
for ( j = 0 ; j < ( int ) GPR_ARRAY_SIZE ( settings_map ) ; j + + ) {
if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
settings_map [ j ] . channel_arg_name ) ) {
@ -374,6 +406,9 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
t - > ping_state . pings_before_data_required =
t - > ping_policy . max_pings_without_data ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " init " ) ;
post_benign_reclaimer ( exec_ctx , t ) ;
}
@ -425,6 +460,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:close " ) ;
}
end_all_the_calls ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
cancel_pings ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
@ -475,11 +511,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
if ( server_data ) {
s - > id = ( uint32_t ) ( uintptr_t ) server_data ;
s - > outgoing_window = t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > incoming_window = s - > max_recv_bytes =
t - > settings [ GRPC_SENT_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
* t - > accepting_stream = s ;
grpc_chttp2_stream_map_add ( & t - > stream_map , s - > id , s ) ;
post_destructive_reclaimer ( exec_ctx , t ) ;
@ -508,6 +539,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
}
grpc_chttp2_list_remove_stalled_by_transport ( t , s ) ;
grpc_chttp2_list_remove_stalled_by_stream ( t , s ) ;
for ( int i = 0 ; i < STREAM_LIST_COUNT ; i + + ) {
if ( s - > included [ i ] ) {
@ -647,13 +679,21 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END ( " grpc_chttp2_initiate_write " , 0 ) ;
}
void grpc_chttp2_become_writable ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , bool covered_by_poller ,
const char * reason ) {
void grpc_chttp2_become_writable (
grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t , grpc_chttp2_stream * s ,
grpc_chttp2_stream_write_type stream_write_type , const char * reason ) {
if ( ! t - > closed & & grpc_chttp2_list_add_writable_stream ( t , s ) ) {
GRPC_CHTTP2_STREAM_REF ( s , " chttp2_writing:become " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , covered_by_poller , reason ) ;
}
switch ( stream_write_type ) {
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK :
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , true , reason ) ;
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , false , reason ) ;
break ;
}
}
@ -781,7 +821,6 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx,
static void maybe_start_some_streams ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ) {
grpc_chttp2_stream * s ;
uint32_t stream_incoming_window ;
/* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */
while ( t - > next_stream_id < = MAX_CLIENT_STREAM_ID & &
@ -804,15 +843,11 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
" no_more_stream_ids " ) ;
}
s - > outgoing_window = t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > incoming_window = stream_incoming_window =
t - > settings [ GRPC_SENT_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > max_recv_bytes = GPR_MAX ( stream_incoming_window , s - > max_recv_bytes ) ;
grpc_chttp2_stream_map_add ( & t - > stream_map , s - > id , s ) ;
post_destructive_reclaimer ( exec_ctx , t ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s , true , " new_stream " ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" new_stream " ) ;
}
/* cancel out streams that will never be started */
while ( t - > next_stream_id > = MAX_CLIENT_STREAM_ID & &
@ -907,7 +942,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream * s ) {
if ( s - > id ! = 0 & & ( ! s - > write_buffering | |
s - > flow_controlled_buffer . length > t - > write_buffer_size ) ) {
grpc_chttp2_become_writable ( exec_ctx , t , s , true , " op.send_message " ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" op.send_message " ) ;
}
}
@ -1069,7 +1106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT ( s - > id ! = 0 ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s , true ,
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" op.send_initial_metadata " ) ;
}
} else {
@ -1160,7 +1198,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if ( s - > id ! = 0 ) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_become_writable ( exec_ctx , t , s , true ,
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" op.send_trailing_metadata " ) ;
}
}
@ -1179,8 +1218,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s - > recv_message = op - > recv_message ;
if ( s - > id ! = 0 & &
( s - > incoming_frames . head = = NULL | | s - > incoming_frames . head - > is_tail ) ) {
incoming_byte_stream_update_flow_control ( exec_ctx , t , s ,
t - > stream_lookahead , 0 ) ;
incoming_byte_stream_update_flow_control ( exec_ctx , t , s , 5 , 0 ) ;
}
grpc_chttp2_maybe_complete_recv_message ( exec_ctx , t , s ) ;
}
@ -1224,43 +1262,46 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_TIMER_END ( " perform_stream_op " , 0 ) ;
}
static void cancel_pings ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_error * error ) {
/* callback remaining pings: they're not allowed to call into the transpot,
and maybe they hold resources that need to be freed */
for ( size_t i = 0 ; i < GRPC_CHTTP2_PING_TYPE_COUNT ; i + + ) {
grpc_chttp2_ping_queue * pq = & t - > ping_queues [ i ] ;
for ( size_t j = 0 ; j < GRPC_CHTTP2_PCL_COUNT ; j + + ) {
grpc_closure_list_fail_all ( & pq - > lists [ j ] , GRPC_ERROR_REF ( error ) ) ;
grpc_closure_list_sched ( exec_ctx , & pq - > lists [ j ] ) ;
}
}
GRPC_ERROR_UNREF ( error ) ;
}
static void send_ping_locked ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_closure * on_recv ) {
grpc_chttp2_outstanding_ping * p = gpr_malloc ( sizeof ( * p ) ) ;
p - > next = & t - > pings ;
p - > prev = p - > next - > prev ;
p - > prev - > next = p - > next - > prev = p ;
p - > id [ 0 ] = ( uint8_t ) ( ( t - > ping_counter > > 56 ) & 0xff ) ;
p - > id [ 1 ] = ( uint8_t ) ( ( t - > ping_counter > > 48 ) & 0xff ) ;
p - > id [ 2 ] = ( uint8_t ) ( ( t - > ping_counter > > 40 ) & 0xff ) ;
p - > id [ 3 ] = ( uint8_t ) ( ( t - > ping_counter > > 32 ) & 0xff ) ;
p - > id [ 4 ] = ( uint8_t ) ( ( t - > ping_counter > > 24 ) & 0xff ) ;
p - > id [ 5 ] = ( uint8_t ) ( ( t - > ping_counter > > 16 ) & 0xff ) ;
p - > id [ 6 ] = ( uint8_t ) ( ( t - > ping_counter > > 8 ) & 0xff ) ;
p - > id [ 7 ] = ( uint8_t ) ( t - > ping_counter & 0xff ) ;
t - > ping_counter + + ;
p - > on_recv = on_recv ;
grpc_slice_buffer_add ( & t - > qbuf , grpc_chttp2_ping_create ( 0 , p - > id ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , true , " send_ping " ) ;
grpc_chttp2_ping_type ping_type ,
grpc_closure * on_initiate , grpc_closure * on_ack ) {
grpc_chttp2_ping_queue * pq = & t - > ping_queues [ ping_type ] ;
grpc_closure_list_append ( & pq - > lists [ GRPC_CHTTP2_PCL_INITIATE ] , on_initiate ,
GRPC_ERROR_NONE ) ;
if ( grpc_closure_list_append ( & pq - > lists [ GRPC_CHTTP2_PCL_NEXT ] , on_ack ,
GRPC_ERROR_NONE ) ) {
grpc_chttp2_initiate_write ( exec_ctx , t , false , " send_ping " ) ;
}
}
void grpc_chttp2_ack_ping ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
const uint8_t * opaque_8bytes ) {
grpc_chttp2_outstanding_ping * ping ;
for ( ping = t - > pings . next ; ping ! = & t - > pings ; ping = ping - > next ) {
if ( 0 = = memcmp ( opaque_8bytes , ping - > id , 8 ) ) {
grpc_closure_sched ( exec_ctx , ping - > on_recv , GRPC_ERROR_NONE ) ;
ping - > next - > prev = ping - > prev ;
ping - > prev - > next = ping - > next ;
gpr_free ( ping ) ;
return ;
}
uint64_t id ) {
grpc_chttp2_ping_queue * pq =
& t - > ping_queues [ id % GRPC_CHTTP2_PING_TYPE_COUNT ] ;
if ( pq - > inflight_id ! = id ) {
char * from = grpc_endpoint_get_peer ( t - > ep ) ;
gpr_log ( GPR_DEBUG , " Unknown ping response from %s: % " PRIx64 , from , id ) ;
gpr_free ( from ) ;
return ;
}
grpc_closure_list_sched ( exec_ctx , & pq - > lists [ GRPC_CHTTP2_PCL_INFLIGHT ] ) ;
if ( ! grpc_closure_list_empty ( pq - > lists [ GRPC_CHTTP2_PCL_NEXT ] ) ) {
grpc_chttp2_initiate_write ( exec_ctx , t , false , " continue_pings " ) ;
}
char * msg = gpr_dump ( ( const char * ) opaque_8bytes , 8 , GPR_DUMP_HEX ) ;
char * from = grpc_endpoint_get_peer ( t - > ep ) ;
gpr_log ( GPR_DEBUG , " Unknown ping response from %s: %s " , from , msg ) ;
gpr_free ( from ) ;
gpr_free ( msg ) ;
}
static void send_goaway ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
@ -1308,7 +1349,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if ( op - > send_ping ) {
send_ping_locked ( exec_ctx , t , op - > send_ping ) ;
send_ping_locked ( exec_ctx , t , GRPC_CHTTP2_PING_ON_NEXT_WRITE , NULL ,
op - > send_ping ) ;
}
if ( close_transport ! = GRPC_ERROR_NONE ) {
@ -1733,34 +1775,28 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF ( error ) ;
}
/** update window from a settings change */
typedef struct {
grpc_chttp2_transport * t ;
grpc_exec_ctx * exec_ctx ;
} update_global_window_args ;
/*******************************************************************************
* INPUT PROCESSING - PARSING
*/
static void update_global_window ( void * args , uint32_t id , void * stream ) {
update_global_window_args * a = args ;
grpc_chttp2_transport * t = a - > t ;
grpc_chttp2_stream * s = stream ;
int was_zero ;
int is_zero ;
int64_t initial_window_update = t - > initial_window_update ;
if ( initial_window_update > 0 ) {
was_zero = s - > outgoing_window < = 0 ;
GRPC_CHTTP2_FLOW_CREDIT_STREAM ( " settings " , t , s , outgoing_window ,
initial_window_update ) ;
is_zero = s - > outgoing_window < = 0 ;
if ( was_zero & & ! is_zero ) {
grpc_chttp2_become_writable ( a - > exec_ctx , t , s , true ,
" update_global_window " ) ;
}
static void update_bdp ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
double bdp_dbl ) {
uint32_t bdp ;
if ( bdp_dbl < = 0 ) {
bdp = 0 ;
} else if ( bdp_dbl > UINT32_MAX ) {
bdp = UINT32_MAX ;
} else {
GRPC_CHTTP2_FLOW_DEBIT_STREAM ( " settings " , t , s , outgoing_window ,
- initial_window_update ) ;
bdp = ( uint32_t ) ( bdp_dbl ) ;
}
int64_t delta =
( int64_t ) bdp -
( int64_t ) t - > settings [ GRPC_LOCAL_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
if ( delta = = 0 | | ( bdp ! = 0 & & delta > - 1024 & & delta < 1024 ) ) {
return ;
}
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE , bdp ) ;
}
/*******************************************************************************
@ -1802,6 +1838,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN ( " reading_action_locked " , 0 ) ;
grpc_chttp2_transport * t = tp ;
bool need_bdp_ping = false ;
GRPC_ERROR_REF ( error ) ;
@ -1819,9 +1856,14 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error * errors [ 3 ] = { GRPC_ERROR_REF ( error ) , GRPC_ERROR_NONE ,
GRPC_ERROR_NONE } ;
for ( ; i < t - > read_buffer . count & & errors [ 1 ] = = GRPC_ERROR_NONE ; i + + ) {
if ( grpc_bdp_estimator_add_incoming_bytes (
& t - > bdp_estimator ,
( int64_t ) GRPC_SLICE_LENGTH ( t - > read_buffer . slices [ i ] ) ) ) {
need_bdp_ping = true ;
}
errors [ 1 ] =
grpc_chttp2_perform_read ( exec_ctx , t , t - > read_buffer . slices [ i ] ) ;
} ;
}
if ( errors [ 1 ] ! = GRPC_ERROR_NONE ) {
errors [ 2 ] = try_http_parsing ( exec_ctx , t ) ;
GRPC_ERROR_UNREF ( error ) ;
@ -1835,21 +1877,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN ( " post_parse_locked " , 0 ) ;
if ( t - > initial_window_update ! = 0 ) {
update_global_window_args args = { t , exec_ctx } ;
grpc_chttp2_stream_map_for_each ( & t - > stream_map , update_global_window ,
& args ) ;
if ( t - > initial_window_update > 0 ) {
grpc_chttp2_stream * s ;
while ( grpc_chttp2_list_pop_stalled_by_stream ( t , & s ) ) {
grpc_chttp2_become_writable (
exec_ctx , t , s , GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED ,
" unstalled " ) ;
}
}
t - > initial_window_update = 0 ;
}
/* handle higher level things */
if ( t - > incoming_window < t - > connection_window_target * 3 / 4 ) {
int64_t announce_bytes = t - > connection_window_target - t - > incoming_window ;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT ( " parsed " , t , announce_incoming_window ,
announce_bytes ) ;
GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT ( " parsed " , t , incoming_window ,
announce_bytes ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " global incoming window " ) ;
}
GPR_TIMER_END ( " post_parse_locked " , 0 ) ;
}
@ -1870,6 +1907,35 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if ( keep_reading ) {
grpc_endpoint_read ( exec_ctx , t - > ep , & t - > read_buffer ,
& t - > read_action_locked ) ;
if ( need_bdp_ping ) {
GRPC_CHTTP2_REF_TRANSPORT ( t , " bdp_ping " ) ;
grpc_bdp_estimator_schedule_ping ( & t - > bdp_estimator ) ;
send_ping_locked ( exec_ctx , t ,
GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE ,
& t - > start_bdp_ping_locked , & t - > finish_bdp_ping_locked ) ;
}
int64_t estimate = - 1 ;
if ( grpc_bdp_estimator_get_estimate ( & t - > bdp_estimator , & estimate ) ) {
double target = 1 + log2 ( ( double ) estimate ) ;
double memory_pressure = grpc_resource_quota_get_memory_pressure (
grpc_resource_user_quota ( grpc_endpoint_get_resource_user ( t - > ep ) ) ) ;
if ( memory_pressure > 0.8 ) {
target * = 1 - GPR_MIN ( 1 , ( memory_pressure - 0.8 ) / 0.1 ) ;
}
double bdp_error = target - grpc_pid_controller_last ( & t - > pid_controller ) ;
gpr_timespec now = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
gpr_timespec dt_timespec = gpr_time_sub ( now , t - > last_pid_update ) ;
double dt = ( double ) dt_timespec . tv_sec + dt_timespec . tv_nsec * 1e-9 ;
if ( dt > 0.1 ) {
dt = 0.1 ;
}
double log2_bdp_guess =
grpc_pid_controller_update ( & t - > pid_controller , bdp_error , dt ) ;
update_bdp ( exec_ctx , t , pow ( 2 , log2_bdp_guess ) ) ;
t - > last_pid_update = now ;
}
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " keep_reading " ) ;
} else {
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " reading_action " ) ;
@ -1882,6 +1948,26 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END ( " reading_action_locked " , 0 ) ;
}
static void start_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
if ( grpc_http_trace ) {
gpr_log ( GPR_DEBUG , " %s: Start BDP ping " , t - > peer_string ) ;
}
grpc_bdp_estimator_start_ping ( & t - > bdp_estimator ) ;
}
static void finish_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
if ( grpc_http_trace ) {
gpr_log ( GPR_DEBUG , " %s: Complete BDP ping " , t - > peer_string ) ;
}
grpc_bdp_estimator_complete_ping ( & t - > bdp_estimator ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " bdp_ping " ) ;
}
/*******************************************************************************
* CALLBACK LOOP
*/
@ -1932,10 +2018,12 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
size_t max_size_hint ,
size_t have_already ) {
uint32_t max_recv_bytes ;
uint32_t initial_window_size =
t - > settings [ GRPC_SENT_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
/* clamp max recv hint to an allowable size */
if ( max_size_hint > = UINT32_MAX - t - > stream_lookahead ) {
max_recv_bytes = UINT32_MAX - t - > stream_lookahead ;
if ( max_size_hint > = UINT32_MAX - initial_window_size ) {
max_recv_bytes = UINT32_MAX - initial_window_size ;
} else {
max_recv_bytes = ( uint32_t ) max_size_hint ;
}
@ -1948,20 +2036,26 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT ( max_recv_bytes < = UINT32_MAX - t - > stream_lookahead ) ;
max_recv_bytes + = t - > stream_lookahead ;
if ( s - > max_recv_bytes < max_recv_bytes ) {
uint32_t add_max_recv_bytes = max_recv_bytes - s - > max_recv_bytes ;
bool new_window_write_is_covered_by_poller =
s - > max_recv_bytes < have_already ;
GRPC_CHTTP2_FLOW_CREDIT_STREAM ( " op " , t , s , max_recv_bytes ,
add_max_recv_bytes ) ;
GRPC_CHTTP2_FLOW_CREDIT_STREAM ( " op " , t , s , incoming_window ,
GPR_ASSERT ( max_recv_bytes < = UINT32_MAX - initial_window_size ) ;
if ( s - > incoming_window_delta < max_recv_bytes & & ! s - > read_closed ) {
uint32_t add_max_recv_bytes =
( uint32_t ) ( max_recv_bytes - s - > incoming_window_delta ) ;
grpc_chttp2_stream_write_type write_type =
GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED ;
if ( s - > incoming_window_delta + initial_window_size <
( int64_t ) have_already ) {
write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ;
}
GRPC_CHTTP2_FLOW_CREDIT_STREAM ( " op " , t , s , incoming_window_delta ,
add_max_recv_bytes ) ;
GRPC_CHTTP2_FLOW_CREDIT_STREAM ( " op " , t , s , announce_window ,
add_max_recv_bytes ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s ,
new_window_write_is_covered_by_poller ,
if ( ( int64_t ) s - > incoming_window_delta + ( int64_t ) initial_window_size -
( int64_t ) s - > announce_window >
( int64_t ) initial_window_size / 2 ) {
write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK ;
}
grpc_chttp2_become_writable ( exec_ctx , t , s , write_type ,
" read_incoming_stream " ) ;
}
}