@ -174,343 +174,451 @@ static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) {
return initial_metadata - > list . default_count = = initial_metadata - > list . count ;
}
grpc_chttp2_begin_write_result grpc_chttp2_begin_write (
grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ) {
grpc_chttp2_stream * s ;
/* stats histogram counters: we increment these throughout this function,
and at the end publish to the central stats histograms */
int flow_control_writes = 0 ;
int initial_metadata_writes = 0 ;
int trailing_metadata_writes = 0 ;
int message_writes = 0 ;
namespace {
class StreamWriteContext ;
class WriteContext {
public :
WriteContext ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ) : t_ ( t ) {
GRPC_STATS_INC_HTTP2_WRITES_BEGUN ( exec_ctx ) ;
GPR_TIMER_BEGIN ( " grpc_chttp2_begin_write " , 0 ) ;
}
GRPC_STATS_INC_HTTP2_WRITES_BEGUN ( exec_ctx ) ;
// TODO(ctiller): make this the destructor
void FlushStats ( grpc_exec_ctx * exec_ctx ) {
GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE (
exec_ctx , initial_metadata_writes_ ) ;
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE ( exec_ctx , message_writes_ ) ;
GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE (
exec_ctx , trailing_metadata_writes_ ) ;
GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE ( exec_ctx , flow_control_writes_ ) ;
}
GPR_TIMER_BEGIN ( " grpc_chttp2_begin_write " , 0 ) ;
void FlushSettings ( grpc_exec_ctx * exec_ctx ) {
if ( t_ - > dirtied_local_settings & & ! t_ - > sent_local_settings ) {
grpc_slice_buffer_add (
& t_ - > outbuf , grpc_chttp2_settings_create (
t_ - > settings [ GRPC_SENT_SETTINGS ] ,
t_ - > settings [ GRPC_LOCAL_SETTINGS ] ,
t_ - > force_send_settings , GRPC_CHTTP2_NUM_SETTINGS ) ) ;
t_ - > force_send_settings = false ;
t_ - > dirtied_local_settings = false ;
t_ - > sent_local_settings = true ;
GRPC_STATS_INC_HTTP2_SETTINGS_WRITES ( exec_ctx ) ;
}
}
if ( t - > dirtied_local_settings & & ! t - > sent_local_settings ) {
grpc_slice_buffer_add (
& t - > outbuf ,
grpc_chttp2_settings_create (
t - > settings [ GRPC_SENT_SETTINGS ] , t - > settings [ GRPC_LOCAL_SETTINGS ] ,
t - > force_send_settings , GRPC_CHTTP2_NUM_SETTINGS ) ) ;
t - > force_send_settings = 0 ;
t - > dirtied_local_settings = 0 ;
t - > sent_local_settings = 1 ;
GRPC_STATS_INC_HTTP2_SETTINGS_WRITES ( exec_ctx ) ;
void FlushQueuedBuffers ( grpc_exec_ctx * exec_ctx ) {
/* simple writes are queued to qbuf, and flushed here */
grpc_slice_buffer_move_into ( & t_ - > qbuf , & t_ - > outbuf ) ;
GPR_ASSERT ( t_ - > qbuf . count = = 0 ) ;
}
for ( size_t i = 0 ; i < t - > ping_ack_count ; i + + ) {
grpc_slice_buffer_add ( & t - > outbuf ,
grpc_chttp2_ping_create ( 1 , t - > ping_acks [ i ] ) ) ;
void FlushWindowUpdates ( grpc_exec_ctx * exec_ctx ) {
uint32_t transport_announce =
grpc_chttp2_flowctl_maybe_send_transport_update ( & t_ - > flow_control ,
t_ - > outbuf . count > 0 ) ;
if ( transport_announce ) {
grpc_transport_one_way_stats throwaway_stats ;
grpc_slice_buffer_add (
& t_ - > outbuf , grpc_chttp2_window_update_create ( 0 , transport_announce ,
& throwaway_stats ) ) ;
ResetPingRecvClock ( ) ;
}
}
t - > ping_ack_count = 0 ;
/* simple writes are queued to qbuf, and flushed here */
grpc_slice_buffer_move_into ( & t - > qbuf , & t - > outbuf ) ;
GPR_ASSERT ( t - > qbuf . count = = 0 ) ;
void FlushPingAcks ( ) {
for ( size_t i = 0 ; i < t_ - > ping_ack_count ; i + + ) {
grpc_slice_buffer_add ( & t_ - > outbuf ,
grpc_chttp2_ping_create ( true , t_ - > ping_acks [ i ] ) ) ;
}
t_ - > ping_ack_count = 0 ;
}
grpc_chttp2_hpack_compressor_set_max_table_size (
& t - > hpack_compressor ,
t - > settings [ GRPC_PEER_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ] ) ;
void EnactHpackSettings ( grpc_exec_ctx * exec_ctx ) {
grpc_chttp2_hpack_compressor_set_max_table_size (
& t_ - > hpack_compressor ,
t_ - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE ] ) ;
}
if ( t - > flow_control . remote_window > 0 ) {
while ( grpc_chttp2_list_pop_stalled_by_transport ( t , & s ) ) {
if ( ! t - > closed & & grpc_chttp2_list_add_writable_stream ( t , s ) ) {
stream_ref_if_not_destroyed ( & s - > refcount - > refs ) ;
void UpdateStreamsNoLongerStalled ( ) {
grpc_chttp2_stream * s ;
while ( grpc_chttp2_list_pop_stalled_by_transport ( t_ , & s ) ) {
if ( ! t_ - > closed & & grpc_chttp2_list_add_writable_stream ( t_ , s ) ) {
if ( ! stream_ref_if_not_destroyed ( & s - > refcount - > refs ) ) {
grpc_chttp2_list_remove_writable_stream ( t_ , s ) ;
}
}
}
}
grpc_chttp2_begin_write_result result = { false , false , false } ;
grpc_chttp2_stream * NextStream ( ) {
if ( t_ - > outbuf . length > target_write_size ( t_ ) ) {
result_ . partial = true ;
return nullptr ;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
( according to available window sizes ) and add to the output buffer */
while ( true ) {
if ( t - > outbuf . length > target_write_size ( t ) ) {
result . partial = true ;
break ;
grpc_chttp2_stream * s ;
if ( ! grpc_chttp2_list_pop_writable_stream ( t_ , & s ) ) {
return nullptr ;
}
return s ;
}
void ResetPingRecvClock ( ) {
if ( ! t_ - > is_client ) {
t_ - > ping_recv_state . last_ping_recv_time = GRPC_MILLIS_INF_PAST ;
t_ - > ping_recv_state . ping_strikes = 0 ;
}
}
void IncInitialMetadataWrites ( ) { + + initial_metadata_writes_ ; }
void IncWindowUpdateWrites ( ) { + + flow_control_writes_ ; }
void IncMessageWrites ( ) { + + message_writes_ ; }
void IncTrailingMetadataWrites ( ) { + + trailing_metadata_writes_ ; }
void NoteScheduledResults ( ) { result_ . early_results_scheduled = true ; }
grpc_chttp2_transport * transport ( ) const { return t_ ; }
grpc_chttp2_begin_write_result Result ( ) {
result_ . writing = t_ - > outbuf . count > 0 ;
return result_ ;
}
private :
grpc_chttp2_transport * const t_ ;
/* stats histogram counters: we increment these throughout this function,
and at the end publish to the central stats histograms */
int flow_control_writes_ = 0 ;
int initial_metadata_writes_ = 0 ;
int trailing_metadata_writes_ = 0 ;
int message_writes_ = 0 ;
grpc_chttp2_begin_write_result result_ = { false , false , false } ;
} ;
class DataSendContext {
public :
DataSendContext ( WriteContext * write_context , grpc_chttp2_transport * t ,
grpc_chttp2_stream * s )
: write_context_ ( write_context ) ,
t_ ( t ) ,
s_ ( s ) ,
sending_bytes_before_ ( s_ - > sending_bytes ) { }
uint32_t stream_remote_window ( ) const {
return ( uint32_t ) GPR_MAX (
0 , s_ - > flow_control . remote_window_delta +
( int64_t ) t_ - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ) ;
}
uint32_t max_outgoing ( ) const {
return ( uint32_t ) GPR_MIN (
t_ - > settings [ GRPC_PEER_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] ,
GPR_MIN ( stream_remote_window ( ) , t_ - > flow_control . remote_window ) ) ;
}
bool AnyOutgoing ( ) const { return max_outgoing ( ) ! = 0 ; }
void FlushCompressedBytes ( ) {
uint32_t send_bytes =
( uint32_t ) GPR_MIN ( max_outgoing ( ) , s_ - > compressed_data_buffer . length ) ;
bool is_last_data_frame =
( send_bytes = = s_ - > compressed_data_buffer . length & &
s_ - > flow_controlled_buffer . length = = 0 & &
s_ - > fetching_send_message = = NULL ) ;
if ( is_last_data_frame & & s_ - > send_trailing_metadata ! = NULL & &
s_ - > stream_compression_ctx ! = NULL ) {
if ( ! grpc_stream_compress ( s_ - > stream_compression_ctx ,
& s_ - > flow_controlled_buffer ,
& s_ - > compressed_data_buffer , NULL , MAX_SIZE_T ,
GRPC_STREAM_COMPRESSION_FLUSH_FINISH ) ) {
gpr_log ( GPR_ERROR , " Stream compression failed. " ) ;
}
grpc_stream_compression_context_destroy ( s_ - > stream_compression_ctx ) ;
s_ - > stream_compression_ctx = NULL ;
/* After finish, bytes in s->compressed_data_buffer may be
* more than max_outgoing . Start another round of the current
* while loop so that send_bytes and is_last_data_frame are
* recalculated . */
return ;
}
is_last_frame_ = is_last_data_frame & & s_ - > send_trailing_metadata ! = NULL & &
grpc_metadata_batch_is_empty ( s_ - > send_trailing_metadata ) ;
grpc_chttp2_encode_data ( s_ - > id , & s_ - > compressed_data_buffer , send_bytes ,
is_last_frame_ , & s_ - > stats . outgoing , & t_ - > outbuf ) ;
grpc_chttp2_flowctl_sent_data ( & t_ - > flow_control , & s_ - > flow_control ,
send_bytes ) ;
if ( s_ - > compressed_data_buffer . length = = 0 ) {
s_ - > sending_bytes + = s_ - > uncompressed_data_size ;
}
}
if ( ! grpc_chttp2_list_pop_writable_stream ( t , & s ) ) {
break ;
void CompressMoreBytes ( ) {
if ( s_ - > stream_compression_ctx = = NULL ) {
s_ - > stream_compression_ctx =
grpc_stream_compression_context_create ( s_ - > stream_compression_method ) ;
}
s_ - > uncompressed_data_size = s_ - > flow_controlled_buffer . length ;
if ( ! grpc_stream_compress ( s_ - > stream_compression_ctx ,
& s_ - > flow_controlled_buffer ,
& s_ - > compressed_data_buffer , NULL , MAX_SIZE_T ,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC ) ) {
gpr_log ( GPR_ERROR , " Stream compression failed. " ) ;
}
}
bool is_last_frame ( ) const { return is_last_frame_ ; }
bool sent_initial_metadata = s - > sent_initial_metadata ;
bool now_writing = false ;
void CallCallbacks ( grpc_exec_ctx * exec_ctx ) {
if ( update_list ( exec_ctx , t_ , s_ ,
( int64_t ) ( s_ - > sending_bytes - sending_bytes_before_ ) ,
& s_ - > on_flow_controlled_cbs ,
& s_ - > flow_controlled_bytes_flowed , GRPC_ERROR_NONE ) ) {
write_context_ - > NoteScheduledResults ( ) ;
}
}
private :
WriteContext * write_context_ ;
grpc_chttp2_transport * t_ ;
grpc_chttp2_stream * s_ ;
const size_t sending_bytes_before_ ;
bool is_last_frame_ = false ;
} ;
class StreamWriteContext {
public :
StreamWriteContext ( WriteContext * write_context , grpc_chttp2_stream * s )
: write_context_ ( write_context ) , t_ ( write_context - > transport ( ) ) , s_ ( s ) {
GRPC_CHTTP2_IF_TRACING (
gpr_log ( GPR_DEBUG , " W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d " , t ,
t - > is_client ? " CLIENT " : " SERVER " , s - > id ,
sent_initial_metadata , s - > send_initial_metadata ! = NULL ,
gpr_log ( GPR_DEBUG , " W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d " , t_ ,
t_ - > is_client ? " CLIENT " : " SERVER " , s - > id ,
s - > s ent_initial_metadata, s - > send_initial_metadata ! = NULL ,
( int ) ( s - > flow_control . local_window_delta -
s - > flow_control . announced_window_delta ) ) ) ;
}
grpc_mdelem * extra_headers_for_trailing_metadata [ 2 ] ;
size_t num_extra_headers_for_trailing_metadata = 0 ;
void FlushInitialMetadata ( grpc_exec_ctx * exec_ctx ) {
/* send initial metadata if it's available */
if ( ! sent_initial_metadata & & s - > send_initial_metadata ! = NULL ) {
// We skip this on the server side if there is no custom initial
// metadata, there are no messages to send, and we are also sending
// trailing metadata. This results in a Trailers-Only response,
// which is required for retries, as per:
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
if ( t - > is_client | | s - > fetching_send_message ! = NULL | |
s - > flow_controlled_buffer . length ! = 0 | |
s - > send_trailing_metadata = = NULL | |
! is_default_initial_metadata ( s - > send_initial_metadata ) ) {
grpc_encode_header_options hopt = {
s - > id , // stream_id
false , // is_eof
t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA ] ! =
0 , // use_true_binary_metadata
t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] , // max_frame_size
& s - > stats . outgoing // stats
} ;
grpc_chttp2_encode_header ( exec_ctx , & t - > hpack_compressor , NULL , 0 ,
s - > send_initial_metadata , & hopt , & t - > outbuf ) ;
now_writing = true ;
if ( ! t - > is_client ) {
t - > ping_recv_state . last_ping_recv_time = GRPC_MILLIS_INF_PAST ;
t - > ping_recv_state . ping_strikes = 0 ;
}
initial_metadata_writes + + ;
} else {
GRPC_CHTTP2_IF_TRACING (
gpr_log ( GPR_INFO , " not sending initial_metadata (Trailers-Only) " ) ) ;
// When sending Trailers-Only, we need to move the :status and
// content-type headers to the trailers.
if ( s - > send_initial_metadata - > idx . named . status ! = NULL ) {
extra_headers_for_trailing_metadata
[ num_extra_headers_for_trailing_metadata + + ] =
& s - > send_initial_metadata - > idx . named . status - > md ;
}
if ( s - > send_initial_metadata - > idx . named . content_type ! = NULL ) {
extra_headers_for_trailing_metadata
[ num_extra_headers_for_trailing_metadata + + ] =
& s - > send_initial_metadata - > idx . named . content_type - > md ;
}
trailing_metadata_writes + + ;
}
s - > send_initial_metadata = NULL ;
s - > sent_initial_metadata = true ;
sent_initial_metadata = true ;
result . early_results_scheduled = true ;
grpc_chttp2_complete_closure_step (
exec_ctx , t , s , & s - > send_initial_metadata_finished , GRPC_ERROR_NONE ,
" send_initial_metadata_finished " ) ;
if ( s_ - > sent_initial_metadata ) return ;
if ( s_ - > send_initial_metadata = = nullptr ) return ;
// We skip this on the server side if there is no custom initial
// metadata, there are no messages to send, and we are also sending
// trailing metadata. This results in a Trailers-Only response,
// which is required for retries, as per:
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
if ( ! t_ - > is_client & & s_ - > fetching_send_message = = nullptr & &
s_ - > flow_controlled_buffer . length = = 0 & &
s_ - > compressed_data_buffer . length = = 0 & &
s_ - > send_trailing_metadata ! = nullptr & &
is_default_initial_metadata ( s_ - > send_initial_metadata ) ) {
ConvertInitialMetadataToTrailingMetadata ( ) ;
} else {
grpc_encode_header_options hopt = {
s_ - > id , // stream_id
false , // is_eof
t_ - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA ] ! =
0 , // use_true_binary_metadata
t_ - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] , // max_frame_size
& s_ - > stats . outgoing // stats
} ;
grpc_chttp2_encode_header ( exec_ctx , & t_ - > hpack_compressor , NULL , 0 ,
s_ - > send_initial_metadata , & hopt , & t_ - > outbuf ) ;
write_context_ - > ResetPingRecvClock ( ) ;
write_context_ - > IncInitialMetadataWrites ( ) ;
}
s_ - > send_initial_metadata = NULL ;
s_ - > sent_initial_metadata = true ;
write_context_ - > NoteScheduledResults ( ) ;
grpc_chttp2_complete_closure_step (
exec_ctx , t_ , s_ , & s_ - > send_initial_metadata_finished , GRPC_ERROR_NONE ,
" send_initial_metadata_finished " ) ;
}
void FlushWindowUpdates ( grpc_exec_ctx * exec_ctx ) {
/* send any window updates */
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update (
& t - > flow_control , & s - > flow_control ) ;
if ( stream_announce > 0 ) {
grpc_slice_buffer_add (
& t - > outbuf , grpc_chttp2_window_update_create ( s - > id , stream_announce ,
& s - > stats . outgoing ) ) ;
if ( ! t - > is_client ) {
t - > ping_recv_state . last_ping_recv_time = GRPC_MILLIS_INF_PAST ;
t - > ping_recv_state . ping_strikes = 0 ;
& t_ - > flow_control , & s_ - > flow_control ) ;
if ( stream_announce = = 0 ) return ;
grpc_slice_buffer_add (
& t_ - > outbuf , grpc_chttp2_window_update_create ( s_ - > id , stream_announce ,
& s_ - > stats . outgoing ) ) ;
write_context_ - > ResetPingRecvClock ( ) ;
write_context_ - > IncWindowUpdateWrites ( ) ;
}
void FlushData ( grpc_exec_ctx * exec_ctx ) {
if ( ! s_ - > sent_initial_metadata ) return ;
if ( s_ - > flow_controlled_buffer . length = = 0 & &
s_ - > compressed_data_buffer . length = = 0 ) {
return ; // early out: nothing to do
}
DataSendContext data_send_context ( write_context_ , t_ , s_ ) ;
if ( ! data_send_context . AnyOutgoing ( ) ) {
if ( t_ - > flow_control . remote_window = = 0 ) {
report_stall ( t_ , s_ , " transport " ) ;
grpc_chttp2_list_add_stalled_by_transport ( t_ , s_ ) ;
} else if ( data_send_context . stream_remote_window ( ) = = 0 ) {
report_stall ( t_ , s_ , " stream " ) ;
grpc_chttp2_list_add_stalled_by_stream ( t_ , s_ ) ;
}
flow_control_writes + + ;
return ; // early out: nothing to do
}
if ( sent_initial_metadata ) {
/* send any body bytes, if allowed by flow control */
if ( s - > flow_controlled_buffer . length > 0 | |
s - > compressed_data_buffer . length > 0 ) {
uint32_t stream_remote_window = ( uint32_t ) GPR_MAX (
0 ,
s - > flow_control . remote_window_delta +
( int64_t ) t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ) ;
uint32_t max_outgoing = ( uint32_t ) GPR_MIN (
t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] ,
GPR_MIN ( stream_remote_window , t - > flow_control . remote_window ) ) ;
if ( max_outgoing > 0 ) {
bool is_last_data_frame = false ;
bool is_last_frame = false ;
size_t sending_bytes_before = s - > sending_bytes ;
while ( ( s - > flow_controlled_buffer . length > 0 | |
s - > compressed_data_buffer . length > 0 ) & &
max_outgoing > 0 ) {
if ( s - > compressed_data_buffer . length > 0 ) {
uint32_t send_bytes = ( uint32_t ) GPR_MIN (
max_outgoing , s - > compressed_data_buffer . length ) ;
is_last_data_frame =
( send_bytes = = s - > compressed_data_buffer . length & &
s - > flow_controlled_buffer . length = = 0 & &
s - > fetching_send_message = = NULL ) ;
if ( is_last_data_frame & & s - > send_trailing_metadata ! = NULL & &
s - > stream_compression_ctx ! = NULL ) {
if ( ! grpc_stream_compress (
s - > stream_compression_ctx , & s - > flow_controlled_buffer ,
& s - > compressed_data_buffer , NULL , MAX_SIZE_T ,
GRPC_STREAM_COMPRESSION_FLUSH_FINISH ) ) {
gpr_log ( GPR_ERROR , " Stream compression failed. " ) ;
}
grpc_stream_compression_context_destroy (
s - > stream_compression_ctx ) ;
s - > stream_compression_ctx = NULL ;
/* After finish, bytes in s->compressed_data_buffer may be
* more than max_outgoing . Start another round of the current
* while loop so that send_bytes and is_last_data_frame are
* recalculated . */
continue ;
}
is_last_frame =
is_last_data_frame & & s - > send_trailing_metadata ! = NULL & &
grpc_metadata_batch_is_empty ( s - > send_trailing_metadata ) ;
grpc_chttp2_encode_data ( s - > id , & s - > compressed_data_buffer ,
send_bytes , is_last_frame ,
& s - > stats . outgoing , & t - > outbuf ) ;
grpc_chttp2_flowctl_sent_data ( & t - > flow_control , & s - > flow_control ,
send_bytes ) ;
max_outgoing - = send_bytes ;
if ( s - > compressed_data_buffer . length = = 0 ) {
s - > sending_bytes + = s - > uncompressed_data_size ;
}
} else {
if ( s - > stream_compression_ctx = = NULL ) {
s - > stream_compression_ctx =
grpc_stream_compression_context_create (
s - > stream_compression_method ) ;
}
s - > uncompressed_data_size = s - > flow_controlled_buffer . length ;
if ( ! grpc_stream_compress (
s - > stream_compression_ctx , & s - > flow_controlled_buffer ,
& s - > compressed_data_buffer , NULL , MAX_SIZE_T ,
GRPC_STREAM_COMPRESSION_FLUSH_SYNC ) ) {
gpr_log ( GPR_ERROR , " Stream compression failed. " ) ;
}
}
}
if ( ! t - > is_client ) {
t - > ping_recv_state . last_ping_recv_time = 0 ;
t - > ping_recv_state . ping_strikes = 0 ;
}
if ( is_last_frame ) {
s - > send_trailing_metadata = NULL ;
s - > sent_trailing_metadata = true ;
if ( ! t - > is_client & & ! s - > read_closed ) {
grpc_slice_buffer_add ( & t - > outbuf , grpc_chttp2_rst_stream_create (
s - > id , GRPC_HTTP2_NO_ERROR ,
& s - > stats . outgoing ) ) ;
}
grpc_chttp2_mark_stream_closed ( exec_ctx , t , s , ! t - > is_client , 1 ,
GRPC_ERROR_NONE ) ;
}
result . early_results_scheduled | =
update_list ( exec_ctx , t , s ,
( int64_t ) ( s - > sending_bytes - sending_bytes_before ) ,
& s - > on_flow_controlled_cbs ,
& s - > flow_controlled_bytes_flowed , GRPC_ERROR_NONE ) ;
now_writing = true ;
if ( s - > flow_controlled_buffer . length > 0 | |
s - > compressed_data_buffer . length > 0 ) {
GRPC_CHTTP2_STREAM_REF ( s , " chttp2_writing:fork " ) ;
grpc_chttp2_list_add_writable_stream ( t , s ) ;
}
message_writes + + ;
} else if ( t - > flow_control . remote_window = = 0 ) {
report_stall ( t , s , " transport " ) ;
grpc_chttp2_list_add_stalled_by_transport ( t , s ) ;
now_writing = true ;
} else if ( stream_remote_window = = 0 ) {
report_stall ( t , s , " stream " ) ;
grpc_chttp2_list_add_stalled_by_stream ( t , s ) ;
now_writing = true ;
}
while ( ( s_ - > flow_controlled_buffer . length > 0 | |
s_ - > compressed_data_buffer . length > 0 ) & &
data_send_context . max_outgoing ( ) > 0 ) {
if ( s_ - > compressed_data_buffer . length > 0 ) {
data_send_context . FlushCompressedBytes ( ) ;
} else {
data_send_context . CompressMoreBytes ( ) ;
}
if ( s - > send_trailing_metadata ! = NULL & &
s - > fetching_send_message = = NULL & &
s - > flow_controlled_buffer . length = = 0 & &
s - > compressed_data_buffer . length = = 0 ) {
GRPC_CHTTP2_IF_TRACING ( gpr_log ( GPR_INFO , " sending trailing_metadata " ) ) ;
if ( grpc_metadata_batch_is_empty ( s - > send_trailing_metadata ) ) {
grpc_chttp2_encode_data ( s - > id , & s - > flow_controlled_buffer , 0 , true ,
& s - > stats . outgoing , & t - > outbuf ) ;
} else {
grpc_encode_header_options hopt = {
s - > id , true ,
t - > settings
[ GRPC_PEER_SETTINGS ]
}
write_context_ - > ResetPingRecvClock ( ) ;
if ( data_send_context . is_last_frame ( ) ) {
SentLastFrame ( exec_ctx ) ;
}
data_send_context . CallCallbacks ( exec_ctx ) ;
stream_became_writable_ = true ;
if ( s_ - > flow_controlled_buffer . length > 0 | |
s_ - > compressed_data_buffer . length > 0 ) {
GRPC_CHTTP2_STREAM_REF ( s_ , " chttp2_writing:fork " ) ;
grpc_chttp2_list_add_writable_stream ( t_ , s_ ) ;
}
write_context_ - > IncMessageWrites ( ) ;
}
void FlushTrailingMetadata ( grpc_exec_ctx * exec_ctx ) {
if ( ! s_ - > sent_initial_metadata ) return ;
if ( s_ - > send_trailing_metadata = = NULL ) return ;
if ( s_ - > fetching_send_message ! = NULL ) return ;
if ( s_ - > flow_controlled_buffer . length ! = 0 ) return ;
if ( s_ - > compressed_data_buffer . length ! = 0 ) return ;
GRPC_CHTTP2_IF_TRACING ( gpr_log ( GPR_INFO , " sending trailing_metadata " ) ) ;
if ( grpc_metadata_batch_is_empty ( s_ - > send_trailing_metadata ) ) {
grpc_chttp2_encode_data ( s_ - > id , & s_ - > flow_controlled_buffer , 0 , true ,
& s_ - > stats . outgoing , & t_ - > outbuf ) ;
} else {
grpc_encode_header_options hopt = {
s_ - > id , true ,
t_ - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA ] ! =
0 ,
t - > settings [ GRPC_PEER_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] ,
& s - > stats . outgoing } ;
grpc_chttp2_encode_header ( exec_ctx , & t - > hpack_compressor ,
extra_headers_for_trailing_metadata ,
num_extra_headers_for_trailing_metadata ,
s - > send_trailing_metadata , & hopt ,
& t - > outbuf ) ;
trailing_metadata_writes + + ;
}
s - > send_trailing_metadata = NULL ;
s - > sent_trailing_metadata = true ;
if ( ! t - > is_client ) {
t - > ping_recv_state . last_ping_recv_time = GRPC_MILLIS_INF_PAST ;
t - > ping_recv_state . ping_strikes = 0 ;
}
if ( ! t - > is_client & & ! s - > read_closed ) {
grpc_slice_buffer_add (
& t - > outbuf , grpc_chttp2_rst_stream_create (
s - > id , GRPC_HTTP2_NO_ERROR , & s - > stats . outgoing ) ) ;
}
grpc_chttp2_mark_stream_closed ( exec_ctx , t , s , ! t - > is_client , 1 ,
GRPC_ERROR_NONE ) ;
now_writing = true ;
result . early_results_scheduled = true ;
grpc_chttp2_complete_closure_step (
exec_ctx , t , s , & s - > send_trailing_metadata_finished ,
GRPC_ERROR_NONE , " send_trailing_metadata_finished " ) ;
}
0 ,
t_ - > settings [ GRPC_PEER_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE ] ,
& s_ - > stats . outgoing } ;
grpc_chttp2_encode_header ( exec_ctx , & t_ - > hpack_compressor ,
extra_headers_for_trailing_metadata_ ,
num_extra_headers_for_trailing_metadata_ ,
s_ - > send_trailing_metadata , & hopt , & t_ - > outbuf ) ;
}
write_context_ - > IncTrailingMetadataWrites ( ) ;
write_context_ - > ResetPingRecvClock ( ) ;
SentLastFrame ( exec_ctx ) ;
write_context_ - > NoteScheduledResults ( ) ;
grpc_chttp2_complete_closure_step (
exec_ctx , t_ , s_ , & s_ - > send_trailing_metadata_finished , GRPC_ERROR_NONE ,
" send_trailing_metadata_finished " ) ;
}
bool stream_became_writable ( ) { return stream_became_writable_ ; }
private :
void ConvertInitialMetadataToTrailingMetadata ( ) {
GRPC_CHTTP2_IF_TRACING (
gpr_log ( GPR_INFO , " not sending initial_metadata (Trailers-Only) " ) ) ;
// When sending Trailers-Only, we need to move the :status and
// content-type headers to the trailers.
if ( s_ - > send_initial_metadata - > idx . named . status ! = NULL ) {
extra_headers_for_trailing_metadata_
[ num_extra_headers_for_trailing_metadata_ + + ] =
& s_ - > send_initial_metadata - > idx . named . status - > md ;
}
if ( s_ - > send_initial_metadata - > idx . named . content_type ! = NULL ) {
extra_headers_for_trailing_metadata_
[ num_extra_headers_for_trailing_metadata_ + + ] =
& s_ - > send_initial_metadata - > idx . named . content_type - > md ;
}
}
void SentLastFrame ( grpc_exec_ctx * exec_ctx ) {
s_ - > send_trailing_metadata = NULL ;
s_ - > sent_trailing_metadata = true ;
if ( ! t_ - > is_client & & ! s_ - > read_closed ) {
grpc_slice_buffer_add (
& t_ - > outbuf , grpc_chttp2_rst_stream_create (
s_ - > id , GRPC_HTTP2_NO_ERROR , & s_ - > stats . outgoing ) ) ;
}
grpc_chttp2_mark_stream_closed ( exec_ctx , t_ , s_ , ! t_ - > is_client , true ,
GRPC_ERROR_NONE ) ;
}
if ( now_writing ) {
GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE (
exec_ctx , initial_metadata_writes ) ;
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE ( exec_ctx , message_writes ) ;
GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE (
exec_ctx , trailing_metadata_writes ) ;
GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE ( exec_ctx ,
flow_control_writes ) ;
WriteContext * const write_context_ ;
grpc_chttp2_transport * const t_ ;
grpc_chttp2_stream * const s_ ;
bool stream_became_writable_ = false ;
grpc_mdelem * extra_headers_for_trailing_metadata_ [ 2 ] ;
size_t num_extra_headers_for_trailing_metadata_ = 0 ;
} ;
} // namespace
grpc_chttp2_begin_write_result grpc_chttp2_begin_write (
grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ) {
WriteContext ctx ( exec_ctx , t ) ;
ctx . FlushSettings ( exec_ctx ) ;
ctx . FlushPingAcks ( ) ;
ctx . FlushQueuedBuffers ( exec_ctx ) ;
ctx . EnactHpackSettings ( exec_ctx ) ;
if ( t - > flow_control . remote_window > 0 ) {
ctx . UpdateStreamsNoLongerStalled ( ) ;
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
( according to available window sizes ) and add to the output buffer */
while ( grpc_chttp2_stream * s = ctx . NextStream ( ) ) {
StreamWriteContext stream_ctx ( & ctx , s ) ;
stream_ctx . FlushInitialMetadata ( exec_ctx ) ;
stream_ctx . FlushWindowUpdates ( exec_ctx ) ;
stream_ctx . FlushData ( exec_ctx ) ;
stream_ctx . FlushTrailingMetadata ( exec_ctx ) ;
if ( stream_ctx . stream_became_writable ( ) ) {
if ( ! grpc_chttp2_list_add_writing_stream ( t , s ) ) {
/* already in writing list: drop ref */
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:already_writing " ) ;
} else {
/* ref will be dropped at end of write */
}
} else {
GRPC_CHTTP2_STREAM_UNREF ( exec_ctx , s , " chttp2_writing:no_write " ) ;
}
}
maybe_initiate_ping ( exec_ctx , t ) ;
ctx . FlushWindowUpdates ( exec_ctx ) ;
uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update (
& t - > flow_control , t - > outbuf . count > 0 ) ;
if ( transport_announce ) {
grpc_transport_one_way_stats throwaway_stats ;
grpc_slice_buffer_add (
& t - > outbuf , grpc_chttp2_window_update_create ( 0 , transport_announce ,
& throwaway_stats ) ) ;
if ( ! t - > is_client ) {
t - > ping_recv_state . last_ping_recv_time = GRPC_MILLIS_INF_PAST ;
t - > ping_recv_state . ping_strikes = 0 ;
}
}
maybe_initiate_ping ( exec_ctx , t ) ;
GPR_TIMER_END ( " grpc_chttp2_begin_write " , 0 ) ;
result . writing = t - > outbuf . count > 0 ;
return result ;
return ctx . Result ( ) ;
}
void grpc_chttp2_end_write ( grpc_exec_ctx * exec_ctx , grpc_chttp2_transport * t ,