@ -73,20 +73,14 @@ static const grpc_transport_vtable vtable;
static void write_action_begin_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void write_action ( grpc_exec_ctx * exec_ctx , void * t , grpc_error * error ) ;
static void write_action_end ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void write_action_end_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void read_action_begin ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void read_action_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void complete_fetch_locked ( grpc_exec_ctx * exec_ctx , void * gs ,
grpc_error * error ) ;
static void complete_fetch ( grpc_exec_ctx * exec_ctx , void * gs ,
grpc_error * error ) ;
/** Set a transport level setting, and push it to our peer */
static void push_setting ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) ;
@ -112,12 +106,8 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void * byte_stream ,
grpc_error * error_ignored ) ;
static void benign_reclaimer ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void benign_reclaimer_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void destructive_reclaimer ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void destructive_reclaimer_locked ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
@ -166,8 +156,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while ( t - > pings . next ! = & t - > pings ) {
grpc_chttp2_outstanding_ping * ping = t - > pings . next ;
grpc_exec_ctx _sched ( exec_ctx , ping - > on_recv ,
GRPC_ERROR_CREATE ( " Transport closed " ) , NULL ) ;
grpc_closur e_sched ( exec_ctx , ping - > on_recv ,
GRPC_ERROR_CREATE ( " Transport closed " ) ) ;
ping - > next - > prev = ping - > prev ;
ping - > prev - > next = ping - > next ;
gpr_free ( ping ) ;
@ -246,18 +236,15 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init ( & t - > outbuf ) ;
grpc_chttp2_hpack_compressor_init ( & t - > hpack_compressor ) ;
grpc_closure_init ( & t - > write_action_begin_locked , write_action_begin_locked ,
t ) ;
grpc_closure_init ( & t - > write_action , write_action , t ) ;
grpc_closure_init ( & t - > write_action_end , write_action_end , t ) ;
grpc_closure_init ( & t - > write_action_end_locked , write_action_end_locked , t ) ;
grpc_closure_init ( & t - > read_action_begin , read_action_begin , t ) ;
grpc_closure_init ( & t - > read_action_locked , read_action_locked , t ) ;
grpc_closure_init ( & t - > benign_reclaimer , benign_reclaimer , t ) ;
grpc_closure_init ( & t - > destructive_reclaimer , destructive_reclaimer , t ) ;
grpc_closure_init ( & t - > benign_reclaimer_locked , benign_reclaimer_locked , t ) ;
grpc_closure_init ( & t - > write_action , write_action , t ,
grpc_schedule_on_exec_ctx ) ;
grpc_closure_init ( & t - > read_action_locked , read_action_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > benign_reclaimer_locked , benign_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > destructive_reclaimer_locked ,
destructive_reclaimer_locked , t ) ;
destructive_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_chttp2_goaway_parser_init ( & t - > goaway_parser ) ;
grpc_chttp2_hpack_parser_init ( & t - > hpack_parser ) ;
@ -395,9 +382,10 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport ( grpc_exec_ctx * exec_ctx , grpc_transport * gt ) {
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) gt ;
grpc_combiner_execute ( exec_ctx , t - > combiner ,
grpc_closure_create ( destroy_transport_locked , t ) ,
GRPC_ERROR_NONE , false ) ;
grpc_closure_sched ( exec_ctx , grpc_closure_create (
destroy_transport_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
}
static void close_transport_locked ( grpc_exec_ctx * exec_ctx ,
@ -471,8 +459,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init ( & s - > data_parser ) ;
grpc_slice_buffer_init ( & s - > flow_controlled_buffer ) ;
s - > deadline = gpr_inf_future ( GPR_CLOCK_MONOTONIC ) ;
grpc_closure_init ( & s - > complete_fetch , complete_fetch , s ) ;
grpc_closure_init ( & s - > complete_fetch_locked , complete_fetch_locked , s ) ;
grpc_closure_init ( & s - > complete_fetch_locked , complete_fetch_locked , s ,
grpc_schedule_on_exec_ctx ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " stream " ) ;
@ -547,9 +535,10 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) gs ;
s - > destroy_stream_arg = and_free_memory ;
grpc_closure_init ( & s - > destroy_stream , destroy_stream_locked , s ) ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & s - > destroy_stream ,
GRPC_ERROR_NONE , false ) ;
grpc_closure_sched (
exec_ctx , grpc_closure_init ( & s - > destroy_stream , destroy_stream_locked , s ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " destroy_stream " , 0 ) ;
}
@ -600,7 +589,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name ( st ) , reason ) ) ;
t - > write_state = st ;
if ( st = = GRPC_CHTTP2_WRITE_STATE_IDLE ) {
grpc_exec_ctx_enqueue_list ( exec_ctx , & t - > run_after_write , NULL ) ;
grpc_closure_list_sched ( exec_ctx , & t - > run_after_write ) ;
if ( t - > close_transport_on_writes_finished ! = NULL ) {
grpc_error * err = t - > close_transport_on_writes_finished ;
t - > close_transport_on_writes_finished = NULL ;
@ -618,9 +607,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx,
case GRPC_CHTTP2_WRITE_STATE_IDLE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING , reason ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_combiner_execute_finally ( exec_ctx , t - > combiner ,
& t - > write_action_begin_locked ,
GRPC_ERROR_NONE , covered_by_poller ) ;
grpc_closure_sched (
exec_ctx ,
grpc_closure_init (
& t - > write_action_begin_locked , write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , covered_by_poller ) ) ,
GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING :
set_write_state (
@ -662,7 +654,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
if ( ! t - > closed & & grpc_chttp2_begin_write ( exec_ctx , t ) ) {
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" begin writing " ) ;
grpc_exec_ctx _sched ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE , NULL ) ;
grpc_closur e_sched ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
} else {
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_IDLE ,
" begin writing nothing " ) ;
@ -674,19 +666,13 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
static void write_action ( grpc_exec_ctx * exec_ctx , void * gt , grpc_error * error ) {
grpc_chttp2_transport * t = gt ;
GPR_TIMER_BEGIN ( " write_action " , 0 ) ;
grpc_endpoint_write ( exec_ctx , t - > ep , & t - > outbuf , & t - > write_action_end ) ;
grpc_endpoint_write (
exec_ctx , t - > ep , & t - > outbuf ,
grpc_closure_init ( & t - > write_action_end_locked , write_action_end_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ) ;
GPR_TIMER_END ( " write_action " , 0 ) ;
}
static void write_action_end ( grpc_exec_ctx * exec_ctx , void * gt ,
grpc_error * error ) {
grpc_chttp2_transport * t = gt ;
GPR_TIMER_BEGIN ( " write_action_end " , 0 ) ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & t - > write_action_end_locked ,
GRPC_ERROR_REF ( error ) , false ) ;
GPR_TIMER_END ( " write_action_end " , 0 ) ;
}
static void write_action_end_locked ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
GPR_TIMER_BEGIN ( " terminate_writing_with_lock " , 0 ) ;
@ -716,18 +702,24 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" continue writing [!covered] " ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_combiner_execute_finally ( exec_ctx , t - > combiner ,
& t - > write_action_begin_locked ,
GRPC_ERROR_NONE , false ) ;
grpc_closure_run (
exec_ctx ,
grpc_closure_init (
& t - > write_action_begin_locked , write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER :
GPR_TIMER_MARK ( " state=writing_stale_with_poller " , 0 ) ;
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" continue writing [covered] " ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_combiner_execute_finally ( exec_ctx , t - > combiner ,
& t - > write_action_begin_locked ,
GRPC_ERROR_NONE , true ) ;
grpc_closure_run (
exec_ctx ,
grpc_closure_init ( & t - > write_action_begin_locked ,
write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , true ) ) ,
GRPC_ERROR_NONE ) ;
break ;
}
@ -965,15 +957,6 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
}
}
static void complete_fetch ( grpc_exec_ctx * exec_ctx , void * gs ,
grpc_error * error ) {
grpc_chttp2_stream * s = gs ;
grpc_chttp2_transport * t = s - > t ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & s - > complete_fetch_locked ,
GRPC_ERROR_REF ( error ) ,
s - > complete_fetch_covered_by_poller ) ;
}
static void do_nothing ( grpc_exec_ctx * exec_ctx , void * arg , grpc_error * error ) { }
static void log_metadata ( const grpc_metadata_batch * md_batch , uint32_t id ,
@ -1009,7 +992,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure * on_complete = op - > on_complete ;
if ( on_complete = = NULL ) {
on_complete = grpc_closure_create ( do_nothing , NULL ) ;
on_complete =
grpc_closure_create ( do_nothing , NULL , grpc_schedule_on_exec_ctx ) ;
}
/* use final_data as a barrier until enqueue time; the inital counter is
@ -1212,13 +1196,15 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free ( str ) ;
}
grpc_closure_init ( & op - > transport_private . closure , perform_stream_op_locked ,
op ) ;
op - > transport_private . args [ 0 ] = gt ;
op - > transport_private . args [ 1 ] = gs ;
GRPC_CHTTP2_STREAM_REF ( s , " perform_stream_op " ) ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & op - > transport_private . closure ,
GRPC_ERROR_NONE , op - > covered_by_poller ) ;
grpc_closure_sched (
exec_ctx ,
grpc_closure_init (
& op - > transport_private . closure , perform_stream_op_locked , op ,
grpc_combiner_scheduler ( t - > combiner , op - > covered_by_poller ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " perform_stream_op " , 0 ) ;
}
@ -1247,7 +1233,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_outstanding_ping * ping ;
for ( ping = t - > pings . next ; ping ! = & t - > pings ; ping = ping - > next ) {
if ( 0 = = memcmp ( opaque_8bytes , ping - > id , 8 ) ) {
grpc_exec_ctx _sched ( exec_ctx , ping - > on_recv , GRPC_ERROR_NONE , NULL ) ;
grpc_closur e_sched ( exec_ctx , ping - > on_recv , GRPC_ERROR_NONE ) ;
ping - > next - > prev = ping - > prev ;
ping - > prev - > next = ping - > next ;
gpr_free ( ping ) ;
@ -1321,11 +1307,12 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
char * msg = grpc_transport_op_string ( op ) ;
gpr_free ( msg ) ;
op - > transport_private . args [ 0 ] = gt ;
grpc_closure_init ( & op - > transport_private . closure , perform_transport_op_locked ,
op ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " transport_op " ) ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & op - > transport_private . closure ,
GRPC_ERROR_NONE , false ) ;
grpc_closure_sched (
exec_ctx , grpc_closure_init ( & op - > transport_private . closure ,
perform_transport_op_locked , op ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
}
/*******************************************************************************
@ -1801,19 +1788,6 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
* INPUT PROCESSING - PARSING
*/
static void read_action_begin ( grpc_exec_ctx * exec_ctx , void * tp ,
grpc_error * error ) {
/* Control flow:
reading_action_locked - >
( parse_unlocked - > post_parse_locked ) ? - >
post_reading_action_locked */
GPR_TIMER_BEGIN ( " reading_action " , 0 ) ;
grpc_chttp2_transport * t = tp ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & t - > read_action_locked ,
GRPC_ERROR_REF ( error ) , false ) ;
GPR_TIMER_END ( " reading_action " , 0 ) ;
}
static grpc_error * try_http_parsing ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ) {
grpc_http_parser parser ;
@ -1913,7 +1887,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_slice_buffer_reset_and_unref ( & t - > read_buffer ) ;
if ( keep_reading ) {
grpc_endpoint_read ( exec_ctx , t - > ep , & t - > read_buffer , & t - > read_action_begin ) ;
grpc_endpoint_read ( exec_ctx , t - > ep , & t - > read_buffer ,
& t - > read_action_locked ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " keep_reading " ) ;
} else {
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " reading_action " ) ;
@ -2050,10 +2025,12 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
bs - > next_action . slice = slice ;
bs - > next_action . max_size_hint = max_size_hint ;
bs - > next_action . on_complete = on_complete ;
grpc_closure_init ( & bs - > next_action . closure , incoming_byte_stream_next_locked ,
bs ) ;
grpc_combiner_execute ( exec_ctx , bs - > transport - > combiner ,
& bs - > next_action . closure , GRPC_ERROR_NONE , false ) ;
grpc_closure_sched (
exec_ctx ,
grpc_closure_init (
& bs - > next_action . closure , incoming_byte_stream_next_locked , bs ,
grpc_combiner_scheduler ( bs - > transport - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " incoming_byte_stream_next " , 0 ) ;
return 0 ;
}
@ -2075,10 +2052,12 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN ( " incoming_byte_stream_destroy " , 0 ) ;
grpc_chttp2_incoming_byte_stream * bs =
( grpc_chttp2_incoming_byte_stream * ) byte_stream ;
grpc_closure_init ( & bs - > destroy_action , incoming_byte_stream_destroy_locked ,
bs ) ;
grpc_combiner_execute ( exec_ctx , bs - > transport - > combiner , & bs - > destroy_action ,
GRPC_ERROR_NONE , false ) ;
grpc_closure_sched (
exec_ctx ,
grpc_closure_init (
& bs - > destroy_action , incoming_byte_stream_destroy_locked , bs ,
grpc_combiner_scheduler ( bs - > transport - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " incoming_byte_stream_destroy " , 0 ) ;
}
@ -2086,7 +2065,7 @@ static void incoming_byte_stream_publish_error(
grpc_exec_ctx * exec_ctx , grpc_chttp2_incoming_byte_stream * bs ,
grpc_error * error ) {
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
grpc_exec_ctx _sched ( exec_ctx , bs - > on_next , GRPC_ERROR_REF ( error ) , NULL ) ;
grpc_closur e_sched ( exec_ctx , bs - > on_next , GRPC_ERROR_REF ( error ) ) ;
bs - > on_next = NULL ;
GRPC_ERROR_UNREF ( bs - > error ) ;
bs - > error = error ;
@ -2103,7 +2082,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
bs - > remaining_bytes - = ( uint32_t ) GRPC_SLICE_LENGTH ( slice ) ;
if ( bs - > on_next ! = NULL ) {
* bs - > next = slice ;
grpc_exec_ctx _sched ( exec_ctx , bs - > on_next , GRPC_ERROR_NONE , NULL ) ;
grpc_closur e_sched ( exec_ctx , bs - > on_next , GRPC_ERROR_NONE ) ;
bs - > on_next = NULL ;
} else {
grpc_slice_buffer_add ( & bs - > slices , slice ) ;
@ -2171,7 +2150,7 @@ static void post_benign_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT ( t , " benign_reclaimer " ) ;
grpc_resource_user_post_reclaimer ( exec_ctx ,
grpc_endpoint_get_resource_user ( t - > ep ) ,
false , & t - > benign_reclaimer ) ;
false , & t - > benign_reclaimer_locked ) ;
}
}
@ -2182,24 +2161,10 @@ static void post_destructive_reclaimer(grpc_exec_ctx *exec_ctx,
GRPC_CHTTP2_REF_TRANSPORT ( t , " destructive_reclaimer " ) ;
grpc_resource_user_post_reclaimer ( exec_ctx ,
grpc_endpoint_get_resource_user ( t - > ep ) ,
true , & t - > destructive_reclaimer ) ;
true , & t - > destructive_reclaimer_locked ) ;
}
}
static void benign_reclaimer ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & t - > benign_reclaimer_locked ,
GRPC_ERROR_REF ( error ) , false ) ;
}
static void destructive_reclaimer ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
grpc_combiner_execute ( exec_ctx , t - > combiner , & t - > destructive_reclaimer_locked ,
GRPC_ERROR_REF ( error ) , false ) ;
}
static void benign_reclaimer_locked ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_transport * t = arg ;
@ -2380,5 +2345,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into ( read_buffer , & t - > read_buffer ) ;
gpr_free ( read_buffer ) ;
}
read_action_begin ( exec_ctx , t , GRPC_ERROR_NONE ) ;
grpc_closure_sched ( exec_ctx , & t - > read_action_locked , GRPC_ERROR_NONE ) ;
}