pull/35409/head
Craig Tiller 1 year ago
parent 023b377513
commit 4f9588101a
  1. 19
      src/core/lib/transport/promise_endpoint.cc
  2. 12
      src/core/lib/transport/promise_endpoint.h

@ -37,9 +37,10 @@ namespace grpc_core {
PromiseEndpoint::PromiseEndpoint( PromiseEndpoint::PromiseEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint, endpoint,
SliceBuffer already_received) { SliceBuffer already_received)
GPR_ASSERT(endpoint != nullptr); : endpoint_(std::move(endpoint)) {
read_state_->endpoint = std::move(endpoint); GPR_ASSERT(endpoint_ != nullptr);
read_state_->endpoint = endpoint_;
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available. // available.
grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(), grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(),
@ -48,12 +49,12 @@ PromiseEndpoint::PromiseEndpoint(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
PromiseEndpoint::GetPeerAddress() const { PromiseEndpoint::GetPeerAddress() const {
return read_state_->endpoint->GetPeerAddress(); return endpoint_->GetPeerAddress();
} }
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
PromiseEndpoint::GetLocalAddress() const { PromiseEndpoint::GetLocalAddress() const {
return read_state_->endpoint->GetLocalAddress(); return endpoint_->GetLocalAddress();
} }
void PromiseEndpoint::ReadState::Complete(absl::Status status, void PromiseEndpoint::ReadState::Complete(absl::Status status,
@ -83,7 +84,13 @@ void PromiseEndpoint::ReadState::Complete(absl::Status status,
// If `Read()` returns true immediately, the callback will not be // If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and // called. We still need to call our callback to pick up the result and
// maybe do further reads. // maybe do further reads.
if (endpoint->Read( auto ep = endpoint.lock();
if (ep == nullptr) {
Complete(absl::UnavailableError("Endpoint closed during read."),
num_bytes_requested);
return;
}
if (ep->Read(
[self = Ref(), num_bytes_requested](absl::Status status) { [self = Ref(), num_bytes_requested](absl::Status status) {
self->Complete(std::move(status), num_bytes_requested); self->Complete(std::move(status), num_bytes_requested);
}, },

@ -76,7 +76,7 @@ class PromiseEndpoint {
// If `Write()` returns true immediately, the callback will not be called. // If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result. // We still need to call our callback to pick up the result.
write_state_->waker = Activity::current()->MakeNonOwningWaker(); write_state_->waker = Activity::current()->MakeNonOwningWaker();
const bool completed = read_state_->endpoint->Write( const bool completed = endpoint_->Write(
[write_state = write_state_](absl::Status status) { [write_state = write_state_](absl::Status status) {
write_state->Complete(std::move(status)); write_state->Complete(std::move(status));
}, },
@ -120,7 +120,7 @@ class PromiseEndpoint {
// If `Read()` returns true immediately, the callback will not be // If `Read()` returns true immediately, the callback will not be
// called. // called.
read_state_->waker = Activity::current()->MakeNonOwningWaker(); read_state_->waker = Activity::current()->MakeNonOwningWaker();
if (read_state_->endpoint->Read( if (endpoint_->Read(
[read_state = read_state_, num_bytes](absl::Status status) { [read_state = read_state_, num_bytes](absl::Status status) {
read_state->Complete(std::move(status), num_bytes); read_state->Complete(std::move(status), num_bytes);
}, },
@ -194,6 +194,9 @@ class PromiseEndpoint {
GetLocalAddress() const; GetLocalAddress() const;
private: private:
std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint_;
struct ReadState : public RefCounted<ReadState> { struct ReadState : public RefCounted<ReadState> {
std::atomic<bool> complete{false}; std::atomic<bool> complete{false};
// Read buffer used for storing successful reads given by // Read buffer used for storing successful reads given by
@ -208,8 +211,9 @@ class PromiseEndpoint {
Waker waker; Waker waker;
// Backing endpoint: we keep this on ReadState as reads will need to // Backing endpoint: we keep this on ReadState as reads will need to
// repeatedly read until the target size is hit, and we don't want to access // repeatedly read until the target size is hit, and we don't want to access
// the main object during this. // the main object during this dance (indeed the main object may be
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> // deleted).
std::weak_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint; endpoint;
void Complete(absl::Status status, size_t num_bytes_requested); void Complete(absl::Status status, size_t num_bytes_requested);

Loading…
Cancel
Save