Merge branch 'master' into FixChttp2ServerRace

pull/37683/head
Yash Tibrewal 4 months ago
commit 3d3c150900
  1. 11
      src/core/ext/filters/gcp_authentication/gcp_authentication_filter.cc
  2. 4
      src/core/ext/filters/gcp_authentication/gcp_authentication_filter.h
  3. 84
      src/core/ext/transport/chaotic_good/client_transport.cc
  4. 152
      src/core/ext/transport/chaotic_good/server_transport.cc
  5. 4
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  6. 2
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  7. 77
      src/core/lib/transport/promise_endpoint.h
  8. 20
      src/core/util/latent_see.h

@ -153,7 +153,7 @@ absl::StatusOr<std::unique_ptr<GcpAuthenticationFilter>>
GcpAuthenticationFilter::Create(const ChannelArgs& args,
ChannelFilter::Args filter_args) {
// Get filter config.
auto* service_config = args.GetObject<ServiceConfig>();
auto service_config = args.GetObjectRef<ServiceConfig>();
if (service_config == nullptr) {
return absl::InvalidArgumentError(
"gcp_auth: no service config in channel args");
@ -184,15 +184,18 @@ GcpAuthenticationFilter::Create(const ChannelArgs& args,
// cache but it has the wrong size.
cache->SetMaxSize(filter_config->cache_size);
// Instantiate filter.
return std::unique_ptr<GcpAuthenticationFilter>(new GcpAuthenticationFilter(
filter_config, std::move(xds_config), std::move(cache)));
return std::unique_ptr<GcpAuthenticationFilter>(
new GcpAuthenticationFilter(std::move(service_config), filter_config,
std::move(xds_config), std::move(cache)));
}
GcpAuthenticationFilter::GcpAuthenticationFilter(
RefCountedPtr<ServiceConfig> service_config,
const GcpAuthenticationParsedConfig::Config* filter_config,
RefCountedPtr<const XdsConfig> xds_config,
RefCountedPtr<CallCredentialsCache> cache)
: filter_config_(filter_config),
: service_config_(std::move(service_config)),
filter_config_(filter_config),
xds_config_(std::move(xds_config)),
cache_(std::move(cache)) {}

@ -80,10 +80,14 @@ class GcpAuthenticationFilter
};
GcpAuthenticationFilter(
RefCountedPtr<ServiceConfig> service_config,
const GcpAuthenticationParsedConfig::Config* filter_config,
RefCountedPtr<const XdsConfig> xds_config,
RefCountedPtr<CallCredentialsCache> cache);
// TODO(roth): Consider having the channel stack hold this ref so that
// individual filters don't need to.
const RefCountedPtr<ServiceConfig> service_config_;
const GcpAuthenticationParsedConfig::Config* filter_config_;
const RefCountedPtr<const XdsConfig> xds_config_;
const RefCountedPtr<CallCredentialsCache> cache_;

@ -117,7 +117,9 @@ auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
});
// Wrap the actual sequence with something that owns the call handler so that
// its lifetime extends until the push completes.
return [call_handler, push = std::move(push)]() mutable { return push(); };
return GRPC_LATENT_SEE_PROMISE(
"PushFrameIntoCall",
([call_handler, push = std::move(push)]() mutable { return push(); }));
}
auto ChaoticGoodClientTransport::TransportReadLoop(
@ -205,11 +207,15 @@ ChaoticGoodClientTransport::ChaoticGoodClientTransport(
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get());
party_ = Party::Make(std::move(party_arena));
party_->Spawn("client-chaotic-writer", TransportWriteLoop(transport),
party_->Spawn("client-chaotic-writer",
GRPC_LATENT_SEE_PROMISE("ClientTransportWriteLoop",
TransportWriteLoop(transport)),
OnTransportActivityDone("write_loop"));
party_->Spawn("client-chaotic-reader",
TransportReadLoop(std::move(transport)),
OnTransportActivityDone("read_loop"));
party_->Spawn(
"client-chaotic-reader",
GRPC_LATENT_SEE_PROMISE("ClientTransportReadLoop",
TransportReadLoop(std::move(transport))),
OnTransportActivityDone("read_loop"));
}
ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { party_.reset(); }
@ -265,38 +271,42 @@ auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
return absl::OkStatus();
});
};
return TrySeq(
// Wait for initial metadata then send it out.
call_handler.PullClientInitialMetadata(),
[send_fragment](ClientMetadataHandle md) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Sending initial metadata: " << md->DebugString();
ClientFragmentFrame frame;
frame.headers = std::move(md);
return send_fragment(std::move(frame));
},
// Continuously send client frame with client to server messages.
ForEach(OutgoingMessages(call_handler),
[send_fragment,
aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
const uint32_t message_length = message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
return send_fragment(std::move(frame));
}),
[send_fragment]() mutable {
ClientFragmentFrame frame;
frame.end_of_stream = true;
return send_fragment(std::move(frame));
});
return GRPC_LATENT_SEE_PROMISE(
"CallOutboundLoop",
TrySeq(
// Wait for initial metadata then send it out.
call_handler.PullClientInitialMetadata(),
[send_fragment](ClientMetadataHandle md) mutable {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD: Sending initial metadata: "
<< md->DebugString();
ClientFragmentFrame frame;
frame.headers = std::move(md);
return send_fragment(std::move(frame));
},
// Continuously send client frame with client to server messages.
ForEach(OutgoingMessages(call_handler),
[send_fragment, aligned_bytes = aligned_bytes_](
MessageHandle message) mutable {
ClientFragmentFrame frame;
// Construct frame header (flags, header_length and
// trailer_length will be added in serialization).
const uint32_t message_length =
message->payload()->Length();
const uint32_t padding =
message_length % aligned_bytes == 0
? 0
: aligned_bytes - message_length % aligned_bytes;
CHECK_EQ((message_length + padding) % aligned_bytes, 0u);
frame.message = FragmentMessage(std::move(message), padding,
message_length);
return send_fragment(std::move(frame));
}),
[send_fragment]() mutable {
ClientFragmentFrame frame;
frame.end_of_stream = true;
return send_fragment(std::move(frame));
}));
}
void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {

@ -199,25 +199,27 @@ auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
auto ChaoticGoodServerTransport::CallOutboundLoop(
uint32_t stream_id, CallInitiator call_initiator) {
auto outgoing_frames = outgoing_frames_.MakeSender();
return Seq(
Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
call_initiator),
[stream_id](absl::Status main_body_result) {
GRPC_TRACE_VLOG(chaotic_good, 2)
<< "CHAOTIC_GOOD: CallOutboundLoop: stream_id=" << stream_id
<< " main_body_result=" << main_body_result;
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[stream_id, outgoing_frames,
call_initiator](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames, call_initiator);
});
return GRPC_LATENT_SEE_PROMISE(
"CallOutboundLoop",
Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
call_initiator),
[stream_id](absl::Status main_body_result) {
GRPC_TRACE_VLOG(chaotic_good, 2)
<< "CHAOTIC_GOOD: CallOutboundLoop: stream_id=" << stream_id
<< " main_body_result=" << main_body_result;
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),
// Capture the call_initator to ensure the underlying call_spine
// is alive until the SendFragment promise completes.
[stream_id, outgoing_frames,
call_initiator](ServerMetadataHandle md) mutable {
ServerFragmentFrame frame;
frame.trailers = std::move(md);
frame.stream_id = stream_id;
return SendFragment(std::move(frame), outgoing_frames,
call_initiator);
}));
}
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
@ -269,57 +271,59 @@ auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
}
auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
return TrySeq(
transport.ReadFrameBytes(),
[this, transport =
&transport](std::tuple<FrameHeader, BufferPair> frame_bytes) {
const auto& frame_header = std::get<0>(frame_bytes);
auto& buffers = std::get<1>(frame_bytes);
return Switch(
frame_header.type,
Case(FrameType::kSettings,
[]() -> absl::Status {
return absl::InternalError("Unexpected settings frame");
}),
Case(FrameType::kFragment,
[this, &frame_header, &buffers, transport]() {
return If(
frame_header.flags.is_set(0),
[this, &frame_header, &buffers, transport]() {
return DeserializeAndPushFragmentToNewCall(
frame_header, std::move(buffers), *transport);
},
[this, &frame_header, &buffers, transport]() {
return DeserializeAndPushFragmentToExistingCall(
frame_header, std::move(buffers), *transport);
});
}),
Case(FrameType::kCancel,
[this, &frame_header]() {
absl::optional<CallInitiator> call_initiator =
ExtractStream(frame_header.stream_id);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "Cancel stream " << frame_header.stream_id
<< (call_initiator.has_value() ? " (active)"
: " (not found)");
return If(
call_initiator.has_value(),
[&call_initiator]() {
auto c = std::move(*call_initiator);
return c.SpawnWaitable("cancel", [c]() mutable {
c.Cancel();
return absl::OkStatus();
});
},
[]() -> absl::Status { return absl::OkStatus(); });
}),
Default([frame_header]() {
return absl::InternalError(
absl::StrCat("Unexpected frame type: ",
static_cast<uint8_t>(frame_header.type)));
}));
},
[]() -> LoopCtl<absl::Status> { return Continue{}; });
return GRPC_LATENT_SEE_PROMISE(
"ReadOneFrame",
TrySeq(
transport.ReadFrameBytes(),
[this, transport = &transport](
std::tuple<FrameHeader, BufferPair> frame_bytes) {
const auto& frame_header = std::get<0>(frame_bytes);
auto& buffers = std::get<1>(frame_bytes);
return Switch(
frame_header.type,
Case(FrameType::kSettings,
[]() -> absl::Status {
return absl::InternalError("Unexpected settings frame");
}),
Case(FrameType::kFragment,
[this, &frame_header, &buffers, transport]() {
return If(
frame_header.flags.is_set(0),
[this, &frame_header, &buffers, transport]() {
return DeserializeAndPushFragmentToNewCall(
frame_header, std::move(buffers), *transport);
},
[this, &frame_header, &buffers, transport]() {
return DeserializeAndPushFragmentToExistingCall(
frame_header, std::move(buffers), *transport);
});
}),
Case(FrameType::kCancel,
[this, &frame_header]() {
absl::optional<CallInitiator> call_initiator =
ExtractStream(frame_header.stream_id);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "Cancel stream " << frame_header.stream_id
<< (call_initiator.has_value() ? " (active)"
: " (not found)");
return If(
call_initiator.has_value(),
[&call_initiator]() {
auto c = std::move(*call_initiator);
return c.SpawnWaitable("cancel", [c]() mutable {
c.Cancel();
return absl::OkStatus();
});
},
[]() -> absl::Status { return absl::OkStatus(); });
}),
Default([frame_header]() {
return absl::InternalError(
absl::StrCat("Unexpected frame type: ",
static_cast<uint8_t>(frame_header.type)));
}));
},
[]() -> LoopCtl<absl::Status> { return Continue{}; }));
}
auto ChaoticGoodServerTransport::TransportReadLoop(
@ -360,9 +364,13 @@ ChaoticGoodServerTransport::ChaoticGoodServerTransport(
party_arena->SetContext<grpc_event_engine::experimental::EventEngine>(
event_engine.get());
party_ = Party::Make(std::move(party_arena));
party_->Spawn("server-chaotic-writer", TransportWriteLoop(transport),
party_->Spawn("server-chaotic-writer",
GRPC_LATENT_SEE_PROMISE("ServerTransportWriteLoop",
TransportWriteLoop(transport)),
OnTransportActivityDone("writer"));
party_->Spawn("server-chaotic-reader", TransportReadLoop(transport),
party_->Spawn("server-chaotic-reader",
GRPC_LATENT_SEE_PROMISE("ServerTransportReadLoop",
TransportReadLoop(transport)),
OnTransportActivityDone("reader"));
}

@ -791,15 +791,11 @@ Chttp2ServerListener::Chttp2ServerListener(
}
Chttp2ServerListener::~Chttp2ServerListener() {
// Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref.
ExecCtx::Get()->Flush();
if (passive_listener_ != nullptr) {
passive_listener_->ListenerDestroyed();
}
if (on_destroy_done_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus());
ExecCtx::Get()->Flush();
}
}

@ -102,7 +102,7 @@ namespace {
// of bytes sent.
ssize_t TcpSend(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GRPC_LATENT_SEE_INNER_SCOPE("TcpSend");
GRPC_LATENT_SEE_PARENT_SCOPE("TcpSend");
ssize_t sent_length;
do {
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);

@ -106,24 +106,25 @@ class PromiseEndpoint {
return absl::OkStatus();
};
},
[this]() {
return [write_state = write_state_]() -> Poll<absl::Status> {
// If current write isn't finished return `Pending()`, else
// return write result.
WriteState::State expected = WriteState::kWritten;
if (write_state->state.compare_exchange_strong(
expected, WriteState::kIdle, std::memory_order_acquire,
std::memory_order_relaxed)) {
// State was Written, and we changed it to Idle. We can return
// the result.
return std::move(write_state->result);
}
// State was not Written; since we're polling it must be
// Writing. Assert that and return Pending.
CHECK(expected == WriteState::kWriting);
return Pending();
};
});
GRPC_LATENT_SEE_PROMISE(
"DelayedWrite", ([this]() {
return [write_state = write_state_]() -> Poll<absl::Status> {
// If current write isn't finished return `Pending()`, else
// return write result.
WriteState::State expected = WriteState::kWritten;
if (write_state->state.compare_exchange_strong(
expected, WriteState::kIdle, std::memory_order_acquire,
std::memory_order_relaxed)) {
// State was Written, and we changed it to Idle. We can return
// the result.
return std::move(write_state->result);
}
// State was not Written; since we're polling it must be
// Writing. Assert that and return Pending.
CHECK(expected == WriteState::kWriting);
return Pending();
};
})));
}
// Returns a promise that resolves to `SliceBuffer` with
@ -174,25 +175,27 @@ class PromiseEndpoint {
return std::move(ret);
};
},
[this, num_bytes]() {
return [read_state = read_state_,
num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
if (!read_state->complete.load(std::memory_order_acquire)) {
return Pending();
}
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
if (read_state->result.ok()) {
SliceBuffer ret;
grpc_slice_buffer_move_first_no_inline(
read_state->buffer.c_slice_buffer(), num_bytes,
ret.c_slice_buffer());
read_state->complete.store(false, std::memory_order_relaxed);
return std::move(ret);
}
read_state->complete.store(false, std::memory_order_relaxed);
return std::move(read_state->result);
};
});
GRPC_LATENT_SEE_PROMISE(
"DelayedRead", ([this, num_bytes]() {
return [read_state = read_state_,
num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
if (!read_state->complete.load(std::memory_order_acquire)) {
return Pending();
}
// If read succeeds, return `SliceBuffer` with `num_bytes`
// bytes.
if (read_state->result.ok()) {
SliceBuffer ret;
grpc_slice_buffer_move_first_no_inline(
read_state->buffer.c_slice_buffer(), num_bytes,
ret.c_slice_buffer());
read_state->complete.store(false, std::memory_order_relaxed);
return std::move(ret);
}
read_state->complete.store(false, std::memory_order_relaxed);
return std::move(read_state->result);
};
})));
}
// Returns a promise that resolves to `Slice` with at least

