[chaotic-good] Implement a promise-based endpoint for chaotic-good transport to read & write to EventEngine::Endpoint. (#33257)

This PR is continuing the work of prototyping in
https://github.com/grpc/grpc/pull/31592, and the design doc is at
[link](https://docs.google.com/document/d/1vRy0yse-d1heLQRmLPo_0figsTPXJAnNN84tBCAne_s/edit?pli=1&resourcekey=0-JvUPdq0LaZq8gMkgT9Pzlw#heading=h.qgvc5vr55ytg).

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33630/head
nanahpang 2 years ago committed by GitHub
parent 9984f1bd5b
commit 0cc9d16e9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      CMakeLists.txt
  2. 13
      build_autogenerated.yaml
  3. 26
      src/core/BUILD
  4. 126
      src/core/lib/transport/promise_endpoint.cc
  5. 297
      src/core/lib/transport/promise_endpoint.h
  6. 17
      test/core/transport/BUILD
  7. 857
      test/core/transport/promise_endpoint_test.cc
  8. 24
      tools/run_tests/generated/tests.json

39
CMakeLists.txt generated

@ -1160,6 +1160,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx posix_event_engine_test)
endif()
add_dependencies(buildtests_cxx prioritized_race_test)
add_dependencies(buildtests_cxx promise_endpoint_test)
add_dependencies(buildtests_cxx promise_factory_test)
add_dependencies(buildtests_cxx promise_map_test)
add_dependencies(buildtests_cxx promise_test)
@ -18766,6 +18767,44 @@ target_link_libraries(prioritized_race_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(promise_endpoint_test
src/core/lib/transport/promise_endpoint.cc
test/core/transport/promise_endpoint_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_compile_features(promise_endpoint_test PUBLIC cxx_std_14)
target_include_directories(promise_endpoint_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(promise_endpoint_test
${_gRPC_BASELIB_LIBRARIES}
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc
)
endif()
if(gRPC_BUILD_TESTS)

@ -11513,6 +11513,19 @@ targets:
deps:
- gpr
uses_polling: false
- name: promise_endpoint_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/promise/join.h
- src/core/lib/transport/promise_endpoint.h
- test/core/promise/test_wakeup_schedulers.h
src:
- src/core/lib/transport/promise_endpoint.cc
- test/core/transport/promise_endpoint_test.cc
deps:
- grpc
- name: promise_factory_test
gtest: true
build: test

@ -5768,6 +5768,32 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_promise_endpoint",
srcs = [
"lib/transport/promise_endpoint.cc",
],
external_deps = [
"absl/base:core_headers",
"absl/status",
"absl/status:statusor",
"absl/types:optional",
],
language = "c++",
public_hdrs = [
"lib/transport/promise_endpoint.h",
],
deps = [
"activity",
"event_engine_common",
"poll",
"slice",
"slice_buffer",
"//:event_engine_base_hdrs",
"//:gpr",
],
)
### UPB Targets
grpc_upb_proto_library(

@ -0,0 +1,126 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/transport/promise_endpoint.h"
#include <functional>
#include <memory>
#include <utility>
#include "absl/status/status.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice_buffer.h"
namespace grpc_core {
PromiseEndpoint::PromiseEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint,
SliceBuffer already_received)
: endpoint_(std::move(endpoint)) {
GPR_ASSERT(endpoint_ != nullptr);
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap(read_buffer_.c_slice_buffer(),
already_received.c_slice_buffer());
}
PromiseEndpoint::~PromiseEndpoint() {
// Last write result has not been polled.
GPR_ASSERT(!write_result_.has_value());
// Last read result has not been polled.
GPR_ASSERT(!read_result_.has_value());
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
PromiseEndpoint::GetPeerAddress() const {
return endpoint_->GetPeerAddress();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
PromiseEndpoint::GetLocalAddress() const {
return endpoint_->GetLocalAddress();
}
void PromiseEndpoint::WriteCallback(absl::Status status) {
MutexLock lock(&write_mutex_);
write_result_ = status;
write_waker_.Wakeup();
}
void PromiseEndpoint::ReadCallback(
absl::Status status, size_t num_bytes_requested,
absl::optional<
struct grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs>
requested_read_args) {
if (!status.ok()) {
// Invalidates all previous reads.
pending_read_buffer_.Clear();
read_buffer_.Clear();
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
} else {
// Appends `pending_read_buffer_` to `read_buffer_`.
pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer(
pending_read_buffer_.Length(), read_buffer_);
GPR_DEBUG_ASSERT(pending_read_buffer_.Count() == 0u);
if (read_buffer_.Length() < num_bytes_requested) {
// A further read is needed.
// Set read args with number of bytes needed as hint.
requested_read_args = {
static_cast<int64_t>(num_bytes_requested - read_buffer_.Length())};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this,
std::placeholders::_1, num_bytes_requested,
requested_read_args),
&pending_read_buffer_,
&(requested_read_args.value()))) {
ReadCallback(absl::OkStatus(), num_bytes_requested,
requested_read_args);
}
} else {
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
}
}
}
void PromiseEndpoint::ReadByteCallback(absl::Status status) {
if (!status.ok()) {
// invalidates all previous reads
pending_read_buffer_.Clear();
read_buffer_.Clear();
} else {
pending_read_buffer_.MoveFirstNBytesIntoSliceBuffer(
pending_read_buffer_.Length(), read_buffer_);
}
MutexLock lock(&read_mutex_);
read_result_ = status;
read_waker_.Wakeup();
}
} // namespace grpc_core

@ -0,0 +1,297 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <memory>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/slice.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
namespace grpc_core {
// Wrapper around event engine endpoint that provides a promise like API.
class PromiseEndpoint {
public:
PromiseEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint,
SliceBuffer already_received);
~PromiseEndpoint();
// Returns a promise that resolves to a `absl::Status` indicating the result
// of the write operation.
//
// Concurrent writes are not supported, which means callers should not call
// `Write()` before the previous write finishes. Doing that results in
// undefined behavior.
auto Write(SliceBuffer data) {
{
MutexLock lock(&write_mutex_);
// Assert previous write finishes.
GPR_ASSERT(!write_result_.has_value());
// TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
// available.
grpc_slice_buffer_swap(write_buffer_.c_slice_buffer(),
data.c_slice_buffer());
}
// If `Write()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result.
if (endpoint_->Write(std::bind(&PromiseEndpoint::WriteCallback, this,
std::placeholders::_1),
&write_buffer_,
nullptr /* uses default arguments */)) {
WriteCallback(absl::OkStatus());
}
return [this]() -> Poll<absl::Status> {
MutexLock lock(&write_mutex_);
// If current write isn't finished return `Pending()`, else return write
// result.
if (!write_result_.has_value()) {
write_waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
} else {
const auto ret = *write_result_;
write_result_.reset();
return ret;
}
};
}
// Returns a promise that resolves to `SliceBuffer` with
// `num_bytes` bytes.
//
// Concurrent reads are not supported, which means callers should not call
// `Read()` before the previous read finishes. Doing that results in
// undefined behavior.
auto Read(size_t num_bytes) {
ReleasableMutexLock lock(&read_mutex_);
// Assert previous read finishes.
GPR_ASSERT(!read_result_.has_value());
// Should not have pending reads.
GPR_ASSERT(pending_read_buffer_.Count() == 0u);
if (read_buffer_.Length() < num_bytes) {
lock.Release();
// Set read args with hinted bytes.
grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
read_args;
read_args.read_hint_bytes = num_bytes;
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result and
// maybe do further reads.
if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadCallback, this,
std::placeholders::_1, num_bytes,
absl::nullopt /* uses default arguments */),
&pending_read_buffer_, &read_args)) {
ReadCallback(absl::OkStatus(), num_bytes, read_args);
}
} else {
read_result_ = absl::OkStatus();
}
return [this, num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
MutexLock lock(&read_mutex_);
if (!read_result_.has_value()) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
} else if (!read_result_->ok()) {
// If read fails, return error.
const absl::Status ret = *read_result_;
read_result_.reset();
return ret;
} else {
// If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
SliceBuffer ret;
grpc_slice_buffer_move_first(read_buffer_.c_slice_buffer(), num_bytes,
ret.c_slice_buffer());
read_result_.reset();
return std::move(ret);
}
};
}
// Returns a promise that resolves to `Slice` with at least
// `num_bytes` bytes which should be less than INT64_MAX bytes.
//
// Concurrent reads are not supported, which means callers should not call
// `ReadSlice()` before the previous read finishes. Doing that results in
// undefined behavior.
auto ReadSlice(size_t num_bytes) {
ReleasableMutexLock lock(&read_mutex_);
// Assert previous read finishes.
GPR_ASSERT(!read_result_.has_value());
// Should not have pending reads.
GPR_ASSERT(pending_read_buffer_.Count() == 0u);
if (read_buffer_.Length() < num_bytes) {
lock.Release();
// Set read args with num_bytes as hint.
const struct grpc_event_engine::experimental::EventEngine::Endpoint::
ReadArgs read_args = {static_cast<int64_t>(num_bytes)};
// If `Read()` returns true immediately, the callback will not be
// called. We still need to call our callback to pick up the result
// and maybe do further reads.
if (endpoint_->Read(
std::bind(&PromiseEndpoint::ReadCallback, this,
std::placeholders::_1, num_bytes, read_args),
&pending_read_buffer_, &read_args)) {
ReadCallback(absl::OkStatus(), num_bytes, read_args);
}
} else {
read_result_ = absl::OkStatus();
}
return [this, num_bytes]() -> Poll<absl::StatusOr<Slice>> {
MutexLock lock(&read_mutex_);
if (!read_result_.has_value()) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
} else if (!read_result_->ok()) {
// If read fails, return error.
const auto ret = *read_result_;
read_result_.reset();
return ret;
}
// If read succeeds, return `Slice` with `num_bytes`.
else if (read_buffer_.RefSlice(0).size() == num_bytes) {
read_result_.reset();
return Slice(read_buffer_.TakeFirst().TakeCSlice());
} else {
// TODO(ladynana): avoid memcpy when read_buffer_.RefSlice(0).size() is
// different from `num_bytes`.
MutableSlice ret = MutableSlice::CreateUninitialized(num_bytes);
read_buffer_.MoveFirstNBytesIntoBuffer(num_bytes, ret.data());
read_result_.reset();
return Slice(std::move(ret));
}
};
}
// Returns a promise that resolves to a byte with type `uint8_t`.
auto ReadByte() {
ReleasableMutexLock lock(&read_mutex_);
// Assert previous read finishes.
GPR_ASSERT(!read_result_.has_value());
// Should not have pending reads.
GPR_ASSERT(pending_read_buffer_.Count() == 0u);
if (read_buffer_.Length() == 0u) {
lock.Release();
// If `Read()` returns true immediately, the callback will not be called.
// We still need to call our callback to pick up the result and maybe do
// further reads.
if (endpoint_->Read(std::bind(&PromiseEndpoint::ReadByteCallback, this,
std::placeholders::_1),
&pending_read_buffer_, nullptr)) {
ReadByteCallback(absl::OkStatus());
}
} else {
read_result_ = absl::OkStatus();
}
return [this]() -> Poll<absl::StatusOr<uint8_t>> {
MutexLock lock(&read_mutex_);
if (!read_result_.has_value()) {
// If current read isn't finished, return `Pending()`.
read_waker_ = Activity::current()->MakeNonOwningWaker();
return Pending();
} else if (!read_result_->ok()) {
// If read fails, return error.
const auto ret = *read_result_;
read_result_.reset();
return ret;
} else {
// If read succeeds, return a byte with type `uint8_t`.
uint8_t ret = 0u;
read_buffer_.MoveFirstNBytesIntoBuffer(1, &ret);
read_result_.reset();
return ret;
}
};
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const;
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const;
private:
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint_;
// Data used for writes.
// TODO(ladynana): Remove this write_mutex_ and use `atomic<bool>
// write_complete_` as write guard.
Mutex write_mutex_;
// Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
// memory behind the buffer is not lost.
grpc_event_engine::experimental::SliceBuffer write_buffer_;
// Used for store the result from `EventEngine::Endpoint::Write()`.
// `write_result_.has_value() == true` means the value has not been polled
// yet.
absl::optional<absl::Status> write_result_ ABSL_GUARDED_BY(write_mutex_);
Waker write_waker_ ABSL_GUARDED_BY(write_mutex_);
// Callback function used for `EventEngine::Endpoint::Write()`.
void WriteCallback(absl::Status status);
// Data used for reads
// TODO(ladynana): Remove this read_mutex_ and use `atomic<bool>
// read_complete_` as read guard.
Mutex read_mutex_;
// Read buffer used for storing successful reads given by
// `EventEngine::Endpoint` but not yet requested by the caller.
grpc_event_engine::experimental::SliceBuffer read_buffer_;
// Buffer used to accept data from `EventEngine::Endpoint`.
// Every time after a successful read from `EventEngine::Endpoint`, the data
// in this buffer should be appended to `read_buffer_`.
grpc_event_engine::experimental::SliceBuffer pending_read_buffer_;
// Used for store the result from `EventEngine::Endpoint::Read()`.
// `read_result_.has_value() == true` means the value has not been polled
// yet.
absl::optional<absl::Status> read_result_ ABSL_GUARDED_BY(read_mutex_);
Waker read_waker_ ABSL_GUARDED_BY(read_mutex_);
// Callback function used for `EventEngine::Endpoint::Read()` shared between
// `Read()` and `ReadSlice()`.
void ReadCallback(absl::Status status, size_t num_bytes_requested,
absl::optional<struct grpc_event_engine::experimental::
EventEngine::Endpoint::ReadArgs>
requested_read_arg = absl::nullopt);
// Callback function used for `EventEngine::Endpoint::Read()` in `ReadByte()`.
void ReadByteCallback(absl::Status status);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H

@ -109,6 +109,23 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "promise_endpoint_test",
srcs = ["promise_endpoint_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:grpc",
"//src/core:activity",
"//src/core:grpc_promise_endpoint",
"//src/core:join",
"//src/core:seq",
"//test/core/promise:test_wakeup_schedulers",
],
)
grpc_cc_test(
name = "status_conversion_test",
srcs = ["status_conversion_test.cc"],

@ -0,0 +1,857 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/transport/promise_endpoint.h"
// IWYU pragma: no_include <sys/socket.h>
#include <cstring>
#include <memory>
#include <string>
#include <tuple>
#include "absl/functional/any_invocable.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/port.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
using testing::Return;
using testing::ReturnRef;
using testing::Sequence;
using testing::StrictMock;
using testing::WithArg;
using testing::WithArgs;
namespace grpc_core {
namespace testing {
class MockEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
MOCK_METHOD(
bool, Read,
(absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args),
(override));
MOCK_METHOD(
bool, Write,
(absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args),
(override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetPeerAddress, (), (const, override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetLocalAddress, (), (const, override));
};
class MockActivity : public Activity, public Wakeable {
public:
MOCK_METHOD(void, WakeupRequested, ());
void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
void Orphan() override {}
Waker MakeOwningWaker() override { return Waker(this, 0); }
Waker MakeNonOwningWaker() override { return Waker(this, 0); }
void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
void Drop(WakeupMask /*mask*/) override {}
std::string DebugTag() const override { return "MockActivity"; }
std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
return DebugTag();
}
void Activate() {
if (scoped_activity_ == nullptr) {
scoped_activity_ = std::make_unique<ScopedActivity>(this);
}
}
void Deactivate() { scoped_activity_.reset(); }
private:
std::unique_ptr<ScopedActivity> scoped_activity_;
};
class PromiseEndpointTest : public ::testing::Test {
public:
PromiseEndpointTest()
: mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
mock_endpoint_(*mock_endpoint_ptr_),
promise_endpoint_(
std::unique_ptr<
grpc_event_engine::experimental::EventEngine::Endpoint>(
mock_endpoint_ptr_),
SliceBuffer()) {}
private:
MockEndpoint* mock_endpoint_ptr_;
protected:
MockEndpoint& mock_endpoint_;
PromiseEndpoint promise_endpoint_;
const absl::Status kDummyErrorStatus =
absl::ErrnoToStatus(5566, "just an error");
static constexpr size_t kDummyRequestSize = 5566u;
};
TEST_F(PromiseEndpointTest, OneReadSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
auto promise = promise_endpoint_.Read(kBuffer.size());
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneReadFailed) {
MockActivity activity;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
auto promise = promise_endpoint_.Read(kDummyRequestSize);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, MutipleReadsSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
Sequence s;
EXPECT_CALL(mock_endpoint_, Read)
.InSequence(s)
.WillOnce(WithArg<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(mock_endpoint_, Read)
.InSequence(s)
.WillOnce(WithArg<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer.substr(4)));
buffer->Append(std::move(slice));
return true;
}));
{
auto promise = promise_endpoint_.Read(4u);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4));
}
{
auto promise = promise_endpoint_.Read(4u);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(4));
}
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0, 1>(
[&read_callback, &kBuffer](
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer) {
read_callback = std::move(on_read);
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
// Return false to mock EventEngine read not finish..
return false;
}));
auto promise = promise_endpoint_.Read(kBuffer.size());
EXPECT_TRUE(promise().pending());
// Mock EventEngine read succeeds, and promise resolves.
read_callback(absl::OkStatus());
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingReadFailed) {
MockActivity activity;
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
read_callback = std::move(on_read);
// Return false to mock EventEngine read not finish.
return false;
}));
auto promise = promise_endpoint_.Read(kDummyRequestSize);
EXPECT_TRUE(promise().pending());
// Mock EventEngine read fails, and promise returns error.
read_callback(kDummyErrorStatus);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
auto promise = promise_endpoint_.ReadSlice(kBuffer.size());
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneReadSliceFailed) {
MockActivity activity;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
auto promise = promise_endpoint_.ReadSlice(kDummyRequestSize);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, MutipleReadSlicesSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
Sequence s;
EXPECT_CALL(mock_endpoint_, Read)
.InSequence(s)
.WillOnce(WithArg<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(mock_endpoint_, Read)
.InSequence(s)
.WillOnce(WithArg<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer.substr(4)));
buffer->Append(std::move(slice));
return true;
}));
{
auto promise = promise_endpoint_.ReadSlice(4u);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4));
}
{
auto promise = promise_endpoint_.ReadSlice(4u);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(4));
}
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0, 1>(
[&read_callback, &kBuffer](
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer) {
read_callback = std::move(on_read);
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
// Return false to mock EventEngine read not finish..
return false;
}));
auto promise = promise_endpoint_.ReadSlice(kBuffer.size());
EXPECT_TRUE(promise().pending());
// Mock EventEngine read succeeds, and promise resolves.
read_callback(absl::OkStatus());
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) {
MockActivity activity;
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
read_callback = std::move(on_read);
// Return false to mock EventEngine read not finish.
return false;
}));
auto promise = promise_endpoint_.ReadSlice(kDummyRequestSize);
EXPECT_TRUE(promise().pending());
// Mock EventEngine read fails, and promise returns error.
read_callback(kDummyErrorStatus);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneReadByteSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
auto promise = promise_endpoint_.ReadByte();
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(*poll.value(), kBuffer[0]);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneReadByteFailed) {
MockActivity activity;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
auto promise = promise_endpoint_.ReadByte();
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, MutipleReadBytesSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArg<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
for (size_t i = 0; i < kBuffer.size(); ++i) {
auto promise = promise_endpoint_.ReadByte();
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(*poll.value(), kBuffer[i]);
}
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) {
MockActivity activity;
const std::string kBuffer = {0x01};
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0, 1>(
[&read_callback, &kBuffer](
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer) {
read_callback = std::move(on_read);
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
// Return false to mock EventEngine read not finish..
return false;
}));
auto promise = promise_endpoint_.ReadByte();
ASSERT_TRUE(promise().pending());
// Mock EventEngine read succeeds, and promise resolves.
read_callback(absl::OkStatus());
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_TRUE(poll.value().ok());
EXPECT_EQ(*poll.value(), kBuffer[0]);
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePengingReadByteFailed) {
MockActivity activity;
absl::AnyInvocable<void(absl::Status)> read_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
read_callback = std::move(on_read);
// Return false to mock EventEngine read not finish.
return false;
}));
auto promise = promise_endpoint_.ReadByte();
ASSERT_TRUE(promise().pending());
// Mock EventEngine read fails, and promise returns error.
read_callback(kDummyErrorStatus);
auto poll = promise();
ASSERT_TRUE(poll.ready());
ASSERT_FALSE(poll.value().ok());
EXPECT_EQ(kDummyErrorStatus, poll.value().status());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneWriteSuccessful) {
MockActivity activity;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true));
auto promise = promise_endpoint_.Write(SliceBuffer());
auto poll = promise();
ASSERT_TRUE(poll.ready());
EXPECT_EQ(absl::OkStatus(), poll.value());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OneWriteFailed) {
MockActivity activity;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(0);
EXPECT_CALL(mock_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
auto promise = promise_endpoint_.Write(SliceBuffer());
auto poll = promise();
ASSERT_TRUE(poll.ready());
EXPECT_EQ(kDummyErrorStatus, poll.value());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) {
MockActivity activity;
absl::AnyInvocable<void(absl::Status)> write_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Write)
.WillOnce(WithArgs<0, 1>(
[&write_callback](
absl::AnyInvocable<void(absl::Status)> on_write,
grpc_event_engine::experimental::SliceBuffer* buffer) {
write_callback = std::move(on_write);
// Schedule mock_endpoint to write buffer.
buffer->Append(grpc_event_engine::experimental::Slice());
// Return false to mock EventEngine write pending..
return false;
}));
auto promise = promise_endpoint_.Write(SliceBuffer());
EXPECT_TRUE(promise().pending());
// Mock EventEngine write succeeds, and promise resolves.
write_callback(absl::OkStatus());
auto poll = promise();
ASSERT_TRUE(poll.ready());
EXPECT_EQ(absl::OkStatus(), poll.value());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, OnePendingWriteFailed) {
MockActivity activity;
absl::AnyInvocable<void(absl::Status)> write_callback;
activity.Activate();
EXPECT_CALL(activity, WakeupRequested).Times(1);
EXPECT_CALL(mock_endpoint_, Write)
.WillOnce(WithArg<0>(
[&write_callback](absl::AnyInvocable<void(absl::Status)> on_write) {
write_callback = std::move(on_write);
// Return false to mock EventEngine write pending..
return false;
}));
auto promise = promise_endpoint_.Write(SliceBuffer());
EXPECT_TRUE(promise().pending());
write_callback(kDummyErrorStatus);
auto poll = promise();
ASSERT_TRUE(poll.ready());
EXPECT_EQ(kDummyErrorStatus, poll.value());
activity.Deactivate();
}
TEST_F(PromiseEndpointTest, GetPeerAddress) {
const char raw_test_address[] = {0x55, 0x66, 0x01, 0x55, 0x66, 0x01};
grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
reinterpret_cast<const sockaddr*>(raw_test_address),
sizeof(raw_test_address));
EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address));
auto peer_address = promise_endpoint_.GetPeerAddress();
EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(),
test_address.size()));
EXPECT_EQ(test_address.size(), peer_address.size());
}
TEST_F(PromiseEndpointTest, GetLocalAddress) {
const char raw_test_address[] = {0x52, 0x55, 0x66, 0x52, 0x55, 0x66};
grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
reinterpret_cast<const sockaddr*>(raw_test_address),
sizeof(raw_test_address));
EXPECT_CALL(mock_endpoint_, GetLocalAddress)
.WillOnce(ReturnRef(test_address));
auto local_address = promise_endpoint_.GetLocalAddress();
EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(),
test_address.size()));
EXPECT_EQ(test_address.size(), local_address.size());
}
class MultiplePromiseEndpointTest : public ::testing::Test {
public:
MultiplePromiseEndpointTest()
: first_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
second_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
first_mock_endpoint_(*first_mock_endpoint_ptr_),
second_mock_endpoint_(*second_mock_endpoint_ptr_),
first_promise_endpoint_(
std::unique_ptr<
grpc_event_engine::experimental::EventEngine::Endpoint>(
first_mock_endpoint_ptr_),
SliceBuffer()),
second_promise_endpoint_(
std::unique_ptr<
grpc_event_engine::experimental::EventEngine::Endpoint>(
second_mock_endpoint_ptr_),
SliceBuffer()) {}
private:
MockEndpoint* first_mock_endpoint_ptr_;
MockEndpoint* second_mock_endpoint_ptr_;
protected:
MockEndpoint& first_mock_endpoint_;
MockEndpoint& second_mock_endpoint_;
PromiseEndpoint first_promise_endpoint_;
PromiseEndpoint second_promise_endpoint_;
const absl::Status kDummyErrorStatus =
absl::ErrnoToStatus(5566, "just an error");
static constexpr size_t kDummyRequestSize = 5566u;
};
TEST_F(MultiplePromiseEndpointTest, JoinReadsSuccessful) {
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
EXPECT_CALL(first_mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(second_mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
[this, &kBuffer] {
return Seq(Join(this->first_promise_endpoint_.Read(kBuffer.size()),
this->second_promise_endpoint_.Read(kBuffer.size())),
[](std::tuple<absl::StatusOr<SliceBuffer>,
absl::StatusOr<SliceBuffer>>
ret) {
// Both reads finish with `absl::OkStatus`.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST_F(MultiplePromiseEndpointTest, JoinOneReadSuccessfulOneReadFailed) {
const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
EXPECT_CALL(first_mock_endpoint_, Read)
.WillOnce(WithArgs<1>(
[&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
// Schedule mock_endpoint to read buffer.
grpc_event_engine::experimental::Slice slice(
grpc_slice_from_cpp_string(kBuffer));
buffer->Append(std::move(slice));
return true;
}));
EXPECT_CALL(second_mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(kDummyErrorStatus));
auto activity = MakeActivity(
[this, &kBuffer] {
return Seq(
Join(this->first_promise_endpoint_.Read(kBuffer.size()),
this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
[this](std::tuple<absl::StatusOr<SliceBuffer>,
absl::StatusOr<SliceBuffer>>
ret) {
// One read finishes with `absl::OkStatus` and the other read
// fails.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_FALSE(std::get<1>(ret).ok());
EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
return this->kDummyErrorStatus;
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST_F(MultiplePromiseEndpointTest, JoinReadsFailed) {
EXPECT_CALL(first_mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
EXPECT_CALL(second_mock_endpoint_, Read)
.WillOnce(WithArgs<0>(
[this](absl::AnyInvocable<void(absl::Status)> read_callback) {
// Mock EventEngine enpoint read fails.
read_callback(this->kDummyErrorStatus);
return false;
}));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(kDummyErrorStatus));
auto activity = MakeActivity(
[this] {
return Seq(
Join(this->first_promise_endpoint_.Read(this->kDummyRequestSize),
this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
[this](std::tuple<absl::StatusOr<SliceBuffer>,
absl::StatusOr<SliceBuffer>>
ret) {
// Both reads finish with errors.
EXPECT_FALSE(std::get<0>(ret).ok());
EXPECT_FALSE(std::get<1>(ret).ok());
EXPECT_EQ(std::get<0>(ret).status(), this->kDummyErrorStatus);
EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
return this->kDummyErrorStatus;
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST_F(MultiplePromiseEndpointTest, JoinWritesSuccessful) {
EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(second_mock_endpoint_, Write).WillOnce(Return(true));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
[this] {
return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer()),
this->second_promise_endpoint_.Write(SliceBuffer())),
[](std::tuple<absl::Status, absl::Status> ret) {
// Both writes finish with `absl::OkStatus`.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST_F(MultiplePromiseEndpointTest, JoinOneWriteSuccessfulOneWriteFailed) {
EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(second_mock_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(kDummyErrorStatus));
auto activity = MakeActivity(
[this] {
return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer()),
this->second_promise_endpoint_.Write(SliceBuffer())),
[this](std::tuple<absl::Status, absl::Status> ret) {
// One write finish with `absl::OkStatus` and the other
// write fails.
EXPECT_TRUE(std::get<0>(ret).ok());
EXPECT_FALSE(std::get<1>(ret).ok());
EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
return this->kDummyErrorStatus;
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
TEST_F(MultiplePromiseEndpointTest, JoinWritesFailed) {
EXPECT_CALL(first_mock_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
EXPECT_CALL(second_mock_endpoint_, Write)
.WillOnce(
WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(this->kDummyErrorStatus);
return false;
}));
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(kDummyErrorStatus));
auto activity = MakeActivity(
[this] {
return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer()),
this->second_promise_endpoint_.Write(SliceBuffer())),
[this](std::tuple<absl::Status, absl::Status> ret) {
// Both writes fail with errors.
EXPECT_FALSE(std::get<0>(ret).ok());
EXPECT_FALSE(std::get<1>(ret).ok());
EXPECT_EQ(std::get<0>(ret), this->kDummyErrorStatus);
EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
return this->kDummyErrorStatus;
});
},
InlineWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
}
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -6587,6 +6587,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "promise_endpoint_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save