@ -13,6 +13,8 @@
// limitations under the License.
# include <grpc/support/port_platform.h>
# include <stdint.h>
# include <algorithm>
# include <memory>
# include <string>
@ -21,12 +23,13 @@
# include <vector>
# include "absl/base/thread_annotations.h"
# include "absl/container/flat_hash_set.h"
# include "absl/functional/any_invocable.h"
# include "absl/hash/hash.h"
# include "absl/status/status.h"
# include "absl/status/statusor.h"
# include "absl/strings/str_cat.h"
# include "absl/strings/string_view.h"
# include "absl/time/time.h"
# include <grpc/event_engine/event_engine.h>
# include <grpc/grpc.h>
@ -34,9 +37,9 @@
# include "src/core/ext/filters/client_channel/resolver/dns/event_engine/event_engine_client_channel_resolver.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/event_engine/default_event_engine.h"
# include "src/core/lib/event_engine/tcp_socket_utils.h"
# include "src/core/lib/gprpp/debug_location.h"
# include "src/core/lib/gprpp/notification.h"
# include "src/core/lib/gprpp/orphanable.h"
# include "src/core/lib/gprpp/work_serializer.h"
# include "src/core/lib/resolver/resolver.h"
@ -48,14 +51,12 @@
# include "test/core/event_engine/util/aborting_event_engine.h"
# include "test/core/ext/filters/event_engine_client_channel_resolver/resolver_fuzzer.pb.h"
// TODO(hork): exercise Orphan on the client channel resolver, which will
// exercise the resolution cancellation path. Currently, all requests will get
// responses.
bool squelch = true ;
static void dont_log ( gpr_log_func_args * /*args*/ ) { }
namespace {
using event_engine_client_channel_resolver : : ExecutionStep ;
using event_engine_client_channel_resolver : : TXTRecordType ;
using grpc_core : : EventEngineClientChannelDNSResolverFactory ;
using grpc_event_engine : : experimental : : EventEngine ;
@ -75,9 +76,11 @@ class FuzzingResolverEventEngine
: public grpc_event_engine : : experimental : : AbortingEventEngine {
public :
explicit FuzzingResolverEventEngine (
const event_engine_client_channel_resolver : : Msg & msg )
: runner_ ( FuzzingEventEngine : : Options ( ) ,
fuzzing_event_engine : : Actions ( ) ) {
const event_engine_client_channel_resolver : : Msg & msg ,
bool * done_resolving )
: runner_ ( FuzzingEventEngine : : Options ( ) , fuzzing_event_engine : : Actions ( ) ) ,
done_resolving_ ( done_resolving ) ,
should_orphan_at_step_ ( msg . should_orphan_at_step ( ) ) {
// Set hostname responses
if ( msg . has_hostname_error ( ) ) {
hostname_responses_ = ErrorToAbslStatus ( msg . hostname_error ( ) ) ;
@ -130,6 +133,7 @@ class FuzzingResolverEventEngine
}
}
}
std : : unique_ptr < DNSResolver > GetDNSResolver (
const DNSResolver : : ResolverOptions & /* options */ ) override {
return std : : make_unique < FuzzingDNSResolver > ( this ) ;
@ -142,57 +146,92 @@ class FuzzingResolverEventEngine
public :
explicit FuzzingDNSResolver ( FuzzingResolverEventEngine * engine )
: engine_ ( engine ) { }
~ FuzzingDNSResolver ( ) override { GPR_ASSERT ( known_handles_ . empty ( ) ) ; }
LookupTaskHandle LookupHostname ( LookupHostnameCallback on_resolve ,
absl : : string_view /* name */ ,
absl : : string_view /* default_port */ ,
Duration /* timeout */ ) override {
auto finish =
[ cb = std : : move ( on_resolve ) , runner = & engine_ - > runner_ ] (
absl : : StatusOr < std : : vector < ResolvedAddress > > response ) mutable {
runner - > Run (
[ cb = std : : move ( cb ) , response = std : : move ( response ) ] ( ) mutable {
cb ( response ) ;
} ) ;
return EventEngine : : DNSResolver : : LookupTaskHandle : : kInvalid ;
} ;
return finish ( engine_ - > hostname_responses_ ) ;
int handle = NextHandle ( ) ;
CheckAndSetOrphan ( ExecutionStep : : DURING_LOOKUP_HOSTNAME ) ;
if ( ! engine_ - > has_been_orphaned_ ) {
engine_ - > runner_ . Run (
[ this , cb = std : : move ( on_resolve ) , handle ] ( ) mutable {
if ( ! HandleExists ( handle ) ) return ;
DeleteHandle ( handle ) ;
cb ( engine_ - > hostname_responses_ ) ;
CheckAndSetOrphan ( ExecutionStep : : AFTER_LOOKUP_HOSTNAME_CALLBACK ) ;
} ) ;
}
return { handle , 0 } ;
}
LookupTaskHandle LookupSRV ( LookupSRVCallback on_resolve ,
absl : : string_view /* name */ ,
Duration /* timeout */ ) override {
auto finish =
[ cb = std : : move ( on_resolve ) , runner = & engine_ - > runner_ ] (
absl : : StatusOr < std : : vector < SRVRecord > > response ) mutable {
runner - > Run (
[ cb = std : : move ( cb ) , response = std : : move ( response ) ] ( ) mutable {
cb ( response ) ;
} ) ;
return EventEngine : : DNSResolver : : LookupTaskHandle : : kInvalid ;
} ;
return finish ( engine_ - > srv_responses_ ) ;
int handle = NextHandle ( ) ;
CheckAndSetOrphan ( ExecutionStep : : DURING_LOOKUP_SRV ) ;
if ( ! engine_ - > has_been_orphaned_ ) {
engine_ - > runner_ . Run (
[ this , cb = std : : move ( on_resolve ) , handle ] ( ) mutable {
if ( ! HandleExists ( handle ) ) return ;
DeleteHandle ( handle ) ;
cb ( engine_ - > srv_responses_ ) ;
CheckAndSetOrphan ( ExecutionStep : : AFTER_LOOKUP_SRV_CALLBACK ) ;
} ) ;
}
return { handle , 0 } ;
}
LookupTaskHandle LookupTXT ( LookupTXTCallback on_resolve ,
absl : : string_view /* name */ ,
Duration /* timeout */ ) override {
auto finish =
[ cb = std : : move ( on_resolve ) , runner = & engine_ - > runner_ ] (
absl : : StatusOr < std : : vector < std : : string > > response ) mutable {
runner - > Run (
[ cb = std : : move ( cb ) , response = std : : move ( response ) ] ( ) mutable {
cb ( response ) ;
} ) ;
return EventEngine : : DNSResolver : : LookupTaskHandle : : kInvalid ;
} ;
return finish ( engine_ - > txt_responses_ ) ;
int handle = NextHandle ( ) ;
CheckAndSetOrphan ( ExecutionStep : : DURING_LOOKUP_TXT ) ;
if ( ! engine_ - > has_been_orphaned_ ) {
engine_ - > runner_ . Run (
[ this , cb = std : : move ( on_resolve ) , handle ] ( ) mutable {
if ( ! HandleExists ( handle ) ) return ;
DeleteHandle ( handle ) ;
cb ( engine_ - > txt_responses_ ) ;
CheckAndSetOrphan ( ExecutionStep : : AFTER_LOOKUP_TXT_CALLBACK ) ;
} ) ;
}
return { handle , 0 } ;
}
bool CancelLookup ( LookupTaskHandle handle ) override {
int bit_handle = handle . keys [ 0 ] ;
if ( ! HandleExists ( bit_handle ) ) return false ;
DeleteHandle ( bit_handle ) ;
return true ;
}
bool CancelLookup ( LookupTaskHandle /* handle */ ) override { return false ; }
private :
int NextHandle ( ) {
static uint64_t next_handle = 0 ;
known_handles_ . insert ( + + next_handle ) ;
return next_handle ;
}
bool HandleExists ( int handle ) { return known_handles_ . contains ( handle ) ; }
void DeleteHandle ( int handle ) { known_handles_ . erase ( handle ) ; }
void CheckAndSetOrphan ( ExecutionStep current_execution_step ) {
if ( engine_ - > should_orphan_at_step_ = = current_execution_step ) {
* engine_ - > done_resolving_ = true ;
engine_ - > has_been_orphaned_ = true ;
}
}
FuzzingResolverEventEngine * engine_ ;
// The set of outstanding LookupTaskHandles.
absl : : flat_hash_set < uint64_t > known_handles_ ;
} ;
// members
FuzzingEventEngine runner_ ;
bool * done_resolving_ ;
ExecutionStep should_orphan_at_step_ ;
bool has_been_orphaned_ = false ;
// responses
absl : : StatusOr < std : : vector < EventEngine : : ResolvedAddress > > hostname_responses_ ;
@ -221,19 +260,18 @@ grpc_core::ChannelArgs ConstructChannelArgs(
class FuzzingResultHandler : public grpc_core : : Resolver : : ResultHandler {
public :
explicit FuzzingResultHandler ( grpc_core : : Notification * signal )
: signal _ ( signal ) { }
explicit FuzzingResultHandler ( bool * done_resolving )
: done_re solv in g_( done_re solv in g) { }
void ReportResult ( grpc_core : : Resolver : : Result /* result */ ) override {
signal_ - > Notify ( ) ;
* done_resolving_ = true ;
}
private :
grpc_core : : Notification * signal _;
bool * done_resolving _;
} ;
grpc_core : : ResolverArgs ConstructResolverArgs (
const grpc_core : : ChannelArgs & channel_args ,
grpc_core : : Notification * result_handler_notification ,
const grpc_core : : ChannelArgs & channel_args , bool * done_resolving ,
std : : shared_ptr < grpc_core : : WorkSerializer > work_serializer ) {
grpc_core : : ResolverArgs resolver_args ;
auto uri = grpc_core : : URI : : Parse ( " dns:localhost " ) ;
@ -242,8 +280,7 @@ grpc_core::ResolverArgs ConstructResolverArgs(
resolver_args . args = channel_args ;
resolver_args . pollset_set = nullptr ;
resolver_args . work_serializer = std : : move ( work_serializer ) ;
auto result_handler =
std : : make_unique < FuzzingResultHandler > ( result_handler_notification ) ;
auto result_handler = std : : make_unique < FuzzingResultHandler > ( done_resolving ) ;
resolver_args . result_handler = std : : move ( result_handler ) ;
return resolver_args ;
}
@ -251,21 +288,31 @@ grpc_core::ResolverArgs ConstructResolverArgs(
} // namespace
DEFINE_PROTO_FUZZER ( const event_engine_client_channel_resolver : : Msg & msg ) {
auto engine = std : : make_shared < FuzzingResolverEventEngine > ( msg ) ;
auto channel_args = ConstructChannelArgs ( msg , engine ) ;
grpc_core : : Notification result_handler_notification ;
auto work_serializer = std : : make_shared < grpc_core : : WorkSerializer > ( ) ;
auto resolver_args = ConstructResolverArgs (
channel_args , & result_handler_notification , work_serializer ) ;
EventEngineClientChannelDNSResolverFactory resolver_factory ;
auto resolver = resolver_factory . CreateResolver ( std : : move ( resolver_args ) ) ;
work_serializer - > Run ( [ resolver = resolver . get ( ) ] ( )
ABSL_EXCLUSIVE_LOCKS_REQUIRED (
* work_serializer ) { resolver - > StartLocked ( ) ; } ,
DEBUG_LOCATION ) ;
// wait for result (no need to check validity)
do {
engine - > Tick ( ) ;
} while ( ! result_handler_notification . WaitForNotificationWithTimeout (
absl : : Milliseconds ( 33 ) ) ) ;
if ( squelch ) gpr_set_log_function ( dont_log ) ;
bool done_resolving = false ;
grpc_event_engine : : experimental : : SetEventEngineFactory ( [ msg ,
& done_resolving ] ( ) {
return std : : make_unique < FuzzingResolverEventEngine > ( msg , & done_resolving ) ;
} ) ;
auto engine = std : : static_pointer_cast < FuzzingResolverEventEngine > (
grpc_event_engine : : experimental : : GetDefaultEventEngine ( ) ) ;
{
// scoped to ensure the resolver is orphaned when done resolving.
auto work_serializer = std : : make_shared < grpc_core : : WorkSerializer > ( ) ;
EventEngineClientChannelDNSResolverFactory resolver_factory ;
auto resolver_args = ConstructResolverArgs (
ConstructChannelArgs ( msg , engine ) , & done_resolving , work_serializer ) ;
auto resolver = resolver_factory . CreateResolver ( std : : move ( resolver_args ) ) ;
work_serializer - > Run (
[ resolver_ptr = resolver . get ( ) ] ( ) ABSL_EXCLUSIVE_LOCKS_REQUIRED (
* work_serializer ) { resolver_ptr - > StartLocked ( ) ; } ,
DEBUG_LOCATION ) ;
// wait for result (no need to check validity)
while ( ! done_resolving ) {
engine - > Tick ( ) ;
}
}
// If orphaned early, callbacks may still need to run, which may keep the
// resolver alive.
while ( engine . use_count ( ) > 1 ) engine - > Tick ( ) ;
}