@ -252,12 +252,26 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
Log::CurrentThreadBin()->Append(md, EventType::kMark, 0);
}
template <typename P>
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto Promise(const Metadata* md_poll,
const Metadata* md_flow,
P promise) {
return [md_poll, md_flow, promise = std::move(promise),
flow = Flow(md_flow)]() mutable {
InnerScope scope(md_poll);
flow.End();
auto r = promise();
flow.Begin(md_flow);
return r;
};
}
} // namespace latent_see
} // namespace grpc_core
#define GRPC_LATENT_SEE_METADATA(name) \
[]() { \
static grpc_core::latent_see::Metadata metadata = {__FILE__, __LINE__, \
#name}; \
name}; \
return &metadata; \
}()
// Parent scope: logs a begin and end event, and flushes the thread log on scope
@ -277,6 +291,9 @@ GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline void Mark(const Metadata* md) {
// scope.
#define GRPC_LATENT_SEE_MARK(name) \
grpc_core::latent_see::Mark(GRPC_LATENT_SEE_METADATA(name))
#define GRPC_LATENT_SEE_PROMISE(name, promise) \
grpc_core::latent_see::Promise(GRPC_LATENT_SEE_METADATA("Poll:" name), \
GRPC_LATENT_SEE_METADATA(name), promise)
#else // !def(GRPC_ENABLE_LATENT_SEE)
namespace grpc_core {
namespace latent_see {
@ -305,6 +322,7 @@ struct InnerScope {
#define GRPC_LATENT_SEE_MARK(name) \
do { \
} while (0)
#define GRPC_LATENT_SEE_PROMISE(name, promise) promise
#endif // GRPC_ENABLE_LATENT_SEE
#endif // GRPC_SRC_CORE_UTIL_LATENT_SEE_H

Loading…
Cancel
Save