@ -30,6 +30,11 @@
# include "src/core/ext/transport/chttp2/transport/frame.h"
# include "src/core/ext/transport/chttp2/transport/varint.h"
# include "src/core/lib/channel/channel_args.h"
# include "src/core/lib/channel/channel_args_preconditioning.h"
# include "src/core/lib/config/core_configuration.h"
# include "src/core/lib/event_engine/channel_args_endpoint_config.h"
# include "src/core/lib/event_engine/tcp_socket_utils.h"
# include "src/core/lib/gpr/useful.h"
# include "src/core/lib/iomgr/exec_ctx.h"
# include "src/core/lib/slice/slice.h"
@ -37,6 +42,8 @@
# include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
# include "test/core/util/mock_endpoint.h"
using grpc_event_engine : : experimental : : EventEngine ;
namespace grpc_core {
namespace {
@ -307,4 +314,142 @@ Duration ScheduleReads(
} ) ;
return Duration : : Milliseconds ( delay + 2 ) ;
}
namespace {
void ReadForever ( std : : shared_ptr < EventEngine : : Endpoint > ep ) {
bool finished ;
do {
auto buffer =
std : : make_unique < grpc_event_engine : : experimental : : SliceBuffer > ( ) ;
auto buffer_ptr = buffer . get ( ) ;
finished = ep - > Read (
[ ep , buffer = std : : move ( buffer ) ] ( absl : : Status status ) mutable {
ExecCtx exec_ctx ;
if ( ! status . ok ( ) ) return ;
ReadForever ( std : : move ( ep ) ) ;
} ,
buffer_ptr , nullptr ) ;
} while ( finished ) ;
}
void ScheduleWritesForReads (
std : : shared_ptr < EventEngine : : Endpoint > ep ,
grpc_event_engine : : experimental : : FuzzingEventEngine * event_engine ,
std : : vector < QueuedRead > schedule ) {
class Scheduler {
public :
Scheduler ( std : : shared_ptr < EventEngine : : Endpoint > ep ,
grpc_event_engine : : experimental : : FuzzingEventEngine * event_engine ,
std : : vector < QueuedRead > schedule )
: ep_ ( std : : move ( ep ) ) ,
event_engine_ ( event_engine ) ,
schedule_ ( std : : move ( schedule ) ) ,
it_ ( schedule_ . begin ( ) ) {
ScheduleNext ( ) ;
}
private :
void ScheduleNext ( ) {
if ( it_ = = schedule_ . end ( ) ) {
delete this ;
return ;
}
event_engine_ - > RunAfterExactly (
Duration : : Milliseconds ( it_ - > delay_ms - delay_consumed_ ) ,
[ this ] ( ) mutable {
ExecCtx exec_ctx ;
delay_consumed_ = it_ - > delay_ms ;
writing_ . Clear ( ) ;
writing_ . Append (
grpc_event_engine : : experimental : : internal : : SliceCast <
grpc_event_engine : : experimental : : Slice > (
it_ - > slices . JoinIntoSlice ( ) ) ) ;
if ( ep_ - > Write (
[ this ] ( absl : : Status status ) {
ExecCtx exec_ctx ;
FinishWrite ( std : : move ( status ) ) ;
} ,
& writing_ , nullptr ) ) {
FinishWrite ( absl : : OkStatus ( ) ) ;
}
} ) ;
}
void FinishWrite ( absl : : Status status ) {
if ( ! status . ok ( ) ) {
it_ = schedule_ . end ( ) ;
} else {
+ + it_ ;
}
ScheduleNext ( ) ;
}
std : : shared_ptr < EventEngine : : Endpoint > ep_ ;
grpc_event_engine : : experimental : : FuzzingEventEngine * event_engine_ ;
std : : vector < QueuedRead > schedule_ ;
std : : vector < QueuedRead > : : iterator it_ ;
grpc_event_engine : : experimental : : SliceBuffer writing_ ;
int delay_consumed_ = 0 ;
} ;
new Scheduler ( std : : move ( ep ) , event_engine , std : : move ( schedule ) ) ;
}
} // namespace
Duration ScheduleConnection (
const fuzzer_input : : NetworkInput & network_input ,
grpc_event_engine : : experimental : : FuzzingEventEngine * event_engine ,
testing : : FuzzingEnvironment environment , int port ) {
ChannelArgs channel_args =
CoreConfiguration : : Get ( )
. channel_args_preconditioning ( )
. PreconditionChannelArgs (
CreateChannelArgsFromFuzzingConfiguration (
network_input . endpoint_config ( ) , environment )
. ToC ( )
. get ( ) ) ;
auto schedule = MakeSchedule ( network_input ) ;
Duration delay = Duration : : Zero ( ) ;
for ( const auto & q : schedule ) {
delay = std : : max (
delay ,
Duration : : Milliseconds ( q . delay_ms ) +
Duration : : NanosecondsRoundUp (
( q . slices . Length ( ) * event_engine - > max_delay_write ( ) ) . count ( ) ) ) ;
}
delay + = Duration : : Milliseconds ( network_input . connect_delay_ms ( ) +
network_input . connect_timeout_ms ( ) ) ;
event_engine - > RunAfterExactly (
Duration : : Milliseconds ( network_input . connect_delay_ms ( ) ) ,
[ event_engine , channel_args ,
connect_timeout_ms = network_input . connect_timeout_ms ( ) ,
schedule = std : : move ( schedule ) , port ] ( ) mutable {
event_engine - > Connect (
[ event_engine , schedule = std : : move ( schedule ) ] (
absl : : StatusOr < std : : unique_ptr < EventEngine : : Endpoint > >
endpoint ) mutable {
ExecCtx exec_ctx ;
if ( ! endpoint . ok ( ) ) {
gpr_log ( GPR_ERROR , " Failed to connect: %s " ,
endpoint . status ( ) . ToString ( ) . c_str ( ) ) ;
return ;
}
std : : shared_ptr < EventEngine : : Endpoint > ep =
std : : move ( endpoint . value ( ) ) ;
ReadForever ( ep ) ;
ScheduleWritesForReads ( std : : move ( ep ) , event_engine ,
std : : move ( schedule ) ) ;
} ,
grpc_event_engine : : experimental : : ResolvedAddressMakeWild4 ( port ) ,
grpc_event_engine : : experimental : : ChannelArgsEndpointConfig (
channel_args ) ,
channel_args . GetObject < ResourceQuota > ( )
- > memory_quota ( )
- > CreateMemoryAllocator ( " fuzzer " ) ,
Duration : : Milliseconds ( connect_timeout_ms ) ) ;
} ) ;
return delay ;
}
} // namespace grpc_core