[chaotic-good] Fix write ordering (#35900)

It could happen that we start two writes on the same endpoint simultaneously, leading to awesome results.

Closes #35900

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35900 from ctiller:write-order 1f7559d7b9
PiperOrigin-RevId: 606707912
pull/35800/head
Craig Tiller 10 months ago committed by Copybara-Service
parent 98a96c5068
commit 46a261e96a
  1. 1
      src/core/BUILD
  2. 36
      src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc
  3. 46
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
  4. 49
      src/core/lib/transport/promise_endpoint.h

@ -6562,6 +6562,7 @@ grpc_cc_library(
],
deps = [
"activity",
"cancel_callback",
"event_engine_common",
"if",
"map",

@ -105,16 +105,14 @@ auto ChaoticGoodConnector::DataEndpointReadSettingsFrame(
auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame(
RefCountedPtr<ChaoticGoodConnector> self) {
return [self]() {
// Serialize setting frame.
SettingsFrame frame;
// frame.header set connectiion_type: control
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kData,
self->connection_id_, kDataAlignmentBytes}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->hpack_compressor_);
return self->data_endpoint_.Write(std::move(write_buffer.control));
};
// Serialize setting frame.
SettingsFrame frame;
// frame.header set connectiion_type: control
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kData,
self->connection_id_, kDataAlignmentBytes}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->hpack_compressor_);
return self->data_endpoint_.Write(std::move(write_buffer.control));
}
auto ChaoticGoodConnector::WaitForDataEndpointSetup(
@ -200,16 +198,14 @@ auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame(
auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame(
RefCountedPtr<ChaoticGoodConnector> self) {
return [self]() {
// Serialize setting frame.
SettingsFrame frame;
// frame.header set connectiion_type: control
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kControl,
absl::nullopt, absl::nullopt}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->hpack_compressor_);
return self->control_endpoint_.Write(std::move(write_buffer.control));
};
// Serialize setting frame.
SettingsFrame frame;
// frame.header set connectiion_type: control
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kControl,
absl::nullopt, absl::nullopt}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->hpack_compressor_);
return self->control_endpoint_.Write(std::move(write_buffer.control));
}
void ChaoticGoodConnector::Connect(const Args& args, Result* result,

@ -331,37 +331,29 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
self->connection_->NewConnectionID();
SettingsFrame frame;
frame.headers =
SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
absl::nullopt}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
return TrySeq(
[self]() {
self->connection_->NewConnectionID();
SettingsFrame frame;
frame.headers =
SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
absl::nullopt}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer =
frame.Serialize(&self->connection_->hpack_compressor_);
return self->connection_->endpoint_.Write(
std::move(write_buffer.control));
},
self->connection_->endpoint_.Write(std::move(write_buffer.control)),
WaitForDataEndpointSetup(self));
}
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) {
// Send data endpoint setting frame
SettingsFrame frame;
frame.headers =
SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
self->connection_->data_alignment_}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer = frame.Serialize(&self->connection_->hpack_compressor_);
return TrySeq(
[self]() {
// Send data endpoint setting frame
SettingsFrame frame;
frame.headers =
SettingsMetadata{absl::nullopt, self->connection_->connection_id_,
self->connection_->data_alignment_}
.ToMetadataBatch(GetContext<Arena>());
auto write_buffer =
frame.Serialize(&self->connection_->hpack_compressor_);
return self->connection_->endpoint_.Write(
std::move(write_buffer.control));
},
self->connection_->endpoint_.Write(std::move(write_buffer.control)),
[self]() mutable {
MutexLock lock(&self->connection_->listener_->mu_);
// Set endpoint to latch
@ -380,8 +372,10 @@ auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState::
EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self,
bool is_control_endpoint) {
return If(is_control_endpoint, ControlEndpointWriteSettingsFrame(self),
DataEndpointWriteSettingsFrame(self));
return If(
is_control_endpoint,
[&self] { return ControlEndpointWriteSettingsFrame(self); },
[&self] { return DataEndpointWriteSettingsFrame(self); });
}
void ChaoticGoodServerListener::ActiveConnection::HandshakingState::

@ -39,6 +39,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/cancel_callback.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/poll.h"
@ -69,8 +70,10 @@ class PromiseEndpoint {
// `Write()` before the previous write finishes. Doing that results in
// undefined behavior.
auto Write(SliceBuffer data) {
// Assert previous write finishes.
GPR_ASSERT(!write_state_->complete.load(std::memory_order_relaxed));
// Start write and assert previous write finishes.
auto prev = write_state_->state.exchange(WriteState::kWriting,
std::memory_order_relaxed);
GPR_ASSERT(prev == WriteState::kIdle);
bool completed;
if (data.Length() == 0) {
completed = true;
@ -92,16 +95,31 @@ class PromiseEndpoint {
if (completed) write_state_->waker = Waker();
}
return If(
completed, []() { return []() { return absl::OkStatus(); }; },
completed,
[this]() {
return [write_state = write_state_]() {
auto prev = write_state->state.exchange(WriteState::kIdle,
std::memory_order_relaxed);
GPR_ASSERT(prev == WriteState::kWriting);
return absl::OkStatus();
};
},
[this]() {
return [write_state = write_state_]() -> Poll<absl::Status> {
// If current write isn't finished return `Pending()`, else return
// write result.
if (!write_state->complete.load(std::memory_order_acquire)) {
return Pending();
// If current write isn't finished return `Pending()`, else
// return write result.
WriteState::State expected = WriteState::kWritten;
if (write_state->state.compare_exchange_strong(
expected, WriteState::kIdle, std::memory_order_acquire,
std::memory_order_relaxed)) {
// State was Written, and we changed it to Idle. We can return
// the result.
return std::move(write_state->result);
}
write_state->complete.store(false, std::memory_order_relaxed);
return std::move(write_state->result);
// State was not Written; since we're polling it must be
// Writing. Assert that and return Pending.
GPR_ASSERT(expected == WriteState::kWriting);
return Pending();
};
});
}
@ -228,7 +246,13 @@ class PromiseEndpoint {
};
struct WriteState : public RefCounted<WriteState> {
std::atomic<bool> complete{false};
enum State : uint8_t {
kIdle, // Not writing.
kWriting, // Write started, but not completed.
kWritten, // Write completed.
};
std::atomic<State> state{kIdle};
// Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
// memory behind the buffer is not lost.
grpc_event_engine::experimental::SliceBuffer buffer;
@ -239,7 +263,10 @@ class PromiseEndpoint {
void Complete(absl::Status status) {
result = std::move(status);
auto w = std::move(waker);
complete.store(true, std::memory_order_release);
auto prev = state.exchange(kWritten, std::memory_order_release);
// Previous state should be Writing. If we got anything else we've entered
// the callback path twice.
GPR_ASSERT(prev == kWriting);
w.Wakeup();
}
};

Loading…
Cancel
Save