@ -34,6 +34,8 @@
# include <sys/types.h>
# include <unistd.h>
# include <gtest/gtest.h>
# include <grpc/grpc.h>
# include <grpc/support/alloc.h>
# include <grpc/support/log.h>
@ -58,6 +60,8 @@
static gpr_mu * g_mu ;
static grpc_pollset * g_pollset ;
static constexpr int64_t kDeadlineMillis = 20000 ;
//
// General test notes:
@ -83,44 +87,6 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT ( fcntl ( sv [ 1 ] , F_SETFL , flags | O_NONBLOCK ) = = 0 ) ;
}
static void create_inet_sockets ( int sv [ 2 ] ) {
// Prepare listening socket
struct sockaddr_in addr ;
memset ( & addr , 0 , sizeof ( struct sockaddr_in ) ) ;
addr . sin_family = AF_INET ;
int sock = socket ( AF_INET , SOCK_STREAM , 0 ) ;
GPR_ASSERT ( sock ) ;
GPR_ASSERT ( bind ( sock , ( sockaddr * ) & addr , sizeof ( sockaddr_in ) ) = = 0 ) ;
listen ( sock , 1 ) ;
// Prepare client socket and connect to server
socklen_t len = sizeof ( sockaddr_in ) ;
GPR_ASSERT ( getsockname ( sock , ( sockaddr * ) & addr , & len ) = = 0 ) ;
int client = socket ( AF_INET , SOCK_STREAM , 0 ) ;
GPR_ASSERT ( client ) ;
int ret ;
do {
ret = connect ( client , reinterpret_cast < sockaddr * > ( & addr ) ,
sizeof ( sockaddr_in ) ) ;
} while ( ret = = - 1 & & errno = = EINTR ) ;
// Accept client connection
len = sizeof ( socklen_t ) ;
int server ;
do {
server = accept ( sock , reinterpret_cast < sockaddr * > ( & addr ) , & len ) ;
} while ( server = = - 1 & & errno = = EINTR ) ;
GPR_ASSERT ( server ! = - 1 ) ;
sv [ 0 ] = server ;
sv [ 1 ] = client ;
int flags = fcntl ( sv [ 0 ] , F_GETFL , 0 ) ;
GPR_ASSERT ( fcntl ( sv [ 0 ] , F_SETFL , flags | O_NONBLOCK ) = = 0 ) ;
flags = fcntl ( sv [ 1 ] , F_GETFL , 0 ) ;
GPR_ASSERT ( fcntl ( sv [ 1 ] , F_SETFL , flags | O_NONBLOCK ) = = 0 ) ;
}
static ssize_t fill_socket ( int fd ) {
ssize_t write_bytes ;
ssize_t total_bytes = 0 ;
@ -225,7 +191,7 @@ static void read_test(size_t num_bytes, size_t slice_size,
struct read_socket_state state ;
size_t written_bytes ;
grpc_core : : Timestamp deadline = grpc_core : : Timestamp : : FromTimespecRoundUp (
grpc_timeout_seconds_to_deadline ( 20 ) ) ;
grpc_timeout_milli seconds_to_deadline ( kDeadlineMillis ) ) ;
grpc_core : : ExecCtx exec_ctx ;
gpr_log ( GPR_INFO , " Read test of size % " PRIuPTR " , slice size % " PRIuPTR ,
@ -263,14 +229,14 @@ static void read_test(size_t num_bytes, size_t slice_size,
grpc_endpoint_read ( ep , & state . incoming , & state . read_cb , /*urgent=*/ false ,
/*min_progress_size=*/ state . min_progress_size ) ;
grpc_core : : ExecCtx : : Get ( ) - > Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
while ( state . read_bytes < state . target_read_bytes ) {
grpc_pollset_worker * worker = nullptr ;
GPR_ASSERT ( GRPC_LOG_IF_ERROR (
" pollset_work " , grpc_pollset_work ( g_pollset , & worker , deadline ) ) ) ;
gpr_mu_unlock ( g_mu ) ;
grpc_core : : ExecCtx : : Get ( ) - > Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
}
GPR_ASSERT ( state . read_bytes = = state . target_read_bytes ) ;
@ -290,7 +256,7 @@ static void large_read_test(size_t slice_size, int min_progress_size) {
struct read_socket_state state ;
ssize_t written_bytes ;
grpc_core : : Timestamp deadline = grpc_core : : Timestamp : : FromTimespecRoundUp (
grpc_timeout_seconds_to_deadline ( 20 ) ) ;
grpc_timeout_milli seconds_to_deadline ( kDeadlineMillis ) ) ;
grpc_core : : ExecCtx exec_ctx ;
gpr_log ( GPR_INFO , " Start large read test, slice size % " PRIuPTR , slice_size ) ;
@ -327,14 +293,14 @@ static void large_read_test(size_t slice_size, int min_progress_size) {
grpc_endpoint_read ( ep , & state . incoming , & state . read_cb , /*urgent=*/ false ,
/*min_progress_size=*/ state . min_progress_size ) ;
grpc_core : : ExecCtx : : Get ( ) - > Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
while ( state . read_bytes < state . target_read_bytes ) {
grpc_pollset_worker * worker = nullptr ;
GPR_ASSERT ( GRPC_LOG_IF_ERROR (
" pollset_work " , grpc_pollset_work ( g_pollset , & worker , deadline ) ) ) ;
gpr_mu_unlock ( g_mu ) ;
grpc_core : : ExecCtx : : Get ( ) - > Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
}
GPR_ASSERT ( state . read_bytes = = state . target_read_bytes ) ;
@ -427,24 +393,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free ( buf ) ;
}
// Verifier for timestamps callback for write_test
void timestamps_verifier ( void * arg , grpc_core : : Timestamps * ts ,
grpc_error_handle error ) {
GPR_ASSERT ( error . ok ( ) ) ;
GPR_ASSERT ( arg ! = nullptr ) ;
GPR_ASSERT ( ts - > sendmsg_time . time . clock_type = = GPR_CLOCK_REALTIME ) ;
GPR_ASSERT ( ts - > scheduled_time . time . clock_type = = GPR_CLOCK_REALTIME ) ;
GPR_ASSERT ( ts - > acked_time . time . clock_type = = GPR_CLOCK_REALTIME ) ;
gpr_atm * done_timestamps = static_cast < gpr_atm * > ( arg ) ;
gpr_atm_rel_store ( done_timestamps , gpr_atm { 1 } ) ;
}
// Write to a socket using the grpc_tcp API, then drain it directly.
// Note that if the write does not complete immediately we need to drain the
// socket in parallel with the read. If collect_timestamps is true, it will
// try to get timestamps for the write.
static void write_test ( size_t num_bytes , size_t slice_size ,
bool collect_timestamps ) {
static void write_test ( size_t num_bytes , size_t slice_size ) {
int sv [ 2 ] ;
grpc_endpoint * ep ;
struct write_socket_state state ;
@ -454,22 +407,14 @@ static void write_test(size_t num_bytes, size_t slice_size,
grpc_slice_buffer outgoing ;
grpc_closure write_done_closure ;
grpc_core : : Timestamp deadline = grpc_core : : Timestamp : : FromTimespecRoundUp (
grpc_timeout_seconds_to_deadline ( 20 ) ) ;
grpc_timeout_milli seconds_to_deadline ( kDeadlineMillis ) ) ;
grpc_core : : ExecCtx exec_ctx ;
if ( collect_timestamps & & ! grpc_event_engine_can_track_errors ( ) ) {
return ;
}
gpr_log ( GPR_INFO ,
" Start write test with % " PRIuPTR " bytes, slice size % " PRIuPTR ,
num_bytes , slice_size ) ;
if ( collect_timestamps ) {
create_inet_sockets ( sv ) ;
} else {
create_sockets ( sv ) ;
}
create_sockets ( sv ) ;
grpc_arg a [ 2 ] ;
a [ 0 ] . key = const_cast < char * > ( GRPC_ARG_TCP_READ_CHUNK_SIZE ) ;
@ -481,7 +426,7 @@ static void write_test(size_t num_bytes, size_t slice_size,
a [ 1 ] . value . pointer . vtable = grpc_resource_quota_arg_vtable ( ) ;
grpc_channel_args args = { GPR_ARRAY_SIZE ( a ) , a } ;
ep = grpc_tcp_create (
grpc_fd_create ( sv [ 1 ] , " write_test " , collect_timestamps ) ,
grpc_fd_create ( sv [ 1 ] , " write_test " , false ) ,
TcpOptionsFromEndpointConfig (
grpc_event_engine : : experimental : : ChannelArgsEndpointConfig (
grpc_core : : ChannelArgs : : FromC ( & args ) ) ) ,
@ -498,21 +443,14 @@ static void write_test(size_t num_bytes, size_t slice_size,
GRPC_CLOSURE_INIT ( & write_done_closure , write_done , & state ,
grpc_schedule_on_exec_ctx ) ;
gpr_atm done_timestamps ;
gpr_atm_rel_store ( & done_timestamps , gpr_atm { 0 } ) ;
grpc_endpoint_write ( ep , & outgoing , & write_done_closure ,
grpc_event_engine_can_track_errors ( ) & & collect_timestamps
? & done_timestamps
: nullptr ,
grpc_endpoint_write ( ep , & outgoing , & write_done_closure , nullptr ,
/*max_frame_size=*/ INT_MAX ) ;
drain_socket_blocking ( sv [ 0 ] , num_bytes , num_bytes ) ;
exec_ctx . Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
for ( ; ; ) {
grpc_pollset_worker * worker = nullptr ;
if ( state . write_done & &
( ! ( grpc_event_engine_can_track_errors ( ) & & collect_timestamps ) | |
gpr_atm_acq_load ( & done_timestamps ) = = gpr_atm { 1 } ) ) {
if ( state . write_done ) {
break ;
}
GPR_ASSERT ( GRPC_LOG_IF_ERROR (
@ -550,7 +488,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
size_t written_bytes ;
int fd ;
grpc_core : : Timestamp deadline = grpc_core : : Timestamp : : FromTimespecRoundUp (
grpc_timeout_seconds_to_deadline ( 20 ) ) ;
grpc_timeout_milli seconds_to_deadline ( kDeadlineMillis ) ) ;
grpc_core : : ExecCtx exec_ctx ;
grpc_closure fd_released_cb ;
release_fd_arg rel_fd ;
@ -609,7 +547,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_endpoint_read ( ep , & state . incoming , & state . read_cb , /*urgent=*/ false ,
/*min_progress_size=*/ state . min_progress_size ) ;
grpc_core : : ExecCtx : : Get ( ) - > Flush ( ) ;
gpr_mu_lock ( g_mu ) ;
while ( state . read_bytes < state . target_read_bytes ) {
grpc_pollset_worker * worker = nullptr ;
@ -649,21 +587,14 @@ void run_tests(void) {
large_read_test ( 8192 , i ) ;
large_read_test ( 1 , i ) ;
}
write_test ( 100 , 8192 , false ) ;
write_test ( 100 , 1 , false ) ;
write_test ( 100000 , 8192 , false ) ;
write_test ( 100000 , 1 , false ) ;
write_test ( 100000 , 137 , false ) ;
write_test ( 100 , 8192 , true ) ;
write_test ( 100 , 1 , true ) ;
write_test ( 100000 , 8192 , true ) ;
write_test ( 100000 , 1 , true ) ;
write_test ( 100 , 137 , true ) ;
write_test ( 100 , 8192 ) ;
write_test ( 100 , 1 ) ;
write_test ( 100000 , 8192 ) ;
write_test ( 100000 , 1 ) ;
write_test ( 100000 , 137 ) ;
for ( i = 1 ; i < 1000 ; i = std : : max ( i + 1 , i * 5 / 4 ) ) {
write_test ( 40320 , i , false ) ;
write_test ( 40320 , i , true ) ;
write_test ( 40320 , i ) ;
}
release_fd_test ( 100 , 8192 ) ;
@ -717,9 +648,9 @@ static void destroy_pollset(void* p, grpc_error_handle /*error*/) {
int main ( int argc , char * * argv ) {
grpc_closure destroyed ;
: : testing : : InitGoogleTest ( & argc , argv ) ;
grpc : : testing : : TestEnvironment env ( & argc , argv ) ;
grpc_init ( ) ;
grpc_core : : grpc_tcp_set_write_timestamps_callback ( timestamps_verifier ) ;
{
grpc_core : : ExecCtx exec_ctx ;
g_pollset = static_cast < grpc_pollset * > ( gpr_zalloc ( grpc_pollset_size ( ) ) ) ;