@ -71,6 +71,13 @@ typedef struct stream stream;
typedef enum {
/* streams that have pending writes */
WRITABLE = 0 ,
/* streams that have been selected to be written */
WRITING ,
/* streams that have just been written, and included a close */
WRITTEN_CLOSED ,
/* streams that have been cancelled and have some pending state updates
to perform */
CANCELLED ,
/* streams that want to send window updates */
WINDOW_UPDATE ,
/* streams that are waiting to start because there are too many concurrent
@ -258,7 +265,12 @@ struct stream {
gpr_uint32 outgoing_window ;
gpr_uint32 incoming_window ;
gpr_uint8 write_closed ;
/* when the application requests writes be closed, the write_closed is
' queued ' ; when the close is flow controlled into the send path , we are
' sending ' it ; when the write has been performed it is ' sent ' */
gpr_uint8 queued_write_closed ;
gpr_uint8 sending_write_closed ;
gpr_uint8 sent_write_closed ;
gpr_uint8 read_closed ;
gpr_uint8 cancelled ;
gpr_uint8 allow_window_updates ;
@ -267,7 +279,10 @@ struct stream {
stream_link links [ STREAM_LIST_COUNT ] ;
gpr_uint8 included [ STREAM_LIST_COUNT ] ;
/* sops from application */
grpc_stream_op_buffer outgoing_sopb ;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb ;
grpc_chttp2_data_parser parser ;
@ -284,7 +299,7 @@ static int prepare_callbacks(transport *t);
static void run_callbacks ( transport * t ) ;
static int prepare_write ( transport * t ) ;
static void finish_write ( void * t , grpc_endpoint_cb_status status ) ;
static void perform_write ( transport * t , grpc_endpoint * ep ) ;
static void lock ( transport * t ) ;
static void unlock ( transport * t ) ;
@ -303,6 +318,7 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
static void cancel_stream ( transport * t , stream * s ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code , int send_rst ) ;
static void finalize_cancellations ( transport * t ) ;
static stream * lookup_stream ( transport * t , gpr_uint32 id ) ;
static void remove_from_stream_map ( transport * t , stream * s ) ;
static void maybe_start_some_streams ( transport * t ) ;
@ -518,7 +534,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t - > settings [ PEER_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > incoming_window =
t - > settings [ SENT_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > write_closed = 0 ;
s - > queued_write_closed = 0 ;
s - > sending_write_closed = 0 ;
s - > sent_write_closed = 0 ;
s - > read_closed = 0 ;
s - > cancelled = 0 ;
s - > allow_window_updates = 0 ;
@ -526,8 +544,9 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
memset ( & s - > links , 0 , sizeof ( s - > links ) ) ;
memset ( & s - > included , 0 , sizeof ( s - > included ) ) ;
grpc_sopb_init ( & s - > outgoing_sopb ) ;
grpc_chttp2_data_parser _init ( & s - > parser ) ;
grpc_sopb _init ( & s - > writing_sopb ) ;
grpc_sopb_init ( & s - > callback_sopb ) ;
grpc_chttp2_data_parser_init ( & s - > parser ) ;
if ( ! server_data ) {
unlock ( t ) ;
@ -565,8 +584,9 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock ( & t - > mu ) ;
grpc_sopb_destroy ( & s - > outgoing_sopb ) ;
grpc_chttp2_data_parser _destroy ( & s - > parser ) ;
grpc_sopb _destroy ( & s - > writing_sopb ) ;
grpc_sopb_destroy ( & s - > callback_sopb ) ;
grpc_chttp2_data_parser_destroy ( & s - > parser ) ;
unref_transport ( t ) ;
}
@ -575,6 +595,10 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
* LIST MANAGEMENT
*/
static int stream_list_empty ( transport * t , stream_list_id id ) {
return t - > lists [ id ] . head = = NULL ;
}
static stream * stream_list_remove_head ( transport * t , stream_list_id id ) {
stream * s = t - > lists [ id ] . head ;
if ( s ) {
@ -666,6 +690,10 @@ static void unlock(transport *t) {
}
}
if ( ! t - > writing ) {
finalize_cancellations ( t ) ;
}
/* gather any callbacks that need to be made */
if ( ! t - > calling_back & & t - > cb ) {
perform_callbacks = prepare_callbacks ( t ) ;
@ -709,53 +737,9 @@ static void unlock(transport *t) {
}
/* write some bytes if necessary */
while ( start_write ) {
switch ( grpc_endpoint_write ( ep , t - > outbuf . slices , t - > outbuf . count ,
finish_write , t ) ) {
case GRPC_ENDPOINT_WRITE_DONE :
/* grab the lock directly without wrappers since we just want to
continue writes if we loop : no need to check read callbacks again */
gpr_mu_lock ( & t - > mu ) ;
t - > outbuf . count = 0 ;
t - > outbuf . length = 0 ;
t - > writing = start_write = prepare_write ( t ) ;
if ( ! start_write ) {
if ( ! t - > reading ) {
grpc_endpoint_destroy ( t - > ep ) ;
t - > ep = NULL ;
gpr_cv_broadcast ( & t - > cv ) ;
/* endpoint ref: safe because we'll still have the ref for write */
unref_transport ( t ) ;
}
}
gpr_mu_unlock ( & t - > mu ) ;
if ( ! start_write ) {
unref_transport ( t ) ;
}
break ;
case GRPC_ENDPOINT_WRITE_ERROR :
start_write = 0 ;
/* use the wrapper lock/unlock here as we drop_connection, causing
read callbacks to be queued ( which will be cleared during unlock ) */
lock ( t ) ;
t - > outbuf . count = 0 ;
t - > outbuf . length = 0 ;
t - > writing = 0 ;
drop_connection ( t ) ;
if ( ! t - > reading ) {
grpc_endpoint_destroy ( t - > ep ) ;
t - > ep = NULL ;
gpr_cv_broadcast ( & t - > cv ) ;
/* endpoint ref: safe because we'll still have the ref for write */
unref_transport ( t ) ;
}
unlock ( t ) ;
unref_transport ( t ) ;
break ;
case GRPC_ENDPOINT_WRITE_PENDING :
start_write = 0 ;
break ;
}
if ( start_write ) {
/* ultimately calls unref_transport(t); and clears t->writing */
perform_write ( t , ep ) ;
}
if ( perform_callbacks | | call_closed | | num_goaways ) {
@ -788,32 +772,10 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
}
}
static void finish_write ( void * tp , grpc_endpoint_cb_status error ) {
transport * t = tp ;
lock ( t ) ;
if ( error ! = GRPC_ENDPOINT_CB_OK ) {
drop_connection ( t ) ;
}
t - > outbuf . count = 0 ;
t - > outbuf . length = 0 ;
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t - > writing = 0 ;
if ( ! t - > reading ) {
grpc_endpoint_destroy ( t - > ep ) ;
t - > ep = NULL ;
gpr_cv_broadcast ( & t - > cv ) ;
unref_transport ( t ) ; /* safe because we'll still have the ref for write */
}
unlock ( t ) ;
unref_transport ( t ) ;
}
static int prepare_write ( transport * t ) {
stream * s ;
gpr_slice_buffer tempbuf ;
gpr_uint32 window_delta ;
/* simple writes are queued to qbuf, and flushed here */
tempbuf = t - > qbuf ;
@ -834,17 +796,16 @@ static int prepare_write(transport *t) {
/* for each stream that's become writable, frame it's data (according to
available window sizes ) and add to the output buffer */
while ( t - > outgoing_window & & ( s = stream_list_remove_head ( t , WRITABLE ) ) ) {
gpr_uint32 written = grpc_chttp2_encode_some (
s - > outgoing_sopb . ops , & s - > outgoing_sopb . nops , s - > write_closed ,
& t - > outbuf , GPR_MIN ( t - > outgoing_window , s - > outgoing_window ) , s - > id ,
& t - > hpack_compressor ) ;
t - > outgoing_window - = written ;
s - > outgoing_window - = written ;
/* if there are no more writes to do and writes are closed, we need to
queue a callback to let the application know */
if ( s - > write_closed & & s - > outgoing_sopb . nops = = 0 ) {
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
window_delta = grpc_chttp2_preencode (
s - > outgoing_sopb . ops , & s - > outgoing_sopb . nops ,
GPR_MIN ( t - > outgoing_window , s - > outgoing_window ) , & s - > writing_sopb ) ;
t - > outgoing_window - = window_delta ;
s - > outgoing_window - = window_delta ;
s - > sending_write_closed =
s - > queued_write_closed & & s - > outgoing_sopb . nops = = 0 ;
if ( s - > writing_sopb . nops > 0 | | s - > sending_write_closed ) {
stream_list_join ( t , s , WRITING ) ;
}
/* if there are still writes to do and the stream still has window
@ -857,25 +818,89 @@ static int prepare_write(transport *t) {
/* for each stream that wants to update its window, add that window here */
while ( ( s = stream_list_remove_head ( t , WINDOW_UPDATE ) ) ) {
gpr_uint32 window_add =
window_delta =
t - > settings [ LOCAL_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] -
s - > incoming_window ;
if ( ! s - > read_closed & & window_add ) {
gpr_slice_buffer_add ( & t - > outbuf ,
grpc_chttp2_window_update_create ( s - > id , window_add ) ) ;
s - > incoming_window + = window_add ;
if ( ! s - > read_closed & & window_delta ) {
gpr_slice_buffer_add (
& t - > outbuf , grpc_chttp2_window_update_create ( s - > id , window_delta ) ) ;
s - > incoming_window + = window_delta ;
}
}
/* if the transport is ready to send a window update, do so here also */
if ( t - > incoming_window < t - > connection_window_target * 3 / 4 ) {
gpr_uint32 window_add = t - > connection_window_target - t - > incoming_window ;
window_delta = t - > connection_window_target - t - > incoming_window ;
gpr_slice_buffer_add ( & t - > outbuf ,
grpc_chttp2_window_update_create ( 0 , window_add ) ) ;
t - > incoming_window + = window_add ;
grpc_chttp2_window_update_create ( 0 , window_delta ) ) ;
t - > incoming_window + = window_delta ;
}
return t - > outbuf . length > 0 ;
return t - > outbuf . length > 0 | | ! stream_list_empty ( t , WRITING ) ;
}
static void finalize_outbuf ( transport * t ) {
stream * s ;
while ( ( s = stream_list_remove_head ( t , WRITING ) ) ) {
grpc_chttp2_encode ( s - > writing_sopb . ops , s - > writing_sopb . nops ,
s - > sending_write_closed , s - > id , & t - > hpack_compressor ,
& t - > outbuf ) ;
s - > writing_sopb . nops = 0 ;
if ( s - > sending_write_closed ) {
stream_list_join ( t , s , WRITTEN_CLOSED ) ;
}
}
}
static void finish_write_common ( transport * t , int success ) {
stream * s ;
lock ( t ) ;
if ( ! success ) {
drop_connection ( t ) ;
}
while ( ( s = stream_list_remove_head ( t , WRITTEN_CLOSED ) ) ) {
s - > sent_write_closed = 1 ;
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
}
t - > outbuf . count = 0 ;
t - > outbuf . length = 0 ;
/* leave the writing flag up on shutdown to prevent further writes in unlock()
from starting */
t - > writing = 0 ;
if ( ! t - > reading ) {
grpc_endpoint_destroy ( t - > ep ) ;
t - > ep = NULL ;
gpr_cv_broadcast ( & t - > cv ) ;
unref_transport ( t ) ; /* safe because we'll still have the ref for write */
}
unlock ( t ) ;
unref_transport ( t ) ;
}
static void finish_write ( void * tp , grpc_endpoint_cb_status error ) {
transport * t = tp ;
finish_write_common ( t , error = = GRPC_ENDPOINT_CB_OK ) ;
}
static void perform_write ( transport * t , grpc_endpoint * ep ) {
finalize_outbuf ( t ) ;
GPR_ASSERT ( t - > outbuf . count > 0 ) ;
switch ( grpc_endpoint_write ( ep , t - > outbuf . slices , t - > outbuf . count ,
finish_write , t ) ) {
case GRPC_ENDPOINT_WRITE_DONE :
finish_write_common ( t , 1 ) ;
break ;
case GRPC_ENDPOINT_WRITE_ERROR :
finish_write_common ( t , 0 ) ;
break ;
case GRPC_ENDPOINT_WRITE_PENDING :
break ;
}
}
static void maybe_start_some_streams ( transport * t ) {
@ -901,19 +926,14 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
lock ( t ) ;
if ( is_last ) {
s - > write_closed = 1 ;
s - > queued_ write_closed = 1 ;
}
if ( ! s - > cancelled ) {
grpc_sopb_append ( & s - > outgoing_sopb , ops , ops_count ) ;
if ( is_last & & s - > outgoing_sopb . nops = = 0 ) {
if ( s - > id ! = 0 ) {
gpr_slice_buffer_add ( & t - > qbuf ,
grpc_chttp2_data_frame_create_empty_close ( s - > id ) ) ;
}
} else if ( s - > id = = 0 ) {
if ( s - > id = = 0 ) {
stream_list_join ( t , s , WAITING_FOR_CONCURRENCY ) ;
maybe_start_some_streams ( t ) ;
} else if ( s - > outgoing_window ) {
} else {
stream_list_join ( t , s , WRITABLE ) ;
}
} else {
@ -967,12 +987,22 @@ static void send_ping(grpc_transport *gt, void (*cb)(void *user_data),
* INPUT PROCESSING
*/
static void finalize_cancellations ( transport * t ) {
stream * s ;
while ( ( s = stream_list_remove_head ( t , CANCELLED ) ) ) {
s - > read_closed = 1 ;
s - > sent_write_closed = 1 ;
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
}
}
static void cancel_stream_inner ( transport * t , stream * s , gpr_uint32 id ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code ,
int send_rst ) {
char buffer [ 32 ] ;
int had_outgoing ;
char buffer [ 32 ] ;
if ( s ) {
/* clear out any unreported input & output: nobody cares anymore */
@ -981,10 +1011,9 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_sopb_reset ( & s - > outgoing_sopb ) ;
if ( s - > cancelled ) {
send_rst = 0 ;
} else if ( ! s - > read_closed | | ! s - > write_closed | | had_outgoing ) {
} else if ( ! s - > read_closed | | ! s - > sent_ write_closed | | had_outgoing ) {
s - > cancelled = 1 ;
s - > read_closed = 1 ;
s - > write_closed = 1 ;
stream_list_join ( t , s , CANCELLED ) ;
sprintf ( buffer , " %d " , local_status ) ;
grpc_sopb_add_metadata (
@ -1667,8 +1696,7 @@ static int prepare_callbacks(transport *t) {
s - > parser . incoming_sopb = s - > callback_sopb ;
s - > callback_sopb = temp_sopb ;
s - > callback_state = compute_state (
s - > write_closed & & s - > outgoing_sopb . nops = = 0 , s - > read_closed ) ;
s - > callback_state = compute_state ( s - > sent_write_closed , s - > read_closed ) ;
if ( s - > callback_state = = GRPC_STREAM_CLOSED ) {
remove_from_stream_map ( t , s ) ;
if ( s - > published_close ) {