@ -35,7 +35,6 @@
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/http/parser.h"
# include "src/core/lib/iomgr/timer.h"
# include "src/core/lib/iomgr/workqueue.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/slice/slice_internal.h"
# include "src/core/lib/slice/slice_string_helpers.h"
@ -53,7 +52,7 @@
# define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
# define MAX_WINDOW 0x7fffffffu
# define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
# define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
# define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024)
# define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX
# define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */
@ -77,6 +76,10 @@ static bool g_default_keepalive_permit_without_calls =
grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER ( false ) ;
grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER ( false ) ;
# ifndef NDEBUG
grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER ( false ) ;
# endif
static const grpc_transport_vtable vtable ;
/* forward declarations of various callbacks that we'll build closures around */
@ -92,8 +95,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *t,
static void complete_fetch_locked ( grpc_exec_ctx * exec_ctx , void * gs ,
grpc_error * error ) ;
/** Set a transport level setting, and push it to our peer */
static void push_setting ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) ;
static void queue_setting_update ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) ;
static void close_from_api ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_stream * s , grpc_error * error ) ;
@ -213,20 +217,26 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free ( t ) ;
}
# ifdef GRPC_CHTTP2_REFCOU NTING_ DEBUG
# ifn def NDEBUG
void grpc_chttp2_unref_transport ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t , const char * reason ,
const char * file , int line ) {
gpr_log ( GPR_DEBUG , " chttp2:unref:%p % " PRIdPTR " ->% " PRIdPTR " %s [%s:%d] " , t ,
t - > refs . count , t - > refs . count - 1 , reason , file , line ) ;
if ( GRPC_TRACER_ON ( grpc_trace_chttp2_refcount ) ) {
gpr_atm val = gpr_atm_no_barrier_load ( & t - > refs . count ) ;
gpr_log ( GPR_DEBUG , " chttp2:unref:%p % " PRIdPTR " ->% " PRIdPTR " %s [%s:%d] " ,
t , val , val - 1 , reason , file , line ) ;
}
if ( ! gpr_unref ( & t - > refs ) ) return ;
destruct_transport ( exec_ctx , t ) ;
}
void grpc_chttp2_ref_transport ( grpc_chttp2_transport * t , const char * reason ,
const char * file , int line ) {
gpr_log ( GPR_DEBUG , " chttp2: ref:%p % " PRIdPTR " ->% " PRIdPTR " %s [%s:%d] " , t ,
t - > refs . count , t - > refs . count + 1 , reason , file , line ) ;
if ( GRPC_TRACER_ON ( grpc_trace_chttp2_refcount ) ) {
gpr_atm val = gpr_atm_no_barrier_load ( & t - > refs . count ) ;
gpr_log ( GPR_DEBUG , " chttp2: ref:%p % " PRIdPTR " ->% " PRIdPTR " %s [%s:%d] " ,
t , val , val + 1 , reason , file , line ) ;
}
gpr_ref ( & t - > refs ) ;
}
# else
@ -252,7 +262,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t - > ep = ep ;
/* one ref is for destroy */
gpr_ref_init ( & t - > refs , 1 ) ;
t - > combiner = grpc_combiner_create ( grpc_endpoint_get_workqueue ( ep ) ) ;
t - > combiner = grpc_combiner_create ( ) ;
t - > peer_string = grpc_endpoint_get_peer ( ep ) ;
t - > endpoint_reading = 1 ;
t - > next_stream_id = is_client ? 1 : 2 ;
@ -270,32 +280,32 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_slice_buffer_init ( & t - > outbuf ) ;
grpc_chttp2_hpack_compressor_init ( & t - > hpack_compressor ) ;
grpc_closure_init ( & t - > write_action , write_action , t ,
GRPC_CLOSURE_INIT ( & t - > write_action , write_action , t ,
grpc_schedule_on_exec_ctx ) ;
grpc_closure_init ( & t - > read_action_locked , read_action_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > benign_reclaimer_locked , benign_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > destructive_reclaimer_locked ,
GRPC_CLOSURE_INIT ( & t - > read_action_locked , read_action_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > benign_reclaimer_locked , benign_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > destructive_reclaimer_locked ,
destructive_reclaimer_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > retry_initiate_ping_locked , retry_initiate_ping_locked ,
t , grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > start_bdp_ping_locked , start_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > finish_bdp_ping_locked , finish_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > init_keepalive_ping_locked , init_keepalive_ping_locked ,
t , grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > start_keepalive_ping_locked ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > retry_initiate_ping_locked , retry_initiate_ping_locked ,
t , grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > start_bdp_ping_locked , start_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > finish_bdp_ping_locked , finish_bdp_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > init_keepalive_ping_locked , init_keepalive_ping_locked ,
t , grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > start_keepalive_ping_locked ,
start_keepalive_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > finish_keepalive_ping_locked ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > finish_keepalive_ping_locked ,
finish_keepalive_ping_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_closure_init ( & t - > keepalive_watchdog_fired_locked ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CLOSURE_INIT ( & t - > keepalive_watchdog_fired_locked ,
keepalive_watchdog_fired_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
grpc_combiner_scheduler ( t - > combiner ) ) ;
grpc_bdp_estimator_init ( & t - > bdp_estimator , t - > peer_string ) ;
t - > last_pid_update = gpr_now ( GPR_CLOCK_MONOTONIC ) ;
@ -338,20 +348,21 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if ( is_client ) {
grpc_slice_buffer_add ( & t - > outbuf , grpc_slice_from_copied_string (
GRPC_CHTTP2_CLIENT_CONNECT_STRING ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " initial_write " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " initial_write " ) ;
}
/* configure http2 the way we like it */
if ( is_client ) {
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_ENABLE_PUSH , 0 ) ;
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS , 0 ) ;
queue_setting_update ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_ENABLE_PUSH , 0 ) ;
queue_setting_update ( exec_ctx , t ,
GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS , 0 ) ;
}
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
DEFAULT_WINDOW ) ;
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
DEFAULT_MAX_HEADER_LIST_SIZE ) ;
push_setting ( exec_ctx , t ,
GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA , 1 ) ;
queue_setting_update ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
DEFAULT_WINDOW ) ;
queue_setting_update ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
DEFAULT_MAX_HEADER_LIST_SIZE ) ;
queue_setting_update ( exec_ctx , t ,
GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA , 1 ) ;
t - > ping_policy = ( grpc_chttp2_repeated_ping_policy ) {
. max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA ,
@ -518,8 +529,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
int value = grpc_channel_arg_get_integer (
& channel_args - > args [ i ] , settings_map [ j ] . integer_options ) ;
if ( value > = 0 ) {
push_setting ( exec_ctx , t , settings_map [ j ] . setting_id ,
( uint32_t ) value ) ;
queue_setting_update ( exec_ctx , t , settings_map [ j ] . setting_id ,
( uint32_t ) value ) ;
}
}
break ;
@ -550,7 +561,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t - > keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED ;
}
grpc_chttp2_initiate_write ( exec_ctx , t , false , " init " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " init " ) ;
post_benign_reclaimer ( exec_ctx , t ) ;
}
@ -568,9 +579,9 @@ static void destroy_transport_locked(grpc_exec_ctx *exec_ctx, void *tp,
static void destroy_transport ( grpc_exec_ctx * exec_ctx , grpc_transport * gt ) {
grpc_chttp2_transport * t = ( grpc_chttp2_transport * ) gt ;
grpc_closure_sched ( exec_ctx , grpc_closure_create (
destroy_transport_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_CLOSURE_SCHED ( exec_ctx ,
GRPC_CLOSURE_CREATE ( destroy_transport_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
}
@ -621,7 +632,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF ( error ) ;
}
# ifdef GRPC_STREAM_REFCOU NT_ DEBUG
# ifn def NDEBUG
void grpc_chttp2_stream_ref ( grpc_chttp2_stream * s , const char * reason ) {
grpc_stream_ref ( s - > refcount , reason ) ;
}
@ -657,13 +668,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_data_parser_init ( & s - > data_parser ) ;
grpc_slice_buffer_init ( & s - > flow_controlled_buffer ) ;
s - > deadline = gpr_inf_future ( GPR_CLOCK_MONOTONIC ) ;
grpc_closure_init ( & s - > complete_fetch_locked , complete_fetch_locked , s ,
GRPC_CLOSURE_INIT ( & s - > complete_fetch_locked , complete_fetch_locked , s ,
grpc_schedule_on_exec_ctx ) ;
grpc_slice_buffer_init ( & s - > unprocessed_incoming_frames_buffer ) ;
grpc_slice_buffer_init ( & s - > frame_storage ) ;
s - > pending_byte_stream = false ;
grpc_closure_init ( & s - > reset_byte_stream , reset_byte_stream , s ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ;
GRPC_CLOSURE_INIT ( & s - > reset_byte_stream , reset_byte_stream , s ,
grpc_combiner_scheduler ( t - > combiner ) ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " stream " ) ;
@ -734,7 +745,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END ( " destroy_stream " , 0 ) ;
grpc_closure_sched ( exec_ctx , s - > destroy_stream_arg , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > destroy_stream_arg , GRPC_ERROR_NONE ) ;
}
static void destroy_stream ( grpc_exec_ctx * exec_ctx , grpc_transport * gt ,
@ -745,9 +756,9 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream * s = ( grpc_chttp2_stream * ) gs ;
s - > destroy_stream_arg = then_schedule_closure ;
grpc_closure_sched (
exec_ctx , grpc_closure_init ( & s - > destroy_stream , destroy_stream_locked , s ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_CLOSURE_SCHED (
exec_ctx , GRPC_CLOSURE_INIT ( & s - > destroy_stream , destroy_stream_locked , s ,
grpc_combiner_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " destroy_stream " , 0 ) ;
}
@ -785,8 +796,6 @@ static const char *write_state_name(grpc_chttp2_write_state st) {
return " WRITING " ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE :
return " WRITING+MORE " ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER :
return " WRITING+MORE+COVERED " ;
}
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
@ -799,7 +808,7 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
write_state_name ( st ) , reason ) ) ;
t - > write_state = st ;
if ( st = = GRPC_CHTTP2_WRITE_STATE_IDLE ) {
grpc_closure_list_sched ( exec_ctx , & t - > run_after_write ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & t - > run_after_write ) ;
if ( t - > close_transport_on_writes_finished ! = NULL ) {
grpc_error * err = t - > close_transport_on_writes_finished ;
t - > close_transport_on_writes_finished = NULL ;
@ -809,38 +818,25 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
void grpc_chttp2_initiate_write ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
bool covered_by_poller , const char * reason ) {
grpc_chttp2_transport * t , const char * reason ) {
GPR_TIMER_BEGIN ( " grpc_chttp2_initiate_write " , 0 ) ;
switch ( t - > write_state ) {
case GRPC_CHTTP2_WRITE_STATE_IDLE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING , reason ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_closure_sched (
GRPC_CLOSURE_SCHED (
exec_ctx ,
grpc_closure_init (
& t - > write_action_begin_locked , write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , covered_by_poll er ) ) ,
GRPC_CLOSURE_INIT ( & t - > write_action_begin_locked ,
write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING :
set_write_state (
exec_ctx , t ,
covered_by_poller
? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER
: GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE ,
reason ) ;
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE ,
reason ) ;
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE :
if ( covered_by_poller ) {
set_write_state (
exec_ctx , t ,
GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER ,
reason ) ;
}
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER :
break ;
}
GPR_TIMER_END ( " grpc_chttp2_initiate_write " , 0 ) ;
@ -856,10 +852,10 @@ void grpc_chttp2_become_writable(
case GRPC_CHTTP2_STREAM_WRITE_PIGGYBACK :
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , true , reason ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , reason ) ;
break ;
case GRPC_CHTTP2_STREAM_WRITE_INITIATE_UNCOVERED :
grpc_chttp2_initiate_write ( exec_ctx , t , false , reason ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , reason ) ;
break ;
}
}
@ -879,12 +875,12 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt,
case GRPC_CHTTP2_PARTIAL_WRITE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE ,
" begin writing partial " ) ;
grpc_closure_sched ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_FULL_WRITE :
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" begin writing " ) ;
grpc_closure_sched ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & t - > write_action , GRPC_ERROR_NONE ) ;
break ;
}
GPR_TIMER_END ( " write_action_begin_locked " , 0 ) ;
@ -895,8 +891,8 @@ static void write_action(grpc_exec_ctx *exec_ctx, void *gt, grpc_error *error) {
GPR_TIMER_BEGIN ( " write_action " , 0 ) ;
grpc_endpoint_write (
exec_ctx , t - > ep , & t - > outbuf ,
grpc_closure_init ( & t - > write_action_end_locked , write_action_end_locked , t ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ) ;
GRPC_CLOSURE_INIT ( & t - > write_action_end_locked , write_action_end_locked , t ,
grpc_combiner_scheduler ( t - > combiner ) ) ) ;
GPR_TIMER_END ( " write_action " , 0 ) ;
}
@ -930,23 +926,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" continue writing [!covered] " ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_closure_run (
GRPC_CLOSURE_RUN (
exec_ctx ,
grpc_closure_init (
& t - > write_action_begin_locked , write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
break ;
case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE_AND_COVERED_BY_POLLER :
GPR_TIMER_MARK ( " state=writing_stale_with_poller " , 0 ) ;
set_write_state ( exec_ctx , t , GRPC_CHTTP2_WRITE_STATE_WRITING ,
" continue writing [covered] " ) ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " writing " ) ;
grpc_closure_run (
exec_ctx ,
grpc_closure_init ( & t - > write_action_begin_locked ,
GRPC_CLOSURE_INIT ( & t - > write_action_begin_locked ,
write_action_begin_locked , t ,
grpc_combiner_finally_scheduler ( t - > combiner , true ) ) ,
grpc_combiner_finally_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
break ;
}
@ -957,8 +941,11 @@ static void write_action_end_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_END ( " terminate_writing_with_lock " , 0 ) ;
}
static void push_setting ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) {
// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
// If the change needs to occur immediately, manually initiate a write.
static void queue_setting_update ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) {
const grpc_chttp2_setting_parameters * sp =
& grpc_chttp2_settings_parameters [ id ] ;
uint32_t use_value = GPR_CLAMP ( value , sp - > min_value , sp - > max_value ) ;
@ -969,7 +956,6 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
if ( use_value ! = t - > settings [ GRPC_LOCAL_SETTINGS ] [ id ] ) {
t - > settings [ GRPC_LOCAL_SETTINGS ] [ id ] = use_value ;
t - > dirtied_local_settings = 1 ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " push_setting " ) ;
}
}
@ -1074,7 +1060,7 @@ static void null_then_run_closure(grpc_exec_ctx *exec_ctx,
grpc_closure * * closure , grpc_error * error ) {
grpc_closure * c = * closure ;
* closure = NULL ;
grpc_closure_run ( exec_ctx , c , error ) ;
GRPC_CLOSURE_RUN ( exec_ctx , c , error ) ;
}
void grpc_chttp2_complete_closure_step ( grpc_exec_ctx * exec_ctx ,
@ -1116,7 +1102,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
}
if ( ( t - > write_state = = GRPC_CHTTP2_WRITE_STATE_IDLE ) | |
! ( closure - > next_data . scratch & CLOSURE_BARRIER_MAY_COVER_WRITE ) ) {
grpc_closure_run ( exec_ctx , closure , closure - > error_data . error ) ;
GRPC_CLOSURE_RUN ( exec_ctx , closure , closure - > error_data . error ) ;
} else {
grpc_closure_list_append ( & t - > run_after_write , closure ,
closure - > error_data . error ) ;
@ -1252,7 +1238,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_closure * on_complete = op - > on_complete ;
if ( on_complete = = NULL ) {
on_complete =
grpc_closure_create ( do_nothing , NULL , grpc_schedule_on_exec_ctx ) ;
GRPC_CLOSURE_CREATE ( do_nothing , NULL , grpc_schedule_on_exec_ctx ) ;
}
/* use final_data as a barrier until enqueue time; the inital counter is
@ -1365,7 +1351,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
s - > next_message_end_offset = s - > flow_controlled_bytes_written +
( int64_t ) s - > flow_controlled_buffer . length +
( int64_t ) len ;
s - > complete_fetch_covered_by_poller = op - > covered_by_poller ;
if ( flags & GRPC_WRITE_BUFFER_HINT ) {
s - > next_message_end_offset - = t - > write_buffer_size ;
s - > write_buffering = true ;
@ -1432,6 +1417,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op_payload - > recv_initial_metadata . recv_initial_metadata_ready ;
s - > recv_initial_metadata =
op_payload - > recv_initial_metadata . recv_initial_metadata ;
s - > trailing_metadata_available =
op_payload - > recv_initial_metadata . trailing_metadata_available ;
grpc_chttp2_maybe_complete_recv_initial_metadata ( exec_ctx , t , s ) ;
}
@ -1485,11 +1472,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
op - > handler_private . extra_arg = gs ;
GRPC_CHTTP2_STREAM_REF ( s , " perform_stream_op " ) ;
grpc_closure_sched (
GRPC_CLOSURE_SCHED (
exec_ctx ,
grpc_closure_init (
& op - > handler_private . closure , perform_stream_op_locked , op ,
grpc_combiner_scheduler ( t - > combiner , op - > covered_by_poller ) ) ,
GRPC_CLOSURE_INIT ( & op - > handler_private . closure , perform_stream_op_locked ,
op , grpc_combiner_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " perform_stream_op " , 0 ) ;
}
@ -1502,7 +1488,7 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_ping_queue * pq = & t - > ping_queues [ i ] ;
for ( size_t j = 0 ; j < GRPC_CHTTP2_PCL_COUNT ; j + + ) {
grpc_closure_list_fail_all ( & pq - > lists [ j ] , GRPC_ERROR_REF ( error ) ) ;
grpc_closure_list_sched ( exec_ctx , & pq - > lists [ j ] ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & pq - > lists [ j ] ) ;
}
}
GRPC_ERROR_UNREF ( error ) ;
@ -1516,7 +1502,7 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_NONE ) ;
if ( grpc_closure_list_append ( & pq - > lists [ GRPC_CHTTP2_PCL_NEXT ] , on_ack ,
GRPC_ERROR_NONE ) ) {
grpc_chttp2_initiate_write ( exec_ctx , t , false , " send_ping " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " send_ping " ) ;
}
}
@ -1524,7 +1510,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_error * error ) {
grpc_chttp2_transport * t = tp ;
t - > ping_state . is_delayed_ping_timer_set = false ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " retry_send_ping " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " retry_send_ping " ) ;
}
void grpc_chttp2_ack_ping ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
@ -1537,9 +1523,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_free ( from ) ;
return ;
}
grpc_closure_list_sched ( exec_ctx , & pq - > lists [ GRPC_CHTTP2_PCL_INFLIGHT ] ) ;
GRPC_CLOSURE_LIST_SCHED ( exec_ctx , & pq - > lists [ GRPC_CHTTP2_PCL_INFLIGHT ] ) ;
if ( ! grpc_closure_list_empty ( pq - > lists [ GRPC_CHTTP2_PCL_NEXT ] ) ) {
grpc_chttp2_initiate_write ( exec_ctx , t , false , " continue_pings " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " continue_pings " ) ;
}
}
@ -1552,7 +1538,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
& slice , & http_error ) ;
grpc_chttp2_goaway_append ( t - > last_new_stream_id , ( uint32_t ) http_error ,
grpc_slice_ref_internal ( slice ) , & t - > qbuf ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " goaway_sent " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " goaway_sent " ) ;
GRPC_ERROR_UNREF ( error ) ;
}
@ -1578,12 +1564,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport * t = op - > handler_private . extra_arg ;
grpc_error * close_transport = op - > disconnect_with_error ;
if ( op - > on_connectivity_state_change ! = NULL ) {
grpc_connectivity_state_notify_on_state_change (
exec_ctx , & t - > channel_callback . state_tracker , op - > connectivity_state ,
op - > on_connectivity_state_change ) ;
}
if ( op - > goaway_error ) {
send_goaway ( exec_ctx , t , op - > goaway_error ) ;
}
@ -1607,11 +1587,17 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
op - > send_ping ) ;
}
if ( op - > on_connectivity_state_change ! = NULL ) {
grpc_connectivity_state_notify_on_state_change (
exec_ctx , & t - > channel_callback . state_tracker , op - > connectivity_state ,
op - > on_connectivity_state_change ) ;
}
if ( close_transport ! = GRPC_ERROR_NONE ) {
close_transport_locked ( exec_ctx , t , close_transport ) ;
}
grpc_closure_run ( exec_ctx , op - > on_consumed , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_RUN ( exec_ctx , op - > on_consumed , GRPC_ERROR_NONE ) ;
GRPC_CHTTP2_UNREF_TRANSPORT ( exec_ctx , t , " transport_op " ) ;
}
@ -1623,11 +1609,11 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_free ( msg ) ;
op - > handler_private . extra_arg = gt ;
GRPC_CHTTP2_REF_TRANSPORT ( t , " transport_op " ) ;
grpc_closure_sched (
exec_ctx , grpc_closure_init ( & op - > handler_private . closure ,
perform_transport_op_locked , op ,
grpc_combiner_scheduler ( t - > combiner , false ) ) ,
GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx ,
GRPC_CLOSURE_INIT ( & op - > handler_private . closure ,
perform_transport_op_locked , op ,
grpc_combiner_scheduler ( t - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
}
/*******************************************************************************
@ -1782,7 +1768,7 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_add (
& t - > qbuf , grpc_chttp2_rst_stream_create ( s - > id , ( uint32_t ) http_error ,
& s - > stats . outgoing ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " rst_stream " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " rst_stream " ) ;
}
}
if ( due_to_error ! = GRPC_ERROR_NONE & & ! s - > seen_error ) {
@ -2095,7 +2081,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
& s - > stats . outgoing ) ) ;
grpc_chttp2_mark_stream_closed ( exec_ctx , t , s , 1 , 1 , error ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , false , " close_from_api " ) ;
grpc_chttp2_initiate_write ( exec_ctx , t , " close_from_api " ) ;
}
typedef struct {
@ -2137,8 +2123,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log ( GPR_DEBUG , " %s: update initial window size to %d " , t - > peer_string ,
( int ) bdp ) ;
}
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
( uint32_t ) bdp ) ;
queue_setting_update ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
( uint32_t ) bdp ) ;
}
static void update_frame ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
@ -2157,8 +2143,8 @@ static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log ( GPR_DEBUG , " %s: update max_frame size to %d " , t - > peer_string ,
( int ) frame_size ) ;
}
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ,
( uint32_t ) frame_size ) ;
queue_setting_update ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ,
( uint32_t ) frame_size ) ;
}
static grpc_error * try_http_parsing ( grpc_exec_ctx * exec_ctx ,
@ -2502,7 +2488,7 @@ static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_maybe_complete_recv_trailing_metadata ( exec_ctx , s - > t , s ) ;
} else {
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
grpc_closure_sched ( exec_ctx , s - > on_next , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > on_next , GRPC_ERROR_REF ( error ) ) ;
s - > on_next = NULL ;
GRPC_ERROR_UNREF ( s - > byte_stream_error ) ;
s - > byte_stream_error = GRPC_ERROR_NONE ;
@ -2581,9 +2567,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if ( s - > frame_storage . length > 0 ) {
grpc_slice_buffer_swap ( & s - > frame_storage ,
& s - > unprocessed_incoming_frames_buffer ) ;
grpc_closure_sched ( exec_ctx , bs - > next_action . on_complete , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , bs - > next_action . on_complete , GRPC_ERROR_NONE ) ;
} else if ( s - > byte_stream_error ! = GRPC_ERROR_NONE ) {
grpc_closure_sched ( exec_ctx , bs - > next_action . on_complete ,
GRPC_CLOSURE_SCHED ( exec_ctx , bs - > next_action . on_complete ,
GRPC_ERROR_REF ( s - > byte_stream_error ) ) ;
if ( s - > data_parser . parsing_frame ! = NULL ) {
incoming_byte_stream_unref ( exec_ctx , s - > data_parser . parsing_frame ) ;
@ -2593,7 +2579,7 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
if ( bs - > remaining_bytes ! = 0 ) {
s - > byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Truncated message " ) ;
grpc_closure_sched ( exec_ctx , bs - > next_action . on_complete ,
GRPC_CLOSURE_SCHED ( exec_ctx , bs - > next_action . on_complete ,
GRPC_ERROR_REF ( s - > byte_stream_error ) ) ;
if ( s - > data_parser . parsing_frame ! = NULL ) {
incoming_byte_stream_unref ( exec_ctx , s - > data_parser . parsing_frame ) ;
@ -2624,11 +2610,11 @@ static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
gpr_ref ( & bs - > refs ) ;
bs - > next_action . max_size_hint = max_size_hint ;
bs - > next_action . on_complete = on_complete ;
grpc_closure_sched (
GRPC_CLOSURE_SCHED (
exec_ctx ,
grpc_closure_init (
& bs - > next_action . closure , incoming_byte_stream_next_locked , bs ,
grpc_combiner_scheduler ( bs - > transport - > combiner , false ) ) ,
GRPC_CLOSURE_INIT ( & bs - > next_action . closure ,
incoming_byte_stream_next_locked , bs ,
grpc_combiner_scheduler ( bs - > transport - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " incoming_byte_stream_next " , 0 ) ;
return false ;
@ -2653,7 +2639,7 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
} else {
grpc_error * error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Truncated message " ) ;
grpc_closure_sched ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
return error ;
}
GPR_TIMER_END ( " incoming_byte_stream_pull " , 0 ) ;
@ -2682,11 +2668,10 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN ( " incoming_byte_stream_destroy " , 0 ) ;
grpc_chttp2_incoming_byte_stream * bs =
( grpc_chttp2_incoming_byte_stream * ) byte_stream ;
grpc_closure_sched (
exec_ctx ,
grpc_closure_init (
& bs - > destroy_action , incoming_byte_stream_destroy_locked , bs ,
grpc_combiner_scheduler ( bs - > transport - > combiner , false ) ) ,
GRPC_CLOSURE_SCHED (
exec_ctx , GRPC_CLOSURE_INIT (
& bs - > destroy_action , incoming_byte_stream_destroy_locked ,
bs , grpc_combiner_scheduler ( bs - > transport - > combiner ) ) ,
GRPC_ERROR_NONE ) ;
GPR_TIMER_END ( " incoming_byte_stream_destroy " , 0 ) ;
}
@ -2697,7 +2682,7 @@ static void incoming_byte_stream_publish_error(
grpc_chttp2_stream * s = bs - > stream ;
GPR_ASSERT ( error ! = GRPC_ERROR_NONE ) ;
grpc_closure_sched ( exec_ctx , s - > on_next , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , s - > on_next , GRPC_ERROR_REF ( error ) ) ;
s - > on_next = NULL ;
GRPC_ERROR_UNREF ( s - > byte_stream_error ) ;
s - > byte_stream_error = GRPC_ERROR_REF ( error ) ;
@ -2714,7 +2699,7 @@ grpc_error *grpc_chttp2_incoming_byte_stream_push(
grpc_error * error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING ( " Too many bytes in stream " ) ;
grpc_closure_sched ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
grpc_slice_unref_internal ( exec_ctx , slice ) ;
return error ;
} else {
@ -2737,7 +2722,7 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished(
}
}
if ( error ! = GRPC_ERROR_NONE & & reset_on_error ) {
grpc_closure_sched ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & s - > reset_byte_stream , GRPC_ERROR_REF ( error ) ) ;
}
incoming_byte_stream_unref ( exec_ctx , bs ) ;
return error ;
@ -2757,6 +2742,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
gpr_ref_init ( & incoming_byte_stream - > refs , 2 ) ;
incoming_byte_stream - > transport = t ;
incoming_byte_stream - > stream = s ;
GRPC_ERROR_UNREF ( s - > byte_stream_error ) ;
s - > byte_stream_error = GRPC_ERROR_NONE ;
return incoming_byte_stream ;
}
@ -2971,5 +2957,5 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_move_into ( read_buffer , & t - > read_buffer ) ;
gpr_free ( read_buffer ) ;
}
grpc_closure_sched ( exec_ctx , & t - > read_action_locked , GRPC_ERROR_NONE ) ;
GRPC_CLOSURE_SCHED ( exec_ctx , & t - > read_action_locked , GRPC_ERROR_NONE ) ;
}