@ -1308,15 +1308,14 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
return ;
}
closure - > next_data . scratch - = CLOSURE_BARRIER_FIRST_REF_BIT ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
LOG ( INFO ) < < " complete_closure_step: t= " < < t < < " " < < closure < < " refs= "
GRPC_TRACE_LOG ( http , INFO )
< < " complete_closure_step: t= " < < t < < " " < < closure < < " refs= "
< < ( closure - > next_data . scratch / CLOSURE_BARRIER_FIRST_REF_BIT )
< < " flags= "
< < ( closure - > next_data . scratch % CLOSURE_BARRIER_FIRST_REF_BIT )
< < " desc= " < < desc < < " err= " < < grpc_core : : StatusToString ( error )
< < " write_state= " < < write_state_name ( t - > write_state )
< < " whence= " < < whence . file ( ) < < " : " < < whence . line ( ) ;
}
if ( ! error . ok ( ) ) {
grpc_error_handle cl_err =
@ -1362,49 +1361,10 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
} ) ;
}
static void perform_stream_op_locked ( void * stream_op ,
grpc_error_handle /*error_ignored*/ ) {
grpc_transport_stream_op_batch * op =
static_cast < grpc_transport_stream_op_batch * > ( stream_op ) ;
grpc_chttp2_stream * s =
static_cast < grpc_chttp2_stream * > ( op - > handler_private . extra_arg ) ;
grpc_transport_stream_op_batch_payload * op_payload = op - > payload ;
grpc_chttp2_transport * t = s - > t . get ( ) ;
s - > traced = op - > is_traced ;
if ( ! grpc_core : : IsCallTracerInTransportEnabled ( ) ) {
s - > call_tracer = CallTracerIfSampled ( s ) ;
}
s - > tcp_tracer = TcpTracerIfSampled ( s ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
LOG ( INFO ) < < " perform_stream_op_locked[s= " < < s < < " ; op= " < < op
< < " ]: " < < grpc_transport_stream_op_batch_string ( op , false )
< < " ; on_complete = " < < op - > on_complete ;
if ( op - > send_initial_metadata ) {
log_metadata ( op_payload - > send_initial_metadata . send_initial_metadata ,
s - > id , t - > is_client , true ) ;
}
if ( op - > send_trailing_metadata ) {
log_metadata ( op_payload - > send_trailing_metadata . send_trailing_metadata ,
s - > id , t - > is_client , false ) ;
}
}
grpc_closure * on_complete = op - > on_complete ;
// on_complete will be null if and only if there are no send ops in the batch.
if ( on_complete ! = nullptr ) {
// This batch has send ops. Use final_data as a barrier until enqueue time;
// the initial counter is dropped at the end of this function.
on_complete - > next_data . scratch = CLOSURE_BARRIER_FIRST_REF_BIT ;
on_complete - > error_data . error = 0 ;
}
if ( op - > cancel_stream ) {
grpc_chttp2_cancel_stream ( t , s , op_payload - > cancel_stream . cancel_error ,
op_payload - > cancel_stream . tarpit ) ;
}
if ( op - > send_initial_metadata ) {
static void send_initial_metadata_locked (
grpc_transport_stream_op_batch * op , grpc_chttp2_stream * s ,
grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t , grpc_closure * on_complete ) {
if ( ! grpc_core : : IsCallTracerInTransportEnabled ( ) ) {
if ( s - > call_tracer ! = nullptr ) {
s - > call_tracer - > RecordAnnotation (
@ -1414,8 +1374,7 @@ static void perform_stream_op_locked(void* stream_op,
. Add ( s - > flow_control . stats ( ) ) ) ;
}
} else if ( grpc_core : : IsTraceRecordCallopsEnabled ( ) ) {
auto * call_tracer =
s - > arena - > GetContext < grpc_core : : CallTracerInterface > ( ) ;
auto * call_tracer = s - > arena - > GetContext < grpc_core : : CallTracerInterface > ( ) ;
if ( call_tracer ! = nullptr & & call_tracer - > IsSampled ( ) ) {
call_tracer - > RecordAnnotation (
grpc_core : : HttpAnnotation ( grpc_core : : HttpAnnotation : : Type : : kStart ,
@ -1434,8 +1393,8 @@ static void perform_stream_op_locked(void* stream_op,
s - > send_initial_metadata =
op_payload - > send_initial_metadata . send_initial_metadata ;
if ( t - > is_client ) {
s - > deadline = std : : min (
s - > deadline ,
s - > deadline =
std : : min ( s - > deadline ,
s - > send_initial_metadata - > get ( grpc_core : : GrpcTimeoutMetadata ( ) )
. value_or ( grpc_core : : Timestamp : : InfFuture ( ) ) ) ;
}
@ -1479,9 +1438,12 @@ static void perform_stream_op_locked(void* stream_op,
& s - > write_closed_error , 1 ) ,
" send_initial_metadata_finished " ) ;
}
}
}
if ( op - > send_message ) {
static void send_message_locked (
grpc_transport_stream_op_batch * op , grpc_chttp2_stream * s ,
grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t , grpc_closure * on_complete ) {
t - > num_messages_in_next_write + + ;
grpc_core : : global_stats ( ) . IncrementHttp2SendMessageSize (
op - > payload - > send_message . send_message - > Length ( ) ) ;
@ -1497,8 +1459,8 @@ static void perform_stream_op_locked(void* stream_op,
absl : : OkStatus ( ) ,
" fetching_send_message_finished " ) ;
} else {
uint8_t * frame_hdr = grpc_slice_buffer_tiny_add (
& s - > flow_controlled_buffer , GRPC_HEADER_SIZE_IN_BYTES ) ;
uint8_t * frame_hdr = grpc_slice_buffer_tiny_add ( & s - > flow_controlled_buffer ,
GRPC_HEADER_SIZE_IN_BYTES ) ;
frame_hdr [ 0 ] = ( flags & GRPC_WRITE_INTERNAL_COMPRESS ) ! = 0 ;
size_t len = op_payload - > send_message . send_message - > Length ( ) ;
frame_hdr [ 1 ] = static_cast < uint8_t > ( len > > 24 ) ;
@ -1550,16 +1512,18 @@ static void perform_stream_op_locked(void* stream_op,
* list = cb ;
}
if ( s - > id ! = 0 & &
( ! s - > write_buffering | |
s - > flow_controlled_buffer . length > t - > write_buffer_size ) ) {
if ( s - > id ! = 0 & & ( ! s - > write_buffering | | s - > flow_controlled_buffer . length >
t - > write_buffer_size ) ) {
grpc_chttp2_mark_stream_writable ( t , s ) ;
grpc_chttp2_initiate_write ( t , GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE ) ;
}
}
}
}
if ( op - > send_trailing_metadata ) {
static void send_trailing_metadata_locked (
grpc_transport_stream_op_batch * op , grpc_chttp2_stream * s ,
grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t , grpc_closure * on_complete ) {
CHECK_EQ ( s - > send_trailing_metadata_finished , nullptr ) ;
on_complete - > next_data . scratch | = t - > closure_barrier_may_cover_write ;
s - > send_trailing_metadata_finished = add_closure_barrier ( on_complete ) ;
@ -1587,9 +1551,11 @@ static void perform_stream_op_locked(void* stream_op,
grpc_chttp2_initiate_write (
t , GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA ) ;
}
}
}
if ( op - > recv_initial_metadata ) {
static void recv_initial_metadata_locked (
grpc_chttp2_stream * s , grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t ) {
CHECK_EQ ( s - > recv_initial_metadata_ready , nullptr ) ;
s - > recv_initial_metadata_ready =
op_payload - > recv_initial_metadata . recv_initial_metadata_ready ;
@ -1601,9 +1567,11 @@ static void perform_stream_op_locked(void* stream_op,
* s - > trailing_metadata_available = true ;
}
grpc_chttp2_maybe_complete_recv_initial_metadata ( t , s ) ;
}
}
if ( op - > recv_message ) {
static void recv_message_locked (
grpc_chttp2_stream * s , grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t ) {
CHECK_EQ ( s - > recv_message_ready , nullptr ) ;
s - > recv_message_ready = op_payload - > recv_message . recv_message_ready ;
s - > recv_message = op_payload - > recv_message . recv_message ;
@ -1612,9 +1580,11 @@ static void perform_stream_op_locked(void* stream_op,
s - > call_failed_before_recv_message =
op_payload - > recv_message . call_failed_before_recv_message ;
grpc_chttp2_maybe_complete_recv_trailing_metadata ( t , s ) ;
}
}
if ( op - > recv_trailing_metadata ) {
static void recv_trailing_metadata_locked (
grpc_chttp2_stream * s , grpc_transport_stream_op_batch_payload * op_payload ,
grpc_chttp2_transport * t ) {
CHECK_EQ ( s - > collecting_stats , nullptr ) ;
s - > collecting_stats = op_payload - > recv_trailing_metadata . collect_stats ;
CHECK_EQ ( s - > recv_trailing_metadata_finished , nullptr ) ;
@ -1624,6 +1594,72 @@ static void perform_stream_op_locked(void* stream_op,
op_payload - > recv_trailing_metadata . recv_trailing_metadata ;
s - > final_metadata_requested = true ;
grpc_chttp2_maybe_complete_recv_trailing_metadata ( t , s ) ;
}
static void perform_stream_op_locked ( void * stream_op ,
grpc_error_handle /*error_ignored*/ ) {
grpc_transport_stream_op_batch * op =
static_cast < grpc_transport_stream_op_batch * > ( stream_op ) ;
grpc_chttp2_stream * s =
static_cast < grpc_chttp2_stream * > ( op - > handler_private . extra_arg ) ;
grpc_transport_stream_op_batch_payload * op_payload = op - > payload ;
grpc_chttp2_transport * t = s - > t . get ( ) ;
s - > traced = op - > is_traced ;
if ( ! grpc_core : : IsCallTracerInTransportEnabled ( ) ) {
s - > call_tracer = CallTracerIfSampled ( s ) ;
}
s - > tcp_tracer = TcpTracerIfSampled ( s ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
LOG ( INFO ) < < " perform_stream_op_locked[s= " < < s < < " ; op= " < < op
< < " ]: " < < grpc_transport_stream_op_batch_string ( op , false )
< < " ; on_complete = " < < op - > on_complete ;
if ( op - > send_initial_metadata ) {
log_metadata ( op_payload - > send_initial_metadata . send_initial_metadata ,
s - > id , t - > is_client , true ) ;
}
if ( op - > send_trailing_metadata ) {
log_metadata ( op_payload - > send_trailing_metadata . send_trailing_metadata ,
s - > id , t - > is_client , false ) ;
}
}
grpc_closure * on_complete = op - > on_complete ;
// on_complete will be null if and only if there are no send ops in the batch.
if ( on_complete ! = nullptr ) {
// This batch has send ops. Use final_data as a barrier until enqueue time;
// the initial counter is dropped at the end of this function.
on_complete - > next_data . scratch = CLOSURE_BARRIER_FIRST_REF_BIT ;
on_complete - > error_data . error = 0 ;
}
if ( op - > cancel_stream ) {
grpc_chttp2_cancel_stream ( t , s , op_payload - > cancel_stream . cancel_error ,
op_payload - > cancel_stream . tarpit ) ;
}
if ( op - > send_initial_metadata ) {
send_initial_metadata_locked ( op , s , op_payload , t , on_complete ) ;
}
if ( op - > send_message ) {
send_message_locked ( op , s , op_payload , t , on_complete ) ;
}
if ( op - > send_trailing_metadata ) {
send_trailing_metadata_locked ( op , s , op_payload , t , on_complete ) ;
}
if ( op - > recv_initial_metadata ) {
recv_initial_metadata_locked ( s , op_payload , t ) ;
}
if ( op - > recv_message ) {
recv_message_locked ( s , op_payload , t ) ;
}
if ( op - > recv_trailing_metadata ) {
recv_trailing_metadata_locked ( s , op_payload , t ) ;
}
if ( on_complete ! = nullptr ) {
@ -2048,11 +2084,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
// Lambda is immediately invoked as a big scoped section that can be
// exited out of at any point by returning.
[ & ] ( ) {
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
VLOG ( 2 ) < < " maybe_complete_recv_message " < < s
GRPC_TRACE_VLOG ( http , 2 )
< < " maybe_complete_recv_message " < < s
< < " final_metadata_requested= " < < s - > final_metadata_requested
< < " seen_error= " < < s - > seen_error ;
}
if ( s - > final_metadata_requested & & s - > seen_error ) {
grpc_slice_buffer_reset_and_unref ( & s - > frame_storage ) ;
s - > recv_message - > reset ( ) ;
@ -2063,11 +2098,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
int64_t min_progress_size ;
auto r = grpc_deframe_unprocessed_incoming_frames (
s , & min_progress_size , & * * s - > recv_message , s - > recv_message_flags ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
VLOG ( 2 ) < < " Deframe data frame: "
GRPC_TRACE_VLOG ( http , 2 )
< < " Deframe data frame: "
< < grpc_core : : PollToString (
r , [ ] ( absl : : Status r ) { return r . ToString ( ) ; } ) ;
}
if ( r . pending ( ) ) {
if ( s - > read_closed ) {
grpc_slice_buffer_reset_and_unref ( & s - > frame_storage ) ;
@ -2118,13 +2152,12 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
void grpc_chttp2_maybe_complete_recv_trailing_metadata ( grpc_chttp2_transport * t ,
grpc_chttp2_stream * s ) {
grpc_chttp2_maybe_complete_recv_message ( t , s ) ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
VLOG ( 2 ) < < " maybe_complete_recv_trailing_metadata cli= " < < t - > is_client
< < " s= " < < s < < " closure= " < < s - > recv_trailing_metadata_finished
GRPC_TRACE_VLOG ( http , 2 ) < < " maybe_complete_recv_trailing_metadata cli= "
< < t - > is_client < < " s= " < < s
< < " closure= " < < s - > recv_trailing_metadata_finished
< < " read_closed= " < < s - > read_closed
< < " write_closed= " < < s - > write_closed < < " "
< < s - > frame_storage . length ;
}
if ( s - > recv_trailing_metadata_finished ! = nullptr & & s - > read_closed & &
s - > write_closed ) {
if ( s - > seen_error | | ! t - > is_client ) {
@ -2328,15 +2361,12 @@ grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
grpc_chttp2_transport * t , grpc_chttp2_stream * s , int close_reads ,
int close_writes , grpc_error_handle error ) {
grpc_chttp2_transport : : RemovedStreamHandle rsh ;
if ( GRPC_TRACE_FLAG_ENABLED ( http ) ) {
VLOG ( 2 ) < < " MARK_STREAM_CLOSED: t= " < < t < < " s= " < < s < < " (id= " < < s - > id
< < " ) "
GRPC_TRACE_VLOG ( http , 2 )
< < " MARK_STREAM_CLOSED: t= " < < t < < " s= " < < s < < " (id= " < < s - > id < < " ) "
< < ( ( close_reads & & close_writes )
? " read+write "
: ( close_reads ? " read "
: ( close_writes ? " write " : " nothing?? " ) ) )
: ( close_reads ? " read " : ( close_writes ? " write " : " nothing?? " ) ) )
< < " [ " < < grpc_core : : StatusToString ( error ) < < " ] " ;
}
if ( s - > read_closed & & s - > write_closed ) {
// already closed, but we should still fake the status if needed.
grpc_error_handle overall_error = removal_error ( error , s , " Stream removed " ) ;