@ -84,8 +84,6 @@ grpc_tracer_flag grpc_trace_chttp2_refcount =
GRPC_TRACER_INITIALIZER ( false , " chttp2_refcount " ) ;
# endif
static const grpc_transport_vtable vtable ;
/* forward declarations of various callbacks that we'll build closures around */
static void write_action_begin_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
@ -248,6 +246,8 @@ void grpc_chttp2_unref_transport(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_ref_transport ( grpc_chttp2_transport * t ) { gpr_ref ( & t - > refs ) ; }
# endif
static const grpc_transport_vtable * get_vtable ( void ) ;
static void init_transport ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
const grpc_channel_args * channel_args ,
grpc_endpoint * ep , bool is_client ) {
@ -257,7 +257,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT ( strlen ( GRPC_CHTTP2_CLIENT_CONNECT_STRING ) = =
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN ) ;
t - > base . vtable = & vtable ;
t - > base . vtable = get_vtable ( ) ;
t - > ep = ep ;
/* one ref is for destroy */
gpr_ref_init ( & t - > refs , 1 ) ;
@ -557,11 +557,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
}
GRPC_CLOSURE_INIT ( & t - > write_action , write_action , t ,
t - > opt_target = = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT
? grpc_executor_scheduler
: grpc_schedule_on_exec_ctx ) ;
t - > ping_state . pings_before_data_required =
t - > ping_policy . max_pings_without_data ;
t - > ping_state . is_delayed_ping_timer_set = false ;
@ -589,7 +584,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void destroy_transport_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
t - > destroying = 1 ;
close_transport_locked (
exec_ctx , t ,
@ -715,7 +710,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked ( grpc_exec_ctx * exec_ctx , void * sp ,
grpc_error * error ) {
grpc_chttp2_stream * s = sp ;
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) sp ;
grpc_chttp2_transport * t = s - > t ;
GPR_TIMER_BEGIN ( " destroy_stream " , 0 ) ;
@ -799,7 +794,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream * grpc_chttp2_parsing_lookup_stream ( grpc_chttp2_transport * t ,
uint32_t id ) {
return grpc_chttp2_stream_map_find ( & t - > stream_map , id ) ;
return ( grpc_chttp2_stream * ) grpc_chttp2_stream_map_find ( & t - > stream_map , id ) ;
}
grpc_chttp2_stream * grpc_chttp2_parsing_accept_stream ( grpc_exec_ctx * exec_ctx ,
@ -858,6 +853,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
switch ( t - > write_state ) {
case GRPC_CHTTP2_WRITE_STATE_IDLE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING , reason ) ;
t - > is_first_write_in_batch = true ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
GRPC_CLOSURE_SCHED (
exec_ctx ,
@ -876,52 +872,100 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END ( " grpc_chttp2_initiate_write " , 0 ) ;
}
void grpc_chttp2_become_writable (
grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t , grpc_chttp2_stream * s ,
grpc_chttp2_stream_write_type stream_write_type , const char * reason ) {
void grpc_chttp2_become_writable ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s ,
bool also_initiate_write , const char * reason ) {
if ( ! t - > closed & & grpc_chttp2_list_add_writable_stream ( t , s ) ) {
GRPC_CHTTP2_STREAM_REF ( s , " chttp2_writing:become " ) ;
}
switch ( stream_write_type ) {
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK :
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , reason ) ;
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , reason ) ;
break ;
if ( also_initiate_write ) {
grpc_chttp2_initiate_write ( exec_ctx , t , reason ) ;
}
}
static grpc_closure_scheduler * write_scheduler ( grpc_chttp2_transport * t ,
bool early_results_scheduled ,
bool partial_write ) {
/* if it's not the first write in a batch, always offload to the executor:
we ' ll probably end up queuing against the kernel anyway , so we ' ll likely
get better latency overall if we switch writing work elsewhere and continue
with application work above */
if ( ! t - > is_first_write_in_batch ) {
return grpc_executor_scheduler ( GRPC_EXECUTOR_SHORT ) ;
}
/* equivalently, if it's a partial write, we *know* we're going to be taking a
thread jump to write it because of the above , may as well do so
immediately */
if ( partial_write ) {
return grpc_executor_scheduler ( GRPC_EXECUTOR_SHORT ) ;
}
switch ( t - > opt_target ) {
case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT :
/* executor gives us the largest probability of being able to batch a
* write with others on this transport */
return grpc_executor_scheduler ( GRPC_EXECUTOR_SHORT ) ;
case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY :
return grpc_schedule_on_exec_ctx ;
}
GPR_UNREACHABLE_CODE ( return NULL ) ;
}
# define WRITE_STATE_TUPLE_TO_INT(p, i) (2 * (int)(p) + (int)(i))
static const char * begin_writing_desc ( bool partial , bool inlined ) {
switch ( WRITE_STATE_TUPLE_TO_INT ( partial , inlined ) ) {
case WRITE_STATE_TUPLE_TO_INT ( false , false ) :
return " begin write in background " ;
case WRITE_STATE_TUPLE_TO_INT ( false , true ) :
return " begin write in current thread " ;
case WRITE_STATE_TUPLE_TO_INT ( true , false ) :
return " begin partial write in background " ;
case WRITE_STATE_TUPLE_TO_INT ( true , true ) :
return " begin partial write in current thread " ;
}
GPR_UNREACHABLE_CODE ( return " bad state tuple " ) ;
}
static void write_action_begin_locked ( grpc_exec_ctx * exec_ctx , void * gt ,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " write_action_begin_locked " , 0 ) ;
grpc_chttp2_transport * t = gt ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) gt ;
GPR_ASSERT ( t - > write_state ! = GRPC_CHTTP2_WRITE_STATE_IDLE ) ;
switch ( t - > closed ? GRPC_CHTTP2_NOTHING_TO_WRITE
: grpc_chttp2_begin_write ( exec_ctx , t ) ) {
case GRPC_CHTTP2_NOTHING_TO_WRITE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_IDLE ,
" begin writing nothing " ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " writing " ) ;
break ;
case GRPC_CHTTP2_PARTIAL_WRITE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE ,
" begin writing partial " ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_FULL_WRITE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" begin writing " ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
break ;
grpc_chttp2_begin_write_result r ;
if ( t - > closed ) {
r . writing = false ;
} else {
r = grpc_chttp2_begin_write ( exec_ctx , t ) ;
}
if ( r . writing ) {
if ( r . partial ) {
GRPC_STATS_INC_HTTP2_PARTIAL_WRITES ( exec_ctx ) ;
}
if ( ! t - > is_first_write_in_batch ) {
GRPC_STATS_INC_HTTP2_WRITES_CONTINUED ( exec_ctx ) ;
}
grpc_closure_scheduler * scheduler =
write_scheduler ( t , r . early_results_scheduled , r . partial ) ;
if ( scheduler ! = grpc_schedule_on_exec_ctx ) {
GRPC_STATS_INC_HTTP2_WRITES_OFFLOADED ( exec_ctx ) ;
}
set_write_state (
exec_ctx , t , r . partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
: GRPC_CHTTP2_WRITE_STATE_WRITING ,
begin_writing_desc ( r . partial , scheduler = = grpc_schedule_on_exec_ctx ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , GRPC_CLOSURE_INIT ( & t - > write_action ,
write_action , t , scheduler ) ,
GRPC_ERROR_NONE ) ;
} else {
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_IDLE ,
" begin writing nothing " ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " writing " ) ;
}
GPR_TIMER_END ( " write_action_begin_locked " , 0 ) ;
}
static void write_action ( grpc_exec_ctx * exec_ctx , void * gt , grpc_error * error ) {
grpc_chttp2_transport * t = gt ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) gt ;
GPR_TIMER_BEGIN ( " write_action " , 0 ) ;
grpc_endpoint_write (
exec_ctx , t - > ep , & t - > outbuf ,
@ -933,7 +977,7 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
static void write_action_end_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
GPR_TIMER_BEGIN ( " terminate_writing_with_lock " , 0 ) ;
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
if ( error ! = GRPC_ERROR_NONE ) {
close_transport_locked ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
@ -958,7 +1002,8 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE :
GPR_TIMER_MARK ( " state=writing_stale_no_poller " , 0 ) ;
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" continue writing [!covered] " ) ;
" continue writing " ) ;
t - > is_first_write_in_batch = false ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
GRPC_CLOSURE_RUN (
exec_ctx ,
@ -1060,9 +1105,7 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_add ( & t - > stream_map , s - > id , s ) ;
post_destructive_reclaimer ( exec_ctx , t ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" new_stream " ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s , true , " new_stream " ) ;
}
/* cancel out streams that will never be started */
while ( t - > next_stream_id > = MAX_CLIENT_STREAM_ID & &
@ -1111,12 +1154,14 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure - > next_data . scratch - = CLOSURE_BARRIER_FIRST_REF_BIT ;
if ( GRPC_TRACER_ON ( grpc_http_trace ) ) {
const char * errstr = grpc_error_string ( error ) ;
gpr_log ( GPR_DEBUG ,
" complete_closure_step: %p refs=%d flags=0x%04x desc=%s err=%s " ,
closure ,
( int ) ( closure - > next_data . scratch / CLOSURE_BARRIER_FIRST_REF_BIT ) ,
( int ) ( closure - > next_data . scratch % CLOSURE_BARRIER_FIRST_REF_BIT ) ,
desc , errstr ) ;
gpr_log (
GPR_DEBUG ,
" complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
" write_state=%s " ,
t , closure ,
( int ) ( closure - > next_data . scratch / CLOSURE_BARRIER_FIRST_REF_BIT ) ,
( int ) ( closure - > next_data . scratch % CLOSURE_BARRIER_FIRST_REF_BIT ) , desc ,
errstr , write_state_name ( t - > write_state ) ) ;
}
if ( error ! = GRPC_ERROR_NONE ) {
if ( closure - > error_data . error = = GRPC_ERROR_NONE ) {
@ -1157,9 +1202,7 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream * s ) {
if ( s - > id ! = 0 & & ( ! s - > write_buffering | |
s - > flow_controlled_buffer . length > t - > write_buffer_size ) ) {
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
" op.send_message " ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s , true , " op.send_message " ) ;
}
}
@ -1191,15 +1234,19 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
} else {
grpc_chttp2_write_cb * cb = t - > write_cb_pool ;
if ( cb = = NULL ) {
cb = gpr_malloc ( sizeof ( * cb ) ) ;
cb = ( grpc_chttp2_write_cb * ) gpr_malloc ( sizeof ( * cb ) ) ;
} else {
t - > write_cb_pool = cb - > next ;
}
cb - > call_at_byte = notify_offset ;
cb - > closure = s - > fetching_send_message_finished ;
s - > fetching_send_message_finished = NULL ;
cb - > next = s - > on_write_finished_cbs ;
s - > on_write_finished_cbs = cb ;
grpc_chttp2_write_cb * * list =
s - > fetching_send_message - > flags & GRPC_WRITE_THROUGH
? & s - > on_write_finished_cbs
: & s - > on_flow_controlled_cbs ;
cb - > next = * list ;
* list = cb ;
}
s - > fetching_send_message = NULL ;
return ; /* early out */
@ -1219,7 +1266,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
static void complete_fetch_locked ( grpc_exec_ctx * exec_ctx , void * gs ,
grpc_error * error ) {
grpc_chttp2_stream * s = gs ;
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) gs ;
grpc_chttp2_transport * t = s - > t ;
if ( error = = GRPC_ERROR_NONE ) {
error = grpc_byte_stream_pull ( exec_ctx , s - > fetching_send_message ,
@ -1254,8 +1301,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error * error_ignored ) {
GPR_TIMER_BEGIN ( " perform_stream_op_locked " , 0 ) ;
grpc_transport_stream_op_batch * op = stream_op ;
grpc_chttp2_stream * s = op - > handler_private . extra_arg ;
grpc_transport_stream_op_batch * op =
( grpc_transport_stream_op_batch * ) stream_op ;
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) op - > handler_private . extra_arg ;
grpc_transport_stream_op_batch_payload * op_payload = op - > payload ;
grpc_chttp2_transport * t = s - > t ;
@ -1308,7 +1356,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
if ( ( s - > stream_compression_send_enabled =
( op_payload - > send_initial_metadata . send_initial_metadata - > idx . named
. content_encoding ! = NULL ) ) = = true ) {
s - > compressed_data_buffer = gpr_malloc ( sizeof ( grpc_slice_buffer ) ) ;
s - > compressed_data_buffer =
( grpc_slice_buffer * ) gpr_malloc ( sizeof ( grpc_slice_buffer ) ) ;
grpc_slice_buffer_init ( s - > compressed_data_buffer ) ;
}
@ -1355,14 +1404,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
} else {
GPR_ASSERT ( s - > id ! = 0 ) ;
grpc_chttp2_stream_write_type write_type =
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ;
bool initiate_write = true ;
if ( op - > send_message & &
( op - > payload - > send_message . send_message - > flags &
GRPC_WRITE_BUFFER_HINT ) ) {
write_type = GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK ;
initiate_write = false ;
}
grpc_chttp2_become_writable ( exec_ctx , t , s , write_typ e,
grpc_chttp2_become_writable ( exec_ctx , t , s , initiate_writ e,
" op.send_initial_metadata " ) ;
}
} else {
@ -1471,8 +1519,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
} else if ( s - > id ! = 0 ) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
grpc_chttp2_become_writable ( exec_ctx , t , s , true ,
" op.send_trailing_metadata " ) ;
}
}
@ -1599,7 +1646,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void retry_initiate_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
t - > ping_state . is_delayed_ping_timer_set = false ;
grpc_chttp2_initiate_write ( exec_ctx , t , " retry_send_ping " ) ;
}
@ -1651,8 +1698,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx,
static void perform_transport_op_locked ( grpc_exec_ctx * exec_ctx ,
void * stream_op ,
grpc_error * error_ignored ) {
grpc_transport_op * op = stream_op ;
grpc_chttp2_transport * t = op - > handler_private . extra_arg ;
grpc_transport_op * op = ( grpc_transport_op * ) stream_op ;
grpc_chttp2_transport * t =
( grpc_chttp2_transport * ) op - > handler_private . extra_arg ;
grpc_error * close_transport = op - > disconnect_with_error ;
if ( op - > goaway_error ) {
@ -1864,7 +1912,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
static void remove_stream ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
uint32_t id , grpc_error * error ) {
grpc_chttp2_stream * s = grpc_chttp2_stream_map_delete ( & t - > stream_map , id ) ;
grpc_chttp2_stream * s =
( grpc_chttp2_stream * ) grpc_chttp2_stream_map_delete ( & t - > stream_map , id ) ;
GPR_ASSERT ( s ) ;
if ( t - > incoming_stream = = s ) {
t - > incoming_stream = NULL ;
@ -1995,6 +2044,21 @@ static grpc_error *removal_error(grpc_error *extra_error, grpc_chttp2_stream *s,
return error ;
}
static void flush_write_list ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , grpc_chttp2_write_cb * * list ,
grpc_error * error ) {
while ( * list ) {
grpc_chttp2_write_cb * cb = * list ;
* list = cb - > next ;
grpc_chttp2_complete_closure_step ( exec_ctx , t , s , & cb - > closure ,
GRPC_ERROR_REF ( error ) ,
" on_write_finished_cb " ) ;
cb - > next = t - > write_cb_pool ;
t - > write_cb_pool = cb ;
}
GRPC_ERROR_UNREF ( error ) ;
}
void grpc_chttp2_fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , grpc_error * error ) {
@ -2014,16 +2078,9 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_complete_closure_step (
exec_ctx , t , s , & s - > fetching_send_message_finished , GRPC_ERROR_REF ( error ) ,
" fetching_send_message_finished " ) ;
while ( s - > on_write_finished_cbs ) {
grpc_chttp2_write_cb * cb = s - > on_write_finished_cbs ;
s - > on_write_finished_cbs = cb - > next ;
grpc_chttp2_complete_closure_step ( exec_ctx , t , s , & cb - > closure ,
GRPC_ERROR_REF ( error ) ,
" on_write_finished_cb " ) ;
cb - > next = t - > write_cb_pool ;
t - > write_cb_pool = cb ;
}
GRPC_ERROR_UNREF ( error ) ;
flush_write_list ( exec_ctx , t , s , & s - > on_write_finished_cbs ,
GRPC_ERROR_REF ( error ) ) ;
flush_write_list ( exec_ctx , t , s , & s - > on_flow_controlled_cbs , error ) ;
}
void grpc_chttp2_mark_stream_closed ( grpc_exec_ctx * exec_ctx ,
@ -2242,8 +2299,8 @@ typedef struct {
} cancel_stream_cb_args ;
static void cancel_stream_cb ( void * user_data , uint32_t key , void * stream ) {
cancel_stream_cb_args * args = user_data ;
grpc_chttp2_stream * s = stream ;
cancel_stream_cb_args * args = ( cancel_stream_cb_args * ) user_data ;
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) stream ;
grpc_chttp2_cancel_stream ( args - > exec_ctx , args - > t , s ,
GRPC_ERROR_REF ( args - > error ) ) ;
}
@ -2267,13 +2324,11 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED :
break ;
case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY :
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED ,
grpc_chttp2_become_writable ( exec_ctx , t , s , true ,
" immediate stream flowctl " ) ;
break ;
case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE :
grpc_chttp2_become_writable ( exec_ctx , t , s ,
GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK ,
grpc_chttp2_become_writable ( exec_ctx , t , s , false ,
" queue stream flowctl " ) ;
break ;
}
@ -2345,7 +2400,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error * error ) {
GPR_TIMER_BEGIN ( " reading_action_locked " , 0 ) ;
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
GRPC_ERROR_REF ( error ) ;
@ -2386,9 +2441,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
if ( t - > flow_control . initial_window_update > 0 ) {
grpc_chttp2_stream * s ;
while ( grpc_chttp2_list_pop_stalled_by_stream ( t , & s ) ) {
grpc_chttp2_become_writable (
exec_ctx , t , s , GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED ,
" unstalled " ) ;
grpc_chttp2_become_writable ( exec_ctx , t , s , true , " unstalled " ) ;
}
}
t - > flow_control . initial_window_update = 0 ;
@ -2430,7 +2483,7 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void start_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
if ( GRPC_TRACER_ON ( grpc_http_trace ) ) {
gpr_log ( GPR_DEBUG , " %s: Start BDP ping " , t - > peer_string ) ;
}
@ -2443,7 +2496,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void finish_bdp_ping_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) tp ;
if ( GRPC_TRACER_ON ( grpc_http_trace ) ) {
gpr_log ( GPR_DEBUG , " %s: Complete BDP ping " , t - > peer_string ) ;
}
@ -2492,7 +2545,7 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args,
static void init_keepalive_ping_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
GPR_ASSERT ( t - > keepalive_state = = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING ) ;
if ( t - > destroying | | t - > closed ) {
t - > keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING ;
@ -2524,7 +2577,7 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void start_keepalive_ping_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " keepalive watchdog " ) ;
grpc_timer_init (
exec_ctx , & t - > keepalive_watchdog_timer ,
@ -2534,7 +2587,7 @@ static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void finish_keepalive_ping_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
if ( t - > keepalive_state = = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING ) {
if ( error = = GRPC_ERROR_NONE ) {
t - > keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING ;
@ -2551,7 +2604,7 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
if ( t - > keepalive_state = = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING ) {
if ( error = = GRPC_ERROR_NONE ) {
t - > keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING ;
@ -2632,7 +2685,8 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_next_locked ( grpc_exec_ctx * exec_ctx ,
void * argp ,
grpc_error * error_ignored ) {
grpc_chttp2_incoming_byte_stream * bs = argp ;
grpc_chttp2_incoming_byte_stream * bs =
( grpc_chttp2_incoming_byte_stream * ) argp ;
grpc_chttp2_transport * t = bs - > transport ;
grpc_chttp2_stream * s = bs - > stream ;
@ -2842,7 +2896,8 @@ static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
static void incoming_byte_stream_destroy_locked ( grpc_exec_ctx * exec_ctx ,
void * byte_stream ,
grpc_error * error_ignored ) {
grpc_chttp2_incoming_byte_stream * bs = byte_stream ;
grpc_chttp2_incoming_byte_stream * bs =
( grpc_chttp2_incoming_byte_stream * ) byte_stream ;
grpc_chttp2_stream * s = bs - > stream ;
grpc_chttp2_transport * t = s - > t ;
@ -2857,7 +2912,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t , grpc_chttp2_stream * s ,
uint32_t frame_size , uint32_t flags ) {
grpc_chttp2_incoming_byte_stream * incoming_byte_stream =
gpr_malloc ( sizeof ( * incoming_byte_stream ) ) ;
( grpc_chttp2_incoming_byte_stream * ) gpr_malloc (
sizeof ( * incoming_byte_stream ) ) ;
incoming_byte_stream - > base . length = frame_size ;
incoming_byte_stream - > remaining_bytes = frame_size ;
incoming_byte_stream - > base . flags = flags ;
@ -2898,7 +2954,7 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
static void benign_reclaimer_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
if ( error = = GRPC_ERROR_NONE & &
grpc_chttp2_stream_map_size ( & t - > stream_map ) = = 0 ) {
/* Channel with no active streams: send a goaway to try and make it
@ -2928,11 +2984,12 @@ static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void destructive_reclaimer_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) arg ;
size_t n = grpc_chttp2_stream_map_size ( & t - > stream_map ) ;
t - > destructive_reclaimer_registered = false ;
if ( error = = GRPC_ERROR_NONE & & n > 0 ) {
grpc_chttp2_stream * s = grpc_chttp2_stream_map_rand ( & t - > stream_map ) ;
grpc_chttp2_stream * s =
( grpc_chttp2_stream * ) grpc_chttp2_stream_map_rand ( & t - > stream_map ) ;
if ( GRPC_TRACER_ON ( grpc_resource_quota_trace ) ) {
gpr_log ( GPR_DEBUG , " HTTP2: %s - abandon stream id %d " , t - > peer_string ,
s - > id ) ;
@ -2976,10 +3033,13 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
destroy_transport ,
chttp2_get_endpoint } ;
static const grpc_transport_vtable * get_vtable ( void ) { return & vtable ; }
grpc_transport * grpc_create_chttp2_transport (
grpc_exec_ctx * exec_ctx , const grpc_channel_args * channel_args ,
grpc_endpoint * ep , int is_client ) {
grpc_chttp2_transport * t = gpr_zalloc ( sizeof ( grpc_chttp2_transport ) ) ;
grpc_chttp2_transport * t =
( grpc_chttp2_transport * ) gpr_zalloc ( sizeof ( grpc_chttp2_transport ) ) ;
init_transport ( exec_ctx , t , channel_args , ep , is_client ! = 0 ) ;
return & t - > base ;
}