[EventEngine] Rewrite mock_endpoint to EventEngine::Endpoint (#36513)

Notes:
* The special `on_write` callback was never used, all slices were discarded. I removed that functionality.

Closes #36513

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36513 from drfloob:rewrite-mock-endpoint-to-ee e45a964633
PiperOrigin-RevId: 631187792
pull/36530/head
AJ Heller 12 months ago committed by Copybara-Service
parent ec8df8955f
commit 9b4478740d
  1. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  2. 4
      test/core/end2end/fuzzers/client_fuzzer.cc
  3. 5
      test/core/end2end/fuzzers/fuzzing_common.h
  4. 2
      test/core/end2end/fuzzers/server_fuzzer.cc
  5. 25
      test/core/security/ssl_server_fuzzer.cc
  6. 197
      test/core/test_util/mock_endpoint.cc
  7. 41
      test/core/test_util/mock_endpoint.h
  8. 9
      test/core/transport/chttp2/ping_configuration_test.cc

@ -400,7 +400,7 @@ namespace testing {
ApiFuzzer::ApiFuzzer(const fuzzing_event_engine::Actions& actions) ApiFuzzer::ApiFuzzer(const fuzzing_event_engine::Actions& actions)
: BasicFuzzer(actions) { : BasicFuzzer(actions) {
ResetDNSResolver(std::make_unique<FuzzerDNSResolver>(engine())); ResetDNSResolver(std::make_unique<FuzzerDNSResolver>(engine().get()));
grpc_dns_lookup_hostname_ares = my_dns_lookup_ares; grpc_dns_lookup_hostname_ares = my_dns_lookup_ares;
grpc_cancel_ares_request = my_cancel_ares_request; grpc_cancel_ares_request = my_cancel_ares_request;

@ -60,7 +60,7 @@ class ClientFuzzer final : public BasicFuzzer {
: BasicFuzzer(msg.event_engine_actions()) { : BasicFuzzer(msg.event_engine_actions()) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
UpdateMinimumRunTime( UpdateMinimumRunTime(
ScheduleReads(msg.network_input()[0], mock_endpoint_, engine())); ScheduleReads(msg.network_input()[0], mock_endpoint_, engine().get()));
ChannelArgs args = ChannelArgs args =
CoreConfiguration::Get() CoreConfiguration::Get()
.channel_args_preconditioning() .channel_args_preconditioning()
@ -92,7 +92,7 @@ class ClientFuzzer final : public BasicFuzzer {
grpc_server* server() override { return nullptr; } grpc_server* server() override { return nullptr; }
grpc_channel* channel() override { return channel_; } grpc_channel* channel() override { return channel_; }
grpc_endpoint* mock_endpoint_ = grpc_mock_endpoint_create(discard_write); grpc_endpoint* mock_endpoint_ = grpc_mock_endpoint_create(engine());
grpc_channel* channel_ = nullptr; grpc_channel* channel_ = nullptr;
}; };

@ -111,8 +111,9 @@ class BasicFuzzer {
RefCountedPtr<ResourceQuota> resource_quota() { return resource_quota_; } RefCountedPtr<ResourceQuota> resource_quota() { return resource_quota_; }
grpc_event_engine::experimental::FuzzingEventEngine* engine() { std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
return engine_.get(); engine() {
return engine_;
} }
grpc_completion_queue* cq() { return cq_; } grpc_completion_queue* cq() { return cq_; }

@ -63,7 +63,7 @@ class ServerFuzzer final : public BasicFuzzer {
grpc_server_start(server_); grpc_server_start(server_);
for (const auto& input : msg.network_input()) { for (const auto& input : msg.network_input()) {
UpdateMinimumRunTime(ScheduleConnection( UpdateMinimumRunTime(ScheduleConnection(
input, engine(), FuzzingEnvironment{resource_quota()}, 1234)); input, engine().get(), FuzzingEnvironment{resource_quota()}, 1234));
} }
} }

@ -25,6 +25,7 @@
#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h" #include "src/core/lib/security/security_connector/security_connector.h"
#include "test/core/test_util/mock_endpoint.h" #include "test/core/test_util/mock_endpoint.h"
@ -47,7 +48,7 @@ static void discard_write(grpc_slice /*slice*/) {}
static void dont_log(gpr_log_func_args* /*args*/) {} static void dont_log(gpr_log_func_args* /*args*/) {}
struct handshake_state { struct handshake_state {
bool done_callback_called; grpc_core::Notification done_signal;
}; };
static void on_handshake_done(void* arg, grpc_error_handle error) { static void on_handshake_done(void* arg, grpc_error_handle error) {
@ -55,10 +56,9 @@ static void on_handshake_done(void* arg, grpc_error_handle error) {
static_cast<grpc_core::HandshakerArgs*>(arg); static_cast<grpc_core::HandshakerArgs*>(arg);
struct handshake_state* state = struct handshake_state* state =
static_cast<struct handshake_state*>(args->user_data); static_cast<struct handshake_state*>(args->user_data);
CHECK(state->done_callback_called == false);
state->done_callback_called = true;
// The fuzzer should not pass the handshake. // The fuzzer should not pass the handshake.
CHECK(!error.ok()); CHECK(!error.ok());
state->done_signal.Notify();
} }
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
@ -67,7 +67,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write); auto engine = GetDefaultEventEngine();
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(engine);
grpc_mock_endpoint_put_read( grpc_mock_endpoint_put_read(
mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size)); mock_endpoint, grpc_slice_from_copied_buffer((const char*)data, size));
@ -92,11 +93,10 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now(); grpc_core::Duration::Seconds(1) + grpc_core::Timestamp::Now();
struct handshake_state state; struct handshake_state state;
state.done_callback_called = false;
auto handshake_mgr = auto handshake_mgr =
grpc_core::MakeRefCounted<grpc_core::HandshakeManager>(); grpc_core::MakeRefCounted<grpc_core::HandshakeManager>();
auto channel_args = grpc_core::ChannelArgs().SetObject<EventEngine>( auto channel_args =
GetDefaultEventEngine()); grpc_core::ChannelArgs().SetObject<EventEngine>(std::move(engine));
sc->add_handshakers(channel_args, nullptr, handshake_mgr.get()); sc->add_handshakers(channel_args, nullptr, handshake_mgr.get());
handshake_mgr->DoHandshake(mock_endpoint, channel_args, deadline, handshake_mgr->DoHandshake(mock_endpoint, channel_args, deadline,
nullptr /* acceptor */, on_handshake_done, nullptr /* acceptor */, on_handshake_done,
@ -105,14 +105,11 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// If the given string happens to be part of the correct client hello, the // If the given string happens to be part of the correct client hello, the
// server will wait for more data. Explicitly fail the server by shutting // server will wait for more data. Explicitly fail the server by shutting
// down the endpoint. // down the handshake manager.
if (!state.done_callback_called) { if (!state.done_signal.WaitForNotificationWithTimeout(absl::Seconds(3))) {
grpc_endpoint_shutdown(mock_endpoint, handshake_mgr->Shutdown(
GRPC_ERROR_CREATE("Explicit close")); absl::DeadlineExceededError("handshake did not fail as expected"));
grpc_core::ExecCtx::Get()->Flush();
} }
CHECK(state.done_callback_called);
sc.reset(DEBUG_LOCATION, "test"); sc.reset(DEBUG_LOCATION, "test");
grpc_server_credentials_release(creds); grpc_server_credentials_release(creds);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();

@ -18,6 +18,8 @@
#include "test/core/test_util/mock_endpoint.h" #include "test/core/test_util/mock_endpoint.h"
#include <memory>
#include "absl/log/check.h" #include "absl/log/check.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
@ -27,140 +29,105 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/sockaddr.h"
typedef struct mock_endpoint {
grpc_endpoint base;
gpr_mu mu;
void (*on_write)(grpc_slice slice);
grpc_slice_buffer read_buffer;
grpc_slice_buffer* on_read_out;
grpc_closure* on_read;
bool put_reads_done;
bool destroyed;
} mock_endpoint;
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
gpr_mu_lock(&m->mu);
if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus());
} else if (m->put_reads_done) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
absl::UnavailableError("reads done"));
} else {
m->on_read = cb;
m->on_read_out = slices;
}
gpr_mu_unlock(&m->mu);
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* /*arg*/, int /*max_frame_size*/) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, absl::OkStatus());
}
static void me_add_to_pollset(grpc_endpoint* /*ep*/, namespace grpc_event_engine {
grpc_pollset* /*pollset*/) {} namespace experimental {
static void me_add_to_pollset_set(grpc_endpoint* /*ep*/, MockEndpoint::MockEndpoint(std::shared_ptr<EventEngine> engine)
grpc_pollset_set* /*pollset*/) {} : engine_(std::move(engine)),
peer_addr_(URIToResolvedAddress("ipv4:127.0.0.1:12345").value()),
local_addr_(URIToResolvedAddress("ipv4:127.0.0.1:6789").value()) {}
static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/, MockEndpoint::~MockEndpoint() {
grpc_pollset_set* /*pollset*/) {} grpc_core::MutexLock lock(&mu_);
if (on_read_) {
engine_->Run([cb = std::move(on_read_)]() mutable {
cb(absl::InternalError("Endpoint Shutdown"));
});
on_read_ = nullptr;
}
}
static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { void MockEndpoint::TriggerReadEvent(Slice read_data) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); grpc_core::MutexLock lock(&mu_);
gpr_mu_lock(&m->mu); CHECK(!reads_done_)
if (m->on_read) { << "Cannot trigger a read event after NoMoreReads has been called.";
grpc_core::ExecCtx::Run( if (on_read_) {
DEBUG_LOCATION, m->on_read, on_read_slice_buffer_->Append(std::move(read_data));
GRPC_ERROR_CREATE_REFERENCING("Endpoint Shutdown", &why, 1)); engine_->Run(
m->on_read = nullptr; [cb = std::move(on_read_)]() mutable { cb(absl::OkStatus()); });
on_read_ = nullptr;
on_read_slice_buffer_ = nullptr;
} else {
read_buffer_.Append(std::move(read_data));
} }
gpr_mu_unlock(&m->mu);
} }
static void destroy(mock_endpoint* m) { void MockEndpoint::NoMoreReads() {
grpc_slice_buffer_destroy(&m->read_buffer); grpc_core::MutexLock lock(&mu_);
gpr_mu_destroy(&m->mu); CHECK(!std::exchange(reads_done_, true))
gpr_free(m); << "NoMoreReads() can only be called once";
} }
static void me_destroy(grpc_endpoint* ep) { bool MockEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); SliceBuffer* buffer, const ReadArgs* /* args */) {
m->destroyed = true; grpc_core::MutexLock lock(&mu_);
if (m->put_reads_done) { if (read_buffer_.Count() > 0) {
destroy(m); CHECK(buffer->Count() == 0);
CHECK(!on_read_);
read_buffer_.Swap(*buffer);
engine_->Run([cb = std::move(on_read)]() mutable { cb(absl::OkStatus()); });
} else if (reads_done_) {
engine_->Run([cb = std::move(on_read)]() mutable {
cb(absl::UnavailableError("reads done"));
});
} else {
on_read_ = std::move(on_read);
on_read_slice_buffer_ = buffer;
} }
return false;
} }
void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep) { bool MockEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); SliceBuffer* data, const WriteArgs* /* args */) {
m->put_reads_done = true; // No-op implementation. Nothing was using it.
if (m->destroyed) { data->Clear();
destroy(m); engine_->Run(
} [cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
return false;
} }
static absl::string_view me_get_peer(grpc_endpoint* /*ep*/) { const EventEngine::ResolvedAddress& MockEndpoint::GetPeerAddress() const {
return "fake:mock_endpoint"; return peer_addr_;
} }
static absl::string_view me_get_local_address(grpc_endpoint* /*ep*/) { const EventEngine::ResolvedAddress& MockEndpoint::GetLocalAddress() const {
return "fake:mock_endpoint"; return local_addr_;
} }
static int me_get_fd(grpc_endpoint* /*ep*/) { return -1; } } // namespace experimental
} // namespace grpc_event_engine
static bool me_can_track_err(grpc_endpoint* /*ep*/) { return false; }
grpc_endpoint* grpc_mock_endpoint_create(
static const grpc_endpoint_vtable vtable = {me_read, std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine) {
me_write, return grpc_event_engine_endpoint_create(
me_add_to_pollset, std::make_unique<grpc_event_engine::experimental::MockEndpoint>(
me_add_to_pollset_set, std::move(engine)));
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_peer,
me_get_local_address,
me_get_fd,
me_can_track_err};
grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice)) {
mock_endpoint* m = static_cast<mock_endpoint*>(gpr_malloc(sizeof(*m)));
m->base.vtable = &vtable;
grpc_slice_buffer_init(&m->read_buffer);
gpr_mu_init(&m->mu);
m->on_write = on_write;
m->on_read = nullptr;
m->put_reads_done = false;
m->destroyed = false;
return &m->base;
} }
void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) { void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep); grpc_event_engine::experimental::Slice s(slice);
gpr_mu_lock(&m->mu); static_cast<grpc_event_engine::experimental::MockEndpoint*>(
CHECK(!m->put_reads_done); grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
if (m->on_read != nullptr) { ep))
grpc_slice_buffer_add(m->on_read_out, slice); ->TriggerReadEvent(std::move(s));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, absl::OkStatus()); }
m->on_read = nullptr;
} else { void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep) {
grpc_slice_buffer_add(&m->read_buffer, slice); static_cast<grpc_event_engine::experimental::MockEndpoint*>(
} grpc_event_engine::experimental::grpc_get_wrapped_event_engine_endpoint(
gpr_mu_unlock(&m->mu); ep))
->NoMoreReads();
} }

