diff --git a/src/core/BUILD b/src/core/BUILD index f39d235d711..63b603d6bc6 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6371,6 +6371,8 @@ grpc_cc_library( deps = [ "activity", "event_engine_common", + "if", + "map", "poll", "slice", "slice_buffer", diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc index e9bc70a2e16..32e4d9d58f9 100644 --- a/src/core/lib/transport/promise_endpoint.cc +++ b/src/core/lib/transport/promise_endpoint.cc @@ -16,6 +16,7 @@ #include "src/core/lib/transport/promise_endpoint.h" +#include #include #include #include @@ -39,19 +40,13 @@ PromiseEndpoint::PromiseEndpoint( SliceBuffer already_received) : endpoint_(std::move(endpoint)) { GPR_ASSERT(endpoint_ != nullptr); + read_state_->endpoint = endpoint_; // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is // available. - grpc_slice_buffer_swap(read_buffer_.c_slice_buffer(), + grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(), already_received.c_slice_buffer()); } -PromiseEndpoint::~PromiseEndpoint() { - // Promise endpoint close when last write result has not been polled. - write_result_.reset(); - // Promise endpoint close when last read result has not been polled. - read_result_.reset(); -} - const grpc_event_engine::experimental::EventEngine::ResolvedAddress& PromiseEndpoint::GetPeerAddress() const { return endpoint_->GetPeerAddress(); @@ -62,60 +57,52 @@ PromiseEndpoint::GetLocalAddress() const { return endpoint_->GetLocalAddress(); } -void PromiseEndpoint::WriteCallback(absl::Status status) { - MutexLock lock(&write_mutex_); - write_result_ = status; - write_waker_.Wakeup(); -} +void PromiseEndpoint::ReadState::Complete(absl::Status status, + size_t num_bytes_requested) { + gpr_log(GPR_ERROR, "PromiseEndpoint::ReadState::Complete: status:%s", + status.ToString().c_str()); -void PromiseEndpoint::ReadCallback(absl::Status status, - size_t num_bytes_requested) { if (!status.ok()) { // Invalidates all previous reads. - pending_read_buffer_.Clear(); - read_buffer_.Clear(); - MutexLock lock(&read_mutex_); - read_result_ = status; - read_waker_.Wakeup(); - } else { - // Appends `pending_read_buffer_` to `read_buffer_`. - pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer( - pending_read_buffer_.Length(), read_buffer_); - GPR_DEBUG_ASSERT(pending_read_buffer_.Count() == 0u); - if (read_buffer_.Length() < num_bytes_requested) { - // A further read is needed. - // Set read args with number of bytes needed as hint. - grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs - read_args = {static_cast(num_bytes_requested - - read_buffer_.Length())}; - // If `Read()` returns true immediately, the callback will not be - // called. We still need to call our callback to pick up the result and - // maybe do further reads. - if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this, - std::placeholders::_1, num_bytes_requested), - &pending_read_buffer_, &read_args)) { - ReadCallback(absl::OkStatus(), num_bytes_requested); - } - } else { - MutexLock lock(&read_mutex_); - read_result_ = status; - read_waker_.Wakeup(); - } + pending_buffer.Clear(); + buffer.Clear(); + result = status; + auto w = std::move(waker); + complete.store(true, std::memory_order_release); + w.Wakeup(); + return; } -} - -void PromiseEndpoint::ReadByteCallback(absl::Status status) { - if (!status.ok()) { - // invalidates all previous reads - pending_read_buffer_.Clear(); - read_buffer_.Clear(); - } else { - pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer( - pending_read_buffer_.Length(), read_buffer_); + // Appends `pending_buffer` to `buffer`. + pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(), + buffer); + GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u); + if (buffer.Length() < num_bytes_requested) { + // A further read is needed. + // Set read args with number of bytes needed as hint. + grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs read_args = + {static_cast(num_bytes_requested - buffer.Length())}; + // If `Read()` returns true immediately, the callback will not be + // called. We still need to call our callback to pick up the result and + // maybe do further reads. + auto ep = endpoint.lock(); + if (ep == nullptr) { + Complete(absl::UnavailableError("Endpoint closed during read."), + num_bytes_requested); + return; + } + if (ep->Read( + [self = Ref(), num_bytes_requested](absl::Status status) { + self->Complete(std::move(status), num_bytes_requested); + }, + &pending_buffer, &read_args)) { + Complete(std::move(status), num_bytes_requested); + } + return; } - MutexLock lock(&read_mutex_); - read_result_ = status; - read_waker_.Wakeup(); + result = status; + auto w = std::move(waker); + complete.store(true, std::memory_order_release); + w.Wakeup(); } } // namespace grpc_core diff --git a/src/core/lib/transport/promise_endpoint.h b/src/core/lib/transport/promise_endpoint.h index 50358fceb1b..9627df75320 100644 --- a/src/core/lib/transport/promise_endpoint.h +++ b/src/core/lib/transport/promise_endpoint.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -37,6 +38,8 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/map.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" @@ -50,10 +53,12 @@ class PromiseEndpoint { std::unique_ptr endpoint, SliceBuffer already_received); - ~PromiseEndpoint(); - /// Prevent copying and moving of PromiseEndpoint. + ~PromiseEndpoint() = default; + /// Prevent copying of PromiseEndpoint; moving is fine. PromiseEndpoint(const PromiseEndpoint&) = delete; - PromiseEndpoint(PromiseEndpoint&&) = delete; + PromiseEndpoint& operator=(const PromiseEndpoint&) = delete; + PromiseEndpoint(PromiseEndpoint&&) = default; + PromiseEndpoint& operator=(PromiseEndpoint&&) = default; // Returns a promise that resolves to a `absl::Status` indicating the result // of the write operation. @@ -62,36 +67,37 @@ class PromiseEndpoint { // `Write()` before the previous write finishes. Doing that results in // undefined behavior. auto Write(SliceBuffer data) { - { - MutexLock lock(&write_mutex_); - // Assert previous write finishes. - GPR_ASSERT(!write_result_.has_value()); - // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is - // available. - grpc_slice_buffer_swap(write_buffer_.c_slice_buffer(), - data.c_slice_buffer()); - } + // Assert previous write finishes. + GPR_ASSERT(!write_state_->complete.load(std::memory_order_relaxed)); + // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is + // available. + grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(), + data.c_slice_buffer()); // If `Write()` returns true immediately, the callback will not be called. // We still need to call our callback to pick up the result. - if (endpoint_->Write(std::bind(&PromiseEndpoint::WriteCallback, this, - std::placeholders::_1), - &write_buffer_, - nullptr /* uses default arguments */)) { - WriteCallback(absl::OkStatus()); - } - return [this]() -> Poll { - MutexLock lock(&write_mutex_); - // If current write isn't finished return `Pending()`, else return write - // result. - if (!write_result_.has_value()) { - write_waker_ = Activity::current()->MakeNonOwningWaker(); - return Pending(); - } else { - const auto ret = *write_result_; - write_result_.reset(); - return ret; - } - }; + write_state_->waker = Activity::current()->MakeNonOwningWaker(); + const bool completed = endpoint_->Write( + [write_state = write_state_](absl::Status status) { + write_state->Complete(std::move(status)); + }, + &write_state_->buffer, nullptr /* uses default arguments */); + return If( + completed, + [this]() { + write_state_->waker = Waker(); + return []() { return absl::OkStatus(); }; + }, + [this]() { + return [write_state = write_state_]() -> Poll { + // If current write isn't finished return `Pending()`, else return + // write result. + if (!write_state->complete.load(std::memory_order_acquire)) { + return Pending(); + } + write_state->complete.store(false, std::memory_order_relaxed); + return std::move(write_state->result); + }; + }); } // Returns a promise that resolves to `SliceBuffer` with @@ -101,47 +107,62 @@ class PromiseEndpoint { // `Read()` before the previous read finishes. Doing that results in // undefined behavior. auto Read(size_t num_bytes) { - ReleasableMutexLock lock(&read_mutex_); // Assert previous read finishes. - GPR_ASSERT(!read_result_.has_value()); + GPR_ASSERT(!read_state_->complete.load(std::memory_order_relaxed)); // Should not have pending reads. - GPR_ASSERT(pending_read_buffer_.Count() == 0u); - if (read_buffer_.Length() < num_bytes) { - lock.Release(); + GPR_ASSERT(read_state_->pending_buffer.Count() == 0u); + bool complete = true; + while (read_state_->buffer.Length() < num_bytes) { // Set read args with hinted bytes. grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs - read_args = {static_cast(num_bytes)}; + read_args = { + static_cast(num_bytes - read_state_->buffer.Length())}; // If `Read()` returns true immediately, the callback will not be - // called. We still need to call our callback to pick up the result and - // maybe do further reads. - if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this, - std::placeholders::_1, num_bytes), - &pending_read_buffer_, &read_args)) { - ReadCallback(absl::OkStatus(), num_bytes); - } - } else { - read_result_ = absl::OkStatus(); - } - return [this, num_bytes]() -> Poll> { - MutexLock lock(&read_mutex_); - if (!read_result_.has_value()) { - // If current read isn't finished, return `Pending()`. - read_waker_ = Activity::current()->MakeNonOwningWaker(); - return Pending(); - } else if (!read_result_->ok()) { - // If read fails, return error. - const absl::Status ret = *read_result_; - read_result_.reset(); - return ret; + // called. + read_state_->waker = Activity::current()->MakeNonOwningWaker(); + if (endpoint_->Read( + [read_state = read_state_, num_bytes](absl::Status status) { + read_state->Complete(std::move(status), num_bytes); + }, + &read_state_->pending_buffer, &read_args)) { + read_state_->waker = Waker(); + read_state_->pending_buffer.MoveFirstNBytesIntoSliceBuffer( + read_state_->pending_buffer.Length(), read_state_->buffer); + GPR_DEBUG_ASSERT(read_state_->pending_buffer.Count() == 0u); } else { - // If read succeeds, return `SliceBuffer` with `num_bytes` bytes. - SliceBuffer ret; - grpc_slice_buffer_move_first(read_buffer_.c_slice_buffer(), num_bytes, - ret.c_slice_buffer()); - read_result_.reset(); - return std::move(ret); + complete = false; + break; } - }; + } + return If( + complete, + [this, num_bytes]() { + SliceBuffer ret; + grpc_slice_buffer_move_first(read_state_->buffer.c_slice_buffer(), + num_bytes, ret.c_slice_buffer()); + return [ret = std::move( + ret)]() mutable -> Poll> { + return std::move(ret); + }; + }, + [this, num_bytes]() { + return [read_state = read_state_, + num_bytes]() -> Poll> { + if (!read_state->complete.load(std::memory_order_acquire)) { + return Pending(); + } + // If read succeeds, return `SliceBuffer` with `num_bytes` bytes. + if (read_state->result.ok()) { + SliceBuffer ret; + grpc_slice_buffer_move_first(read_state->buffer.c_slice_buffer(), + num_bytes, ret.c_slice_buffer()); + read_state->complete.store(false, std::memory_order_relaxed); + return ret; + } + read_state->complete.store(false, std::memory_order_relaxed); + return std::move(read_state->result); + }; + }); } // Returns a promise that resolves to `Slice` with at least @@ -151,93 +172,20 @@ class PromiseEndpoint { // `ReadSlice()` before the previous read finishes. Doing that results in // undefined behavior. auto ReadSlice(size_t num_bytes) { - ReleasableMutexLock lock(&read_mutex_); - // Assert previous read finishes. - GPR_ASSERT(!read_result_.has_value()); - // Should not have pending reads. - GPR_ASSERT(pending_read_buffer_.Count() == 0u); - if (read_buffer_.Length() < num_bytes) { - lock.Release(); - // Set read args with num_bytes as hint. - grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs - read_args = {static_cast(num_bytes)}; - // If `Read()` returns true immediately, the callback will not be - // called. We still need to call our callback to pick up the result - // and maybe do further reads. - if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this, - std::placeholders::_1, num_bytes), - &pending_read_buffer_, &read_args)) { - ReadCallback(absl::OkStatus(), num_bytes); - } - } else { - read_result_ = absl::OkStatus(); - } - return [this, num_bytes]() -> Poll> { - MutexLock lock(&read_mutex_); - if (!read_result_.has_value()) { - // If current read isn't finished, return `Pending()`. - read_waker_ = Activity::current()->MakeNonOwningWaker(); - return Pending(); - } else if (!read_result_->ok()) { - // If read fails, return error. - const auto ret = *read_result_; - read_result_.reset(); - return ret; - } - // If read succeeds, return `Slice` with `num_bytes`. - else if (read_buffer_.RefSlice(0).size() == num_bytes) { - read_result_.reset(); - return Slice(read_buffer_.TakeFirst().TakeCSlice()); - } else { - // TODO(ladynana): avoid memcpy when read_buffer_.RefSlice(0).size() is - // different from `num_bytes`. - MutableSlice ret = MutableSlice::CreateUninitialized(num_bytes); - read_buffer_.MoveFirstNBytesIntoBuffer(num_bytes, ret.data()); - read_result_.reset(); - return Slice(std::move(ret)); - } - }; + return Map(Read(num_bytes), + [](absl::StatusOr buffer) -> absl::StatusOr { + if (!buffer.ok()) return buffer.status(); + return buffer->JoinIntoSlice(); + }); } // Returns a promise that resolves to a byte with type `uint8_t`. auto ReadByte() { - ReleasableMutexLock lock(&read_mutex_); - // Assert previous read finishes. - GPR_ASSERT(!read_result_.has_value()); - // Should not have pending reads. - GPR_ASSERT(pending_read_buffer_.Count() == 0u); - if (read_buffer_.Length() == 0u) { - lock.Release(); - // If `Read()` returns true immediately, the callback will not be called. - // We still need to call our callback to pick up the result and maybe do - // further reads. - if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadByteCallback, this, - std::placeholders::_1), - &pending_read_buffer_, nullptr)) { - ReadByteCallback(absl::OkStatus()); - } - } else { - read_result_ = absl::OkStatus(); - } - return [this]() -> Poll> { - MutexLock lock(&read_mutex_); - if (!read_result_.has_value()) { - // If current read isn't finished, return `Pending()`. - read_waker_ = Activity::current()->MakeNonOwningWaker(); - return Pending(); - } else if (!read_result_->ok()) { - // If read fails, return error. - const auto ret = *read_result_; - read_result_.reset(); - return ret; - } else { - // If read succeeds, return a byte with type `uint8_t`. - uint8_t ret = 0u; - read_buffer_.MoveFirstNBytesIntoBuffer(1, &ret); - read_result_.reset(); - return ret; - } - }; + return Map(ReadSlice(1), + [](absl::StatusOr slice) -> absl::StatusOr { + if (!slice.ok()) return slice.status(); + return (*slice)[0]; + }); } const grpc_event_engine::experimental::EventEngine::ResolvedAddress& @@ -246,49 +194,52 @@ class PromiseEndpoint { GetLocalAddress() const; private: - std::unique_ptr + std::shared_ptr endpoint_; - // Data used for writes. - // TODO(ladynana): Remove this write_mutex_ and use `atomic - // write_complete_` as write guard. - Mutex write_mutex_; - // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the - // memory behind the buffer is not lost. - grpc_event_engine::experimental::SliceBuffer write_buffer_; - // Used for store the result from `EventEngine::Endpoint::Write()`. - // `write_result_.has_value() == true` means the value has not been polled - // yet. - absl::optional write_result_ ABSL_GUARDED_BY(write_mutex_); - Waker write_waker_ ABSL_GUARDED_BY(write_mutex_); - - // Callback function used for `EventEngine::Endpoint::Write()`. - void WriteCallback(absl::Status status); - - // Data used for reads - // TODO(ladynana): Remove this read_mutex_ and use `atomic - // read_complete_` as read guard. - Mutex read_mutex_; - // Read buffer used for storing successful reads given by - // `EventEngine::Endpoint` but not yet requested by the caller. - grpc_event_engine::experimental::SliceBuffer read_buffer_; - // Buffer used to accept data from `EventEngine::Endpoint`. - // Every time after a successful read from `EventEngine::Endpoint`, the data - // in this buffer should be appended to `read_buffer_`. - grpc_event_engine::experimental::SliceBuffer pending_read_buffer_; - // Used for store the result from `EventEngine::Endpoint::Read()`. - // `read_result_.has_value() == true` means the value has not been polled - // yet. - absl::optional read_result_ ABSL_GUARDED_BY(read_mutex_); - Waker read_waker_ ABSL_GUARDED_BY(read_mutex_); + struct ReadState : public RefCounted { + std::atomic complete{false}; + // Read buffer used for storing successful reads given by + // `EventEngine::Endpoint` but not yet requested by the caller. + grpc_event_engine::experimental::SliceBuffer buffer; + // Buffer used to accept data from `EventEngine::Endpoint`. + // Every time after a successful read from `EventEngine::Endpoint`, the data + // in this buffer should be appended to `buffer`. + grpc_event_engine::experimental::SliceBuffer pending_buffer; + // Used for store the result from `EventEngine::Endpoint::Read()`. + absl::Status result; + Waker waker; + // Backing endpoint: we keep this on ReadState as reads will need to + // repeatedly read until the target size is hit, and we don't want to access + // the main object during this dance (indeed the main object may be + // deleted). + std::weak_ptr + endpoint; + + void Complete(absl::Status status, size_t num_bytes_requested); + }; + + struct WriteState : public RefCounted { + std::atomic complete{false}; + // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the + // memory behind the buffer is not lost. + grpc_event_engine::experimental::SliceBuffer buffer; + // Used for store the result from `EventEngine::Endpoint::Write()`. + absl::Status result; + Waker waker; + + void Complete(absl::Status status) { + result = std::move(status); + auto w = std::move(waker); + complete.store(true, std::memory_order_release); + w.Wakeup(); + } + }; - // Callback function used for `EventEngine::Endpoint::Read()` shared between - // `Read()` and `ReadSlice()`. - void ReadCallback(absl::Status status, size_t num_bytes_requested); - // Callback function used for `EventEngine::Endpoint::Read()` in `ReadByte()`. - void ReadByteCallback(absl::Status status); + RefCountedPtr write_state_ = MakeRefCounted(); + RefCountedPtr read_state_ = MakeRefCounted(); }; } // namespace grpc_core -#endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H \ No newline at end of file +#endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H diff --git a/test/core/transport/promise_endpoint_test.cc b/test/core/transport/promise_endpoint_test.cc index 09478d0dd27..760a2add6c7 100644 --- a/test/core/transport/promise_endpoint_test.cc +++ b/test/core/transport/promise_endpoint_test.cc @@ -37,6 +37,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "test/core/promise/test_wakeup_schedulers.h" +using testing::AtMost; using testing::MockFunction; using testing::Return; using testing::ReturnRef; @@ -108,18 +109,18 @@ class PromiseEndpointTest : public ::testing::Test { PromiseEndpointTest() : mock_endpoint_ptr_(new StrictMock()), mock_endpoint_(*mock_endpoint_ptr_), - promise_endpoint_( + promise_endpoint_(std::make_unique( std::unique_ptr< grpc_event_engine::experimental::EventEngine::Endpoint>( mock_endpoint_ptr_), - SliceBuffer()) {} + SliceBuffer())) {} private: MockEndpoint* mock_endpoint_ptr_; protected: MockEndpoint& mock_endpoint_; - PromiseEndpoint promise_endpoint_; + std::unique_ptr promise_endpoint_; const absl::Status kDummyErrorStatus = absl::ErrnoToStatus(5566, "just an error"); @@ -140,7 +141,7 @@ TEST_F(PromiseEndpointTest, OneReadSuccessful) { buffer->Append(std::move(slice)); return true; })); - auto promise = promise_endpoint_.Read(kBuffer.size()); + auto promise = promise_endpoint_->Read(kBuffer.size()); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -151,7 +152,7 @@ TEST_F(PromiseEndpointTest, OneReadSuccessful) { TEST_F(PromiseEndpointTest, OneReadFailed) { MockActivity activity; activity.Activate(); - EXPECT_CALL(activity, WakeupRequested).Times(0); + EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1)); EXPECT_CALL(mock_endpoint_, Read) .WillOnce(WithArgs<0>( [this](absl::AnyInvocable read_callback) { @@ -159,7 +160,7 @@ TEST_F(PromiseEndpointTest, OneReadFailed) { read_callback(this->kDummyErrorStatus); return false; })); - auto promise = promise_endpoint_.Read(kDummyRequestSize); + auto promise = promise_endpoint_->Read(kDummyRequestSize); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_FALSE(poll.value().ok()); @@ -194,14 +195,14 @@ TEST_F(PromiseEndpointTest, MutipleReadsSuccessful) { return true; })); { - auto promise = promise_endpoint_.Read(4u); + auto promise = promise_endpoint_->Read(4u); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4)); } { - auto promise = promise_endpoint_.Read(4u); + auto promise = promise_endpoint_->Read(4u); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -229,7 +230,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) { // Return false to mock EventEngine read not finish.. return false; })); - auto promise = promise_endpoint_.Read(kBuffer.size()); + auto promise = promise_endpoint_->Read(kBuffer.size()); EXPECT_TRUE(promise().pending()); // Mock EventEngine read succeeds, and promise resolves. read_callback(absl::OkStatus()); @@ -252,7 +253,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadFailed) { // Return false to mock EventEngine read not finish. return false; })); - auto promise = promise_endpoint_.Read(kDummyRequestSize); + auto promise = promise_endpoint_->Read(kDummyRequestSize); EXPECT_TRUE(promise().pending()); // Mock EventEngine read fails, and promise returns error. read_callback(kDummyErrorStatus); @@ -277,7 +278,7 @@ TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) { buffer->Append(std::move(slice)); return true; })); - auto promise = promise_endpoint_.ReadSlice(kBuffer.size()); + auto promise = promise_endpoint_->ReadSlice(kBuffer.size()); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -288,7 +289,7 @@ TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) { TEST_F(PromiseEndpointTest, OneReadSliceFailed) { MockActivity activity; activity.Activate(); - EXPECT_CALL(activity, WakeupRequested).Times(0); + EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1)); EXPECT_CALL(mock_endpoint_, Read) .WillOnce(WithArgs<0>( [this](absl::AnyInvocable read_callback) { @@ -296,7 +297,7 @@ TEST_F(PromiseEndpointTest, OneReadSliceFailed) { read_callback(this->kDummyErrorStatus); return false; })); - auto promise = promise_endpoint_.ReadSlice(kDummyRequestSize); + auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_FALSE(poll.value().ok()); @@ -331,14 +332,14 @@ TEST_F(PromiseEndpointTest, MutipleReadSlicesSuccessful) { return true; })); { - auto promise = promise_endpoint_.ReadSlice(4u); + auto promise = promise_endpoint_->ReadSlice(4u); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4)); } { - auto promise = promise_endpoint_.ReadSlice(4u); + auto promise = promise_endpoint_->ReadSlice(4u); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -366,7 +367,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) { // Return false to mock EventEngine read not finish.. return false; })); - auto promise = promise_endpoint_.ReadSlice(kBuffer.size()); + auto promise = promise_endpoint_->ReadSlice(kBuffer.size()); EXPECT_TRUE(promise().pending()); // Mock EventEngine read succeeds, and promise resolves. read_callback(absl::OkStatus()); @@ -389,7 +390,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) { // Return false to mock EventEngine read not finish. return false; })); - auto promise = promise_endpoint_.ReadSlice(kDummyRequestSize); + auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize); EXPECT_TRUE(promise().pending()); // Mock EventEngine read fails, and promise returns error. read_callback(kDummyErrorStatus); @@ -414,7 +415,7 @@ TEST_F(PromiseEndpointTest, OneReadByteSuccessful) { buffer->Append(std::move(slice)); return true; })); - auto promise = promise_endpoint_.ReadByte(); + auto promise = promise_endpoint_->ReadByte(); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -425,7 +426,7 @@ TEST_F(PromiseEndpointTest, OneReadByteSuccessful) { TEST_F(PromiseEndpointTest, OneReadByteFailed) { MockActivity activity; activity.Activate(); - EXPECT_CALL(activity, WakeupRequested).Times(0); + EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1)); EXPECT_CALL(mock_endpoint_, Read) .WillOnce(WithArgs<0>( [this](absl::AnyInvocable read_callback) { @@ -433,7 +434,7 @@ TEST_F(PromiseEndpointTest, OneReadByteFailed) { read_callback(this->kDummyErrorStatus); return false; })); - auto promise = promise_endpoint_.ReadByte(); + auto promise = promise_endpoint_->ReadByte(); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_FALSE(poll.value().ok()); @@ -456,7 +457,7 @@ TEST_F(PromiseEndpointTest, MutipleReadBytesSuccessful) { return true; })); for (size_t i = 0; i < kBuffer.size(); ++i) { - auto promise = promise_endpoint_.ReadByte(); + auto promise = promise_endpoint_->ReadByte(); auto poll = promise(); ASSERT_TRUE(poll.ready()); ASSERT_TRUE(poll.value().ok()); @@ -484,7 +485,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) { // Return false to mock EventEngine read not finish.. return false; })); - auto promise = promise_endpoint_.ReadByte(); + auto promise = promise_endpoint_->ReadByte(); ASSERT_TRUE(promise().pending()); // Mock EventEngine read succeeds, and promise resolves. read_callback(absl::OkStatus()); @@ -495,7 +496,7 @@ TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) { activity.Deactivate(); } -TEST_F(PromiseEndpointTest, OnePengingReadByteFailed) { +TEST_F(PromiseEndpointTest, OnePendingReadByteFailed) { MockActivity activity; absl::AnyInvocable read_callback; activity.Activate(); @@ -507,7 +508,7 @@ TEST_F(PromiseEndpointTest, OnePengingReadByteFailed) { // Return false to mock EventEngine read not finish. return false; })); - auto promise = promise_endpoint_.ReadByte(); + auto promise = promise_endpoint_->ReadByte(); ASSERT_TRUE(promise().pending()); // Mock EventEngine read fails, and promise returns error. read_callback(kDummyErrorStatus); @@ -523,7 +524,7 @@ TEST_F(PromiseEndpointTest, OneWriteSuccessful) { activity.Activate(); EXPECT_CALL(activity, WakeupRequested).Times(0); EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true)); - auto promise = promise_endpoint_.Write(SliceBuffer()); + auto promise = promise_endpoint_->Write(SliceBuffer()); auto poll = promise(); ASSERT_TRUE(poll.ready()); EXPECT_EQ(absl::OkStatus(), poll.value()); @@ -533,14 +534,14 @@ TEST_F(PromiseEndpointTest, OneWriteSuccessful) { TEST_F(PromiseEndpointTest, OneWriteFailed) { MockActivity activity; activity.Activate(); - EXPECT_CALL(activity, WakeupRequested).Times(0); + EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1)); EXPECT_CALL(mock_endpoint_, Write) .WillOnce( WithArgs<0>([this](absl::AnyInvocable on_write) { on_write(this->kDummyErrorStatus); return false; })); - auto promise = promise_endpoint_.Write(SliceBuffer()); + auto promise = promise_endpoint_->Write(SliceBuffer()); auto poll = promise(); ASSERT_TRUE(poll.ready()); EXPECT_EQ(kDummyErrorStatus, poll.value()); @@ -563,7 +564,7 @@ TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) { // Return false to mock EventEngine write pending.. return false; })); - auto promise = promise_endpoint_.Write(SliceBuffer()); + auto promise = promise_endpoint_->Write(SliceBuffer()); EXPECT_TRUE(promise().pending()); // Mock EventEngine write succeeds, and promise resolves. write_callback(absl::OkStatus()); @@ -585,7 +586,7 @@ TEST_F(PromiseEndpointTest, OnePendingWriteFailed) { // Return false to mock EventEngine write pending.. return false; })); - auto promise = promise_endpoint_.Write(SliceBuffer()); + auto promise = promise_endpoint_->Write(SliceBuffer()); EXPECT_TRUE(promise().pending()); write_callback(kDummyErrorStatus); auto poll = promise(); @@ -600,7 +601,7 @@ TEST_F(PromiseEndpointTest, GetPeerAddress) { reinterpret_cast(raw_test_address), sizeof(raw_test_address)); EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address)); - auto peer_address = promise_endpoint_.GetPeerAddress(); + auto peer_address = promise_endpoint_->GetPeerAddress(); EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(), test_address.size())); EXPECT_EQ(test_address.size(), peer_address.size()); @@ -613,12 +614,43 @@ TEST_F(PromiseEndpointTest, GetLocalAddress) { sizeof(raw_test_address)); EXPECT_CALL(mock_endpoint_, GetLocalAddress) .WillOnce(ReturnRef(test_address)); - auto local_address = promise_endpoint_.GetLocalAddress(); + auto local_address = promise_endpoint_->GetLocalAddress(); EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(), test_address.size())); EXPECT_EQ(test_address.size(), local_address.size()); } +TEST_F(PromiseEndpointTest, DestroyedBeforeReadCompletes) { + MockActivity activity; + const std::string kBuffer = {0x01}; + absl::AnyInvocable read_callback; + activity.Activate(); + EXPECT_CALL(activity, WakeupRequested).Times(1); + EXPECT_CALL(mock_endpoint_, Read) + .WillOnce(WithArgs<0, 1>( + [&read_callback, &kBuffer]( + absl::AnyInvocable on_read, + grpc_event_engine::experimental::SliceBuffer* buffer) { + read_callback = std::move(on_read); + // Schedule mock_endpoint to read buffer. + grpc_event_engine::experimental::Slice slice( + grpc_slice_from_cpp_string(kBuffer)); + buffer->Append(std::move(slice)); + // Return false to mock EventEngine read not finish.. + return false; + })); + auto promise = promise_endpoint_->ReadByte(); + ASSERT_TRUE(promise().pending()); + promise_endpoint_.reset(); + // Mock EventEngine read succeeds, and promise resolves. + read_callback(absl::OkStatus()); + auto poll = promise(); + ASSERT_TRUE(poll.ready()); + ASSERT_TRUE(poll.value().ok()); + EXPECT_EQ(*poll.value(), kBuffer[0]); + activity.Deactivate(); +} + class MultiplePromiseEndpointTest : public ::testing::Test { public: MultiplePromiseEndpointTest() diff --git a/tools/distrib/fix_build_deps.py b/tools/distrib/fix_build_deps.py index d75aabd6679..52e6eb3595b 100755 --- a/tools/distrib/fix_build_deps.py +++ b/tools/distrib/fix_build_deps.py @@ -426,7 +426,7 @@ for dirname in [ "grpc_proto_fuzzer": grpc_cc_library, "grpc_proto_library": grpc_proto_library, "select": lambda d: d["//conditions:default"], - "glob": lambda files: None, + "glob": lambda files, **kwargs: None, "grpc_end2end_tests": lambda: None, "grpc_upb_proto_library": lambda name, **kwargs: None, "grpc_upb_proto_reflection_library": lambda name, **kwargs: None,