@ -89,9 +89,9 @@
# include "src/core/lib/gprpp/manual_constructor.h"
# include "src/core/lib/gprpp/manual_constructor.h"
# include "src/core/lib/gprpp/map.h"
# include "src/core/lib/gprpp/map.h"
# include "src/core/lib/gprpp/memory.h"
# include "src/core/lib/gprpp/memory.h"
# include "src/core/lib/gprpp/mutex_lock.h"
# include "src/core/lib/gprpp/orphanable.h"
# include "src/core/lib/gprpp/orphanable.h"
# include "src/core/lib/gprpp/ref_counted_ptr.h"
# include "src/core/lib/gprpp/ref_counted_ptr.h"
# include "src/core/lib/gprpp/sync.h"
# include "src/core/lib/iomgr/combiner.h"
# include "src/core/lib/iomgr/combiner.h"
# include "src/core/lib/iomgr/sockaddr.h"
# include "src/core/lib/iomgr/sockaddr.h"
# include "src/core/lib/iomgr/sockaddr_utils.h"
# include "src/core/lib/iomgr/sockaddr_utils.h"
@ -118,6 +118,7 @@ namespace {
constexpr char kXds [ ] = " xds_experimental " ;
constexpr char kXds [ ] = " xds_experimental " ;
constexpr char kDefaultLocalityName [ ] = " xds_default_locality " ;
constexpr char kDefaultLocalityName [ ] = " xds_default_locality " ;
constexpr uint32_t kDefaultLocalityWeight = 3 ;
class XdsLb : public LoadBalancingPolicy {
class XdsLb : public LoadBalancingPolicy {
public :
public :
@ -259,29 +260,52 @@ class XdsLb : public LoadBalancingPolicy {
bool retry_timer_callback_pending_ = false ;
bool retry_timer_callback_pending_ = false ;
} ;
} ;
// Since pickers are UniquePtrs we use this RefCounted wrapper
// to control references to it by the xds picker and the locality
// entry
class PickerRef : public RefCounted < PickerRef > {
public :
explicit PickerRef ( UniquePtr < SubchannelPicker > picker )
: picker_ ( std : : move ( picker ) ) { }
PickResult Pick ( PickArgs * pick , grpc_error * * error ) {
return picker_ - > Pick ( pick , error ) ;
}
private :
UniquePtr < SubchannelPicker > picker_ ;
} ;
// The picker will use a stateless weighting algorithm to pick the locality to
// use for each request.
class Picker : public SubchannelPicker {
class Picker : public SubchannelPicker {
public :
public :
Picker ( UniquePtr < SubchannelPicker > child_picker ,
// Maintains a weighted list of pickers from each locality that is in ready
RefCountedPtr < XdsLbClientStats > client_stats )
// state. The first element in the pair represents the end of a range
: child_picker_ ( std : : move ( child_picker ) ) ,
// proportional to the locality's weight. The start of the range is the
client_stats_ ( std : : move ( client_stats ) ) { }
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector < Pair < uint32_t , RefCountedPtr < PickerRef > > , 1 > ;
Picker ( RefCountedPtr < XdsLbClientStats > client_stats , PickerList pickers )
: client_stats_ ( std : : move ( client_stats ) ) ,
pickers_ ( std : : move ( pickers ) ) { }
PickResult Pick ( PickArgs * pick , grpc_error * * error ) override ;
PickResult Pick ( PickArgs * pick , grpc_error * * error ) override ;
private :
private :
UniquePtr < SubchannelPicker > child_picker_ ;
// Calls the picker of the locality that the key falls within
PickResult PickFromLocality ( const uint32_t key , PickArgs * pick ,
grpc_error * * error ) ;
RefCountedPtr < XdsLbClientStats > client_stats_ ;
RefCountedPtr < XdsLbClientStats > client_stats_ ;
PickerList pickers_ ;
} ;
} ;
class LocalityMap {
class LocalityMap {
public :
public :
class LocalityEntry : public InternallyRefCounted < LocalityEntry > {
class LocalityEntry : public InternallyRefCounted < LocalityEntry > {
public :
public :
explicit LocalityEntry ( RefCountedPtr < XdsLb > parent )
LocalityEntry ( RefCountedPtr < XdsLb > parent , uint32_t locality_weight )
: parent_ ( std : : move ( parent ) ) {
: parent_ ( std : : move ( parent ) ) , locality_weight_ ( locality_weight ) { }
gpr_mu_init ( & child_policy_mu_ ) ;
~ LocalityEntry ( ) = default ;
}
~ LocalityEntry ( ) { gpr_mu_destroy ( & child_policy_mu_ ) ; }
void UpdateLocked ( xds_grpclb_serverlist * serverlist ,
void UpdateLocked ( xds_grpclb_serverlist * serverlist ,
LoadBalancingPolicy : : Config * child_policy_config ,
LoadBalancingPolicy : : Config * child_policy_config ,
@ -323,13 +347,13 @@ class XdsLb : public LoadBalancingPolicy {
OrphanablePtr < LoadBalancingPolicy > pending_child_policy_ ;
OrphanablePtr < LoadBalancingPolicy > pending_child_policy_ ;
// Lock held when modifying the value of child_policy_ or
// Lock held when modifying the value of child_policy_ or
// pending_child_policy_.
// pending_child_policy_.
gpr_mu child_policy_mu_ ;
Mutex child_policy_mu_ ;
RefCountedPtr < XdsLb > parent_ ;
RefCountedPtr < XdsLb > parent_ ;
RefCountedPtr < PickerRef > picker_ref_ ;
grpc_connectivity_state connectivity_state_ ;
uint32_t locality_weight_ ;
} ;
} ;
LocalityMap ( ) { gpr_mu_init ( & child_refs_mu_ ) ; }
~ LocalityMap ( ) { gpr_mu_destroy ( & child_refs_mu_ ) ; }
void UpdateLocked ( const LocalityList & locality_list ,
void UpdateLocked ( const LocalityList & locality_list ,
LoadBalancingPolicy : : Config * child_policy_config ,
LoadBalancingPolicy : : Config * child_policy_config ,
const grpc_channel_args * args , XdsLb * parent ) ;
const grpc_channel_args * args , XdsLb * parent ) ;
@ -343,7 +367,7 @@ class XdsLb : public LoadBalancingPolicy {
Map < UniquePtr < char > , OrphanablePtr < LocalityEntry > , StringLess > map_ ;
Map < UniquePtr < char > , OrphanablePtr < LocalityEntry > , StringLess > map_ ;
// Lock held while filling child refs for all localities
// Lock held while filling child refs for all localities
// inside the map
// inside the map
gpr_mu child_refs_mu_ ;
Mutex child_refs_mu_ ;
} ;
} ;
struct LocalityServerlistEntry {
struct LocalityServerlistEntry {
@ -351,7 +375,9 @@ class XdsLb : public LoadBalancingPolicy {
gpr_free ( locality_name ) ;
gpr_free ( locality_name ) ;
xds_grpclb_destroy_serverlist ( serverlist ) ;
xds_grpclb_destroy_serverlist ( serverlist ) ;
}
}
char * locality_name ;
char * locality_name ;
uint32_t locality_weight ;
// The deserialized response from the balancer. May be nullptr until one
// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
// such response has arrived.
xds_grpclb_serverlist * serverlist ;
xds_grpclb_serverlist * serverlist ;
@ -397,7 +423,7 @@ class XdsLb : public LoadBalancingPolicy {
// Mutex to protect the channel to the LB server. This is used when
// Mutex to protect the channel to the LB server. This is used when
// processing a channelz request.
// processing a channelz request.
// TODO(juanlishen): Replace this with atomic.
// TODO(juanlishen): Replace this with atomic.
gpr_mu lb_chand_mu_ ;
Mutex lb_chand_mu_ ;
// Timeout in milliseconds for the LB call. 0 means no deadline.
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0 ;
int lb_call_timeout_ms_ = 0 ;
@ -417,6 +443,8 @@ class XdsLb : public LoadBalancingPolicy {
RefCountedPtr < Config > child_policy_config_ ;
RefCountedPtr < Config > child_policy_config_ ;
// Map of policies to use in the backend
// Map of policies to use in the backend
LocalityMap locality_map_ ;
LocalityMap locality_map_ ;
// TODO(mhaidry) : Add support for multiple maps of localities
// with different priorities
LocalityList locality_serverlist_ ;
LocalityList locality_serverlist_ ;
// TODO(mhaidry) : Add a pending locality map that may be swapped with the
// TODO(mhaidry) : Add a pending locality map that may be swapped with the
// the current one when new localities in the pending map are ready
// the current one when new localities in the pending map are ready
@ -429,8 +457,12 @@ class XdsLb : public LoadBalancingPolicy {
XdsLb : : PickResult XdsLb : : Picker : : Pick ( PickArgs * pick , grpc_error * * error ) {
XdsLb : : PickResult XdsLb : : Picker : : Pick ( PickArgs * pick , grpc_error * * error ) {
// TODO(roth): Add support for drop handling.
// TODO(roth): Add support for drop handling.
// Forward pick to child policy.
// Generate a random number between 0 and the total weight
PickResult result = child_picker_ - > Pick ( pick , error ) ;
const uint32_t key =
( rand ( ) * pickers_ [ pickers_ . size ( ) - 1 ] . first ) / RAND_MAX ;
// Forward pick to whichever locality maps to the range in which the
// random number falls in.
PickResult result = PickFromLocality ( key , pick , error ) ;
// If pick succeeded, add client stats.
// If pick succeeded, add client stats.
if ( result = = PickResult : : PICK_COMPLETE & &
if ( result = = PickResult : : PICK_COMPLETE & &
pick - > connected_subchannel ! = nullptr & & client_stats_ ! = nullptr ) {
pick - > connected_subchannel ! = nullptr & & client_stats_ ! = nullptr ) {
@ -439,6 +471,29 @@ XdsLb::PickResult XdsLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
return result ;
return result ;
}
}
XdsLb : : PickResult XdsLb : : Picker : : PickFromLocality ( const uint32_t key ,
PickArgs * pick ,
grpc_error * * error ) {
size_t mid = 0 ;
size_t start_index = 0 ;
size_t end_index = pickers_ . size ( ) - 1 ;
size_t index = 0 ;
while ( end_index > start_index ) {
mid = ( start_index + end_index ) / 2 ;
if ( pickers_ [ mid ] . first > key ) {
end_index = mid ;
} else if ( pickers_ [ mid ] . first < key ) {
start_index = mid + 1 ;
} else {
index = mid + 1 ;
break ;
}
}
if ( index = = 0 ) index = start_index ;
GPR_ASSERT ( pickers_ [ index ] . first > key ) ;
return pickers_ [ index ] . second - > Pick ( pick , error ) ;
}
//
//
// serverlist parsing code
// serverlist parsing code
//
//
@ -940,6 +995,8 @@ void XdsLb::BalancerChannelState::BalancerCallState::
MakeUnique < LocalityServerlistEntry > ( ) ) ;
MakeUnique < LocalityServerlistEntry > ( ) ) ;
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_name =
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_name =
static_cast < char * > ( gpr_strdup ( kDefaultLocalityName ) ) ;
static_cast < char * > ( gpr_strdup ( kDefaultLocalityName ) ) ;
xdslb_policy - > locality_serverlist_ [ 0 ] - > locality_weight =
kDefaultLocalityWeight ;
}
}
// and update the copy in the XdsLb instance. This
// and update the copy in the XdsLb instance. This
// serverlist instance will be destroyed either upon the next
// serverlist instance will be destroyed either upon the next
@ -1090,7 +1147,6 @@ XdsLb::XdsLb(Args args)
: LoadBalancingPolicy ( std : : move ( args ) ) ,
: LoadBalancingPolicy ( std : : move ( args ) ) ,
locality_map_ ( ) ,
locality_map_ ( ) ,
locality_serverlist_ ( ) {
locality_serverlist_ ( ) {
gpr_mu_init ( & lb_chand_mu_ ) ;
// Record server name.
// Record server name.
const grpc_arg * arg = grpc_channel_args_find ( args . args , GRPC_ARG_SERVER_URI ) ;
const grpc_arg * arg = grpc_channel_args_find ( args . args , GRPC_ARG_SERVER_URI ) ;
const char * server_uri = grpc_channel_arg_get_string ( arg ) ;
const char * server_uri = grpc_channel_arg_get_string ( arg ) ;
@ -1114,7 +1170,6 @@ XdsLb::XdsLb(Args args)
}
}
XdsLb : : ~ XdsLb ( ) {
XdsLb : : ~ XdsLb ( ) {
gpr_mu_destroy ( & lb_chand_mu_ ) ;
gpr_free ( ( void * ) server_name_ ) ;
gpr_free ( ( void * ) server_name_ ) ;
grpc_channel_args_destroy ( args_ ) ;
grpc_channel_args_destroy ( args_ ) ;
locality_serverlist_ . clear ( ) ;
locality_serverlist_ . clear ( ) ;
@ -1323,8 +1378,8 @@ void XdsLb::LocalityMap::UpdateLocked(
gpr_strdup ( locality_serverlist [ i ] - > locality_name ) ) ;
gpr_strdup ( locality_serverlist [ i ] - > locality_name ) ) ;
auto iter = map_ . find ( locality_name ) ;
auto iter = map_ . find ( locality_name ) ;
if ( iter = = map_ . end ( ) ) {
if ( iter = = map_ . end ( ) ) {
OrphanablePtr < LocalityEntry > new_entry =
OrphanablePtr < LocalityEntry > new_entry = MakeOrphanable < LocalityEntry > (
MakeOrphanable < LocalityEntry > ( parent - > Ref ( ) ) ;
parent - > Ref ( ) , locality_serverlist [ i ] - > locality_weight ) ;
MutexLock lock ( & child_refs_mu_ ) ;
MutexLock lock ( & child_refs_mu_ ) ;
iter = map_ . emplace ( std : : move ( locality_name ) , std : : move ( new_entry ) ) . first ;
iter = map_ . emplace ( std : : move ( locality_name ) , std : : move ( new_entry ) ) . first ;
}
}
@ -1342,8 +1397,8 @@ void grpc_core::XdsLb::LocalityMap::ShutdownLocked() {
}
}
void grpc_core : : XdsLb : : LocalityMap : : ResetBackoffLocked ( ) {
void grpc_core : : XdsLb : : LocalityMap : : ResetBackoffLocked ( ) {
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; iter + + ) {
for ( auto & p : map_ ) {
iter - > second - > ResetBackoffLocked ( ) ;
p . second - > ResetBackoffLocked ( ) ;
}
}
}
}
@ -1351,8 +1406,8 @@ void grpc_core::XdsLb::LocalityMap::FillChildRefsForChannelz(
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_subchannels ,
channelz : : ChildRefsList * child_channels ) {
channelz : : ChildRefsList * child_channels ) {
MutexLock lock ( & child_refs_mu_ ) ;
MutexLock lock ( & child_refs_mu_ ) ;
for ( auto iter = map_ . begin ( ) ; iter ! = map_ . end ( ) ; iter + + ) {
for ( auto & p : map_ ) {
iter - > second - > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
p . second - > FillChildRefsForChannelz ( child_subchannels , child_channels ) ;
}
}
}
}
@ -1624,9 +1679,72 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::UpdateState(
entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) = = nullptr
? nullptr
? nullptr
: entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) - > client_stats ( ) ;
: entry_ - > parent_ - > lb_chand_ - > lb_calld ( ) - > client_stats ( ) ;
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
// Cache the picker and its state in the entry
state , UniquePtr < SubchannelPicker > (
entry_ - > picker_ref_ = MakeRefCounted < PickerRef > ( std : : move ( picker ) ) ;
New < Picker > ( std : : move ( picker ) , std : : move ( client_stats ) ) ) ) ;
entry_ - > connectivity_state_ = state ;
// Construct a new xds picker which maintains a map of all locality pickers
// that are ready. Each locality is represented by a portion of the range
// proportional to its weight, such that the total range is the sum of the
// weights of all localities
uint32_t end = 0 ;
size_t num_connecting = 0 ;
size_t num_idle = 0 ;
size_t num_transient_failures = 0 ;
auto & locality_map = this - > entry_ - > parent_ - > locality_map_ . map_ ;
Picker : : PickerList pickers ;
for ( auto & p : locality_map ) {
const LocalityEntry * entry = p . second . get ( ) ;
grpc_connectivity_state connectivity_state = entry - > connectivity_state_ ;
switch ( connectivity_state ) {
case GRPC_CHANNEL_READY : {
end + = entry - > locality_weight_ ;
pickers . push_back ( MakePair ( end , entry - > picker_ref_ ) ) ;
break ;
}
case GRPC_CHANNEL_CONNECTING : {
num_connecting + + ;
break ;
}
case GRPC_CHANNEL_IDLE : {
num_idle + + ;
break ;
}
case GRPC_CHANNEL_TRANSIENT_FAILURE : {
num_transient_failures + + ;
break ;
}
default : {
gpr_log ( GPR_ERROR , " Invalid locality connectivity state - %d " ,
connectivity_state ) ;
}
}
}
// Pass on the constructed xds picker if it has any ready pickers in their map
// otherwise pass a QueuePicker if any of the locality pickers are in a
// connecting or idle state, finally return a transient failure picker if all
// locality pickers are in transient failure
if ( pickers . size ( ) > 0 ) {
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
GRPC_CHANNEL_READY ,
UniquePtr < LoadBalancingPolicy : : SubchannelPicker > (
New < Picker > ( std : : move ( client_stats ) , std : : move ( pickers ) ) ) ) ;
} else if ( num_connecting > 0 ) {
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
GRPC_CHANNEL_CONNECTING ,
UniquePtr < SubchannelPicker > ( New < QueuePicker > ( this - > entry_ - > parent_ ) ) ) ;
} else if ( num_idle > 0 ) {
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
GRPC_CHANNEL_IDLE ,
UniquePtr < SubchannelPicker > ( New < QueuePicker > ( this - > entry_ - > parent_ ) ) ) ;
} else {
GPR_ASSERT ( num_transient_failures = = locality_map . size ( ) ) ;
grpc_error * error =
grpc_error_set_int ( GRPC_ERROR_CREATE_FROM_STATIC_STRING (
" connections to all localities failing " ) ,
GRPC_ERROR_INT_GRPC_STATUS , GRPC_STATUS_UNAVAILABLE ) ;
entry_ - > parent_ - > channel_control_helper ( ) - > UpdateState (
state , UniquePtr < SubchannelPicker > ( New < TransientFailurePicker > ( error ) ) ) ;
}
}
}
void XdsLb : : LocalityMap : : LocalityEntry : : Helper : : RequestReresolution ( ) {
void XdsLb : : LocalityMap : : LocalityEntry : : Helper : : RequestReresolution ( ) {