|
|
|
@ -19,6 +19,7 @@ |
|
|
|
|
#include <tuple> |
|
|
|
|
|
|
|
|
|
#include "absl/log/check.h" |
|
|
|
|
#include "absl/log/log.h" |
|
|
|
|
#include "absl/random/bit_gen_ref.h" |
|
|
|
|
#include "absl/random/random.h" |
|
|
|
|
#include "absl/status/status.h" |
|
|
|
@ -27,7 +28,6 @@ |
|
|
|
|
#include <grpc/event_engine/event_engine.h> |
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
|
#include <grpc/slice.h> |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/port_platform.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" |
|
|
|
@ -76,8 +76,8 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall( |
|
|
|
|
uint32_t stream_id) { |
|
|
|
|
DCHECK(frame.headers == nullptr); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_INFO, "CHAOTIC_GOOD: PushFragmentIntoCall: frame=%s", |
|
|
|
|
frame.ToString().c_str()); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: PushFragmentIntoCall: frame=" |
|
|
|
|
<< frame.ToString(); |
|
|
|
|
} |
|
|
|
|
return Seq(If( |
|
|
|
|
frame.message.has_value(), |
|
|
|
@ -89,7 +89,7 @@ auto ChaoticGoodServerTransport::PushFragmentIntoCall( |
|
|
|
|
[this, call_initiator, end_of_stream = frame.end_of_stream, |
|
|
|
|
stream_id](StatusFlag status) mutable -> StatusFlag { |
|
|
|
|
if (!status.ok() && GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_INFO, "CHAOTIC_GOOD: Failed PushFragmentIntoCall"); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: Failed PushFragmentIntoCall"; |
|
|
|
|
} |
|
|
|
|
if (end_of_stream || !status.ok()) { |
|
|
|
|
call_initiator.FinishSends(); |
|
|
|
@ -124,10 +124,8 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( |
|
|
|
|
// already been removed from the stream_map and hence the EOF frame
|
|
|
|
|
// cannot be pushed into the call. No need to log such frames.
|
|
|
|
|
if (!frame.end_of_stream) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s", |
|
|
|
|
error.ToString().c_str(), frame.ToString().c_str()); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: Cannot pass frame to stream. Error:" |
|
|
|
|
<< error.ToString() << " Frame:" << frame.ToString(); |
|
|
|
|
} |
|
|
|
|
return Immediate(std::move(error)); |
|
|
|
|
}); |
|
|
|
@ -137,8 +135,7 @@ auto ChaoticGoodServerTransport::SendFragment( |
|
|
|
|
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames, |
|
|
|
|
CallInitiator call_initiator) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s", |
|
|
|
|
frame.ToString().c_str()); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString(); |
|
|
|
|
} |
|
|
|
|
// Capture the call_initiator to ensure the underlying call spine is alive
|
|
|
|
|
// until the outgoing_frames.Send promise completes.
|
|
|
|
@ -189,9 +186,8 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( |
|
|
|
|
[stream_id, outgoing_frames, call_initiator, |
|
|
|
|
this](absl::optional<ServerMetadataHandle> md) mutable { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=%s", |
|
|
|
|
md.has_value() ? (*md)->DebugString().c_str() : "null"); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=" |
|
|
|
|
<< (md.has_value() ? (*md)->DebugString() : "null"); |
|
|
|
|
} |
|
|
|
|
return If( |
|
|
|
|
md.has_value(), |
|
|
|
@ -211,28 +207,26 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody( |
|
|
|
|
auto ChaoticGoodServerTransport::CallOutboundLoop( |
|
|
|
|
uint32_t stream_id, CallInitiator call_initiator) { |
|
|
|
|
auto outgoing_frames = outgoing_frames_.MakeSender(); |
|
|
|
|
return Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames, |
|
|
|
|
call_initiator), |
|
|
|
|
[stream_id](absl::Status main_body_result) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"CHAOTIC_GOOD: CallOutboundLoop: stream_id=%d " |
|
|
|
|
"main_body_result=%s", |
|
|
|
|
stream_id, main_body_result.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
return Empty{}; |
|
|
|
|
}), |
|
|
|
|
call_initiator.PullServerTrailingMetadata(), |
|
|
|
|
// Capture the call_initator to ensure the underlying call_spine
|
|
|
|
|
// is alive until the SendFragment promise completes.
|
|
|
|
|
[stream_id, outgoing_frames, |
|
|
|
|
call_initiator](ServerMetadataHandle md) mutable { |
|
|
|
|
ServerFragmentFrame frame; |
|
|
|
|
frame.trailers = std::move(md); |
|
|
|
|
frame.stream_id = stream_id; |
|
|
|
|
return SendFragment(std::move(frame), outgoing_frames, |
|
|
|
|
call_initiator); |
|
|
|
|
}); |
|
|
|
|
return Seq( |
|
|
|
|
Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames, |
|
|
|
|
call_initiator), |
|
|
|
|
[stream_id](absl::Status main_body_result) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
VLOG(2) << "CHAOTIC_GOOD: CallOutboundLoop: stream_id=" |
|
|
|
|
<< stream_id << " main_body_result=" << main_body_result; |
|
|
|
|
} |
|
|
|
|
return Empty{}; |
|
|
|
|
}), |
|
|
|
|
call_initiator.PullServerTrailingMetadata(), |
|
|
|
|
// Capture the call_initator to ensure the underlying call_spine
|
|
|
|
|
// is alive until the SendFragment promise completes.
|
|
|
|
|
[stream_id, outgoing_frames, |
|
|
|
|
call_initiator](ServerMetadataHandle md) mutable { |
|
|
|
|
ServerFragmentFrame frame; |
|
|
|
|
frame.trailers = std::move(md); |
|
|
|
|
frame.stream_id = stream_id; |
|
|
|
|
return SendFragment(std::move(frame), outgoing_frames, call_initiator); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( |
|
|
|
@ -347,9 +341,8 @@ auto ChaoticGoodServerTransport::OnTransportActivityDone( |
|
|
|
|
return [self = RefAsSubclass<ChaoticGoodServerTransport>(), |
|
|
|
|
activity](absl::Status status) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"CHAOTIC_GOOD: OnTransportActivityDone: activity=%s status=%s", |
|
|
|
|
std::string(activity).c_str(), status.ToString().c_str()); |
|
|
|
|
LOG(INFO) << "CHAOTIC_GOOD: OnTransportActivityDone: activity=" |
|
|
|
|
<< activity << " status=" << status; |
|
|
|
|
} |
|
|
|
|
self->AbortWithError(); |
|
|
|
|
}; |
|
|
|
|