@ -16,12 +16,6 @@
*
*/
# include <algorithm>
# include <memory>
# include <mutex>
# include <random>
# include <thread>
# include <grpc/grpc.h>
# include <grpc/support/alloc.h>
# include <grpc/support/atm.h>
@ -35,16 +29,22 @@
# include <grpcpp/health_check_service_interface.h>
# include <grpcpp/server.h>
# include <grpcpp/server_builder.h>
# include <gtest/gtest.h>
# include <algorithm>
# include <memory>
# include <mutex>
# include <random>
# include <thread>
# include "src/core/lib/backoff/backoff.h"
# include "src/core/lib/gpr/env.h"
# include "src/proto/grpc/testing/echo.grpc.pb.h"
# include "test/core/util/debugger_macros.h"
# include "test/core/util/port.h"
# include "test/core/util/test_config.h"
# include "test/cpp/end2end/test_service_impl.h"
# include <gtest/gtest.h>
# include "test/cpp/util/test_credentials_provider.h"
# ifdef GPR_LINUX
using grpc : : testing : : EchoRequest ;
@ -54,14 +54,20 @@ namespace grpc {
namespace testing {
namespace {
class FlakyNetworkTest : public : : testing : : Test {
struct TestScenario {
TestScenario ( const grpc : : string & creds_type , const grpc : : string & content )
: credentials_type ( creds_type ) , message_content ( content ) { }
const grpc : : string credentials_type ;
const grpc : : string message_content ;
} ;
class FlakyNetworkTest : public : : testing : : TestWithParam < TestScenario > {
protected :
FlakyNetworkTest ( )
: server_host_ ( " grpctest " ) ,
interface_ ( " lo:1 " ) ,
ipv4_address_ ( " 10.0.0.1 " ) ,
netmask_ ( " /32 " ) ,
kRequestMessage_ ( " 🖖 " ) { }
netmask_ ( " /32 " ) { }
void InterfaceUp ( ) {
std : : ostringstream cmd ;
@ -129,10 +135,11 @@ class FlakyNetworkTest : public ::testing::Test {
void FlakeNetwork ( ) {
std : : ostringstream cmd ;
// Emulate a flaky network connection over interface_. Add a delay of 100ms
// +/- 590ms, 3% packet loss, 1% duplicates and 0. 1% corrupt packets.
// +/- 20ms, 0.1% packet loss, 1% duplicates and 0.0 1% corrupt packets.
cmd < < " tc qdisc replace dev " < < interface_
< < " root netem delay 100ms 50ms distribution normal loss 3% duplicate "
" 1% corrupt 0.1% " ;
< < " root netem delay 100ms 20ms distribution normal loss 0.1% "
" duplicate "
" 0.1% corrupt 0.01% " ;
std : : system ( cmd . str ( ) . c_str ( ) ) ;
}
@ -172,7 +179,7 @@ class FlakyNetworkTest : public ::testing::Test {
// ip6-looopback, but ipv6 support is not enabled by default in docker.
port_ = SERVER_PORT ;
server_ . reset ( new ServerData ( port_ ) ) ;
server_ . reset ( new ServerData ( port_ , GetParam ( ) . credentials_type ) ) ;
server_ - > Start ( server_host_ ) ;
}
void StopServer ( ) { server_ - > Shutdown ( ) ; }
@ -188,10 +195,11 @@ class FlakyNetworkTest : public ::testing::Test {
if ( lb_policy_name . size ( ) > 0 ) {
args . SetLoadBalancingPolicyName ( lb_policy_name ) ;
} // else, default to pick first
auto channel_creds = GetCredentialsProvider ( ) - > GetChannelCredentials (
GetParam ( ) . credentials_type , & args ) ;
std : : ostringstream server_address ;
server_address < < server_host_ < < " : " < < port_ ;
return CreateCustomChannel ( server_address . str ( ) ,
InsecureChannelCredentials ( ) , args ) ;
return CreateCustomChannel ( server_address . str ( ) , channel_creds , args ) ;
}
bool SendRpc (
@ -199,7 +207,8 @@ class FlakyNetworkTest : public ::testing::Test {
int timeout_ms = 0 , bool wait_for_ready = false ) {
auto response = std : : unique_ptr < EchoResponse > ( new EchoResponse ( ) ) ;
EchoRequest request ;
request . set_message ( kRequestMessage_ ) ;
auto & msg = GetParam ( ) . message_content ;
request . set_message ( msg ) ;
ClientContext context ;
if ( timeout_ms > 0 ) {
context . set_deadline ( grpc_timeout_milliseconds_to_deadline ( timeout_ms ) ) ;
@ -211,22 +220,33 @@ class FlakyNetworkTest : public ::testing::Test {
}
Status status = stub - > Echo ( & context , request , response . get ( ) ) ;
auto ok = status . ok ( ) ;
int stream_id = 0 ;
grpc_call * call = context . c_call ( ) ;
if ( call ) {
grpc_chttp2_stream * stream = grpc_chttp2_stream_from_call ( call ) ;
if ( stream ) {
stream_id = stream - > id ;
}
}
if ( ok ) {
gpr_log ( GPR_DEBUG , " RPC returned %s \n " , response - > message ( ) . c_str ( ) ) ;
gpr_log ( GPR_DEBUG , " RPC with stream_id %d succeeded " , stream_id ) ;
} else {
gpr_log ( GPR_DEBUG , " RPC failed: %s " , status . error_message ( ) . c_str ( ) ) ;
gpr_log ( GPR_DEBUG , " RPC with stream_id %d failed: %s " , stream_id ,
status . error_message ( ) . c_str ( ) ) ;
}
return ok ;
}
struct ServerData {
int port_ ;
const grpc : : string creds_ ;
std : : unique_ptr < Server > server_ ;
TestServiceImpl service_ ;
std : : unique_ptr < std : : thread > thread_ ;
bool server_ready_ = false ;
explicit ServerData ( int port ) { port_ = port ; }
ServerData ( int port , const grpc : : string & creds )
: port_ ( port ) , creds_ ( creds ) { }
void Start ( const grpc : : string & server_host ) {
gpr_log ( GPR_INFO , " starting server on port %d " , port_ ) ;
@ -245,8 +265,9 @@ class FlakyNetworkTest : public ::testing::Test {
std : : ostringstream server_address ;
server_address < < server_host < < " : " < < port_ ;
ServerBuilder builder ;
builder . AddListeningPort ( server_address . str ( ) ,
InsecureServerCredentials ( ) ) ;
auto server_creds =
GetCredentialsProvider ( ) - > GetServerCredentials ( creds_ ) ;
builder . AddListeningPort ( server_address . str ( ) , server_creds ) ;
builder . RegisterService ( & service_ ) ;
server_ = builder . BuildAndStart ( ) ;
std : : lock_guard < std : : mutex > lock ( * mu ) ;
@ -291,11 +312,43 @@ class FlakyNetworkTest : public ::testing::Test {
std : : unique_ptr < ServerData > server_ ;
const int SERVER_PORT = 32750 ;
int port_ ;
const grpc : : string kRequestMessage_ ;
} ;
std : : vector < TestScenario > CreateTestScenarios ( ) {
std : : vector < TestScenario > scenarios ;
std : : vector < grpc : : string > credentials_types ;
std : : vector < grpc : : string > messages ;
credentials_types . push_back ( kInsecureCredentialsType ) ;
auto sec_list = GetCredentialsProvider ( ) - > GetSecureCredentialsTypeList ( ) ;
for ( auto sec = sec_list . begin ( ) ; sec ! = sec_list . end ( ) ; sec + + ) {
credentials_types . push_back ( * sec ) ;
}
messages . push_back ( " 🖖 " ) ;
for ( size_t k = 1 ; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024 ; k * = 32 ) {
grpc : : string big_msg ;
for ( size_t i = 0 ; i < k * 1024 ; + + i ) {
char c = ' a ' + ( i % 26 ) ;
big_msg + = c ;
}
messages . push_back ( big_msg ) ;
}
for ( auto cred = credentials_types . begin ( ) ; cred ! = credentials_types . end ( ) ;
+ + cred ) {
for ( auto msg = messages . begin ( ) ; msg ! = messages . end ( ) ; msg + + ) {
scenarios . emplace_back ( * cred , * msg ) ;
}
}
return scenarios ;
}
INSTANTIATE_TEST_CASE_P ( FlakyNetworkTest , FlakyNetworkTest ,
: : testing : : ValuesIn ( CreateTestScenarios ( ) ) ) ;
// Network interface connected to server flaps
TEST_F ( FlakyNetworkTest , NetworkTransition ) {
TEST_P ( FlakyNetworkTest , NetworkTransition ) {
const int kKeepAliveTimeMs = 1000 ;
const int kKeepAliveTimeoutMs = 1000 ;
ChannelArguments args ;
@ -336,7 +389,7 @@ TEST_F(FlakyNetworkTest, NetworkTransition) {
}
// Traffic to server server is blackholed temporarily with keepalives enabled
TEST_F ( FlakyNetworkTest , ServerUnreachableWithKeepalive ) {
TEST_P ( FlakyNetworkTest , ServerUnreachableWithKeepalive ) {
const int kKeepAliveTimeMs = 1000 ;
const int kKeepAliveTimeoutMs = 1000 ;
const int kReconnectBackoffMs = 1000 ;
@ -385,7 +438,7 @@ TEST_F(FlakyNetworkTest, ServerUnreachableWithKeepalive) {
//
// Traffic to server server is blackholed temporarily with keepalives disabled
TEST_F ( FlakyNetworkTest , ServerUnreachableNoKeepalive ) {
TEST_P ( FlakyNetworkTest , ServerUnreachableNoKeepalive ) {
auto channel = BuildChannel ( " pick_first " , ChannelArguments ( ) ) ;
auto stub = BuildStub ( channel ) ;
// Channel should be in READY state after we send an RPC
@ -411,7 +464,7 @@ TEST_F(FlakyNetworkTest, ServerUnreachableNoKeepalive) {
}
// Send RPCs over a flaky network connection
TEST_F ( FlakyNetworkTest , FlakyNetwork ) {
TEST_P ( FlakyNetworkTest , FlakyNetwork ) {
const int kKeepAliveTimeMs = 1000 ;
const int kKeepAliveTimeoutMs = 1000 ;
const int kMessageCount = 100 ;
@ -438,7 +491,7 @@ TEST_F(FlakyNetworkTest, FlakyNetwork) {
}
// Server is shutdown gracefully and restarted. Client keepalives are enabled
TEST_F ( FlakyNetworkTest , ServerRestartKeepaliveEnabled ) {
TEST_P ( FlakyNetworkTest , ServerRestartKeepaliveEnabled ) {
const int kKeepAliveTimeMs = 1000 ;
const int kKeepAliveTimeoutMs = 1000 ;
ChannelArguments args ;
@ -468,7 +521,7 @@ TEST_F(FlakyNetworkTest, ServerRestartKeepaliveEnabled) {
}
// Server is shutdown gracefully and restarted. Client keepalives are enabled
TEST_F ( FlakyNetworkTest , ServerRestartKeepaliveDisabled ) {
TEST_P ( FlakyNetworkTest , ServerRestartKeepaliveDisabled ) {
auto channel = BuildChannel ( " pick_first " , ChannelArguments ( ) ) ;
auto stub = BuildStub ( channel ) ;
// Channel should be in READY state after we send an RPC