@ -48,6 +48,7 @@
# include "src/core/ext/transport/chttp2/transport/status_conversion.h"
# include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
# include "src/core/lib/http/parser.h"
# include "src/core/lib/iomgr/workqueue.h"
# include "src/core/lib/profiling/timers.h"
# include "src/core/lib/support/string.h"
# include "src/core/lib/transport/static_metadata.h"
@ -60,9 +61,9 @@
# define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
# define MAX_CLIENT_STREAM_ID 0x7fffffffu
int grpc_http_trace = 0 ;
int grpc_flowctl_trace = 0 ;
int grpc_http_write_state_trace = 0 ;
# define TRANSPORT_FROM_WRITING(tw) \
( ( grpc_chttp2_transport * ) ( ( char * ) ( tw ) - offsetof ( grpc_chttp2_transport , \
@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable;
static void writing_action ( grpc_exec_ctx * exec_ctx , void * t , grpc_error * error ) ;
static void reading_action ( grpc_exec_ctx * exec_ctx , void * t , grpc_error * error ) ;
static void parsing_action ( grpc_exec_ctx * exec_ctx , void * t , grpc_error * error ) ;
static void initiate_writing ( grpc_exec_ctx * exec_ctx , void * t ,
grpc_error * error ) ;
static void start_writing ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ) ;
static void end_waiting_for_write ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t , grpc_error * error ) ;
/** Set a transport level setting, and push it to our peer */
static void push_setting ( grpc_chttp2_transport * t , grpc_chttp2_setting_id id ,
uint32_t value ) ;
static void push_setting ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) ;
/** Start disconnection chain */
static void drop_connection ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global * transport_global ) ;
static void incoming_byte_stream_update_flow_control (
grpc_chttp2_transport_global * transport_global ,
grpc_exec_ctx * exec_ctx , grpc_ chttp2_transport_global * transport_global ,
grpc_chttp2_stream_global * stream_global , size_t max_size_hint ,
size_t have_already ) ;
static void incoming_byte_stream_destroy_locked ( grpc_exec_ctx * exec_ctx ,
@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
gpr_free ( t ) ;
}
/*#define REFCOUNTING_DEBUG 1*/
# ifdef REFCOUNTING_DEBUG
# define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
# define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
static void init_transport ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
const grpc_channel_args * channel_args ,
grpc_endpoint * ep , uint8_t is_client ) {
grpc_endpoint * ep , bool is_client ) {
size_t i ;
int j ;
@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_closure_init ( & t - > writing_action , writing_action , t ) ;
grpc_closure_init ( & t - > reading_action , reading_action , t ) ;
grpc_closure_init ( & t - > parsing_action , parsing_action , t ) ;
grpc_closure_init ( & t - > initiate_writing , initiate_writing , t ) ;
gpr_slice_buffer_init ( & t - > parsing . qbuf ) ;
grpc_chttp2_goaway_parser_init ( & t - > parsing . goaway_parser ) ;
@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_slice_buffer_add (
& t - > global . qbuf ,
gpr_slice_from_copied_string ( GRPC_CHTTP2_CLIENT_CONNECT_STRING ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , & t - > global , false , " initial_write " ) ;
}
/* 8 is a random stab in the dark as to a good initial size: it's small enough
that it shouldn ' t waste memory for infrequently used connections , yet
@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
/* configure http2 the way we like it */
if ( is_client ) {
push_setting ( t , GRPC_CHTTP2_SETTINGS_ENABLE_PUSH , 0 ) ;
push_setting ( t , GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS , 0 ) ;
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_ENABLE_PUSH , 0 ) ;
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS , 0 ) ;
}
push_setting ( t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE , DEFAULT_WINDOW ) ;
push_setting ( t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ,
DEFAULT_WINDOW ) ;
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
DEFAULT_MAX_HEADER_LIST_SIZE ) ;
if ( channel_args ) {
@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log ( GPR_ERROR , " %s: must be an integer " ,
GRPC_ARG_MAX_CONCURRENT_STREAMS ) ;
} else {
push_setting ( t , GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS ,
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS ,
( uint32_t ) channel_args - > args [ i ] . value . integer ) ;
}
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log ( GPR_ERROR , " %s: must be non-negative " ,
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER ) ;
} else {
push_setting ( t , GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ,
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ,
( uint32_t ) channel_args - > args [ i ] . value . integer ) ;
}
} else if ( 0 = = strcmp ( channel_args - > args [ i ] . key ,
@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log ( GPR_ERROR , " %s: must be non-negative " ,
GRPC_ARG_MAX_METADATA_SIZE ) ;
} else {
push_setting ( t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
push_setting ( exec_ctx , t , GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE ,
( uint32_t ) channel_args - > args [ i ] . value . integer ) ;
}
}
@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport * t ,
grpc_error * error ) {
if ( ! t - > closed ) {
if ( grpc_http_write_state_trace ) {
gpr_log ( GPR_DEBUG , " W:%p close transport " , t ) ;
}
t - > closed = 1 ;
connectivity_state_set ( exec_ctx , & t - > global , GRPC_CHANNEL_SHUTDOWN ,
GRPC_ERROR_REF ( error ) , " close_transport " ) ;
@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_destroy (
& s - > global . received_trailing_metadata ) ;
gpr_slice_buffer_destroy ( & s - > writing . flow_controlled_buffer ) ;
GRPC_ERROR_UNREF ( s - > global . removal_error ) ;
GRPC_ERROR_UNREF ( s - > global . read_closed_error ) ;
GRPC_ERROR_UNREF ( s - > global . write_closed_error ) ;
UNREF_TRANSPORT ( exec_ctx , t , " stream " ) ;
@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
* LOCK MANAGEMENT
*/
static const char * write_state_name ( grpc_chttp2_write_state state ) {
switch ( state ) {
case GRPC_CHTTP2_WRITING_INACTIVE :
return " INACTIVE " ;
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER :
return " REQUESTED[p=0] " ;
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER :
return " REQUESTED[p=1] " ;
case GRPC_CHTTP2_WRITE_SCHEDULED :
return " SCHEDULED " ;
case GRPC_CHTTP2_WRITING :
return " WRITING " ;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER :
return " WRITING[p=1] " ;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER :
return " WRITING[p=0] " ;
}
GPR_UNREACHABLE_CODE ( return " UNKNOWN " ) ;
}
static void set_write_state ( grpc_chttp2_transport * t ,
grpc_chttp2_write_state state , const char * reason ) {
if ( grpc_http_write_state_trace ) {
gpr_log ( GPR_DEBUG , " W:%p %s -> %s because %s " , t ,
write_state_name ( t - > executor . write_state ) , write_state_name ( state ) ,
reason ) ;
}
t - > executor . write_state = state ;
}
static void finish_global_actions ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ) {
grpc_chttp2_executor_action_header * hdr ;
@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN ( " finish_global_actions " , 0 ) ;
for ( ; ; ) {
if ( ! t - > executor . writing_active & & ! t - > closed & &
grpc_chttp2_unlocking_check_writes ( exec_ctx , & t - > global , & t - > writing ) ) {
t - > executor . writing_active = 1 ;
REF_TRANSPORT ( t , " writing " ) ;
prevent_endpoint_shutdown ( t ) ;
grpc_exec_ctx_sched ( exec_ctx , & t - > writing_action , GRPC_ERROR_NONE , NULL ) ;
}
check_read_ops ( exec_ctx , & t - > global ) ;
gpr_mu_lock ( & t - > executor . mu ) ;
@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
continue ;
} else {
t - > executor . global_active = false ;
switch ( t - > executor . write_state ) {
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER :
set_write_state ( t , GRPC_CHTTP2_WRITE_SCHEDULED , " unlocking " ) ;
REF_TRANSPORT ( t , " initiate_writing " ) ;
gpr_mu_unlock ( & t - > executor . mu ) ;
grpc_exec_ctx_sched (
exec_ctx , & t - > initiate_writing , GRPC_ERROR_NONE ,
t - > ep ! = NULL ? grpc_endpoint_get_workqueue ( t - > ep ) : NULL ) ;
break ;
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER :
start_writing ( exec_ctx , t ) ;
gpr_mu_unlock ( & t - > executor . mu ) ;
break ;
case GRPC_CHTTP2_WRITING_INACTIVE :
case GRPC_CHTTP2_WRITING :
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER :
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER :
case GRPC_CHTTP2_WRITE_SCHEDULED :
gpr_mu_unlock ( & t - > executor . mu ) ;
break ;
}
}
gpr_mu_unlock ( & t - > executor . mu ) ;
break ;
}
@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
* OUTPUT PROCESSING
*/
void grpc_chttp2_become_writable ( grpc_chttp2_transport_global * transport_global ,
grpc_chttp2_stream_global * stream_global ) {
void grpc_chttp2_initiate_write ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport_global * transport_global ,
bool covered_by_poller , const char * reason ) {
/* Perform state checks, and transition to a scheduled state if appropriate.
Each time we finish the global lock execution , we check if we need to
write . If we do :
- ( if there is a poller surrounding the write ) schedule
initiate_writing , which locks and calls initiate_writing_locked to . . .
- call start_writing , which verifies ( under the global lock ) that there
are things that need to be written by calling
grpc_chttp2_unlocking_check_writes , and if so schedules writing_action
against the current exec_ctx , to be executed OUTSIDE of the global lock
- eventually writing_action results in grpc_chttp2_terminate_writing being
called , which re - takes the global lock , updates state , checks if we need
to do * another * write immediately , and if so loops back to
start_writing .
Current problems :
- too much lock entry / exiting
- the writing thread can become stuck indefinitely ( punt through the
workqueue periodically to fix ) */
grpc_chttp2_transport * t = TRANSPORT_FROM_GLOBAL ( transport_global ) ;
switch ( t - > executor . write_state ) {
case GRPC_CHTTP2_WRITING_INACTIVE :
set_write_state ( t , covered_by_poller
? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER
: GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER ,
reason ) ;
break ;
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER :
/* nothing to do: write already requested */
break ;
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER :
if ( covered_by_poller ) {
/* upgrade to note poller is available to cover the write */
set_write_state ( t , GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER , reason ) ;
}
break ;
case GRPC_CHTTP2_WRITE_SCHEDULED :
/* nothing to do: write already scheduled */
break ;
case GRPC_CHTTP2_WRITING :
set_write_state ( t ,
covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER
: GRPC_CHTTP2_WRITING_STALE_NO_POLLER ,
reason ) ;
break ;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER :
/* nothing to do: write already requested */
break ;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER :
if ( covered_by_poller ) {
/* upgrade to note poller is available to cover the write */
set_write_state ( t , GRPC_CHTTP2_WRITING_STALE_WITH_POLLER , reason ) ;
}
break ;
}
}
static void start_writing ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ) {
GPR_ASSERT ( t - > executor . write_state = = GRPC_CHTTP2_WRITE_SCHEDULED | |
t - > executor . write_state = = GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER ) ;
if ( ! t - > closed & &
grpc_chttp2_unlocking_check_writes ( exec_ctx , & t - > global , & t - > writing ) ) {
set_write_state ( t , GRPC_CHTTP2_WRITING , " start_writing " ) ;
REF_TRANSPORT ( t , " writing " ) ;
prevent_endpoint_shutdown ( t ) ;
grpc_exec_ctx_sched ( exec_ctx , & t - > writing_action , GRPC_ERROR_NONE , NULL ) ;
} else {
if ( t - > closed ) {
set_write_state ( t , GRPC_CHTTP2_WRITING_INACTIVE ,
" start_writing:transport_closed " ) ;
} else {
set_write_state ( t , GRPC_CHTTP2_WRITING_INACTIVE ,
" start_writing:nothing_to_write " ) ;
}
end_waiting_for_write ( exec_ctx , t , GRPC_ERROR_CREATE ( " Nothing to write " ) ) ;
if ( t - > ep & & ! t - > endpoint_reading ) {
destroy_endpoint ( exec_ctx , t ) ;
}
}
}
static void initiate_writing_locked ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s_unused ,
void * arg_ignored ) {
start_writing ( exec_ctx , t ) ;
UNREF_TRANSPORT ( exec_ctx , t , " initiate_writing " ) ;
}
static void initiate_writing ( grpc_exec_ctx * exec_ctx , void * arg ,
grpc_error * error ) {
grpc_chttp2_run_with_global_lock ( exec_ctx , arg , NULL , initiate_writing_locked ,
NULL , 0 ) ;
}
void grpc_chttp2_become_writable ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport_global * transport_global ,
grpc_chttp2_stream_global * stream_global ,
bool covered_by_poller , const char * reason ) {
if ( ! TRANSPORT_FROM_GLOBAL ( transport_global ) - > closed & &
grpc_chttp2_list_add_writable_stream ( transport_global , stream_global ) ) {
GRPC_CHTTP2_STREAM_REF ( stream_global , " chttp2_writing " ) ;
grpc_chttp2_initiate_write ( exec_ctx , transport_global , covered_by_poller ,
reason ) ;
}
}
static void push_setting ( grpc_chttp2_transport * t , grpc_chttp2_setting_id id ,
uint32_t value ) {
static void push_setting ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_chttp2_setting_id id , uint32_t value ) {
const grpc_chttp2_setting_parameters * sp =
& grpc_chttp2_settings_parameters [ id ] ;
uint32_t use_value = GPR_CLAMP ( value , sp - > min_value , sp - > max_value ) ;
@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
if ( use_value ! = t - > global . settings [ GRPC_LOCAL_SETTINGS ] [ id ] ) {
t - > global . settings [ GRPC_LOCAL_SETTINGS ] [ id ] = use_value ;
t - > global . dirtied_local_settings = 1 ;
grpc_chttp2_initiate_write ( exec_ctx , & t - > global , false , " push_setting " ) ;
}
}
static void end_waiting_for_write ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t , grpc_error * error ) {
grpc_chttp2_stream_global * stream_global ;
while ( grpc_chttp2_list_pop_closed_waiting_for_writing ( & t - > global ,
& stream_global ) ) {
fail_pending_writes ( exec_ctx , & t - > global , stream_global ,
GRPC_ERROR_REF ( error ) ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , stream_global , " finish_writes " ) ;
}
GRPC_ERROR_UNREF ( error ) ;
}
static void terminate_writing_with_lock ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport * t ,
grpc_chttp2_stream * s_ignored ,
@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_cleanup_writing ( exec_ctx , & t - > global , & t - > writing ) ;
grpc_chttp2_stream_global * stream_global ;
while ( grpc_chttp2_list_pop_closed_waiting_for_writing ( & t - > global ,
& stream_global ) ) {
fail_pending_writes ( exec_ctx , & t - > global , stream_global ,
GRPC_ERROR_REF ( error ) ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , stream_global , " finish_writes " ) ;
end_waiting_for_write ( exec_ctx , t , error ) ;
switch ( t - > executor . write_state ) {
case GRPC_CHTTP2_WRITING_INACTIVE :
case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER :
case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER :
case GRPC_CHTTP2_WRITE_SCHEDULED :
GPR_UNREACHABLE_CODE ( break ) ;
case GRPC_CHTTP2_WRITING :
set_write_state ( t , GRPC_CHTTP2_WRITING_INACTIVE , " terminate_writing " ) ;
break ;
case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER :
set_write_state ( t , GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER ,
" terminate_writing " ) ;
break ;
case GRPC_CHTTP2_WRITING_STALE_NO_POLLER :
set_write_state ( t , GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER ,
" terminate_writing " ) ;
break ;
}
/* leave the writing flag up on shutdown to prevent further writes in
unlock ( )
from starting */
t - > executor . writing_active = 0 ;
if ( t - > ep & & ! t - > endpoint_reading ) {
destroy_endpoint ( exec_ctx , t ) ;
}
UNREF_TRANSPORT ( exec_ctx , t , " writing " ) ;
GRPC_ERROR_UNREF ( error ) ;
}
void grpc_chttp2_terminate_writing ( grpc_exec_ctx * exec_ctx ,
@ -878,7 +1059,8 @@ static void maybe_start_some_streams(
stream_global - > id , STREAM_FROM_GLOBAL ( stream_global ) ) ;
stream_global - > in_stream_map = true ;
transport_global - > concurrent_stream_count + + ;
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( exec_ctx , transport_global , stream_global , true ,
" new_stream " ) ;
}
/* cancel out streams that will never be started */
while ( transport_global - > next_stream_id > = MAX_CLIENT_STREAM_ID & &
@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
maybe_start_some_streams ( exec_ctx , transport_global ) ;
} else {
GPR_ASSERT ( stream_global - > id ! = 0 ) ;
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( exec_ctx , transport_global , stream_global ,
true , " op.send_initial_metadata " ) ;
}
} else {
stream_global - > send_trailing_metadata = NULL ;
grpc_chttp2_complete_closure_step (
exec_ctx , transport_global , stream_global ,
& stream_global - > send_initial_metadata_finished ,
@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else {
stream_global - > send_message = op - > send_message ;
if ( stream_global - > id ! = 0 ) {
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( exec_ctx , transport_global , stream_global ,
true , " op.send_message " ) ;
}
}
}
@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_check_read_ops ( transport_global , stream_global ) ;
}
if ( stream_global - > write_closed ) {
stream_global - > send_trailing_metadata = NULL ;
grpc_chttp2_complete_closure_step (
exec_ctx , transport_global , stream_global ,
& stream_global - > send_trailing_metadata_finished ,
@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
} else if ( stream_global - > id ! = 0 ) {
/* TODO(ctiller): check if there's flow control for any outstanding
bytes before going writable */
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( exec_ctx , transport_global , stream_global ,
true , " op.send_trailing_metadata " ) ;
}
}
}
@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
( stream_global - > incoming_frames . head = = NULL | |
stream_global - > incoming_frames . head - > is_tail ) ) {
incoming_byte_stream_update_flow_control (
transport_global , stream_global , transport_global - > stream_lookahead ,
0 ) ;
exec_ctx , transport_global , stream_global ,
transport_global - > stream_lookahead , 0 ) ;
}
grpc_chttp2_list_add_check_read_ops ( transport_global , stream_global ) ;
}
@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
sizeof ( * op ) ) ;
}
static void send_ping_locked ( grpc_chttp2_transport * t , grpc_closure * on_recv ) {
static void send_ping_locked ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
grpc_closure * on_recv ) {
grpc_chttp2_outstanding_ping * p = gpr_malloc ( sizeof ( * p ) ) ;
p - > next = & t - > global . pings ;
p - > prev = p - > next - > prev ;
@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
p - > id [ 7 ] = ( uint8_t ) ( t - > global . ping_counter & 0xff ) ;
p - > on_recv = on_recv ;
gpr_slice_buffer_add ( & t - > global . qbuf , grpc_chttp2_ping_create ( 0 , p - > id ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , & t - > global , true , " send_ping " ) ;
}
static void ack_ping_locked ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,
@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
close_transport = grpc_chttp2_has_streams ( t )
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE ( " GOAWAY sent " ) ;
grpc_chttp2_initiate_write ( exec_ctx , & t - > global , false , " goaway_sent " ) ;
}
if ( op - > set_accept_stream ) {
@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
}
if ( op - > send_ping ) {
send_ping_locked ( t , op - > send_ping ) ;
send_ping_locked ( exec_ctx , t , op - > send_ping ) ;
}
if ( close_transport ! = GRPC_ERROR_NONE ) {
@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
& transport_global - > qbuf ,
grpc_chttp2_rst_stream_create ( stream_global - > id , ( uint32_t ) http_error ,
& stream_global - > stats . outgoing ) ) ;
grpc_chttp2_initiate_write ( exec_ctx , transport_global , false ,
" rst_stream " ) ;
}
const char * msg =
@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
}
}
static void add_error ( grpc_error * error , grpc_error * * refs , size_t * nrefs ) {
if ( error = = GRPC_ERROR_NONE ) return ;
for ( size_t i = 0 ; i < * nrefs ; i + + ) {
if ( error = = refs [ i ] ) {
return ;
}
}
refs [ * nrefs ] = error ;
+ + * nrefs ;
}
static grpc_error * removal_error ( grpc_error * extra_error ,
grpc_chttp2_stream_global * stream_global ) {
grpc_error * refs [ 3 ] ;
size_t nrefs = 0 ;
add_error ( stream_global - > read_closed_error , refs , & nrefs ) ;
add_error ( stream_global - > write_closed_error , refs , & nrefs ) ;
add_error ( extra_error , refs , & nrefs ) ;
grpc_error * error = GRPC_ERROR_NONE ;
if ( nrefs > 0 ) {
error = GRPC_ERROR_CREATE_REFERENCING ( " Failed due to stream removal " , refs ,
nrefs ) ;
}
GRPC_ERROR_UNREF ( extra_error ) ;
return error ;
}
static void fail_pending_writes ( grpc_exec_ctx * exec_ctx ,
grpc_chttp2_transport_global * transport_global ,
grpc_chttp2_stream_global * stream_global ,
grpc_error * error ) {
error = removal_error ( error , stream_global ) ;
stream_global - > send_message = NULL ;
grpc_chttp2_complete_closure_step (
exec_ctx , transport_global , stream_global ,
& stream_global - > send_initial_metadata_finished , GRPC_ERROR_REF ( error ) ) ;
@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed(
}
grpc_chttp2_list_add_check_read_ops ( transport_global , stream_global ) ;
if ( close_reads & & ! stream_global - > read_closed ) {
stream_global - > read_closed_error = GRPC_ERROR_REF ( error ) ;
stream_global - > read_closed = true ;
stream_global - > published_initial_metadata = true ;
stream_global - > published_trailing_metadata = true ;
decrement_active_streams_locked ( exec_ctx , transport_global , stream_global ) ;
}
if ( close_writes & & ! stream_global - > write_closed ) {
stream_global - > write_closed_error = GRPC_ERROR_REF ( error ) ;
stream_global - > write_closed = true ;
if ( TRANSPORT_FROM_GLOBAL ( transport_global ) - > executor . writing_active ) {
if ( TRANSPORT_FROM_GLOBAL ( transport_global ) - > executor . write_state ! =
GRPC_CHTTP2_WRITING_INACTIVE ) {
GRPC_CHTTP2_STREAM_REF ( stream_global , " finish_writes " ) ;
grpc_chttp2_list_add_closed_waiting_for_writing ( transport_global ,
stream_global ) ;
@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed(
}
}
if ( stream_global - > read_closed & & stream_global - > write_closed ) {
stream_global - > removal_error = GRPC_ERROR_REF ( error ) ;
if ( stream_global - > id ! = 0 & &
TRANSPORT_FROM_GLOBAL ( transport_global ) - > executor . parsing_active ) {
grpc_chttp2_list_add_closed_waiting_for_parsing ( transport_global ,
@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed(
} else {
if ( stream_global - > id ! = 0 ) {
remove_stream ( exec_ctx , TRANSPORT_FROM_GLOBAL ( transport_global ) ,
stream_global - > id , GRPC_ERROR_REF ( error ) ) ;
stream_global - > id ,
removal_error ( GRPC_ERROR_REF ( error ) , stream_global ) ) ;
}
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , stream_global , " chttp2 " ) ;
}
@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_mark_stream_closed ( exec_ctx , transport_global , stream_global , 1 ,
1 , error ) ;
grpc_chttp2_initiate_write ( exec_ctx , transport_global , false ,
" close_from_api " ) ;
}
typedef struct {
@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
/** update window from a settings change */
typedef struct {
grpc_chttp2_transport * t ;
grpc_exec_ctx * exec_ctx ;
} update_global_window_args ;
static void update_global_window ( void * args , uint32_t id , void * stream ) {
grpc_chttp2_transport * t = args ;
update_global_window_args * a = args ;
grpc_chttp2_transport * t = a - > t ;
grpc_chttp2_stream * s = stream ;
grpc_chttp2_transport_global * transport_global = & t - > global ;
grpc_chttp2_stream_global * stream_global = & s - > global ;
@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) {
is_zero = stream_global - > outgoing_window < = 0 ;
if ( was_zero & & ! is_zero ) {
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( a - > exec_ctx , transport_global , stream_global ,
true , " update_global_window " ) ;
}
}
@ -1801,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_transport_global * transport_global = & t - > global ;
grpc_chttp2_transport_parsing * transport_parsing = & t - > parsing ;
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into ( & t - > parsing . qbuf , & t - > global . qbuf ) ;
if ( t - > parsing . qbuf . count > 0 ) {
gpr_slice_buffer_move_into ( & t - > parsing . qbuf , & t - > global . qbuf ) ;
grpc_chttp2_initiate_write ( exec_ctx , transport_global , false ,
" parsing_qbuf " ) ;
}
/* merge stream lists */
grpc_chttp2_stream_map_move_into ( & t - > new_stream_map , & t - > parsing_stream_map ) ;
transport_global - > concurrent_stream_count =
( uint32_t ) grpc_chttp2_stream_map_size ( & t - > parsing_stream_map ) ;
if ( transport_parsing - > initial_window_update ! = 0 ) {
update_global_window_args args = { t , exec_ctx } ;
grpc_chttp2_stream_map_for_each ( & t - > parsing_stream_map ,
update_global_window , t ) ;
update_global_window , & args ) ;
transport_parsing - > initial_window_update = 0 ;
}
/* handle higher level things */
@ -1831,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT ( stream_global - > write_closed ) ;
GPR_ASSERT ( stream_global - > read_closed ) ;
remove_stream ( exec_ctx , t , stream_global - > id ,
GRPC_ERROR_REF ( stream_global - > removal_error ) ) ;
removal_error ( GRPC_ERROR_NONE , stream_global ) ) ;
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , stream_global , " chttp2 " ) ;
}
@ -1854,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
}
drop_connection ( exec_ctx , t , GRPC_ERROR_REF ( error ) ) ;
t - > endpoint_reading = 0 ;
if ( ! t - > executor . writing_active & & t - > ep ) {
grpc_endpoint_destroy ( exec_ctx , t - > ep ) ;
t - > ep = NULL ;
/* safe as we still have a ref for read */
UNREF_TRANSPORT ( exec_ctx , t , " disconnect " ) ;
if ( grpc_http_write_state_trace ) {
gpr_log ( GPR_DEBUG , " R:%p -> 0 ws=%s " , t ,
write_state_name ( t - > executor . write_state ) ) ;
}
if ( t - > executor . write_state = = GRPC_CHTTP2_WRITING_INACTIVE & & t - > ep ) {
destroy_endpoint ( exec_ctx , t ) ;
}
} else if ( ! t - > closed ) {
keep_reading = true ;
@ -1942,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
}
static void incoming_byte_stream_update_flow_control (
grpc_chttp2_transport_global * transport_global ,
grpc_exec_ctx * exec_ctx , grpc_ chttp2_transport_global * transport_global ,
grpc_chttp2_stream_global * stream_global , size_t max_size_hint ,
size_t have_already ) {
uint32_t max_recv_bytes ;
@ -1977,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control(
add_max_recv_bytes ) ;
grpc_chttp2_list_add_unannounced_incoming_window_available ( transport_global ,
stream_global ) ;
grpc_chttp2_become_writable ( transport_global , stream_global ) ;
grpc_chttp2_become_writable ( exec_ctx , transport_global , stream_global ,
false , " read_incoming_stream " ) ;
}
}
@ -1999,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global * stream_global = & bs - > stream - > global ;
if ( bs - > is_tail ) {
incoming_byte_stream_update_flow_control (
transport_global , stream_global , arg - > max_size_hint , bs - > slices . length ) ;
incoming_byte_stream_update_flow_control ( exec_ctx , transport_global ,
stream_global , arg - > max_size_hint ,
bs - > slices . length ) ;
}
if ( bs - > slices . count > 0 ) {
* arg - > slice = gpr_slice_buffer_take_first ( & bs - > slices ) ;
@ -2184,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
if ( context = = NULL ) {
* scope = NULL ;
gpr_asprintf ( & buf , " %s(% " PRId64 " ) " , var , val ) ;
result = gpr_leftpad ( buf , ' ' , 4 0) ;
result = gpr_leftpad ( buf , ' ' , 6 0) ;
gpr_free ( buf ) ;
return result ;
}
@ -2197,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_free ( tmp ) ;
}
gpr_asprintf ( & buf , " %s.%s(% " PRId64 " ) " , underscore_pos + 1 , var , val ) ;
result = gpr_leftpad ( buf , ' ' , 4 0) ;
result = gpr_leftpad ( buf , ' ' , 6 0) ;
gpr_free ( buf ) ;
return result ;
}
@ -2230,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
tmp_phase = gpr_leftpad ( phase , ' ' , 8 ) ;
tmp_scope1 = gpr_leftpad ( scope1 , ' ' , 11 ) ;
gpr_asprintf ( & prefix , " FLOW %s: %s %s " , phase , clisvr , scope1 ) ;
gpr_asprintf ( & prefix , " FLOW %s: %s %s " , tmp_ phase, clisvr , scope1 ) ;
gpr_free ( tmp_phase ) ;
gpr_free ( tmp_scope1 ) ;