@ -91,10 +91,9 @@ typedef enum {
/* streams that are waiting to start because there are too many concurrent
streams on the connection */
WAITING_FOR_CONCURRENCY ,
/* streams that want to callback the application */
PENDING_CALLBACKS ,
/* streams that *ARE* calling back to the application */
EXECUTING_CALLBACKS ,
/* streams that have finished reading: we wait until unlock to coalesce
all changes into one callback */
FINISHED_READ_OP ,
STREAM_LIST_COUNT /* must be last */
} stream_list_id ;
@ -141,6 +140,12 @@ typedef enum {
DTS_FRAME
} deframe_transport_state ;
typedef enum {
WRITE_STATE_OPEN ,
WRITE_STATE_QUEUED_CLOSE ,
WRITE_STATE_SENT_CLOSE
} WRITE_STATE ;
typedef struct {
stream * head ;
stream * tail ;
@ -182,6 +187,18 @@ typedef struct {
gpr_slice debug ;
} pending_goaway ;
typedef struct {
void ( * cb ) ( void * user_data , int success ) ;
void * user_data ;
int success ;
} op_closure ;
typedef struct {
op_closure * callbacks ;
size_t count ;
size_t capacity ;
} op_closure_array ;
struct transport {
grpc_transport base ; /* must be first */
const grpc_transport_callbacks * cb ;
@ -202,6 +219,10 @@ struct transport {
gpr_uint8 closed ;
error_state error_state ;
/* queued callbacks */
op_closure_array pending_callbacks ;
op_closure_array executing_callbacks ;
/* stream indexing */
gpr_uint32 next_stream_id ;
gpr_uint32 last_incoming_stream_id ;
@ -281,13 +302,13 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
' queued ' ; when the close is flow controlled into the send path , we are
' sending ' it ; when the write has been performed it is ' sent ' */
gpr_uint8 queued_write_closed ;
gpr_uint8 sending_write_closed ;
gpr_uint8 sent_write_closed ;
WRITE_STATE write_state ;
gpr_uint8 send_closed ;
gpr_uint8 read_closed ;
gpr_uint8 cancelled ;
gpr_uint8 allow_window_updates ;
gpr_uint8 published_close ;
op_closure send_done_closure ;
op_closure recv_done_closure ;
stream_link links [ STREAM_LIST_COUNT ] ;
gpr_uint8 included [ STREAM_LIST_COUNT ] ;
@ -296,10 +317,14 @@ struct stream {
grpc_linked_mdelem * incoming_metadata ;
size_t incoming_metadata_count ;
size_t incoming_metadata_capacity ;
grpc_linked_mdelem * old_incoming_metadata ;
gpr_timespec incoming_deadline ;
/* sops from application */
grpc_stream_op_buffer outgoing_sopb ;
grpc_stream_op_buffer * outgoing_sopb ;
grpc_stream_op_buffer * incoming_sopb ;
grpc_stream_state * publish_state ;
grpc_stream_state published_state ;
/* sops that have passed flow control to be written */
grpc_stream_op_buffer writing_sopb ;
@ -337,7 +362,8 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
grpc_chttp2_error_code error_code , int send_rst ) ;
static void cancel_stream ( transport * t , stream * s ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code , int send_rst ) ;
grpc_chttp2_error_code error_code ,
grpc_mdstr * optional_message , int send_rst ) ;
static void finalize_cancellations ( transport * t ) ;
static stream * lookup_stream ( transport * t , gpr_uint32 id ) ;
static void remove_from_stream_map ( transport * t , stream * s ) ;
@ -348,6 +374,14 @@ static void become_skip_parser(transport *t);
static void recv_data ( void * tp , gpr_slice * slices , size_t nslices ,
grpc_endpoint_cb_status error ) ;
static void schedule_cb ( transport * t , op_closure closure , int success ) ;
static void maybe_finish_read ( transport * t , stream * s ) ;
static void maybe_join_window_updates ( transport * t , stream * s ) ;
static void finish_reads ( transport * t ) ;
static void add_to_pollset_locked ( transport * t , grpc_pollset * pollset ) ;
static void perform_op_locked ( transport * t , stream * s , grpc_transport_op * op ) ;
static void add_metadata_batch ( transport * t , stream * s ) ;
/*
* CONSTRUCTION / DESTRUCTION / REFCOUNTING
*/
@ -387,6 +421,9 @@ static void destruct_transport(transport *t) {
}
gpr_free ( t - > pings ) ;
gpr_free ( t - > pending_callbacks . callbacks ) ;
gpr_free ( t - > executing_callbacks . callbacks ) ;
for ( i = 0 ; i < t - > num_pending_goaways ; i + + ) {
gpr_slice_unref ( t - > pending_goaways [ i ] . debug ) ;
}
@ -416,6 +453,8 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
GPR_ASSERT ( strlen ( CLIENT_CONNECT_STRING ) = = CLIENT_CONNECT_STRLEN ) ;
memset ( t , 0 , sizeof ( * t ) ) ;
t - > base . vtable = & vtable ;
t - > ep = ep ;
/* one ref is for destroy, the other for when ep becomes NULL */
@ -427,27 +466,16 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
t - > str_grpc_timeout =
grpc_mdstr_from_string ( t - > metadata_context , " grpc-timeout " ) ;
t - > reading = 1 ;
t - > writing = 0 ;
t - > error_state = ERROR_STATE_NONE ;
t - > next_stream_id = is_client ? 1 : 2 ;
t - > last_incoming_stream_id = 0 ;
t - > destroying = 0 ;
t - > closed = 0 ;
t - > is_client = is_client ;
t - > outgoing_window = DEFAULT_WINDOW ;
t - > incoming_window = DEFAULT_WINDOW ;
t - > connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET ;
t - > deframe_state = is_client ? DTS_FH_0 : DTS_CLIENT_PREFIX_0 ;
t - > expect_continuation_stream_id = 0 ;
t - > pings = NULL ;
t - > ping_count = 0 ;
t - > ping_capacity = 0 ;
t - > ping_counter = gpr_now ( ) . tv_nsec ;
grpc_chttp2_hpack_compressor_init ( & t - > hpack_compressor , mdctx ) ;
grpc_chttp2_goaway_parser_init ( & t - > goaway_parser ) ;
t - > pending_goaways = NULL ;
t - > num_pending_goaways = 0 ;
t - > cap_pending_goaways = 0 ;
gpr_slice_buffer_init ( & t - > outbuf ) ;
gpr_slice_buffer_init ( & t - > qbuf ) ;
grpc_sopb_init ( & t - > nuke_later_sopb ) ;
@ -462,7 +490,6 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
needed .
TODO ( ctiller ) : tune this */
grpc_chttp2_stream_map_init ( & t - > stream_map , 8 ) ;
memset ( & t - > lists , 0 , sizeof ( t - > lists ) ) ;
/* copy in initial settings to all setting sets */
for ( i = 0 ; i < NUM_SETTING_SETS ; i + + ) {
@ -503,7 +530,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock ( & t - > mu ) ;
t - > calling_back = 1 ;
ref_transport ( t ) ;
ref_transport ( t ) ; /* matches unref at end of this function */
gpr_mu_unlock ( & t - > mu ) ;
sr = setup ( arg , & t - > base , t - > metadata_context ) ;
@ -515,7 +542,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if ( t - > destroying ) gpr_cv_signal ( & t - > cv ) ;
unlock ( t ) ;
ref_transport ( t ) ;
ref_transport ( t ) ; /* matches unref inside recv_data */
recv_data ( t , slices , nslices , GRPC_ENDPOINT_CB_OK ) ;
unref_transport ( t ) ;
@ -573,16 +600,19 @@ static void goaway(grpc_transport *gt, grpc_status_code status,
}
static int init_stream ( grpc_transport * gt , grpc_stream * gs ,
const void * server_data ) {
const void * server_data , grpc_transport_op * initial_op ) {
transport * t = ( transport * ) gt ;
stream * s = ( stream * ) gs ;
memset ( s , 0 , sizeof ( * s ) ) ;
ref_transport ( t ) ;
if ( ! server_data ) {
lock ( t ) ;
s - > id = 0 ;
} else {
/* already locked */
s - > id = ( gpr_uint32 ) ( gpr_uintptr ) server_data ;
t - > incoming_stream = s ;
grpc_chttp2_stream_map_add ( & t - > stream_map , s - > id , s ) ;
@ -592,24 +622,13 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
t - > settings [ PEER_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > incoming_window =
t - > settings [ SENT_SETTINGS ] [ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] ;
s - > queued_write_closed = 0 ;
s - > sending_write_closed = 0 ;
s - > sent_write_closed = 0 ;
s - > read_closed = 0 ;
s - > cancelled = 0 ;
s - > allow_window_updates = 0 ;
s - > published_close = 0 ;
s - > incoming_metadata_count = 0 ;
s - > incoming_metadata_capacity = 0 ;
s - > incoming_metadata = NULL ;
s - > incoming_deadline = gpr_inf_future ;
memset ( & s - > links , 0 , sizeof ( s - > links ) ) ;
memset ( & s - > included , 0 , sizeof ( s - > included ) ) ;
grpc_sopb_init ( & s - > outgoing_sopb ) ;
grpc_sopb_init ( & s - > writing_sopb ) ;
grpc_sopb_init ( & s - > callback_sopb ) ;
grpc_chttp2_data_parser_init ( & s - > parser ) ;
if ( initial_op ) perform_op_locked ( t , s , initial_op ) ;
if ( ! server_data ) {
unlock ( t ) ;
}
@ -642,10 +661,16 @@ static void destroy_stream(grpc_transport *gt, grpc_stream *gs) {
gpr_mu_unlock ( & t - > mu ) ;
grpc_sopb_destroy ( & s - > outgoing_sopb ) ;
GPR_ASSERT ( s - > outgoing_sopb = = NULL ) ;
GPR_ASSERT ( s - > incoming_sopb = = NULL ) ;
grpc_sopb_destroy ( & s - > writing_sopb ) ;
grpc_sopb_destroy ( & s - > callback_sopb ) ;
grpc_chttp2_data_parser_destroy ( & s - > parser ) ;
for ( i = 0 ; i < s - > incoming_metadata_count ; i + + ) {
grpc_mdelem_unref ( s - > incoming_metadata [ i ] . md ) ;
}
gpr_free ( s - > incoming_metadata ) ;
gpr_free ( s - > old_incoming_metadata ) ;
unref_transport ( t ) ;
}
@ -708,8 +733,6 @@ static void stream_list_add_tail(transport *t, stream *s, stream_list_id id) {
}
static void stream_list_join ( transport * t , stream * s , stream_list_id id ) {
if ( id = = PENDING_CALLBACKS )
GPR_ASSERT ( t - > cb ! = NULL | | t - > error_state = = ERROR_STATE_NONE ) ;
if ( s - > included [ id ] ) {
return ;
}
@ -718,6 +741,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map ( transport * t , stream * s ) {
if ( s - > id = = 0 ) return ;
IF_TRACING ( gpr_log ( GPR_DEBUG , " HTTP:%s: Removing stream %d " ,
t - > is_client ? " CLI " : " SVR " , s - > id ) ) ;
if ( grpc_chttp2_stream_map_delete ( & t - > stream_map , s - > id ) ) {
maybe_start_some_streams ( t ) ;
}
@ -762,6 +787,8 @@ static void unlock(transport *t) {
finalize_cancellations ( t ) ;
}
finish_reads ( t ) ;
/* gather any callbacks that need to be made */
if ( ! t - > calling_back & & cb ) {
perform_callbacks = prepare_callbacks ( t ) ;
@ -865,21 +892,24 @@ static int prepare_write(transport *t) {
while ( t - > outgoing_window & & ( s = stream_list_remove_head ( t , WRITABLE ) ) & &
s - > outgoing_window > 0 ) {
window_delta = grpc_chttp2_preencode (
s - > outgoing_sopb . ops , & s - > outgoing_sopb . nops ,
s - > outgoing_sopb - > ops , & s - > outgoing_sopb - > nops ,
GPR_MIN ( t - > outgoing_window , s - > outgoing_window ) , & s - > writing_sopb ) ;
t - > outgoing_window - = window_delta ;
s - > outgoing_window - = window_delta ;
s - > sending_write_closed =
s - > queued_write_closed & & s - > outgoing_sopb . nops = = 0 ;
if ( s - > writing_sopb . nops > 0 | | s - > sending_write_closed ) {
if ( s - > write_state = = WRITE_STATE_QUEUED_CLOSE & &
s - > outgoing_sopb - > nops = = 0 ) {
s - > send_closed = 1 ;
}
if ( s - > writing_sopb . nops > 0 | | s - > send_closed ) {
stream_list_join ( t , s , WRITING ) ;
}
/* if there are still writes to do and the stream still has window
available , then schedule a further write */
if ( s - > outgoing_sopb . nops > 0 & & s - > outgoing_window > 0 ) {
GPR_ASSERT ( ! t - > outgoing_window ) ;
/* we should either exhaust window or have no ops left, but not both */
if ( s - > outgoing_sopb - > nops = = 0 ) {
s - > outgoing_sopb = NULL ;
schedule_cb ( t , s - > send_done_closure , 1 ) ;
} else if ( s - > outgoing_window ) {
stream_list_add_tail ( t , s , WRITABLE ) ;
}
}
@ -912,10 +942,9 @@ static void finalize_outbuf(transport *t) {
while ( ( s = stream_list_remove_head ( t , WRITING ) ) ) {
grpc_chttp2_encode ( s - > writing_sopb . ops , s - > writing_sopb . nops ,
s - > sending_write_closed , s - > id , & t - > hpack_compressor ,
& t - > outbuf ) ;
s - > send_closed , s - > id , & t - > hpack_compressor , & t - > outbuf ) ;
s - > writing_sopb . nops = 0 ;
if ( s - > sending_write _closed ) {
if ( s - > send_closed ) {
stream_list_join ( t , s , WRITTEN_CLOSED ) ;
}
}
@ -929,8 +958,10 @@ static void finish_write_common(transport *t, int success) {
drop_connection ( t ) ;
}
while ( ( s = stream_list_remove_head ( t , WRITTEN_CLOSED ) ) ) {
s - > sent_write_closed = 1 ;
if ( ! s - > cancelled ) stream_list_join ( t , s , PENDING_CALLBACKS ) ;
s - > write_state = WRITE_STATE_SENT_CLOSE ;
if ( 1 | | ! s - > cancelled ) {
maybe_finish_read ( t , s ) ;
}
}
t - > outbuf . count = 0 ;
t - > outbuf . length = 0 ;
@ -980,6 +1011,9 @@ static void maybe_start_some_streams(transport *t) {
stream * s = stream_list_remove_head ( t , WAITING_FOR_CONCURRENCY ) ;
if ( ! s ) break ;
IF_TRACING ( gpr_log ( GPR_DEBUG , " HTTP:%s: Allocating new stream %p to id %d " ,
t - > is_client ? " CLI " : " SVR " , s , t - > next_stream_id ) ) ;
GPR_ASSERT ( s - > id = = 0 ) ;
s - > id = t - > next_stream_id ;
t - > next_stream_id + = 2 ;
@ -988,43 +1022,63 @@ static void maybe_start_some_streams(transport *t) {
}
}
static void send_batch ( grpc_transport * gt , grpc_stream * gs , grpc_stream_op * ops ,
size_t ops_count , int is_last ) {
transport * t = ( transport * ) gt ;
stream * s = ( stream * ) gs ;
lock ( t ) ;
if ( is_last ) {
s - > queued_write_closed = 1 ;
}
if ( ! s - > cancelled ) {
grpc_sopb_append ( & s - > outgoing_sopb , ops , ops_count ) ;
if ( s - > id = = 0 ) {
stream_list_join ( t , s , WAITING_FOR_CONCURRENCY ) ;
maybe_start_some_streams ( t ) ;
static void perform_op_locked ( transport * t , stream * s , grpc_transport_op * op ) {
if ( op - > cancel_with_status ! = GRPC_STATUS_OK ) {
cancel_stream (
t , s , op - > cancel_with_status ,
grpc_chttp2_grpc_status_to_http2_error ( op - > cancel_with_status ) ,
op - > cancel_message , 1 ) ;
}
if ( op - > send_ops ) {
GPR_ASSERT ( s - > outgoing_sopb = = NULL ) ;
s - > send_done_closure . cb = op - > on_done_send ;
s - > send_done_closure . user_data = op - > send_user_data ;
if ( ! s - > cancelled ) {
s - > outgoing_sopb = op - > send_ops ;
if ( op - > is_last_send & & s - > write_state = = WRITE_STATE_OPEN ) {
s - > write_state = WRITE_STATE_QUEUED_CLOSE ;
}
if ( s - > id = = 0 ) {
IF_TRACING ( gpr_log ( GPR_DEBUG ,
" HTTP:%s: New stream %p waiting for concurrency " ,
t - > is_client ? " CLI " : " SVR " , s ) ) ;
stream_list_join ( t , s , WAITING_FOR_CONCURRENCY ) ;
maybe_start_some_streams ( t ) ;
} else if ( s - > outgoing_window > 0 ) {
stream_list_join ( t , s , WRITABLE ) ;
}
} else {
stream_list_join ( t , s , WRITABLE ) ;
schedule_nuke_sopb ( t , op - > send_ops ) ;
schedule_cb ( t , s - > send_done_closure , 0 ) ;
}
} else {
grpc_sopb_append ( & t - > nuke_later_sopb , ops , ops_count ) ;
}
if ( is_last & & s - > outgoing_sopb . nops = = 0 & & s - > read_closed & &
! s - > published_close ) {
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
if ( op - > recv_ops ) {
GPR_ASSERT ( s - > incoming_sopb = = NULL ) ;
s - > recv_done_closure . cb = op - > on_done_recv ;
s - > recv_done_closure . user_data = op - > recv_user_data ;
s - > incoming_sopb = op - > recv_ops ;
s - > incoming_sopb - > nops = 0 ;
s - > publish_state = op - > recv_state ;
gpr_free ( s - > old_incoming_metadata ) ;
s - > old_incoming_metadata = NULL ;
maybe_finish_read ( t , s ) ;
maybe_join_window_updates ( t , s ) ;
}
unlock ( t ) ;
if ( op - > bind_pollset ) {
add_to_pollset_locked ( t , op - > bind_pollset ) ;
}
}
static void abort_stream ( grpc_transport * gt , grpc_stream * gs ,
grpc_status_code status ) {
static void perform_op ( grpc_transport * gt , grpc_stream * gs ,
grpc_transport_op * op ) {
transport * t = ( transport * ) gt ;
stream * s = ( stream * ) gs ;
lock ( t ) ;
cancel_stream ( t , s , status , grpc_chttp2_grpc_status_to_http2_error ( status ) ,
1 ) ;
perform_op_locked ( t , s , op ) ;
unlock ( t ) ;
}
@ -1063,8 +1117,8 @@ static void finalize_cancellations(transport *t) {
while ( ( s = stream_list_remove_head ( t , CANCELLED ) ) ) {
s - > read_closed = 1 ;
s - > sent_write_closed = 1 ;
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
s - > write_state = WRITE_STATE_SENT_CLOSE ;
maybe_finish_read ( t , s ) ;
}
}
@ -1082,18 +1136,24 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner ( transport * t , stream * s , gpr_uint32 id ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code ,
int send_rst ) {
grpc_mdstr * optional_message , int send_rst ) {
int had_outgoing ;
char buffer [ GPR_LTOA_MIN_BUFSIZE ] ;
if ( s ) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s - > outgoing_sopb . nops ! = 0 ;
had_outgoing = s - > outgoing_sopb & & s - > outgoing_sopb - > nops ! = 0 ;
schedule_nuke_sopb ( t , & s - > parser . incoming_sopb ) ;
schedule_nuke_sopb ( t , & s - > outgoing_sopb ) ;
if ( s - > outgoing_sopb ) {
schedule_nuke_sopb ( t , s - > outgoing_sopb ) ;
s - > outgoing_sopb = NULL ;
stream_list_remove ( t , s , WRITABLE ) ;
schedule_cb ( t , s - > send_done_closure , 0 ) ;
}
if ( s - > cancelled ) {
send_rst = 0 ;
} else if ( ! s - > read_closed | | ! s - > sent_write_closed | | had_outgoing ) {
} else if ( ! s - > read_closed | | s - > write_state ! = WRITE_STATE_SENT_CLOSE | |
had_outgoing ) {
s - > cancelled = 1 ;
stream_list_join ( t , s , CANCELLED ) ;
@ -1101,17 +1161,26 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
add_incoming_metadata (
t , s ,
grpc_mdelem_from_strings ( t - > metadata_context , " grpc-status " , buffer ) ) ;
switch ( local_status ) {
case GRPC_STATUS_CANCELLED :
add_incoming_metadata (
t , s , grpc_mdelem_from_strings ( t - > metadata_context ,
" grpc-message " , " Cancelled " ) ) ;
break ;
default :
break ;
if ( ! optional_message ) {
switch ( local_status ) {
case GRPC_STATUS_CANCELLED :
add_incoming_metadata (
t , s , grpc_mdelem_from_strings ( t - > metadata_context ,
" grpc-message " , " Cancelled " ) ) ;
break ;
default :
break ;
}
} else {
add_incoming_metadata (
t , s ,
grpc_mdelem_from_metadata_strings (
t - > metadata_context ,
grpc_mdstr_from_string ( t - > metadata_context , " grpc-message " ) ,
grpc_mdstr_ref ( optional_message ) ) ) ;
}
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
add_metadata_batch ( t , s ) ;
maybe_finish_read ( t , s ) ;
}
}
if ( ! id ) send_rst = 0 ;
@ -1119,24 +1188,29 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
gpr_slice_buffer_add ( & t - > qbuf ,
grpc_chttp2_rst_stream_create ( id , error_code ) ) ;
}
if ( optional_message ) {
grpc_mdstr_unref ( optional_message ) ;
}
}
static void cancel_stream_id ( transport * t , gpr_uint32 id ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code , int send_rst ) {
cancel_stream_inner ( t , lookup_stream ( t , id ) , id , local_status , error_code ,
send_rst ) ;
NULL , send_rst ) ;
}
static void cancel_stream ( transport * t , stream * s ,
grpc_status_code local_status ,
grpc_chttp2_error_code error_code , int send_rst ) {
cancel_stream_inner ( t , s , s - > id , local_status , error_code , send_rst ) ;
grpc_chttp2_error_code error_code ,
grpc_mdstr * optional_message , int send_rst ) {
cancel_stream_inner ( t , s , s - > id , local_status , error_code , optional_message ,
send_rst ) ;
}
static void cancel_stream_cb ( void * user_data , gpr_uint32 id , void * stream ) {
cancel_stream ( user_data , stream , GRPC_STATUS_UNAVAILABLE ,
GRPC_CHTTP2_INTERNAL_ERROR , 0 ) ;
GRPC_CHTTP2_INTERNAL_ERROR , NULL , 0 ) ;
}
static void end_all_the_calls ( transport * t ) {
@ -1150,8 +1224,14 @@ static void drop_connection(transport *t) {
end_all_the_calls ( t ) ;
}
static void maybe_finish_read ( transport * t , stream * s ) {
if ( s - > incoming_sopb ) {
stream_list_join ( t , s , FINISHED_READ_OP ) ;
}
}
static void maybe_join_window_updates ( transport * t , stream * s ) {
if ( s - > allow_window_updates & &
if ( s - > incoming_sopb ! = NULL & &
s - > incoming_window <
t - > settings [ LOCAL_SETTINGS ]
[ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE ] *
@ -1160,21 +1240,6 @@ static void maybe_join_window_updates(transport *t, stream *s) {
}
}
static void set_allow_window_updates ( grpc_transport * tp , grpc_stream * sp ,
int allow ) {
transport * t = ( transport * ) tp ;
stream * s = ( stream * ) sp ;
lock ( t ) ;
s - > allow_window_updates = allow ;
if ( allow ) {
maybe_join_window_updates ( t , s ) ;
} else {
stream_list_remove ( t , s , WINDOW_UPDATE ) ;
}
unlock ( t ) ;
}
static grpc_chttp2_parse_error update_incoming_window ( transport * t , stream * s ) {
if ( t - > incoming_frame_size > t - > incoming_window ) {
gpr_log ( GPR_ERROR , " frame of size %d overflows incoming window of %d " ,
@ -1248,7 +1313,7 @@ static int init_data_frame_parser(transport *t) {
case GRPC_CHTTP2_STREAM_ERROR :
cancel_stream ( t , s , grpc_chttp2_http2_error_to_grpc_status (
GRPC_CHTTP2_INTERNAL_ERROR ) ,
GRPC_CHTTP2_INTERNAL_ERROR , 1 ) ;
GRPC_CHTTP2_INTERNAL_ERROR , NULL , 1 ) ;
return init_skip_frame ( t , 0 ) ;
case GRPC_CHTTP2_CONNECTION_ERROR :
drop_connection ( t ) ;
@ -1267,11 +1332,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT ( s ) ;
IF_TRACING ( gpr_log ( GPR_INFO , " HTTP:%d:HDR: %s: %s " , s - > id ,
grpc_mdstr_as_c_string ( md - > key ) ,
grpc_mdstr_as_c_string ( md - > value ) ) ) ;
IF_TRACING ( gpr_log (
GPR_INFO , " HTTP:%d:%s:HDR: %s: %s " , s - > id , t - > is_client ? " CLI " : " SVR " ,
grpc_mdstr_as_c_string ( md - > key ) , grpc_mdstr_as_c_string ( md - > value ) ) ) ;
stream_list_join ( t , s , PENDING_CALLBACKS ) ;
if ( md - > key = = t - > str_grpc_timeout ) {
gpr_timespec * cached_timeout = grpc_mdelem_get_user_data ( md , free_timeout ) ;
if ( ! cached_timeout ) {
@ -1290,6 +1354,7 @@ static void on_header(void *tp, grpc_mdelem *md) {
} else {
add_incoming_metadata ( t , s , md ) ;
}
maybe_finish_read ( t , s ) ;
}
static int init_header_frame_parser ( transport * t , int is_continuation ) {
@ -1327,7 +1392,10 @@ static int init_header_frame_parser(transport *t, int is_continuation) {
gpr_log ( GPR_ERROR ,
" ignoring out of order new stream request on server; last stream "
" id=%d, new stream id=%d " ,
t - > last_incoming_stream_id , t - > incoming_stream ) ;
t - > last_incoming_stream_id , t - > incoming_stream_id ) ;
return init_skip_frame ( t , 1 ) ;
} else if ( ( t - > incoming_stream_id & 1 ) = = 0 ) {
gpr_log ( GPR_ERROR , " ignoring stream with non-client generated index %d " , t - > incoming_stream_id ) ;
return init_skip_frame ( t , 1 ) ;
}
t - > incoming_stream = NULL ;
@ -1464,33 +1532,20 @@ static int is_window_update_legal(gpr_int64 window_update, gpr_int64 window) {
return window + window_update < MAX_WINDOW ;
}
static void free_md ( void * p , grpc_op_error result ) { gpr_free ( p ) ; }
static void add_metadata_batch ( transport * t , stream * s ) {
grpc_metadata_batch b ;
size_t i ;
b . list . head = & s - > incoming_metadata [ 0 ] ;
b . list . tail = & s - > incoming_metadata [ s - > incoming_metadata_count - 1 ] ;
b . list . head = NULL ;
/* Store away the last element of the list, so that in patch_metadata_ops
we can reconstitute the list .
We can ' t do list building here as later incoming metadata may reallocate
the underlying array . */
b . list . tail = ( void * ) ( gpr_intptr ) s - > incoming_metadata_count ;
b . garbage . head = b . garbage . tail = NULL ;
b . deadline = s - > incoming_deadline ;
for ( i = 1 ; i < s - > incoming_metadata_count ; i + + ) {
s - > incoming_metadata [ i ] . prev = & s - > incoming_metadata [ i - 1 ] ;
s - > incoming_metadata [ i - 1 ] . next = & s - > incoming_metadata [ i ] ;
}
s - > incoming_metadata [ 0 ] . prev = NULL ;
s - > incoming_metadata [ s - > incoming_metadata_count - 1 ] . next = NULL ;
s - > incoming_deadline = gpr_inf_future ;
grpc_sopb_add_metadata ( & s - > parser . incoming_sopb , b ) ;
grpc_sopb_add_flow_ctl_cb ( & s - > parser . incoming_sopb , free_md ,
s - > incoming_metadata ) ;
/* reset */
s - > incoming_deadline = gpr_inf_future ;
s - > incoming_metadata = NULL ;
s - > incoming_metadata_count = 0 ;
s - > incoming_metadata_capacity = 0 ;
}
static int parse_frame_slice ( transport * t , gpr_slice slice , int is_last ) {
@ -1501,14 +1556,14 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
case GRPC_CHTTP2_PARSE_OK :
if ( st . end_of_stream ) {
t - > incoming_stream - > read_closed = 1 ;
stream_list_join ( t , t - > incoming_stream , PENDING_CALLBACKS ) ;
maybe_finish_read ( t , t - > incoming_stream ) ;
}
if ( st . need_flush_reads ) {
stream_list_join ( t , t - > incoming_stream , PENDING_CALLBACKS ) ;
maybe_finish_read ( t , t - > incoming_stream ) ;
}
if ( st . metadata_boundary ) {
add_metadata_batch ( t , t - > incoming_stream ) ;
stream_list_join ( t , t - > incoming_stream , PENDING_CALLBACKS ) ;
maybe_finish_read ( t , t - > incoming_stream ) ;
}
if ( st . ack_settings ) {
gpr_slice_buffer_add ( & t - > qbuf , grpc_chttp2_settings_ack_create ( ) ) ;
@ -1545,11 +1600,11 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
}
if ( st . initial_window_update ) {
for ( i = 0 ; i < t - > stream_map . count ; i + + ) {
stream * s = ( stream * ) ( t - > stream_map . values [ i ] ) ;
stream * s = ( stream * ) ( t - > stream_map . values [ i ] ) ;
int was_window_empty = s - > outgoing_window < = 0 ;
s - > outgoing_window + = st . initial_window_update ;
if ( was_window_empty & & s - > outgoing_window > 0 & &
s - > outgoing_sopb . nops > 0 ) {
if ( was_window_empty & & s - > outgoing_window > 0 & & s - > outgoing_sopb & &
s - > outgoing_sopb - > nops > 0 ) {
stream_list_join ( t , s , WRITABLE ) ;
}
}
@ -1563,12 +1618,13 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if ( ! is_window_update_legal ( st . window_update , s - > outgoing_window ) ) {
cancel_stream ( t , s , grpc_chttp2_http2_error_to_grpc_status (
GRPC_CHTTP2_FLOW_CONTROL_ERROR ) ,
GRPC_CHTTP2_FLOW_CONTROL_ERROR , 1 ) ;
GRPC_CHTTP2_FLOW_CONTROL_ERROR , NULL , 1 ) ;
} else {
s - > outgoing_window + = st . window_update ;
/* if this window update makes outgoing ops writable again,
flag that */
if ( was_window_empty & & s - > outgoing_sopb . nops ) {
if ( was_window_empty & & s - > outgoing_sopb & &
s - > outgoing_sopb - > nops > 0 ) {
stream_list_join ( t , s , WRITABLE ) ;
}
}
@ -1830,53 +1886,135 @@ static grpc_stream_state compute_state(gpr_uint8 write_closed,
return GRPC_STREAM_OPEN ;
}
static int prepare_callbacks ( transport * t ) {
stream * s ;
int n = 0 ;
while ( ( s = stream_list_remove_head ( t , PENDING_CALLBACKS ) ) ) {
int execute = 1 ;
s - > callback_state = compute_state ( s - > sent_write_closed , s - > read_closed ) ;
if ( s - > callback_state = = GRPC_STREAM_CLOSED ) {
remove_from_stream_map ( t , s ) ;
if ( s - > published_close ) {
execute = 0 ;
} else if ( s - > incoming_metadata_count ) {
add_metadata_batch ( t , s ) ;
}
s - > published_close = 1 ;
static void patch_metadata_ops ( stream * s ) {
grpc_stream_op * ops = s - > incoming_sopb - > ops ;
size_t nops = s - > incoming_sopb - > nops ;
size_t i ;
size_t j ;
size_t mdidx = 0 ;
size_t last_mdidx ;
int found_metadata = 0 ;
/* rework the array of metadata into a linked list, making use
of the breadcrumbs we left in metadata batches during
add_metadata_batch */
for ( i = 0 ; i < nops ; i + + ) {
grpc_stream_op * op = & ops [ i ] ;
if ( op - > type ! = GRPC_OP_METADATA ) continue ;
found_metadata = 1 ;
/* we left a breadcrumb indicating where the end of this list is,
and since we add sequentially , we know from the end of the last
segment where this segment begins */
last_mdidx = ( size_t ) ( gpr_intptr ) ( op - > data . metadata . list . tail ) ;
GPR_ASSERT ( last_mdidx > mdidx ) ;
GPR_ASSERT ( last_mdidx < = s - > incoming_metadata_count ) ;
/* turn the array into a doubly linked list */
op - > data . metadata . list . head = & s - > incoming_metadata [ mdidx ] ;
op - > data . metadata . list . tail = & s - > incoming_metadata [ last_mdidx - 1 ] ;
for ( j = mdidx + 1 ; j < last_mdidx ; j + + ) {
s - > incoming_metadata [ j ] . prev = & s - > incoming_metadata [ j - 1 ] ;
s - > incoming_metadata [ j - 1 ] . next = & s - > incoming_metadata [ j ] ;
}
s - > incoming_metadata [ mdidx ] . prev = NULL ;
s - > incoming_metadata [ last_mdidx - 1 ] . next = NULL ;
/* track where we're up to */
mdidx = last_mdidx ;
}
if ( found_metadata ) {
s - > old_incoming_metadata = s - > incoming_metadata ;
if ( mdidx ! = s - > incoming_metadata_count ) {
/* we have a partially read metadata batch still in incoming_metadata */
size_t new_count = s - > incoming_metadata_count - mdidx ;
size_t copy_bytes = sizeof ( * s - > incoming_metadata ) * new_count ;
GPR_ASSERT ( mdidx < s - > incoming_metadata_count ) ;
s - > incoming_metadata = gpr_malloc ( copy_bytes ) ;
memcpy ( s - > old_incoming_metadata + mdidx , s - > incoming_metadata , copy_bytes ) ;
s - > incoming_metadata_count = s - > incoming_metadata_capacity = new_count ;
} else {
s - > incoming_metadata = NULL ;
s - > incoming_metadata_count = 0 ;
s - > incoming_metadata_capacity = 0 ;
}
}
}
grpc_sopb_swap ( & s - > parser . incoming_sopb , & s - > callback_sopb ) ;
static void finish_reads ( transport * t ) {
stream * s ;
if ( execute ) {
stream_list_add_tail ( t , s , EXECUTING_CALLBACKS ) ;
n = 1 ;
while ( ( s = stream_list_remove_head ( t , FINISHED_READ_OP ) ) ! = NULL ) {
int publish = 0 ;
GPR_ASSERT ( s - > incoming_sopb ) ;
* s - > publish_state =
compute_state ( s - > write_state = = WRITE_STATE_SENT_CLOSE , s - > read_closed ) ;
if ( * s - > publish_state ! = s - > published_state ) {
s - > published_state = * s - > publish_state ;
publish = 1 ;
if ( s - > published_state = = GRPC_STREAM_CLOSED ) {
remove_from_stream_map ( t , s ) ;
}
}
if ( s - > parser . incoming_sopb . nops > 0 ) {
grpc_sopb_swap ( s - > incoming_sopb , & s - > parser . incoming_sopb ) ;
publish = 1 ;
}
if ( publish ) {
if ( s - > incoming_metadata_count > 0 ) {
patch_metadata_ops ( s ) ;
}
s - > incoming_sopb = NULL ;
schedule_cb ( t , s - > recv_done_closure , 1 ) ;
}
}
return n ;
}
static void schedule_cb ( transport * t , op_closure closure , int success ) {
if ( t - > pending_callbacks . capacity = = t - > pending_callbacks . count ) {
t - > pending_callbacks . capacity =
GPR_MAX ( t - > pending_callbacks . capacity * 2 , 8 ) ;
t - > pending_callbacks . callbacks =
gpr_realloc ( t - > pending_callbacks . callbacks ,
t - > pending_callbacks . capacity *
sizeof ( * t - > pending_callbacks . callbacks ) ) ;
}
closure . success = success ;
t - > pending_callbacks . callbacks [ t - > pending_callbacks . count + + ] = closure ;
}
static int prepare_callbacks ( transport * t ) {
op_closure_array temp = t - > pending_callbacks ;
t - > pending_callbacks = t - > executing_callbacks ;
t - > executing_callbacks = temp ;
return t - > executing_callbacks . count > 0 ;
}
static void run_callbacks ( transport * t , const grpc_transport_callbacks * cb ) {
stream * s ;
while ( ( s = stream_list_remove_head ( t , EXECUTING_CALLBACKS ) ) ) {
size_t nops = s - > callback_sopb . nops ;
s - > callback_sopb . nops = 0 ;
cb - > recv_batch ( t - > cb_user_data , & t - > base , ( grpc_stream * ) s ,
s - > callback_sopb . ops , nops , s - > callback_state ) ;
size_t i ;
for ( i = 0 ; i < t - > executing_callbacks . count ; i + + ) {
op_closure c = t - > executing_callbacks . callbacks [ i ] ;
c . cb ( c . user_data , c . success ) ;
}
t - > executing_callbacks . count = 0 ;
}
static void call_cb_closed ( transport * t , const grpc_transport_callbacks * cb ) {
cb - > closed ( t - > cb_user_data , & t - > base ) ;
}
static void add_to_pollset ( grpc_transport * gt , grpc_pollset * pollset ) {
transport * t = ( transport * ) gt ;
lock ( t ) ;
/*
* POLLSET STUFF
*/
static void add_to_pollset_locked ( transport * t , grpc_pollset * pollset ) {
if ( t - > ep ) {
grpc_endpoint_add_to_pollset ( t - > ep , pollset ) ;
}
}
static void add_to_pollset ( grpc_transport * gt , grpc_pollset * pollset ) {
transport * t = ( transport * ) gt ;
lock ( t ) ;
add_to_pollset_locked ( t , pollset ) ;
unlock ( t ) ;
}
@ -1885,9 +2023,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) {
*/
static const grpc_transport_vtable vtable = {
sizeof ( stream ) , init_stream , send_batch , set_allow_window_updates ,
add_to_pollset , destroy_stream , abort_stream , goaway , close_transport ,
send_ping , destroy_transport } ;
sizeof ( stream ) , init_stream , perform_op ,
add_to_pollset , destroy_stream , goaway ,
close_transport , send_ping , destroy_transport } ;
void grpc_create_chttp2_transport ( grpc_transport_setup_callback setup ,
void * arg ,