@ -19,12 +19,51 @@
#ifndef GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H #ifndef GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H
#define GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H #define GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H
#include <memory>
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h> #include <grpc/slice.h>
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
grpc_endpoint* grpc_mock_endpoint_create(void (*on_write)(grpc_slice slice)); grpc_endpoint* grpc_mock_endpoint_create(
std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice); void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice);
void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep); void grpc_mock_endpoint_finish_put_reads(grpc_endpoint* ep);
namespace grpc_event_engine {
namespace experimental {
class MockEndpoint : public EventEngine::Endpoint {
public:
explicit MockEndpoint(std::shared_ptr<EventEngine> engine);
~MockEndpoint() override;
// ---- mock methods ----
void TriggerReadEvent(Slice read_data);
void NoMoreReads();
// ---- overrides ----
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
const EventEngine::ResolvedAddress& GetPeerAddress() const override;
const EventEngine::ResolvedAddress& GetLocalAddress() const override;
private:
std::shared_ptr<EventEngine> engine_;
grpc_core::Mutex mu_;
SliceBuffer read_buffer_ ABSL_GUARDED_BY(mu_);
bool reads_done_ ABSL_GUARDED_BY(mu_) = false;
absl::AnyInvocable<void(absl::Status)> on_read_ ABSL_GUARDED_BY(mu_);
SliceBuffer* on_read_slice_buffer_ ABSL_GUARDED_BY(mu_) = nullptr;
EventEngine::ResolvedAddress peer_addr_;
EventEngine::ResolvedAddress local_addr_;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H #endif // GRPC_TEST_CORE_TEST_UTIL_MOCK_ENDPOINT_H

@ -41,18 +41,15 @@ namespace {
class ConfigurationTest : public ::testing::Test { class ConfigurationTest : public ::testing::Test {
protected: protected:
ConfigurationTest() { ConfigurationTest() {
mock_endpoint_ = grpc_mock_endpoint_create(DiscardWrite); auto engine = grpc_event_engine::experimental::GetDefaultEventEngine();
mock_endpoint_ = grpc_mock_endpoint_create(engine);
grpc_mock_endpoint_finish_put_reads(mock_endpoint_); grpc_mock_endpoint_finish_put_reads(mock_endpoint_);
args_ = args_.SetObject(ResourceQuota::Default()); args_ = args_.SetObject(ResourceQuota::Default());
args_ = args_.SetObject( args_ = args_.SetObject(std::move(engine));
grpc_event_engine::experimental::GetDefaultEventEngine());
} }
grpc_endpoint* mock_endpoint_ = nullptr; grpc_endpoint* mock_endpoint_ = nullptr;
ChannelArgs args_; ChannelArgs args_;
private:
static void DiscardWrite(grpc_slice /*slice*/) {}
}; };
TEST_F(ConfigurationTest, ClientKeepaliveDefaults) { TEST_F(ConfigurationTest, ClientKeepaliveDefaults) {

Loading…
Cancel
Save