Merge remote-tracking branch 'upstream/master' into dns-migration-chttp2-server

pull/37853/head
yijiem 4 months ago
commit b830720b20
  1. 88
      src/core/ext/transport/chaotic_good/server_transport.cc
  2. 4
      src/core/ext/transport/chaotic_good/server_transport.h
  3. 103
      src/core/server/server.cc
  4. 1
      test/cpp/end2end/orca_service_end2end_test.cc

@ -125,14 +125,13 @@ auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
}
auto ChaoticGoodServerTransport::SendFragment(
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator) {
ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: SendFragment: frame=" << frame.ToString();
// Capture the call_initiator to ensure the underlying call spine is alive
// until the outgoing_frames.Send promise completes.
return Map(outgoing_frames.Send(std::move(frame)),
[call_initiator](bool success) -> absl::Status {
[](bool success) -> absl::Status {
if (!success) {
// Failed to send outgoing frame.
return absl::UnavailableError("Transport closed.");
@ -146,27 +145,26 @@ auto ChaoticGoodServerTransport::SendCallBody(
CallInitiator call_initiator) {
// Continuously send client frame with client to server
// messages.
return ForEach(
OutgoingMessages(call_initiator),
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[stream_id, outgoing_frames, call_initiator,
aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message =
FragmentMessage(std::move(message), padding, message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator);
});
return ForEach(OutgoingMessages(call_initiator),
// Capture the call_initator to ensure the underlying call
// spine is alive until the SendFragment promise completes.
[stream_id, outgoing_frames, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ServerFragmentFrame frame;
// Construct frame header (flags, header_length
// and trailer_length will be added in
// serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - (message_length % aligned_bytes);
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames);
});
}
auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
@ -187,8 +185,7 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
frame.headers = std::move(*md);
frame.stream_id = stream_id;
return TrySeq(
SendFragment(std::move(frame), outgoing_frames,
call_initiator),
SendFragment(std::move(frame), outgoing_frames),
SendCallBody(stream_id, outgoing_frames, call_initiator));
},
[]() { return absl::OkStatus(); });
@ -209,15 +206,11 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[stream_id, outgoing_frames,
call_initiator](ServerMetadataHandle md) mutable {
[stream_id, outgoing_frames](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames,
call_initiator);
return SendFragment(std::move(frame), outgoing_frames);
}));
}
@ -309,10 +302,11 @@ auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
call_initiator.has_value(),
[&call_initiator]() {
auto c = std::move(*call_initiator);
return c.SpawnWaitable("cancel", [c]() mutable {
c.Cancel();
return absl::OkStatus();
});
return c.SpawnWaitable("cancel_from_read",
[c]() mutable {
c.Cancel();
return absl::OkStatus();
});
},
[]() -> absl::Status { return absl::OkStatus(); });
}),
@ -397,6 +391,8 @@ void ChaoticGoodServerTransport::AbortWithError() {
// Close all the available pipes.
outgoing_frames_.MarkClosed();
ReleasableMutexLock lock(&mu_);
if (aborted_with_error_) return;
aborted_with_error_ = true;
StreamMap stream_map = std::move(stream_map_);
stream_map_.clear();
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
@ -405,18 +401,19 @@ void ChaoticGoodServerTransport::AbortWithError() {
lock.Release();
for (const auto& pair : stream_map) {
auto call_initiator = pair.second;
call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
call_initiator.SpawnInfallible("cancel_from_transport_closed",
[call_initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
}
}
absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
uint32_t stream_id) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " LookupStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
return it->second;
@ -424,9 +421,9 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
uint32_t stream_id) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " ExtractStream " << stream_id;
MutexLock lock(&mu_);
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) return absl::nullopt;
auto r = std::move(it->second);
@ -436,9 +433,12 @@ absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
absl::Status ChaoticGoodServerTransport::NewStream(
uint32_t stream_id, CallInitiator call_initiator) {
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " NewStream " << stream_id;
MutexLock lock(&mu_);
if (aborted_with_error_) {
return absl::UnavailableError("Transport closed");
}
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return absl::InternalError("Stream already exists");
@ -454,7 +454,7 @@ absl::Status ChaoticGoodServerTransport::NewStream(
self->ExtractStream(stream_id);
if (call_initiator.has_value()) {
auto c = std::move(*call_initiator);
c.SpawnInfallible("cancel", [c]() mutable {
c.SpawnInfallible("cancel_from_on_done", [c]() mutable {
c.Cancel();
return Empty{};
});

@ -109,8 +109,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
auto SendCallBody(uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
static auto SendFragment(ServerFragmentFrame frame,
MpscSender<ServerFrame> outgoing_frames,
CallInitiator call_initiator);
MpscSender<ServerFrame> outgoing_frames);
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator);
auto OnTransportActivityDone(absl::string_view activity);
auto TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport);
@ -144,6 +143,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
// Map of stream incoming server frames, key is stream_id.
StreamMap stream_map_ ABSL_GUARDED_BY(mu_);
uint32_t last_seen_new_stream_id_ = 0;
bool aborted_with_error_ ABSL_GUARDED_BY(mu_) = false;
RefCountedPtr<Party> party_;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(mu_){
"chaotic_good_server", GRPC_CHANNEL_READY};

@ -813,55 +813,60 @@ absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
} // namespace
auto Server::MatchAndPublishCall(CallHandler call_handler) {
call_handler.SpawnGuarded("request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable { return call_handler.PullMessage(); },
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler, this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
call_handler.SpawnGuardedUntilCallCompletes(
"request_matcher", [this, call_handler]() mutable {
return TrySeq(
// Wait for initial metadata to pass through all filters
Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
// Match request with requested call
[this, call_handler](ClientMetadataHandle md) mutable {
auto* registered_method = static_cast<RegisteredMethod*>(
md->get(GrpcRegisteredMethod()).value_or(nullptr));
RequestMatcherInterface* rm;
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (registered_method == nullptr) {
rm = unregistered_request_matcher_.get();
} else {
payload_handling = registered_method->payload_handling;
rm = registered_method->matcher.get();
}
auto maybe_read_first_message = If(
payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
[call_handler]() mutable {
return call_handler.PullMessage();
},
[]() -> ValueOrFailure<absl::optional<MessageHandle>> {
return ValueOrFailure<absl::optional<MessageHandle>>(
absl::nullopt);
});
return TryJoin<absl::StatusOr>(
std::move(maybe_read_first_message), rm->MatchRequest(0),
[md = std::move(md)]() mutable {
return ValueOrFailure<ClientMetadataHandle>(std::move(md));
});
},
// Publish call to cq
[call_handler,
this](std::tuple<absl::optional<MessageHandle>,
RequestMatcherInterface::MatchResult,
ClientMetadataHandle>
r) {
RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
auto md = std::move(std::get<2>(r));
auto* rc = mr.TakeCall();
rc->Complete(std::move(std::get<0>(r)), *md);
grpc_call* call =
MakeServerCall(call_handler, std::move(md), this,
rc->cq_bound_to_call, rc->initial_metadata);
*rc->call = call;
return Map(
WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
[rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
return absl::OkStatus();
});
});
});
}
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>

@ -76,6 +76,7 @@ class OrcaServiceEnd2endTest : public ::testing::Test {
grpc_core::Duration::Milliseconds(750) *
grpc_test_slowdown_factor();
auto elapsed = now - *last_response_time_;
LOG(INFO) << "received ORCA response after " << elapsed;
EXPECT_GE(elapsed, requested_interval_ - fudge_factor)
<< elapsed.ToString();
EXPECT_LE(elapsed, requested_interval_ + fudge_factor)

Loading…
Cancel
Save