mirror of https://github.com/grpc/grpc.git
[chaotic-good] Multi-connection support (#38032)
Builds upon #37765 to support arbitrary connection counts in the transport.
(note: at this point the number of connections is determined at connection establishment - future work will be autotuning this)
Closes #38032
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38032 from ctiller:tiefling-buffer c7520fd7a9
PiperOrigin-RevId: 698952890
pull/37889/merge
parent
c333d60fcd
commit
394118d04d
41 changed files with 2328 additions and 392 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,68 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/control_endpoint.h" |
||||
|
||||
#include "src/core/lib/event_engine/event_engine_context.h" |
||||
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
auto ControlEndpoint::Buffer::Pull() { |
||||
return [this]() -> Poll<SliceBuffer> { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
MutexLock lock(&mu_); |
||||
if (queued_output_.Length() == 0) { |
||||
flush_waker_ = GetContext<Activity>()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
waker = std::move(write_waker_); |
||||
return std::move(queued_output_); |
||||
}; |
||||
} |
||||
|
||||
ControlEndpoint::ControlEndpoint( |
||||
PromiseEndpoint endpoint, |
||||
grpc_event_engine::experimental::EventEngine* event_engine) |
||||
: endpoint_(std::make_shared<PromiseEndpoint>(std::move(endpoint))) { |
||||
CHECK(event_engine != nullptr); |
||||
write_party_->arena()->SetContext(event_engine); |
||||
write_party_->Spawn( |
||||
"flush-control", |
||||
GRPC_LATENT_SEE_PROMISE( |
||||
"FlushLoop", Loop([endpoint = endpoint_, buffer = buffer_]() { |
||||
return TrySeq( |
||||
// Pull one set of buffered writes
|
||||
buffer->Pull(), |
||||
// And write them
|
||||
[endpoint, buffer = buffer.get()](SliceBuffer flushing) { |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Flush " << flushing.Length() |
||||
<< " bytes from " << buffer << " to " |
||||
<< ResolvedAddressToString(endpoint->GetPeerAddress()) |
||||
.value_or("<<unknown peer address>>"); |
||||
return endpoint->Write(std::move(flushing)); |
||||
}, |
||||
// Then repeat
|
||||
[]() -> LoopCtl<absl::Status> { return Continue{}; }); |
||||
})), |
||||
[](absl::Status) {}); |
||||
} |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,99 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CONTROL_ENDPOINT_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CONTROL_ENDPOINT_H |
||||
|
||||
#include "absl/cleanup/cleanup.h" |
||||
#include "src/core/lib/promise/party.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
#include "src/core/util/sync.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
// Wrapper around PromiseEndpoint.
|
||||
// Buffers all of the small writes that get enqueued to this endpoint, and then
|
||||
// uses a separate party to flush them to the wire.
|
||||
// In doing so we get to batch up effectively all the writes from the transport
|
||||
// (since party wakeups are sticky), and then flush all the writes in one go.
|
||||
class ControlEndpoint { |
||||
private: |
||||
class Buffer : public RefCounted<Buffer> { |
||||
public: |
||||
// Queue some buffer to be written.
|
||||
// We cap the queue size so that we don't infinitely buffer on one
|
||||
// connection - if the cap is hit, this queue operation will not resolve
|
||||
// until it empties.
|
||||
// Returns a promise that resolves to Empty{} when the data has been queued.
|
||||
auto Queue(SliceBuffer&& buffer) { |
||||
return [buffer = std::move(buffer), this]() mutable -> Poll<Empty> { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
MutexLock lock(&mu_); |
||||
if (queued_output_.Length() != 0 && |
||||
queued_output_.Length() + buffer.Length() > MaxQueued()) { |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Delay control write" |
||||
<< " write_length=" << buffer.Length() |
||||
<< " already_buffered=" << queued_output_.Length() |
||||
<< " queue=" << this; |
||||
write_waker_ = GetContext<Activity>()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Queue control write " << buffer.Length() |
||||
<< " bytes on " << this; |
||||
waker = std::move(flush_waker_); |
||||
queued_output_.Append(buffer); |
||||
return Empty{}; |
||||
}; |
||||
} |
||||
|
||||
auto Pull(); |
||||
|
||||
private: |
||||
size_t MaxQueued() const { return 1024 * 1024; } |
||||
|
||||
Mutex mu_; |
||||
Waker write_waker_ ABSL_GUARDED_BY(mu_); |
||||
Waker flush_waker_ ABSL_GUARDED_BY(mu_); |
||||
SliceBuffer queued_output_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
public: |
||||
ControlEndpoint(PromiseEndpoint endpoint, |
||||
grpc_event_engine::experimental::EventEngine* event_engine); |
||||
|
||||
// Write some data to the control endpoint; returns a promise that resolves
|
||||
// to Empty{} -- it's not possible to see errors from this api.
|
||||
auto Write(SliceBuffer&& bytes) { return buffer_->Queue(std::move(bytes)); } |
||||
|
||||
// Read operations are simply passthroughs to the underlying promise endpoint.
|
||||
auto ReadSlice(size_t length) { return endpoint_->ReadSlice(length); } |
||||
auto Read(size_t length) { return endpoint_->Read(length); } |
||||
auto GetPeerAddress() const { return endpoint_->GetPeerAddress(); } |
||||
auto GetLocalAddress() const { return endpoint_->GetLocalAddress(); } |
||||
|
||||
private: |
||||
std::shared_ptr<PromiseEndpoint> endpoint_; |
||||
RefCountedPtr<Party> write_party_ = |
||||
Party::Make(SimpleArenaAllocator(0)->MakeArena()); |
||||
RefCountedPtr<Buffer> buffer_ = MakeRefCounted<Buffer>(); |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CONTROL_ENDPOINT_H
|
@ -0,0 +1,236 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/data_endpoints.h" |
||||
|
||||
#include <cstddef> |
||||
|
||||
#include "absl/cleanup/cleanup.h" |
||||
#include "absl/strings/escaping.h" |
||||
#include "src/core/lib/event_engine/event_engine_context.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
namespace data_endpoints_detail { |
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// OutputBuffer
|
||||
|
||||
bool OutputBuffer::Accept(SliceBuffer& buffer) { |
||||
if (pending_.Length() != 0 && |
||||
pending_.Length() + buffer.Length() > pending_max_) { |
||||
return false; |
||||
} |
||||
pending_.Append(buffer); |
||||
return true; |
||||
} |
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// OutputBuffers
|
||||
|
||||
OutputBuffers::OutputBuffers(uint32_t num_connections) |
||||
: buffers_(num_connections) {} |
||||
|
||||
Poll<uint32_t> OutputBuffers::PollWrite(SliceBuffer& output_buffer) { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
const auto length = output_buffer.Length(); |
||||
MutexLock lock(&mu_); |
||||
for (size_t i = 0; i < buffers_.size(); ++i) { |
||||
if (buffers_[i].Accept(output_buffer)) { |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Queue " << length << " data onto endpoint " << i |
||||
<< " queue " << this; |
||||
waker = buffers_[i].TakeWaker(); |
||||
return i; |
||||
} |
||||
} |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: No data endpoint ready for " << length |
||||
<< " bytes on queue " << this; |
||||
write_waker_ = GetContext<Activity>()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
|
||||
Poll<SliceBuffer> OutputBuffers::PollNext(uint32_t connection_id) { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
MutexLock lock(&mu_); |
||||
auto& buffer = buffers_[connection_id]; |
||||
if (buffer.HavePending()) { |
||||
waker = std::move(write_waker_); |
||||
return buffer.TakePending(); |
||||
} |
||||
buffer.SetWaker(); |
||||
return Pending{}; |
||||
} |
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// InputQueues
|
||||
|
||||
InputQueues::InputQueues(uint32_t num_connections) |
||||
: read_requests_(num_connections), read_request_waker_(num_connections) {} |
||||
|
||||
absl::StatusOr<uint64_t> InputQueues::CreateTicket(uint32_t connection_id, |
||||
size_t length) { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
MutexLock lock(&mu_); |
||||
if (connection_id >= read_requests_.size()) { |
||||
return absl::UnavailableError( |
||||
absl::StrCat("Invalid connection id: ", connection_id)); |
||||
} |
||||
uint64_t ticket = next_ticket_id_; |
||||
++next_ticket_id_; |
||||
auto r = ReadRequest{length, ticket}; |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: New read ticket on #" << connection_id << " " << r; |
||||
read_requests_[connection_id].push_back(r); |
||||
outstanding_reads_.emplace(ticket, Waker{}); |
||||
waker = std::move(read_request_waker_[connection_id]); |
||||
return ticket; |
||||
} |
||||
|
||||
Poll<absl::StatusOr<SliceBuffer>> InputQueues::PollRead(uint64_t ticket) { |
||||
MutexLock lock(&mu_); |
||||
auto it = outstanding_reads_.find(ticket); |
||||
CHECK(it != outstanding_reads_.end()) << " ticket=" << ticket; |
||||
if (auto* waker = absl::get_if<Waker>(&it->second)) { |
||||
*waker = GetContext<Activity>()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
auto result = std::move(absl::get<absl::StatusOr<SliceBuffer>>(it->second)); |
||||
outstanding_reads_.erase(it); |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Poll for ticket #" << ticket |
||||
<< " completes: " << result.status(); |
||||
return result; |
||||
} |
||||
|
||||
Poll<std::vector<InputQueues::ReadRequest>> InputQueues::PollNext( |
||||
uint32_t connection_id) { |
||||
MutexLock lock(&mu_); |
||||
auto& q = read_requests_[connection_id]; |
||||
if (q.empty()) { |
||||
read_request_waker_[connection_id] = |
||||
GetContext<Activity>()->MakeNonOwningWaker(); |
||||
return Pending{}; |
||||
} |
||||
auto r = std::move(q); |
||||
q.clear(); |
||||
return r; |
||||
} |
||||
|
||||
void InputQueues::CompleteRead(uint64_t ticket, |
||||
absl::StatusOr<SliceBuffer> buffer) { |
||||
Waker waker; |
||||
auto cleanup = absl::MakeCleanup([&waker]() { waker.Wakeup(); }); |
||||
MutexLock lock(&mu_); |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Complete ticket #" << ticket << ": " << buffer.status(); |
||||
auto it = outstanding_reads_.find(ticket); |
||||
if (it == outstanding_reads_.end()) return; // cancelled
|
||||
waker = std::move(absl::get<Waker>(it->second)); |
||||
it->second.emplace<absl::StatusOr<SliceBuffer>>(std::move(buffer)); |
||||
} |
||||
|
||||
void InputQueues::CancelTicket(uint64_t ticket) { |
||||
MutexLock lock(&mu_); |
||||
outstanding_reads_.erase(ticket); |
||||
} |
||||
|
||||
} // namespace data_endpoints_detail
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// DataEndpoints
|
||||
|
||||
DataEndpoints::DataEndpoints( |
||||
std::vector<PromiseEndpoint> endpoints_vec, |
||||
grpc_event_engine::experimental::EventEngine* event_engine) |
||||
: output_buffers_(MakeRefCounted<data_endpoints_detail::OutputBuffers>( |
||||
endpoints_vec.size())), |
||||
input_queues_(MakeRefCounted<data_endpoints_detail::InputQueues>( |
||||
endpoints_vec.size())) { |
||||
CHECK(event_engine != nullptr); |
||||
for (auto& endpoint : endpoints_vec) { |
||||
// Enable RxMemoryAlignment and RPC receive coalescing after the transport
|
||||
// setup is complete. At this point all the settings frames should have
|
||||
// been read.
|
||||
endpoint.EnforceRxMemoryAlignmentAndCoalescing(); |
||||
} |
||||
auto endpoints = MakeRefCounted<data_endpoints_detail::Endpoints>(); |
||||
endpoints->endpoints = std::move(endpoints_vec); |
||||
parties_.reserve(2 * endpoints->endpoints.size()); |
||||
auto arena = SimpleArenaAllocator(0)->MakeArena(); |
||||
arena->SetContext(event_engine); |
||||
for (size_t i = 0; i < endpoints->endpoints.size(); ++i) { |
||||
auto write_party = Party::Make(arena); |
||||
auto read_party = Party::Make(arena); |
||||
write_party->Spawn( |
||||
"flush-data", |
||||
[i, endpoints, output_buffers = output_buffers_]() { |
||||
return Loop([i, endpoints, output_buffers]() { |
||||
return TrySeq( |
||||
output_buffers->Next(i), |
||||
[endpoints = endpoints.get(), i](SliceBuffer buffer) { |
||||
GRPC_TRACE_LOG(chaotic_good, INFO) |
||||
<< "CHAOTIC_GOOD: Write " << buffer.Length() |
||||
<< "b to data endpoint #" << i; |
||||
return endpoints->endpoints[i].Write(std::move(buffer)); |
||||
}, |
||||
[]() -> LoopCtl<absl::Status> { return Continue{}; }); |
||||
}); |
||||
}, |
||||
[](absl::Status) {}); |
||||
read_party->Spawn( |
||||
"read-data", |
||||
[i, endpoints, input_queues = input_queues_]() { |
||||
return Loop([i, endpoints, input_queues]() { |
||||
return TrySeq( |
||||
input_queues->Next(i), |
||||
[endpoints, i, input_queues]( |
||||
std::vector<data_endpoints_detail::InputQueues::ReadRequest> |
||||
requests) { |
||||
return TrySeqContainer( |
||||
std::move(requests), Empty{}, |
||||
[endpoints, i, input_queues]( |
||||
data_endpoints_detail::InputQueues::ReadRequest |
||||
read_request, |
||||
Empty) { |
||||
return Seq( |
||||
endpoints->endpoints[i].Read(read_request.length), |
||||
[ticket = read_request.ticket, |
||||
|
||||
input_queues](absl::StatusOr<SliceBuffer> buffer) { |
||||
input_queues->CompleteRead(ticket, |
||||
std::move(buffer)); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
}, |
||||
[]() -> LoopCtl<absl::Status> { return Continue{}; }); |
||||
}); |
||||
}, |
||||
[](absl::Status) {}); |
||||
parties_.emplace_back(std::move(write_party)); |
||||
parties_.emplace_back(std::move(read_party)); |
||||
} |
||||
} |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,199 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_DATA_ENDPOINTS_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_DATA_ENDPOINTS_H |
||||
|
||||
#include <cstdint> |
||||
|
||||
#include "src/core/lib/promise/party.h" |
||||
#include "src/core/lib/promise/promise.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
namespace data_endpoints_detail { |
||||
struct Endpoints : public RefCounted<Endpoints> { |
||||
std::vector<PromiseEndpoint> endpoints; |
||||
}; |
||||
|
||||
// Buffered writes for one data endpoint
|
||||
class OutputBuffer { |
||||
public: |
||||
bool Accept(SliceBuffer& buffer); |
||||
Waker TakeWaker() { return std::move(flush_waker_); } |
||||
void SetWaker() { |
||||
flush_waker_ = GetContext<Activity>()->MakeNonOwningWaker(); |
||||
} |
||||
bool HavePending() const { return pending_.Length() > 0; } |
||||
SliceBuffer TakePending() { return std::move(pending_); } |
||||
|
||||
private: |
||||
Waker flush_waker_; |
||||
size_t pending_max_ = 1024 * 1024; |
||||
SliceBuffer pending_; |
||||
}; |
||||
|
||||
// The set of output buffers for all connected data endpoints
|
||||
class OutputBuffers : public RefCounted<OutputBuffers> { |
||||
public: |
||||
explicit OutputBuffers(uint32_t num_connections); |
||||
|
||||
auto Write(SliceBuffer output_buffer) { |
||||
return [output_buffer = std::move(output_buffer), this]() mutable { |
||||
return PollWrite(output_buffer); |
||||
}; |
||||
} |
||||
|
||||
auto Next(uint32_t connection_id) { |
||||
return [this, connection_id]() { return PollNext(connection_id); }; |
||||
} |
||||
|
||||
private: |
||||
Poll<uint32_t> PollWrite(SliceBuffer& output_buffer); |
||||
Poll<SliceBuffer> PollNext(uint32_t connection_id); |
||||
|
||||
Mutex mu_; |
||||
std::vector<OutputBuffer> buffers_ ABSL_GUARDED_BY(mu_); |
||||
Waker write_waker_ ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
|
||||
class InputQueues : public RefCounted<InputQueues> { |
||||
public: |
||||
// One outstanding read.
|
||||
// ReadTickets get filed by read requests, and all tickets are fullfilled
|
||||
// by an endpoint.
|
||||
// A call may Await a ticket to get the bytes back later (or it may skip that
|
||||
// step - in which case the bytes are thrown away after reading).
|
||||
// This decoupling is necessary to ensure that cancelled reads by calls do not
|
||||
// cause data corruption for other calls.
|
||||
class ReadTicket { |
||||
public: |
||||
ReadTicket(absl::StatusOr<uint64_t> ticket, |
||||
RefCountedPtr<InputQueues> input_queues) |
||||
: ticket_(std::move(ticket)), input_queues_(std::move(input_queues)) {} |
||||
|
||||
ReadTicket(const ReadTicket&) = delete; |
||||
ReadTicket& operator=(const ReadTicket&) = delete; |
||||
ReadTicket(ReadTicket&& other) noexcept |
||||
: ticket_(std::move(other.ticket_)), |
||||
input_queues_(std::move(other.input_queues_)) {} |
||||
ReadTicket& operator=(ReadTicket&& other) noexcept { |
||||
ticket_ = std::move(other.ticket_); |
||||
input_queues_ = std::move(other.input_queues_); |
||||
return *this; |
||||
} |
||||
|
||||
~ReadTicket() { |
||||
if (input_queues_ != nullptr && ticket_.ok()) { |
||||
input_queues_->CancelTicket(*ticket_); |
||||
} |
||||
} |
||||
|
||||
auto Await() { |
||||
return If( |
||||
ticket_.ok(), |
||||
[&]() { |
||||
return |
||||
[ticket = *ticket_, input_queues = std::move(input_queues_)]() { |
||||
return input_queues->PollRead(ticket); |
||||
}; |
||||
}, |
||||
[&]() { |
||||
return Immediate(absl::StatusOr<SliceBuffer>(ticket_.status())); |
||||
}); |
||||
} |
||||
|
||||
private: |
||||
absl::StatusOr<uint64_t> ticket_; |
||||
RefCountedPtr<InputQueues> input_queues_; |
||||
}; |
||||
|
||||
struct ReadRequest { |
||||
size_t length; |
||||
uint64_t ticket; |
||||
|
||||
template <typename Sink> |
||||
friend void AbslStringify(Sink& sink, const ReadRequest& req) { |
||||
sink.Append(absl::StrCat("read#", req.ticket, ":", req.length, "b")); |
||||
} |
||||
}; |
||||
|
||||
explicit InputQueues(uint32_t num_connections); |
||||
|
||||
ReadTicket Read(uint32_t connection_id, size_t length) { |
||||
return ReadTicket(CreateTicket(connection_id, length), Ref()); |
||||
} |
||||
|
||||
auto Next(uint32_t connection_id) { |
||||
return [this, connection_id]() { return PollNext(connection_id); }; |
||||
} |
||||
|
||||
void CompleteRead(uint64_t ticket, absl::StatusOr<SliceBuffer> buffer); |
||||
|
||||
void CancelTicket(uint64_t ticket); |
||||
|
||||
private: |
||||
using ReadState = absl::variant<absl::StatusOr<SliceBuffer>, Waker>; |
||||
|
||||
absl::StatusOr<uint64_t> CreateTicket(uint32_t connection_id, size_t length); |
||||
Poll<absl::StatusOr<SliceBuffer>> PollRead(uint64_t ticket); |
||||
Poll<std::vector<ReadRequest>> PollNext(uint32_t connection_id); |
||||
|
||||
Mutex mu_; |
||||
uint64_t next_ticket_id_ ABSL_GUARDED_BY(mu_) = 0; |
||||
std::vector<std::vector<ReadRequest>> read_requests_ ABSL_GUARDED_BY(mu_); |
||||
std::vector<Waker> read_request_waker_; |
||||
absl::flat_hash_map<uint64_t, ReadState> outstanding_reads_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
}; |
||||
} // namespace data_endpoints_detail
|
||||
|
||||
// Collection of data connections.
|
||||
class DataEndpoints { |
||||
public: |
||||
using ReadTicket = data_endpoints_detail::InputQueues::ReadTicket; |
||||
|
||||
explicit DataEndpoints( |
||||
std::vector<PromiseEndpoint> endpoints, |
||||
grpc_event_engine::experimental::EventEngine* event_engine); |
||||
|
||||
// Try to queue output_buffer against a data endpoint.
|
||||
// Returns a promise that resolves to the data endpoint connection id
|
||||
// selected.
|
||||
// Connection ids returned by this class are 0 based (which is different
|
||||
// to how chaotic good communicates them on the wire - those are 1 based
|
||||
// to allow for the control channel identification)
|
||||
auto Write(SliceBuffer output_buffer) { |
||||
return output_buffers_->Write(std::move(output_buffer)); |
||||
} |
||||
|
||||
ReadTicket Read(uint32_t connection_id, uint32_t length) { |
||||
return input_queues_->Read(connection_id, length); |
||||
} |
||||
|
||||
bool empty() const { return parties_.empty(); } |
||||
|
||||
private: |
||||
RefCountedPtr<data_endpoints_detail::OutputBuffers> output_buffers_; |
||||
RefCountedPtr<data_endpoints_detail::InputQueues> input_queues_; |
||||
std::vector<RefCountedPtr<Party>> parties_; |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_DATA_ENDPOINTS_H
|
@ -0,0 +1,45 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/control_endpoint.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "gtest/gtest.h" |
||||
#include "test/core/call/yodel/yodel_test.h" |
||||
#include "test/core/transport/util/mock_promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ControlEndpointTest : public YodelTest { |
||||
protected: |
||||
using YodelTest::YodelTest; |
||||
}; |
||||
|
||||
#define CONTROL_ENDPOINT_TEST(name) YODEL_TEST(ControlEndpointTest, name) |
||||
|
||||
CONTROL_ENDPOINT_TEST(CanWrite) { |
||||
chaotic_good::testing::MockPromiseEndpoint ep(1234); |
||||
chaotic_good::ControlEndpoint control_endpoint(std::move(ep.promise_endpoint), |
||||
event_engine().get()); |
||||
ep.ExpectWrite( |
||||
{grpc_event_engine::experimental::Slice::FromCopiedString("hello")}, |
||||
nullptr); |
||||
SpawnTestSeqWithoutContext( |
||||
"write", |
||||
control_endpoint.Write(SliceBuffer(Slice::FromCopiedString("hello")))); |
||||
WaitForAllPendingWork(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,119 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/data_endpoints.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "gtest/gtest.h" |
||||
#include "test/core/call/yodel/yodel_test.h" |
||||
#include "test/core/transport/util/mock_promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class DataEndpointsTest : public YodelTest { |
||||
protected: |
||||
using YodelTest::YodelTest; |
||||
}; |
||||
|
||||
#define DATA_ENDPOINTS_TEST(name) YODEL_TEST(DataEndpointsTest, name) |
||||
|
||||
DATA_ENDPOINTS_TEST(CanWrite) { |
||||
chaotic_good::testing::MockPromiseEndpoint ep(1234); |
||||
std::vector<PromiseEndpoint> endpoints; |
||||
endpoints.push_back(std::move(ep.promise_endpoint)); |
||||
chaotic_good::DataEndpoints data_endpoints(std::move(endpoints), |
||||
event_engine().get()); |
||||
ep.ExpectWrite( |
||||
{grpc_event_engine::experimental::Slice::FromCopiedString("hello")}, |
||||
event_engine().get()); |
||||
SpawnTestSeqWithoutContext( |
||||
"write", |
||||
data_endpoints.Write(SliceBuffer(Slice::FromCopiedString("hello"))), |
||||
[](uint32_t id) { |
||||
EXPECT_EQ(id, 0); |
||||
return Empty{}; |
||||
}); |
||||
WaitForAllPendingWork(); |
||||
} |
||||
|
||||
DATA_ENDPOINTS_TEST(CanMultiWrite) { |
||||
chaotic_good::testing::MockPromiseEndpoint ep1(1234); |
||||
chaotic_good::testing::MockPromiseEndpoint ep2(1235); |
||||
std::vector<PromiseEndpoint> endpoints; |
||||
endpoints.push_back(std::move(ep1.promise_endpoint)); |
||||
endpoints.push_back(std::move(ep2.promise_endpoint)); |
||||
chaotic_good::DataEndpoints data_endpoints(std::move(endpoints), |
||||
event_engine().get()); |
||||
SliceBuffer writes1; |
||||
SliceBuffer writes2; |
||||
ep1.CaptureWrites(writes1, event_engine().get()); |
||||
ep2.CaptureWrites(writes2, event_engine().get()); |
||||
uint32_t write1_ep = 42; |
||||
uint32_t write2_ep = 42; |
||||
SpawnTestSeqWithoutContext( |
||||
"write", |
||||
data_endpoints.Write(SliceBuffer(Slice::FromCopiedString("hello"))), |
||||
[&write1_ep](uint32_t id) { |
||||
write1_ep = id; |
||||
return Empty{}; |
||||
}, |
||||
data_endpoints.Write(SliceBuffer(Slice::FromCopiedString("world"))), |
||||
[&write2_ep](uint32_t id) { |
||||
write2_ep = id; |
||||
return Empty{}; |
||||
}); |
||||
WaitForAllPendingWork(); |
||||
EXPECT_THAT(write1_ep, ::testing::AnyOf(0, 1)); |
||||
EXPECT_THAT(write2_ep, ::testing::AnyOf(0, 1)); |
||||
std::string expect[2]; |
||||
expect[write1_ep] += "hello"; |
||||
expect[write2_ep] += "world"; |
||||
EXPECT_EQ(writes1.JoinIntoString(), expect[0]); |
||||
EXPECT_EQ(writes2.JoinIntoString(), expect[1]); |
||||
} |
||||
|
||||
DATA_ENDPOINTS_TEST(CanRead) { |
||||
chaotic_good::testing::MockPromiseEndpoint ep(1234); |
||||
std::vector<PromiseEndpoint> endpoints; |
||||
endpoints.push_back(std::move(ep.promise_endpoint)); |
||||
chaotic_good::DataEndpoints data_endpoints(std::move(endpoints), |
||||
event_engine().get()); |
||||
ep.ExpectRead( |
||||
{grpc_event_engine::experimental::Slice::FromCopiedString("hello")}, |
||||
event_engine().get()); |
||||
SpawnTestSeqWithoutContext("read", data_endpoints.Read(0, 5).Await(), |
||||
[](absl::StatusOr<SliceBuffer> result) { |
||||
EXPECT_TRUE(result.ok()); |
||||
EXPECT_EQ(result->JoinIntoString(), "hello"); |
||||
return Empty{}; |
||||
}); |
||||
WaitForAllPendingWork(); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
Loading…
Reference in new issue