diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 299976cb5e2..b3fde2b9954 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -400,7 +400,7 @@ namespace testing { ApiFuzzer::ApiFuzzer(const fuzzing_event_engine::Actions& actions) : BasicFuzzer(actions) { - ResetDNSResolver(std::make_unique(engine())); + ResetDNSResolver(std::make_unique(engine().get())); grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_cancel_ares_request = my_cancel_ares_request; diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index 18079e5f4e8..defc274c1f3 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -60,7 +60,7 @@ class ClientFuzzer final : public BasicFuzzer { : BasicFuzzer(msg.event_engine_actions()) { ExecCtx exec_ctx; UpdateMinimumRunTime( - ScheduleReads(msg.network_input()[0], mock_endpoint_, engine())); + ScheduleReads(msg.network_input()[0], mock_endpoint_, engine().get())); ChannelArgs args = CoreConfiguration::Get() .channel_args_preconditioning() @@ -92,7 +92,7 @@ class ClientFuzzer final : public BasicFuzzer { grpc_server* server() override { return nullptr; } grpc_channel* channel() override { return channel_; } - grpc_endpoint* mock_endpoint_ = grpc_mock_endpoint_create(discard_write); + grpc_endpoint* mock_endpoint_ = grpc_mock_endpoint_create(engine()); grpc_channel* channel_ = nullptr; }; diff --git a/test/core/end2end/fuzzers/fuzzing_common.h b/test/core/end2end/fuzzers/fuzzing_common.h index bfeb776c35e..51d63abb3d3 100644 --- a/test/core/end2end/fuzzers/fuzzing_common.h +++ b/test/core/end2end/fuzzers/fuzzing_common.h @@ -111,8 +111,9 @@ class BasicFuzzer { RefCountedPtr resource_quota() { return resource_quota_; } - grpc_event_engine::experimental::FuzzingEventEngine* engine() { - return engine_.get(); + std::shared_ptr + engine() { + return engine_; } grpc_completion_queue* cq() { return cq_; } diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index da17e436685..c151c8b30b8 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -63,7 +63,7 @@ class ServerFuzzer final : public BasicFuzzer { grpc_server_start(server_); for (const auto& input : msg.network_input()) { UpdateMinimumRunTime(ScheduleConnection( - input, engine(), FuzzingEnvironment{resource_quota()}, 1234)); + input, engine().get(), FuzzingEnvironment{resource_quota()}, 1234)); } } diff --git a/test/core/security/ssl_server_fuzzer.cc b/test/core/security/ssl_server_fuzzer.cc index 295a6a6d1f8..429911d2ccb 100644 --- a/test/core/security/ssl_server_fuzzer.cc +++ b/test/core/security/ssl_server_fuzzer.cc @@ -25,6 +25,7 @@ #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/security_connector/security_connector.h" #include "test/core/test_util/mock_endpoint.h" @@ -47,7 +48,7 @@ static void discard_write(grpc_slice /*slice*/) {} static void dont_log(gpr_log_func_args* /*args*/) {} struct handshake_state { - bool done_callback_called; + grpc_core::Notification done_signal; }; static void on_handshake_done(void* arg, grpc_error_handle error) { @@ -55,10 +56,9 @@ static void on_handshake_done(void* arg, grpc_error_handle error) { static_cast(arg); struct handshake_state* state = static_cast(args->user_data); - CHECK(state->done_callback_called == false); - state->done_callback_called = true; // The fuzzer should not pass the handshake. CHECK(!error.ok()); + state->done_signal.Notify(); } extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { @@ -67,7 +67,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { { grpc_core::ExecCtx exec_ctx; - grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write); + auto engine = GetDefaultEventEngine(); + grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(engine); grpc_mock_endpoint_put_read( mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size)); @@ -92,11 +93,10 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(); struct handshake_state state; - state.done_callback_called = false; auto handshake_mgr = grpc_core::MakeRefCounted(); - auto channel_args = grpc_core::ChannelArgs().SetObject( - GetDefaultEventEngine()); + auto channel_args = + grpc_core::ChannelArgs().SetObject(std::move(engine)); sc->add_handshakers(channel_args, nullptr, handshake_mgr.get()); handshake_mgr->DoHandshake(mock_endpoint, channel_args, deadline, nullptr /* acceptor */, on_handshake_done, @@ -105,14 +105,11 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { // If the given string happens to be part of the correct client hello, the // server will wait for more data. Explicitly fail the server by shutting - // down the endpoint. - if (!state.done_callback_called) { - grpc_endpoint_shutdown(mock_endpoint, - GRPC_ERROR_CREATE("Explicit close")); - grpc_core::ExecCtx::Get()->Flush(); + // down the handshake manager. + if (!state.done_signal.WaitForNotificationWithTimeout(absl::Seconds(3))) { + handshake_mgr->Shutdown( + absl::DeadlineExceededError("handshake did not fail as expected")); } - CHECK(state.done_callback_called); - sc.reset(DEBUG_LOCATION, "test"); grpc_server_credentials_release(creds); grpc_core::ExecCtx::Get()->Flush(); diff --git a/test/core/test_util/mock_endpoint.cc b/test/core/test_util/mock_endpoint.cc index f940ceb911f..8da1f802e3e 100644 --- a/test/core/test_util/mock_endpoint.cc +++ b/test/core/test_util/mock_endpoint.cc @@ -18,6 +18,8 @@ #include "test/core/test_util/mock_endpoint.h" +#include + #include "absl/log/check.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" @@ -27,140 +29,105 @@ #include #include -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr_fwd.h" -#include "src/core/lib/iomgr/sockaddr.h" - -typedef struct mock_endpoint { - grpc_endpoint base; - gpr_mu mu; - void (*on_write)(grpc_slice slice); - grpc_slice_buffer read_buffer; - grpc_slice_buffer* on_read_out; - grpc_closure* on_read; - bool put_reads_done; - bool destroyed; -} mock_endpoint; - -static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, bool /*urgent*/, - int /*min_progress_size*/) { - mock_endpoint* m = reinterpret_cast(ep); - gpr_mu_lock(&m->mu); - if (m->read_buffer.count > 0) { - grpc_slice_buffer_swap(&m->read_buffer, slices); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus()); - } else if (m->put_reads_done) { - grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, - absl::UnavailableError("reads done")); - } else { - m->on_read = cb; - m->on_read_out = slices; - } - gpr_mu_unlock(&m->mu); -} - -static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) { - mock_endpoint* m = reinterpret_cast(ep); - for (size_t i = 0; i < slices->count; i++) { - m->on_write(slices->slices[i]); - } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus()); -} +#include "src/core/lib/event_engine/tcp_socket_utils.h" +#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" -static void me_add_to_pollset(grpc_endpoint* /*ep*/, - grpc_pollset* /*pollset*/) {} +namespace grpc_event_engine { +namespace experimental { -static void me_add_to_pollset_set(grpc_endpoint* /*ep*/, - grpc_pollset_set* /*pollset*/) {} +MockEndpoint::MockEndpoint(std::shared_ptr engine) + : engine_(std::move(engine)), + peer_addr_(URIToResolvedAddress("ipv4:127.0.0.1:12345").value()), + local_addr_(URIToResolvedAddress("ipv4:127.0.0.1:6789").value()) {} -static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/, - grpc_pollset_set* /*pollset*/) {} +MockEndpoint::~MockEndpoint() { + grpc_core::MutexLock lock(&mu_); + if (on_read_) { + engine_->Run([cb = std::move(on_read_)]() mutable { + cb(absl::InternalError("Endpoint Shutdown")); + }); + on_read_ = nullptr; + } +} -static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { - mock_endpoint* m = reinterpret_cast(ep); - gpr_mu_lock(&m->mu); - if (m->on_read) { - grpc_core::ExecCtx::Run( - DEBUG_LOCATION, m->on_read, - GRPC_ERROR_CREATE_REFERENCING("Endpoint Shutdown", &why, 1)); - m->on_read = nullptr; +void MockEndpoint::TriggerReadEvent(Slice read_data) { + grpc_core::MutexLock lock(&mu_); + CHECK(!reads_done_) + << "Cannot trigger a read event after NoMoreReads has been called."; + if (on_read_) { + on_read_slice_buffer_->Append(std::move(read_data)); + engine_->Run( + [cb = std::move(on_read_)]() mutable { cb(absl::OkStatus()); }); + on_read_ = nullptr; + on_read_slice_buffer_ = nullptr; + } else { + read_buffer_.Append(std::move(read_data)); } - gpr_mu_unlock(&m->mu); } -static void destroy(mock_endpoint* m) { - grpc_slice_buffer_destroy(&m->read_buffer); - gpr_mu_destroy(&m->mu); - gpr_free(m); +void MockEndpoint::NoMoreReads() { + grpc_core::MutexLock lock(&mu_); + CHECK(!std::exchange(reads_done_, true)) + << "NoMoreReads() can only be called once"; } -static void me_destroy(grpc_endpoint* ep) { - mock_endpoint* m = reinterpret_cast(ep); - m->destroyed = true; - if (m->put_reads_done) { - destroy(m); +bool MockEndpoint::Read(absl::AnyInvocable on_read, + SliceBuffer* buffer, const ReadArgs* /* args */) { + grpc_core::MutexLock lock(&mu_); + if (read_buffer_.Count() > 0) { + CHECK(buffer->Count() == 0); + CHECK(!on_read_); + read_buffer_.Swap(*buffer); + engine_->Run([cb = std::move(on_read)]() mutable { cb(absl::OkStatus()); }); + } else if (reads_done_) { + engine_->Run([cb = std::move(on_read)]() mutable { + cb(absl::UnavailableError("reads done")); + }); + } else { + on_read_ = std::move(on_read); + on_read_slice_buffer_ = buffer; } + return false; } -void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep) { - mock_endpoint* m = reinterpret_cast(ep); - m->put_reads_done = true; - if (m->destroyed) { - destroy(m); - } +bool MockEndpoint::Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* /* args */) { + // No-op implementation. Nothing was using it. + data->Clear(); + engine_->Run( + [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); }); + return false; } -static absl::string_view me_get_peer(grpc_endpoint* /*ep*/) { - return "fake:mock_endpoint"; +const EventEngine::ResolvedAddress& MockEndpoint::GetPeerAddress() const { + return peer_addr_; } -static absl::string_view me_get_local_address(grpc_endpoint* /*ep*/) { - return "fake:mock_endpoint"; +const EventEngine::ResolvedAddress& MockEndpoint::GetLocalAddress() const { + return local_addr_; } -static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; } - -static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; } - -static const grpc_endpoint_vtable vtable = {me_read, - me_write, - me_add_to_pollset, - me_add_to_pollset_set, - me_delete_from_pollset_set, - me_shutdown, - me_destroy, - me_get_peer, - me_get_local_address, - me_get_fd, - me_can_track_err}; - -grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice)) { - mock_endpoint* m = static_cast(gpr_malloc(sizeof(*m))); - m->base.vtable = &vtable; - grpc_slice_buffer_init(&m->read_buffer); - gpr_mu_init(&m->mu); - m->on_write = on_write; - m->on_read = nullptr; - m->put_reads_done = false; - m->destroyed = false; - return &m->base; +} // namespace experimental +} // namespace grpc_event_engine + +grpc_endpoint* grpc_mock_endpoint_create( + std::shared_ptr engine) { + return grpc_event_engine_endpoint_create( + std::make_unique( + std::move(engine))); } void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) { - mock_endpoint* m = reinterpret_cast(ep); - gpr_mu_lock(&m->mu); - CHECK(!m->put_reads_done); - if (m->on_read != nullptr) { - grpc_slice_buffer_add(m->on_read_out, slice); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, absl::OkStatus()); - m->on_read = nullptr; - } else { - grpc_slice_buffer_add(&m->read_buffer, slice); - } - gpr_mu_unlock(&m->mu); + grpc_event_engine::experimental::Slice s(slice); + static_cast( + grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint( + ep)) + ->TriggerReadEvent(std::move(s)); +} + +void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep) { + static_cast( + grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint( + ep)) + ->NoMoreReads(); } diff --git a/test/core/test_util/mock_endpoint.h b/test/core/test_util/mock_endpoint.h index 5f2dbe5e5d6..fe31627894d 100644 --- a/test/core/test_util/mock_endpoint.h +++ b/test/core/test_util/mock_endpoint.h @@ -19,12 +19,51 @@ #ifndef GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H #define GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H +#include + +#include #include #include "src/core/lib/iomgr/endpoint.h" -grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice)); +grpc_endpoint* grpc_mock_endpoint_create( + std::shared_ptr engine); void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice); void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep); +namespace grpc_event_engine { +namespace experimental { + +class MockEndpoint : public EventEngine::Endpoint { + public: + explicit MockEndpoint(std::shared_ptr engine); + + ~MockEndpoint() override; + + // ---- mock methods ---- + void TriggerReadEvent(Slice read_data); + void NoMoreReads(); + + // ---- overrides ---- + bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer, + const ReadArgs* args) override; + bool Write(absl::AnyInvocable on_writable, + SliceBuffer* data, const WriteArgs* args) override; + const EventEngine::ResolvedAddress& GetPeerAddress() const override; + const EventEngine::ResolvedAddress& GetLocalAddress() const override; + + private: + std::shared_ptr engine_; + grpc_core::Mutex mu_; + SliceBuffer read_buffer_ ABSL_GUARDED_BY(mu_); + bool reads_done_ ABSL_GUARDED_BY(mu_) = false; + absl::AnyInvocable on_read_ ABSL_GUARDED_BY(mu_); + SliceBuffer* on_read_slice_buffer_ ABSL_GUARDED_BY(mu_) = nullptr; + EventEngine::ResolvedAddress peer_addr_; + EventEngine::ResolvedAddress local_addr_; +}; + +} // namespace experimental +} // namespace grpc_event_engine + #endif // GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H diff --git a/test/core/transport/chttp2/ping_configuration_test.cc b/test/core/transport/chttp2/ping_configuration_test.cc index b05b6b6285f..83acda3b566 100644 --- a/test/core/transport/chttp2/ping_configuration_test.cc +++ b/test/core/transport/chttp2/ping_configuration_test.cc @@ -41,18 +41,15 @@ namespace { class ConfigurationTest : public ::testing::Test { protected: ConfigurationTest() { - mock_endpoint_ = grpc_mock_endpoint_create(DiscardWrite); + auto engine = grpc_event_engine::experimental::GetDefaultEventEngine(); + mock_endpoint_ = grpc_mock_endpoint_create(engine); grpc_mock_endpoint_finish_put_reads(mock_endpoint_); args_ = args_.SetObject(ResourceQuota::Default()); - args_ = args_.SetObject( - grpc_event_engine::experimental::GetDefaultEventEngine()); + args_ = args_.SetObject(std::move(engine)); } grpc_endpoint* mock_endpoint_ = nullptr; ChannelArgs args_; - - private: - static void DiscardWrite(grpc_slice /*slice*/) {} }; TEST_F(ConfigurationTest, ClientKeepaliveDefaults) {