@ -32,6 +32,7 @@
# include "absl/base/thread_annotations.h"
# include "absl/status/status.h"
# include "absl/status/statusor.h"
# include "absl/strings/str_format.h"
# include "absl/strings/string_view.h"
# include "absl/synchronization/notification.h"
# include "absl/types/optional.h"
@ -50,6 +51,7 @@
# include "src/core/lib/config/core_configuration.h"
# include "src/core/lib/event_engine/default_event_engine.h"
# include "src/core/lib/gprpp/debug_location.h"
# include "src/core/lib/gprpp/match.h"
# include "src/core/lib/gprpp/orphanable.h"
# include "src/core/lib/gprpp/ref_counted_ptr.h"
# include "src/core/lib/gprpp/sync.h"
@ -80,15 +82,10 @@ class LoadBalancingPolicyTest : public ::testing::Test {
class FakeSubchannel : public SubchannelInterface {
public :
FakeSubchannel ( SubchannelState * state ,
const grpc_resolved_address & address ,
const ChannelArgs & args ,
std : : shared_ptr < WorkSerializer > work_serializer )
: state_ ( state ) ,
address_ ( address ) ,
args_ ( args ) ,
work_serializer_ ( std : : move ( work_serializer ) ) { }
: state_ ( state ) , work_serializer_ ( std : : move ( work_serializer ) ) { }
const grpc_resolved_address & address ( ) const { return address _; }
SubchannelState * state ( ) const { return state_ ; }
private :
// Converts between
@ -146,20 +143,29 @@ class LoadBalancingPolicyTest : public ::testing::Test {
void AddDataWatcher ( std : : unique_ptr < DataWatcherInterface > ) override { }
SubchannelState * state_ ;
grpc_resolved_address address_ ;
ChannelArgs args_ ;
std : : shared_ptr < WorkSerializer > work_serializer_ ;
std : : map < SubchannelInterface : : ConnectivityStateWatcherInterface * ,
WatcherWrapper * >
watcher_map_ ;
} ;
SubchannelState ( ) : state_tracker_ ( " LoadBalancingPolicyTest " ) { }
explicit SubchannelState ( absl : : string_view address )
: address_ ( address ) , state_tracker_ ( " LoadBalancingPolicyTest " ) { }
const std : : string & address ( ) const { return address_ ; }
// Sets the connectivity state for this subchannel. The updated state
// will be reported to all associated SubchannelInterface objects.
void SetConnectivityState ( grpc_connectivity_state state ,
const absl : : Status & status ) {
const absl : : Status & status = absl : : OkStatus ( ) ) {
if ( state = = GRPC_CHANNEL_TRANSIENT_FAILURE ) {
EXPECT_FALSE ( status . ok ( ) )
< < " bug in test: TRANSIENT_FAILURE must have non-OK status " ;
} else {
EXPECT_TRUE ( status . ok ( ) )
< < " bug in test: " < < ConnectivityStateName ( state )
< < " must have OK status: " < < status ;
}
MutexLock lock ( & mu_ ) ;
state_tracker_ . SetState ( state , status , " set from test " ) ;
}
@ -174,13 +180,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// To be invoked by FakeHelper.
RefCountedPtr < SubchannelInterface > CreateSubchannel (
const grpc_resolved_address & address , const ChannelArgs & args ,
std : : shared_ptr < WorkSerializer > work_serializer ) {
return MakeRefCounted < FakeSubchannel > ( this , address , args ,
std : : move ( work_serializer ) ) ;
return MakeRefCounted < FakeSubchannel > ( this , std : : move ( work_serializer ) ) ;
}
private :
const std : : string address_ ;
Mutex mu_ ;
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY ( & mu_ ) ;
Mutex requested_connection_mu_ ;
@ -196,35 +201,100 @@ class LoadBalancingPolicyTest : public ::testing::Test {
grpc_connectivity_state state ;
absl : : Status status ;
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > picker ;
std : : string ToString ( ) const {
return absl : : StrFormat ( " UPDATE{state=%s, status=%s, picker=%p} " ,
ConnectivityStateName ( state ) , status . ToString ( ) ,
picker . get ( ) ) ;
}
} ;
// Represents a re-resolution request from the LB policy.
struct ReresolutionRequested { } ;
// Represents an event reported by the LB policy.
using Event = absl : : variant < StateUpdate , ReresolutionRequested > ;
struct ReresolutionRequested {
std : : string ToString ( ) const { return " RERESOLUTION " ; }
} ;
FakeHelper ( LoadBalancingPolicyTest * test ,
std : : shared_ptr < WorkSerializer > work_serializer )
: test_ ( test ) , work_serializer_ ( std : : move ( work_serializer ) ) { }
// Returns the most recent event from the LB policy, or nullopt if
// there have been no events .
absl : : optional < Event > GetEvent ( ) {
// Called at test tear-down time to ensure that we have not left any
// unexpected events in the queue .
void ExpectQueueEmpty ( ) {
MutexLock lock ( & mu_ ) ;
EXPECT_TRUE ( queue_ . empty ( ) ) ;
for ( const Event & event : queue_ ) {
gpr_log ( GPR_ERROR , " UNEXPECTED EVENT LEFT IN QUEUE: %s " ,
EventString ( event ) . c_str ( ) ) ;
}
}
// Returns the next event in the queue if it is a state update.
// If the queue is empty or the next event is not a state update,
// fails the test and returns nullopt without removing anything from
// the queue.
absl : : optional < StateUpdate > GetNextStateUpdate (
SourceLocation location = SourceLocation ( ) ) {
MutexLock lock ( & mu_ ) ;
EXPECT_FALSE ( queue_ . empty ( ) ) < < location . file ( ) < < " : " < < location . line ( ) ;
if ( queue_ . empty ( ) ) return absl : : nullopt ;
Event event = std : : move ( queue_ . front ( ) ) ;
Event & event = queue_ . front ( ) ;
auto * update = absl : : get_if < StateUpdate > ( & event ) ;
EXPECT_NE ( update , nullptr )
< < " unexpected event " < < EventString ( event ) < < " at "
< < location . file ( ) < < " : " < < location . line ( ) ;
if ( update = = nullptr ) return absl : : nullopt ;
StateUpdate result = std : : move ( * update ) ;
queue_ . pop_front ( ) ;
return std : : move ( result ) ;
}
// Returns the next event in the queue if it is a re-resolution.
// If the queue is empty or the next event is not a re-resolution,
// fails the test and returns nullopt without removing anything
// from the queue.
absl : : optional < ReresolutionRequested > GetNextReresolution (
SourceLocation location = SourceLocation ( ) ) {
MutexLock lock ( & mu_ ) ;
EXPECT_FALSE ( queue_ . empty ( ) ) < < location . file ( ) < < " : " < < location . line ( ) ;
if ( queue_ . empty ( ) ) return absl : : nullopt ;
Event & event = queue_ . front ( ) ;
auto * reresolution = absl : : get_if < ReresolutionRequested > ( & event ) ;
EXPECT_NE ( reresolution , nullptr )
< < " unexpected event " < < EventString ( event ) < < " at "
< < location . file ( ) < < " : " < < location . line ( ) ;
if ( reresolution = = nullptr ) return absl : : nullopt ;
ReresolutionRequested result = * reresolution ;
queue_ . pop_front ( ) ;
return std : : move ( event ) ;
return result ;
}
private :
// Represents an event reported by the LB policy.
using Event = absl : : variant < StateUpdate , ReresolutionRequested > ;
// Returns a human-readable representation of an event.
static std : : string EventString ( const Event & event ) {
return Match (
event , [ ] ( const StateUpdate & update ) { return update . ToString ( ) ; } ,
[ ] ( const ReresolutionRequested & reresolution ) {
return reresolution . ToString ( ) ;
} ) ;
}
RefCountedPtr < SubchannelInterface > CreateSubchannel (
ServerAddress address , const ChannelArgs & args ) override {
SubchannelKey key ( address . address ( ) , args ) ;
auto & subchannel_state = test_ - > subchannel_pool_ [ key ] ;
return subchannel_state . CreateSubchannel ( address . address ( ) , args ,
work_serializer_ ) ;
auto it = test_ - > subchannel_pool_ . find ( key ) ;
if ( it = = test_ - > subchannel_pool_ . end ( ) ) {
auto address_uri = grpc_sockaddr_to_uri ( & address . address ( ) ) ;
GPR_ASSERT ( address_uri . ok ( ) ) ;
it = test_ - > subchannel_pool_
. emplace ( std : : piecewise_construct , std : : forward_as_tuple ( key ) ,
std : : forward_as_tuple ( std : : move ( * address_uri ) ) )
. first ;
}
return it - > second . CreateSubchannel ( work_serializer_ ) ;
}
void UpdateState (
@ -305,6 +375,16 @@ class LoadBalancingPolicyTest : public ::testing::Test {
LoadBalancingPolicyTest ( )
: work_serializer_ ( std : : make_shared < WorkSerializer > ( ) ) { }
~ LoadBalancingPolicyTest ( ) override {
// Note: Can't safely trigger this from inside the FakeHelper dtor,
// because if there is a picker in the queue that is holding a ref
// to the LB policy, that will prevent the LB policy from being
// destroyed, and therefore the FakeHelper will not be destroyed.
// (This will cause an ASAN failure, but it will not display the
// queued events, so the failure will be harder to diagnose.)
helper_ - > ExpectQueueEmpty ( ) ;
}
// Creates an LB policy of the specified name.
// Creates a new FakeHelper for the new LB policy, and sets helper_ to
// point to the FakeHelper.
@ -336,6 +416,19 @@ class LoadBalancingPolicyTest : public ::testing::Test {
return address ;
}
// Constructs an update containing a list of addresses.
LoadBalancingPolicy : : UpdateArgs BuildUpdate (
absl : : Span < const absl : : string_view > addresses ,
RefCountedPtr < LoadBalancingPolicy : : Config > config = nullptr ) {
LoadBalancingPolicy : : UpdateArgs update ;
update . addresses . emplace ( ) ;
for ( const absl : : string_view & address : addresses ) {
update . addresses - > emplace_back ( MakeAddress ( address ) , ChannelArgs ( ) ) ;
}
update . config = std : : move ( config ) ;
return update ;
}
// Applies the update on the LB policy.
absl : : Status ApplyUpdate ( LoadBalancingPolicy : : UpdateArgs update_args ,
LoadBalancingPolicy * lb_policy ) {
@ -356,22 +449,12 @@ class LoadBalancingPolicyTest : public ::testing::Test {
// not a state update; otherwise (if continue_predicate() tells us to
// stop) returns true.
bool WaitForStateUpdate (
std : : function < bool ( grpc_connectivity_state , absl : : Status ,
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > ) >
continue_predicate ,
std : : function < bool ( FakeHelper : : StateUpdate update ) > continue_predicate ,
SourceLocation location = SourceLocation ( ) ) {
while ( true ) {
auto event = helper_ - > GetEvent ( ) ;
EXPECT_TRUE ( event . has_value ( ) )
< < location . file ( ) < < " : " < < location . line ( ) ;
if ( ! event . has_value ( ) ) return false ;
auto * update = absl : : get_if < FakeHelper : : StateUpdate > ( & * event ) ;
EXPECT_NE ( update , nullptr ) < < location . file ( ) < < " : " < < location . line ( ) ;
if ( update = = nullptr ) return false ;
if ( ! continue_predicate ( update - > state , std : : move ( update - > status ) ,
std : : move ( update - > picker ) ) ) {
return true ;
}
auto update = helper_ - > GetNextStateUpdate ( location ) ;
if ( ! update . has_value ( ) ) return false ;
if ( ! continue_predicate ( std : : move ( * update ) ) ) return true ;
}
}
@ -381,76 +464,184 @@ class LoadBalancingPolicyTest : public ::testing::Test {
grpc_connectivity_state expected_state ,
absl : : Status expected_status = absl : : OkStatus ( ) ,
SourceLocation location = SourceLocation ( ) ) {
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > final_picker ;
WaitForStateUpdate (
[ & ] ( grpc_connectivity_state state , absl : : Status status ,
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > picker ) {
EXPECT_EQ ( state , expected_state )
< < " got " < < ConnectivityStateName ( state ) < < " , expected "
< < ConnectivityStateName ( expected_state ) < < " \n "
< < " at " < < location . file ( ) < < " : " < < location . line ( ) ;
EXPECT_EQ ( status , expected_status )
< < status < < " \n "
< < location . file ( ) < < " : " < < location . line ( ) ;
EXPECT_NE ( picker , nullptr )
< < location . file ( ) < < " : " < < location . line ( ) ;
final_picker = std : : move ( picker ) ;
return false ;
} ) ;
return final_picker ;
auto update = helper_ - > GetNextStateUpdate ( location ) ;
if ( ! update . has_value ( ) ) return nullptr ;
EXPECT_EQ ( update - > state , expected_state )
< < " got " < < ConnectivityStateName ( update - > state ) < < " , expected "
< < ConnectivityStateName ( expected_state ) < < " \n "
< < " at " < < location . file ( ) < < " : " < < location . line ( ) ;
EXPECT_EQ ( update - > status , expected_status )
< < update - > status < < " \n "
< < location . file ( ) < < " : " < < location . line ( ) ;
EXPECT_NE ( update - > picker , nullptr )
< < location . file ( ) < < " : " < < location . line ( ) ;
return std : : move ( update - > picker ) ;
}
// Waits for the LB policy to get connected.
// Waits for the LB policy to get connected, then returns the final
// picker. There can be any number of CONNECTING updates, each of
// which must return a picker that queues picks, followed by one
// update for state READY, whose picker is returned.
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > WaitForConnected (
SourceLocation location = SourceLocation ( ) ) {
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > final_picker ;
WaitForStateUpdate (
[ & ] ( grpc_connectivity_state state , absl : : Status status ,
RefCountedPtr < LoadBalancingPolicy : : SubchannelPicker > picker ) {
if ( state = = GRPC_CHANNEL_CONNECTING ) {
EXPECT_TRUE ( status . ok ( ) ) < < status ;
ExpectPickQueued ( picker . get ( ) , location ) ;
return true ;
}
EXPECT_EQ ( state , GRPC_CHANNEL_READY ) < < ConnectivityStateName ( state ) ;
final_picker = std : : move ( picker ) ;
return false ;
} ) ;
WaitForStateUpdate ( [ & ] ( FakeHelper : : StateUpdate update ) {
if ( update . state = = GRPC_CHANNEL_CONNECTING ) {
EXPECT_TRUE ( update . status . ok ( ) )
< < update . status < < " at " < < location . file ( ) < < " : "
< < location . line ( ) ;
ExpectPickQueued ( update . picker . get ( ) , location ) ;
return true ; // Keep going.
}
EXPECT_EQ ( update . state , GRPC_CHANNEL_READY )
< < ConnectivityStateName ( update . state ) < < " at " < < location . file ( )
< < " : " < < location . line ( ) ;
final_picker = std : : move ( update . picker ) ;
return false ; // Stop.
} ) ;
return final_picker ;
}
// Requests a pick on picker and expects a Queue result.
void ExpectPickQueued ( LoadBalancingPolicy : : SubchannelPicker * picker ,
SourceLocation location = SourceLocation ( ) ) {
// Waits for the LB policy to fail a connection attempt. There can be
// any number of CONNECTING updates, each of which must return a picker
// that queues picks, followed by one update for state TRANSIENT_FAILURE,
// whose status is passed to check_status() and whose picker must fail
// picks with a status that is passed to check_status().
// Returns true if the reported states match expectations.
bool WaitForConnectionFailed (
std : : function < void ( const absl : : Status & ) > check_status ,
SourceLocation location = SourceLocation ( ) ) {
bool retval = false ;
WaitForStateUpdate ( [ & ] ( FakeHelper : : StateUpdate update ) {
if ( update . state = = GRPC_CHANNEL_CONNECTING ) {
EXPECT_TRUE ( update . status . ok ( ) )
< < update . status < < " at " < < location . file ( ) < < " : "
< < location . line ( ) ;
ExpectPickQueued ( update . picker . get ( ) , location ) ;
return true ; // Keep going.
}
EXPECT_EQ ( update . state , GRPC_CHANNEL_TRANSIENT_FAILURE )
< < ConnectivityStateName ( update . state ) < < " at " < < location . file ( )
< < " : " < < location . line ( ) ;
check_status ( update . status ) ;
ExpectPickFail ( update . picker . get ( ) , check_status , location ) ;
retval = update . state = = GRPC_CHANNEL_TRANSIENT_FAILURE ;
return false ; // Stop.
} ) ;
return retval ;
}
// Expects a state update for the specified state and status, and then
// expects the resulting picker to queue picks.
void ExpectStateAndQueuingPicker (
grpc_connectivity_state expected_state ,
absl : : Status expected_status = absl : : OkStatus ( ) ,
SourceLocation location = SourceLocation ( ) ) {
auto picker = ExpectState ( expected_state , expected_status , location ) ;
ExpectPickQueued ( picker . get ( ) , location ) ;
}
// Convenient frontend to ExpectStateAndQueuingPicker() for CONNECTING.
void ExpectConnectingUpdate ( SourceLocation location = SourceLocation ( ) ) {
ExpectStateAndQueuingPicker ( GRPC_CHANNEL_CONNECTING , absl : : OkStatus ( ) ,
location ) ;
}
// Does a pick and returns the result.
LoadBalancingPolicy : : PickResult DoPick (
LoadBalancingPolicy : : SubchannelPicker * picker ) {
ExecCtx exec_ctx ;
FakeMetadata metadata ( { } ) ;
FakeCallState call_state ;
auto pick_result =
picker - > Pick ( { " /service/method " , & metadata , & call_state } ) ;
return picker - > Pick ( { " /service/method " , & metadata , & call_state } ) ;
}
// Requests a pick on picker and expects a Queue result.
void ExpectPickQueued ( LoadBalancingPolicy : : SubchannelPicker * picker ,
SourceLocation location = SourceLocation ( ) ) {
auto pick_result = DoPick ( picker ) ;
ASSERT_TRUE ( absl : : holds_alternative < LoadBalancingPolicy : : PickResult : : Queue > (
pick_result . result ) )
< < location . file ( ) < < " : " < < location . line ( ) ;
< < PickResultString ( pick_result ) < < " at " < < location . file ( ) < < " : "
< < location . line ( ) ;
}
// Requests a pick on picker and expects a Complete result whose
// subchannel has the specified address.
void ExpectPickComplete ( LoadBalancingPolicy : : SubchannelPicker * picker ,
absl : : string_view address_uri ,
SourceLocation location = SourceLocation ( ) ) {
ExecCtx exec_ctx ;
FakeMetadata metadata ( { } ) ;
FakeCallState call_state ;
auto pick_result =
picker - > Pick ( { " /service/method " , & metadata , & call_state } ) ;
// Requests a pick on picker and expects a Complete result.
// The address of the resulting subchannel is returned, or nullopt if
// the result was something other than Complete.
absl : : optional < std : : string > ExpectPickComplete (
LoadBalancingPolicy : : SubchannelPicker * picker ,
SourceLocation location = SourceLocation ( ) ) {
auto pick_result = DoPick ( picker ) ;
auto * complete = absl : : get_if < LoadBalancingPolicy : : PickResult : : Complete > (
& pick_result . result ) ;
ASSERT_NE ( complete , nullptr ) < < location . file ( ) < < " : " < < location . line ( ) ;
EXPECT_NE ( complete , nullptr ) < < PickResultString ( pick_result ) < < " at "
< < location . file ( ) < < " : " < < location . line ( ) ;
if ( complete = = nullptr ) return absl : : nullopt ;
auto * subchannel = static_cast < SubchannelState : : FakeSubchannel * > (
complete - > subchannel . get ( ) ) ;
auto uri = grpc_sockaddr_to_uri ( & subchannel - > address ( ) ) ;
ASSERT_TRUE ( uri . ok ( ) ) < < uri . status ( ) < < " at " < < location . file ( ) < < " : "
< < location . line ( ) ;
EXPECT_EQ ( * uri , address_uri ) < < location . file ( ) < < " : " < < location . line ( ) ;
return subchannel - > state ( ) - > address ( ) ;
}
// Requests a picker on picker and expects a Fail result.
// The failing status is passed to check_status.
void ExpectPickFail ( LoadBalancingPolicy : : SubchannelPicker * picker ,
std : : function < void ( const absl : : Status & ) > check_status ,
SourceLocation location = SourceLocation ( ) ) {
auto pick_result = DoPick ( picker ) ;
auto * fail = absl : : get_if < LoadBalancingPolicy : : PickResult : : Fail > (
& pick_result . result ) ;
ASSERT_NE ( fail , nullptr ) < < PickResultString ( pick_result ) < < " at "
< < location . file ( ) < < " : " < < location . line ( ) ;
check_status ( fail - > status ) ;
}
// Returns a human-readable string for a pick result.
static std : : string PickResultString (
const LoadBalancingPolicy : : PickResult & result ) {
return Match (
result . result ,
[ ] ( const LoadBalancingPolicy : : PickResult : : Complete & complete ) {
auto * subchannel = static_cast < SubchannelState : : FakeSubchannel * > (
complete . subchannel . get ( ) ) ;
return absl : : StrFormat (
" COMPLETE{subchannel=%s, subchannel_call_tracker=%p} " ,
subchannel - > state ( ) - > address ( ) ,
complete . subchannel_call_tracker . get ( ) ) ;
} ,
[ ] ( const LoadBalancingPolicy : : PickResult : : Queue & ) - > std : : string {
return " QUEUE{} " ;
} ,
[ ] ( const LoadBalancingPolicy : : PickResult : : Fail & fail ) - > std : : string {
return absl : : StrFormat ( " FAIL{%s} " , fail . status . ToString ( ) ) ;
} ,
[ ] ( const LoadBalancingPolicy : : PickResult : : Drop & drop ) - > std : : string {
return absl : : StrFormat ( " FAIL{%s} " , drop . status . ToString ( ) ) ;
} ) ;
}
// Returns the entry in the subchannel pool, or null if not present.
SubchannelState * FindSubchannel ( absl : : string_view address ,
const ChannelArgs & args = ChannelArgs ( ) ) {
SubchannelKey key ( MakeAddress ( address ) , args ) ;
auto it = subchannel_pool_ . find ( key ) ;
if ( it = = subchannel_pool_ . end ( ) ) return nullptr ;
return & it - > second ;
}
// Creates and returns an entry in the subchannel pool.
// This can be used in cases where we want to test that a subchannel
// already exists when the LB policy creates it (e.g., due to it being
// created by another channel and shared via the global subchannel
// pool, or by being created by another LB policy in this channel).
SubchannelState * CreateSubchannel ( absl : : string_view address ,
const ChannelArgs & args = ChannelArgs ( ) ) {
SubchannelKey key ( MakeAddress ( address ) , args ) ;
auto it = subchannel_pool_
. emplace ( std : : piecewise_construct , std : : forward_as_tuple ( key ) ,
std : : forward_as_tuple ( address ) )
. first ;
return & it - > second ;
}
std : : shared_ptr < WorkSerializer > work_serializer_ ;