@ -44,6 +44,7 @@ int grpc_chttp2_unlocking_check_writes(
grpc_chttp2_transport_writing * transport_writing ) {
grpc_chttp2_stream_global * stream_global ;
grpc_chttp2_stream_writing * stream_writing ;
grpc_chttp2_stream_global * first_reinserted_stream = NULL ;
gpr_uint32 window_delta ;
/* simple writes are queued to qbuf, and flushed here */
@ -64,51 +65,53 @@ int grpc_chttp2_unlocking_check_writes(
}
/* for each grpc_chttp2_stream that's become writable, frame it's data
( according to
available window sizes ) and add to the output buffer */
while ( transport_global - > outgoing_window > 0 & &
grpc_chttp2_list_pop_writable_stream ( transport_global ,
transport_writing , & stream_global ,
& stream_writing ) ) {
( according to available window sizes ) and add to the output buffer */
while ( grpc_chttp2_list_pop_writable_stream (
transport_global , transport_writing , & stream_global , & stream_writing ) ) {
if ( stream_global = = first_reinserted_stream ) {
/* prevent infinite loop */
grpc_chttp2_list_add_first_writable_stream ( transport_global ,
stream_global ) ;
break ;
}
stream_writing - > id = stream_global - > id ;
window_delta = grpc_chttp2_preencode (
stream_global - > outgoing_sopb - > ops , & stream_global - > outgoing_sopb - > nops ,
GPR_MIN ( transport_global - > outgoing_window ,
stream_global - > outgoing_window ) ,
& stream_writing - > sopb ) ;
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT (
" write " , transport_global , outgoing_window , - ( gpr_int64 ) window_delta ) ;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ( " write " , transport_global , stream_global ,
outgoing_window , - ( gpr_int64 ) window_delta ) ;
transport_global - > outgoing_window - = window_delta ;
stream_global - > outgoing_window - = window_delta ;
if ( stream_global - > write_state = = GRPC_WRITE_STATE_QUEUED_CLOSE & &
stream_global - > outgoing_sopb - > nops = = 0 ) {
if ( ! transport_global - > is_client & & ! stream_global - > read_closed ) {
stream_writing - > send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM ;
} else {
stream_writing - > send_closed = GRPC_SEND_CLOSED ;
stream_writing - > send_closed = GRPC_DONT_SEND_CLOSED ;
if ( stream_global - > outgoing_sopb ) {
window_delta =
grpc_chttp2_preencode ( stream_global - > outgoing_sopb - > ops ,
& stream_global - > outgoing_sopb - > nops ,
GPR_MIN ( transport_global - > outgoing_window ,
stream_global - > outgoing_window ) ,
& stream_writing - > sopb ) ;
GRPC_CHTTP2_FLOWCTL_TRACE_TRANSPORT (
" write " , transport_global , outgoing_window , - ( gpr_int64 ) window_delta ) ;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ( " write " , transport_global , stream_global ,
outgoing_window ,
- ( gpr_int64 ) window_delta ) ;
transport_global - > outgoing_window - = window_delta ;
stream_global - > outgoing_window - = window_delta ;
if ( stream_global - > write_state = = GRPC_WRITE_STATE_QUEUED_CLOSE & &
stream_global - > outgoing_sopb - > nops = = 0 ) {
if ( ! transport_global - > is_client & & ! stream_global - > read_closed ) {
stream_writing - > send_closed = GRPC_SEND_CLOSED_WITH_RST_STREAM ;
} else {
stream_writing - > send_closed = GRPC_SEND_CLOSED ;
}
}
}
if ( stream_writing - > sopb . nops > 0 | |
stream_writing - > send_closed ! = GRPC_DONT_SEND_CLOSED ) {
grpc_chttp2_list_add_writing_stream ( transport_writing , stream_writing ) ;
}
if ( stream_global - > outgoing_window > 0 & &
stream_global - > outgoing_sopb - > nops ! = 0 ) {
grpc_chttp2_list_add_writable_stream ( transport_global , stream_global ) ;
if ( stream_global - > outgoing_window > 0 & &
stream_global - > outgoing_sopb - > nops ! = 0 ) {
grpc_chttp2_list_add_writable_stream ( transport_global , stream_global ) ;
if ( first_reinserted_stream = = NULL & &
transport_global - > outgoing_window = = 0 ) {
first_reinserted_stream = stream_global ;
}
}
}
}
/* for each grpc_chttp2_stream that wants to update its window, add that
* window here */
while ( grpc_chttp2_list_pop_writable_window_update_stream ( transport_global ,
transport_writing ,
& stream_global ,
& stream_writing ) ) {
stream_writing - > id = stream_global - > id ;
if ( ! stream_global - > read_closed & & stream_global - > unannounced_incoming_window > 0 ) {
stream_writing - > announce_window = stream_global - > unannounced_incoming_window ;
GRPC_CHTTP2_FLOWCTL_TRACE_STREAM ( " write " , transport_global , stream_global ,
@ -119,6 +122,11 @@ int grpc_chttp2_unlocking_check_writes(
stream_global - > unannounced_incoming_window = 0 ;
grpc_chttp2_list_add_incoming_window_updated ( transport_global ,
stream_global ) ;
stream_global - > writing_now = 1 ;
grpc_chttp2_list_add_writing_stream ( transport_writing , stream_writing ) ;
} else if ( stream_writing - > sopb . nops > 0 | |
stream_writing - > send_closed ! = GRPC_DONT_SEND_CLOSED ) {
stream_global - > writing_now = 1 ;
grpc_chttp2_list_add_writing_stream ( transport_writing , stream_writing ) ;
}
}
@ -206,6 +214,8 @@ void grpc_chttp2_cleanup_writing(
while ( grpc_chttp2_list_pop_written_stream (
transport_global , transport_writing , & stream_global , & stream_writing ) ) {
GPR_ASSERT ( stream_global - > writing_now ) ;
stream_global - > writing_now = 0 ;
if ( stream_global - > outgoing_sopb ! = NULL & &
stream_global - > outgoing_sopb - > nops = = 0 ) {
stream_global - > outgoing_sopb = NULL ;
@ -219,6 +229,9 @@ void grpc_chttp2_cleanup_writing(
}
grpc_chttp2_list_add_read_write_state_changed ( transport_global ,
stream_global ) ;
} else if ( stream_global - > read_closed ) {
grpc_chttp2_list_add_read_write_state_changed ( transport_global ,
stream_global ) ;
}
}
transport_writing - > outbuf . count = 0 ;