@ -125,13 +125,14 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}
auto ChaoticGoodServerTransport : : SendFragment (
ServerFragmentFrame frame , MpscSender < ServerFrame > outgoing_frames ) {
ServerFragmentFrame frame , MpscSender < ServerFrame > outgoing_frames ,
CallInitiator call_initiator ) {
GRPC_TRACE_LOG ( chaotic_good , INFO )
< < " CHAOTIC_GOOD: SendFragment: frame= " < < frame . ToString ( ) ;
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map ( outgoing_frames . Send ( std : : move ( frame ) ) ,
[ ] ( bool success ) - > absl : : Status {
[ call_initiator ] ( bool success ) - > absl : : Status {
if ( ! success ) {
// Failed to send outgoing frame.
return absl : : UnavailableError ( " Transport closed. " ) ;
@ -145,26 +146,27 @@ auto ChaoticGoodServerTransport::SendCallBody(
CallInitiator call_initiator ) {
// Continuously send client frame with client to server
// messages.
return ForEach ( OutgoingMessages ( call_initiator ) ,
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[ stream_id , outgoing_frames , aligned_bytes = aligned_bytes_ ] (
MessageHandle message ) mutable {
ServerFragmentFrame frame ;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message - > payload ( ) - > Length ( ) ;
const uint32_t padding =
message_length % aligned_bytes = = 0
? 0
: aligned_bytes - ( message_length % aligned_bytes ) ;
CHECK_EQ ( ( message_length + padding ) % aligned_bytes , 0u ) ;
frame . message = FragmentMessage ( std : : move ( message ) , padding ,
message_length ) ;
frame . stream_id = stream_id ;
return SendFragment ( std : : move ( frame ) , outgoing_frames ) ;
} ) ;
return ForEach (
OutgoingMessages ( call_initiator ) ,
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[ stream_id , outgoing_frames , call_initiator ,
aligned_bytes = aligned_bytes_ ] ( MessageHandle message ) mutable {
ServerFragmentFrame frame ;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message - > payload ( ) - > Length ( ) ;
const uint32_t padding =
message_length % aligned_bytes = = 0
? 0
: aligned_bytes - message_length % aligned_bytes ;
CHECK_EQ ( ( message_length + padding ) % aligned_bytes , 0u ) ;
frame . message =
FragmentMessage ( std : : move ( message ) , padding , message_length ) ;
frame . stream_id = stream_id ;
return SendFragment ( std : : move ( frame ) , outgoing_frames , call_initiator ) ;
} ) ;
}
auto ChaoticGoodServerTransport : : SendCallInitialMetadataAndBody (
@ -185,7 +187,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
frame . headers = std : : move ( * md ) ;
frame . stream_id = stream_id ;
return TrySeq (
SendFragment ( std : : move ( frame ) , outgoing_frames ) ,
SendFragment ( std : : move ( frame ) , outgoing_frames ,
call_initiator ) ,
SendCallBody ( stream_id , outgoing_frames , call_initiator ) ) ;
} ,
[ ] ( ) { return absl : : OkStatus ( ) ; } ) ;
@ -206,11 +209,15 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
return Empty { } ;
} ) ,
call_initiator . PullServerTrailingMetadata ( ) ,
[ stream_id , outgoing_frames ] ( ServerMetadataHandle md ) mutable {
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[ stream_id , outgoing_frames ,
call_initiator ] ( ServerMetadataHandle md ) mutable {
ServerFragmentFrame frame ;
frame . trailers = std : : move ( md ) ;
frame . stream_id = stream_id ;
return SendFragment ( std : : move ( frame ) , outgoing_frames ) ;
return SendFragment ( std : : move ( frame ) , outgoing_frames ,
call_initiator ) ;
} ) ) ;
}
@ -302,11 +309,10 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
call_initiator . has_value ( ) ,
[ & call_initiator ] ( ) {
auto c = std : : move ( * call_initiator ) ;
return c . SpawnWaitable ( " cancel_from_read " ,
[ c ] ( ) mutable {
c . Cancel ( ) ;
return absl : : OkStatus ( ) ;
} ) ;
return c . SpawnWaitable ( " cancel " , [ c ] ( ) mutable {
c . Cancel ( ) ;
return absl : : OkStatus ( ) ;
} ) ;
} ,
[ ] ( ) - > absl : : Status { return absl : : OkStatus ( ) ; } ) ;
} ) ,
@ -391,8 +397,6 @@ void ChaoticGoodServerTransport::AbortWithError() {
// Close all the available pipes.
outgoing_frames_ . MarkClosed ( ) ;
ReleasableMutexLock lock ( & mu_ ) ;
if ( aborted_with_error_ ) return ;
aborted_with_error_ = true ;
StreamMap stream_map = std : : move ( stream_map_ ) ;
stream_map_ . clear ( ) ;
state_tracker_ . SetState ( GRPC_CHANNEL_SHUTDOWN ,
@ -401,19 +405,18 @@ void ChaoticGoodServerTransport::AbortWithError() {
lock . Release ( ) ;
for ( const auto & pair : stream_map ) {
auto call_initiator = pair . second ;
call_initiator . SpawnInfallible ( " cancel_from_transport_closed " ,
[ call_initiator ] ( ) mutable {
call_initiator . Cancel ( ) ;
return Empty { } ;
} ) ;
call_initiator . SpawnInfallible ( " cancel " , [ call_initiator ] ( ) mutable {
call_initiator . Cancel ( ) ;
return Empty { } ;
} ) ;
}
}
absl : : optional < CallInitiator > ChaoticGoodServerTransport : : LookupStream (
uint32_t stream_id ) {
MutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( chaotic_good , INFO )
< < " CHAOTIC_GOOD " < < this < < " LookupStream " < < stream_id ;
MutexLock lock ( & mu_ ) ;
auto it = stream_map_ . find ( stream_id ) ;
if ( it = = stream_map_ . end ( ) ) return absl : : nullopt ;
return it - > second ;
@ -421,9 +424,9 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
absl : : optional < CallInitiator > ChaoticGoodServerTransport : : ExtractStream (
uint32_t stream_id ) {
MutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( chaotic_good , INFO )
< < " CHAOTIC_GOOD " < < this < < " ExtractStream " < < stream_id ;
MutexLock lock ( & mu_ ) ;
auto it = stream_map_ . find ( stream_id ) ;
if ( it = = stream_map_ . end ( ) ) return absl : : nullopt ;
auto r = std : : move ( it - > second ) ;
@ -433,12 +436,9 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
absl : : Status ChaoticGoodServerTransport : : NewStream (
uint32_t stream_id , CallInitiator call_initiator ) {
MutexLock lock ( & mu_ ) ;
GRPC_TRACE_LOG ( chaotic_good , INFO )
< < " CHAOTIC_GOOD " < < this < < " NewStream " < < stream_id ;
if ( aborted_with_error_ ) {
return absl : : UnavailableError ( " Transport closed " ) ;
}
MutexLock lock ( & mu_ ) ;
auto it = stream_map_ . find ( stream_id ) ;
if ( it ! = stream_map_ . end ( ) ) {
return absl : : InternalError ( " Stream already exists " ) ;
@ -454,7 +454,7 @@ absl::Status ChaoticGoodServerTransport::NewStream(
self - > ExtractStream ( stream_id ) ;
if ( call_initiator . has_value ( ) ) {
auto c = std : : move ( * call_initiator ) ;
c . SpawnInfallible ( " cancel_from_on_done " , [ c ] ( ) mutable {
c . SpawnInfallible ( " cancel " , [ c ] ( ) mutable {
c . Cancel ( ) ;
return Empty { } ;
} ) ;