pull/35400/head
Craig Tiller 1 year ago
parent edfe26fe2a
commit 14587568e5
  1. 36
      src/core/lib/transport/promise_endpoint.h

@ -69,24 +69,26 @@ class PromiseEndpoint {
auto Write(SliceBuffer data) {
// Assert previous write finishes.
GPR_ASSERT(!write_state_->complete.load(std::memory_order_relaxed));
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(),
data.c_slice_buffer());
// If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result.
write_state_->waker = Activity::current()->MakeNonOwningWaker();
const bool completed = endpoint_->Write(
[write_state = write_state_](absl::Status status) {
write_state->Complete(std::move(status));
},
&write_state_->buffer, nullptr /* uses default arguments */);
bool completed;
if (data.Length() == 0) {
completed = true;
} else {
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(),
data.c_slice_buffer());
// If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result.
write_state_->waker = Activity::current()->MakeNonOwningWaker();
completed = endpoint_->Write(
[write_state = write_state_](absl::Status status) {
write_state->Complete(std::move(status));
},
&write_state_->buffer, nullptr /* uses default arguments */);
if (completed) write_state_->waker = Waker();
}
return If(
completed,
[this]() {
write_state_->waker = Waker();
return []() { return absl::OkStatus(); };
},
completed, []() { return []() { return absl::OkStatus(); }; },
[this]() {
return [write_state = write_state_]() -> Poll<absl::Status> {
// If current write isn't finished return `Pending()`, else return

Loading…
Cancel
Save