|
|
|
@ -116,10 +116,6 @@ TraceFlag grpc_lb_xds_trace(false, "xds"); |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
constexpr char kXds[] = "xds_experimental"; |
|
|
|
|
constexpr char kDefaultLocalityRegion[] = "xds_default_locality_region"; |
|
|
|
|
constexpr char kDefaultLocalityZone[] = "xds_default_locality_zone"; |
|
|
|
|
constexpr char kDefaultLocalitySubzone[] = "xds_default_locality_subzone"; |
|
|
|
|
constexpr uint32_t kDefaultLocalityWeight = 3; |
|
|
|
|
|
|
|
|
|
class ParsedXdsConfig : public LoadBalancingPolicy::Config { |
|
|
|
|
public: |
|
|
|
@ -158,9 +154,6 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
void ResetBackoffLocked() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
struct LocalityServerlistEntry; |
|
|
|
|
using LocalityList = InlinedVector<UniquePtr<LocalityServerlistEntry>, 1>; |
|
|
|
|
|
|
|
|
|
/// Contains a channel to the LB server and all the data related to the
|
|
|
|
|
/// channel.
|
|
|
|
|
class BalancerChannelState |
|
|
|
@ -181,7 +174,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
return client_stats_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool seen_initial_response() const { return seen_initial_response_; } |
|
|
|
|
bool seen_response() const { return seen_response_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE |
|
|
|
@ -220,7 +213,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
// recv_message
|
|
|
|
|
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
|
|
|
grpc_closure lb_on_balancer_message_received_; |
|
|
|
|
bool seen_initial_response_ = false; |
|
|
|
|
bool seen_response_ = false; |
|
|
|
|
|
|
|
|
|
// recv_trailing_metadata
|
|
|
|
|
grpc_closure lb_on_balancer_status_received_; |
|
|
|
@ -351,57 +344,16 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
LoadBalancingPolicy* child_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class LocalityName : public RefCounted<LocalityName> { |
|
|
|
|
public: |
|
|
|
|
struct Less { |
|
|
|
|
bool operator()(const RefCountedPtr<LocalityName>& lhs, |
|
|
|
|
const RefCountedPtr<LocalityName>& rhs) { |
|
|
|
|
int cmp_result = strcmp(lhs->region_.get(), rhs->region_.get()); |
|
|
|
|
if (cmp_result != 0) return cmp_result < 0; |
|
|
|
|
cmp_result = strcmp(lhs->zone_.get(), rhs->zone_.get()); |
|
|
|
|
if (cmp_result != 0) return cmp_result < 0; |
|
|
|
|
return strcmp(lhs->subzone_.get(), rhs->subzone_.get()) < 0; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
LocalityName(UniquePtr<char> region, UniquePtr<char> zone, |
|
|
|
|
UniquePtr<char> subzone) |
|
|
|
|
: region_(std::move(region)), |
|
|
|
|
zone_(std::move(zone)), |
|
|
|
|
subzone_(std::move(subzone)) {} |
|
|
|
|
|
|
|
|
|
bool operator==(const LocalityName& other) const { |
|
|
|
|
return strcmp(region_.get(), other.region_.get()) == 0 && |
|
|
|
|
strcmp(zone_.get(), other.zone_.get()) == 0 && |
|
|
|
|
strcmp(subzone_.get(), other.subzone_.get()) == 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const char* AsHumanReadableString() { |
|
|
|
|
if (human_readable_string_ == nullptr) { |
|
|
|
|
char* tmp; |
|
|
|
|
gpr_asprintf(&tmp, "{region=\"%s\", zone=\"%s\", subzone=\"%s\"}", |
|
|
|
|
region_.get(), zone_.get(), subzone_.get()); |
|
|
|
|
human_readable_string_.reset(tmp); |
|
|
|
|
} |
|
|
|
|
return human_readable_string_.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
UniquePtr<char> region_; |
|
|
|
|
UniquePtr<char> zone_; |
|
|
|
|
UniquePtr<char> subzone_; |
|
|
|
|
UniquePtr<char> human_readable_string_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class LocalityMap { |
|
|
|
|
public: |
|
|
|
|
class LocalityEntry : public InternallyRefCounted<LocalityEntry> { |
|
|
|
|
public: |
|
|
|
|
LocalityEntry(RefCountedPtr<XdsLb> parent, |
|
|
|
|
RefCountedPtr<LocalityName> name, uint32_t locality_weight); |
|
|
|
|
RefCountedPtr<XdsLocalityName> name, |
|
|
|
|
uint32_t locality_weight); |
|
|
|
|
~LocalityEntry(); |
|
|
|
|
|
|
|
|
|
void UpdateLocked(xds_grpclb_serverlist* serverlist, |
|
|
|
|
void UpdateLocked(ServerAddressList serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
@ -441,7 +393,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
const grpc_channel_args* args); |
|
|
|
|
|
|
|
|
|
RefCountedPtr<XdsLb> parent_; |
|
|
|
|
RefCountedPtr<LocalityName> name_; |
|
|
|
|
RefCountedPtr<XdsLocalityName> name_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_; |
|
|
|
|
RefCountedPtr<PickerRef> picker_ref_; |
|
|
|
@ -449,35 +401,25 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
uint32_t locality_weight_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const LocalityList& locality_list, |
|
|
|
|
void UpdateLocked(const XdsLocalityList& locality_list, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent); |
|
|
|
|
void ShutdownLocked(); |
|
|
|
|
void ResetBackoffLocked(); |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void PruneLocalities(const LocalityList& locality_list); |
|
|
|
|
Map<RefCountedPtr<LocalityName>, OrphanablePtr<LocalityEntry>, |
|
|
|
|
LocalityName::Less> |
|
|
|
|
void PruneLocalities(const XdsLocalityList& locality_list); |
|
|
|
|
Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<LocalityEntry>, |
|
|
|
|
XdsLocalityName::Less> |
|
|
|
|
map_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
struct LocalityServerlistEntry { |
|
|
|
|
~LocalityServerlistEntry() { xds_grpclb_destroy_serverlist(serverlist); } |
|
|
|
|
|
|
|
|
|
RefCountedPtr<LocalityName> locality_name; |
|
|
|
|
uint32_t locality_weight; |
|
|
|
|
// The deserialized response from the balancer. May be nullptr until one
|
|
|
|
|
// such response has arrived.
|
|
|
|
|
xds_grpclb_serverlist* serverlist; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
~XdsLb(); |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
|
|
|
|
|
|
// Helper function used in UpdateLocked().
|
|
|
|
|
void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses, |
|
|
|
|
void ProcessAddressesAndChannelArgsLocked(ServerAddressList addresses, |
|
|
|
|
const grpc_channel_args& args); |
|
|
|
|
|
|
|
|
|
// Parses the xds config given the JSON node of the first child of XdsConfig.
|
|
|
|
@ -499,7 +441,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
const char* name, const grpc_channel_args* args); |
|
|
|
|
void MaybeExitFallbackMode(); |
|
|
|
|
|
|
|
|
|
// Who the client is trying to communicate with.
|
|
|
|
|
// Name of the backend server to connect to.
|
|
|
|
|
const char* server_name_ = nullptr; |
|
|
|
|
|
|
|
|
|
// Name of the balancer to connect to.
|
|
|
|
@ -547,7 +489,7 @@ class XdsLb : public LoadBalancingPolicy { |
|
|
|
|
LocalityMap locality_map_; |
|
|
|
|
// TODO(mhaidry) : Add support for multiple maps of localities
|
|
|
|
|
// with different priorities
|
|
|
|
|
LocalityList locality_serverlist_; |
|
|
|
|
XdsLocalityList locality_list_; |
|
|
|
|
// TODO(mhaidry) : Add a pending locality map that may be swapped with the
|
|
|
|
|
// the current one when new localities in the pending map are ready
|
|
|
|
|
// to accept connections
|
|
|
|
@ -677,79 +619,6 @@ void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity, |
|
|
|
|
parent_->channel_control_helper()->AddTraceEvent(severity, message); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// serverlist parsing code
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// Returns the backend addresses extracted from the given addresses.
|
|
|
|
|
ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) { |
|
|
|
|
ServerAddressList backend_addresses; |
|
|
|
|
for (size_t i = 0; i < addresses.size(); ++i) { |
|
|
|
|
if (!addresses[i].IsBalancer()) { |
|
|
|
|
backend_addresses.emplace_back(addresses[i]); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return backend_addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool IsServerValid(const xds_grpclb_server* server, size_t idx, bool log) { |
|
|
|
|
if (server->drop) return false; |
|
|
|
|
const xds_grpclb_ip_address* ip = &server->ip_address; |
|
|
|
|
if (GPR_UNLIKELY(server->port >> 16 != 0)) { |
|
|
|
|
if (log) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Invalid port '%d' at index %lu of serverlist. Ignoring.", |
|
|
|
|
server->port, (unsigned long)idx); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) { |
|
|
|
|
if (log) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Expected IP to be 4 or 16 bytes, got %d at index %lu of " |
|
|
|
|
"serverlist. Ignoring", |
|
|
|
|
ip->size, (unsigned long)idx); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { |
|
|
|
|
memset(addr, 0, sizeof(*addr)); |
|
|
|
|
if (server->drop) return; |
|
|
|
|
const uint16_t netorder_port = grpc_htons((uint16_t)server->port); |
|
|
|
|
/* the addresses are given in binary format (a in(6)_addr struct) in
|
|
|
|
|
* server->ip_address.bytes. */ |
|
|
|
|
const xds_grpclb_ip_address* ip = &server->ip_address; |
|
|
|
|
if (ip->size == 4) { |
|
|
|
|
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); |
|
|
|
|
grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); |
|
|
|
|
addr4->sin_family = GRPC_AF_INET; |
|
|
|
|
memcpy(&addr4->sin_addr, ip->bytes, ip->size); |
|
|
|
|
addr4->sin_port = netorder_port; |
|
|
|
|
} else if (ip->size == 16) { |
|
|
|
|
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); |
|
|
|
|
grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; |
|
|
|
|
addr6->sin6_family = GRPC_AF_INET6; |
|
|
|
|
memcpy(&addr6->sin6_addr, ip->bytes, ip->size); |
|
|
|
|
addr6->sin6_port = netorder_port; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Returns addresses extracted from \a serverlist.
|
|
|
|
|
ServerAddressList ProcessServerlist(const xds_grpclb_serverlist* serverlist) { |
|
|
|
|
ServerAddressList addresses; |
|
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
|
const xds_grpclb_server* server = serverlist->servers[i]; |
|
|
|
|
if (!IsServerValid(serverlist->servers[i], i, false)) continue; |
|
|
|
|
grpc_resolved_address addr; |
|
|
|
|
ParseServer(server, &addr); |
|
|
|
|
addresses.emplace_back(addr, nullptr); |
|
|
|
|
} |
|
|
|
|
return addresses; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// XdsLb::BalancerChannelState
|
|
|
|
|
//
|
|
|
|
@ -913,19 +782,18 @@ XdsLb::BalancerChannelState::BalancerCallState::BalancerCallState( |
|
|
|
|
xdslb_policy()->lb_call_timeout_ms_ == 0 |
|
|
|
|
? GRPC_MILLIS_INF_FUTURE |
|
|
|
|
: ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; |
|
|
|
|
// Create an LB call with the specified method name.
|
|
|
|
|
lb_call_ = grpc_channel_create_pollset_set_call( |
|
|
|
|
lb_chand_->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, |
|
|
|
|
xdslb_policy()->interested_parties(), |
|
|
|
|
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, |
|
|
|
|
GRPC_MDSTR_SLASH_ENVOY_DOT_API_DOT_V2_DOT_ENDPOINTDISCOVERYSERVICE_SLASH_STREAMENDPOINTS, |
|
|
|
|
nullptr, deadline, nullptr); |
|
|
|
|
// Init the LB call request payload.
|
|
|
|
|
xds_grpclb_request* request = |
|
|
|
|
xds_grpclb_request_create(xdslb_policy()->server_name_); |
|
|
|
|
grpc_slice request_payload_slice = xds_grpclb_request_encode(request); |
|
|
|
|
grpc_slice request_payload_slice = |
|
|
|
|
XdsEdsRequestCreateAndEncode(xdslb_policy()->server_name_); |
|
|
|
|
send_message_payload_ = |
|
|
|
|
grpc_raw_byte_buffer_create(&request_payload_slice, 1); |
|
|
|
|
grpc_slice_unref_internal(request_payload_slice); |
|
|
|
|
xds_grpclb_request_destroy(request); |
|
|
|
|
// Init other data associated with the LB call.
|
|
|
|
|
grpc_metadata_array_init(&lb_initial_metadata_recv_); |
|
|
|
|
grpc_metadata_array_init(&lb_trailing_metadata_recv_); |
|
|
|
@ -1068,15 +936,20 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
|
|
|
|
|
bool XdsLb::BalancerChannelState::BalancerCallState::LoadReportCountersAreZero( |
|
|
|
|
xds_grpclb_request* request) { |
|
|
|
|
XdsLbClientStats::DroppedCallCounts* drop_entries = |
|
|
|
|
static_cast<XdsLbClientStats::DroppedCallCounts*>( |
|
|
|
|
request->client_stats.calls_finished_with_drop.arg); |
|
|
|
|
return request->client_stats.num_calls_started == 0 && |
|
|
|
|
request->client_stats.num_calls_finished == 0 && |
|
|
|
|
request->client_stats.num_calls_finished_with_client_failed_to_send == |
|
|
|
|
const grpc_lb_v1_ClientStats* cstats = |
|
|
|
|
grpc_lb_v1_LoadBalanceRequest_client_stats(request); |
|
|
|
|
if (cstats == nullptr) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
size_t drop_count; |
|
|
|
|
grpc_lb_v1_ClientStats_calls_finished_with_drop(cstats, &drop_count); |
|
|
|
|
return grpc_lb_v1_ClientStats_num_calls_started(cstats) == 0 && |
|
|
|
|
grpc_lb_v1_ClientStats_num_calls_finished(cstats) == 0 && |
|
|
|
|
grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send( |
|
|
|
|
cstats) == 0 && |
|
|
|
|
grpc_lb_v1_ClientStats_num_calls_finished_known_received(cstats) == |
|
|
|
|
0 && |
|
|
|
|
request->client_stats.num_calls_finished_known_received == 0 && |
|
|
|
|
(drop_entries == nullptr || drop_entries->empty()); |
|
|
|
|
drop_count == 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(vpowar): Use LRS to send the client Load Report.
|
|
|
|
@ -1084,13 +957,13 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
SendClientLoadReportLocked() { |
|
|
|
|
// Construct message payload.
|
|
|
|
|
GPR_ASSERT(send_message_payload_ == nullptr); |
|
|
|
|
xds_grpclb_request* request = |
|
|
|
|
xds_grpclb_load_report_request_create_locked(client_stats_.get()); |
|
|
|
|
upb::Arena arena; |
|
|
|
|
xds_grpclb_request* request = xds_grpclb_load_report_request_create_locked( |
|
|
|
|
client_stats_.get(), arena.ptr()); |
|
|
|
|
// Skip client load report if the counters were all zero in the last
|
|
|
|
|
// report and they are still zero in this one.
|
|
|
|
|
if (LoadReportCountersAreZero(request)) { |
|
|
|
|
if (last_client_load_report_counters_were_zero_) { |
|
|
|
|
xds_grpclb_request_destroy(request); |
|
|
|
|
ScheduleNextClientLoadReportLocked(); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
@ -1099,7 +972,6 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
last_client_load_report_counters_were_zero_ = false; |
|
|
|
|
} |
|
|
|
|
// TODO(vpowar): Send the report on LRS stream.
|
|
|
|
|
xds_grpclb_request_destroy(request); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState::OnInitialRequestSentLocked( |
|
|
|
@ -1127,65 +999,67 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
lb_calld->seen_response_ = true; |
|
|
|
|
// Read the response.
|
|
|
|
|
grpc_byte_buffer_reader bbr; |
|
|
|
|
grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_); |
|
|
|
|
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
|
|
|
|
grpc_byte_buffer_reader_destroy(&bbr); |
|
|
|
|
grpc_byte_buffer_destroy(lb_calld->recv_message_payload_); |
|
|
|
|
lb_calld->recv_message_payload_ = nullptr; |
|
|
|
|
xds_grpclb_initial_response* initial_response; |
|
|
|
|
xds_grpclb_serverlist* serverlist; |
|
|
|
|
if (!lb_calld->seen_initial_response_ && |
|
|
|
|
(initial_response = xds_grpclb_initial_response_parse(response_slice)) != |
|
|
|
|
nullptr) { |
|
|
|
|
// Have NOT seen initial response, look for initial response.
|
|
|
|
|
// TODO(juanlishen): When we convert this to use the xds protocol, the
|
|
|
|
|
// balancer will send us a fallback timeout such that we should go into
|
|
|
|
|
// fallback mode if we have lost contact with the balancer after a certain
|
|
|
|
|
// period of time. We will need to save the timeout value here, and then
|
|
|
|
|
// when the balancer call ends, we will need to start a timer for the
|
|
|
|
|
// specified period of time, and if the timer fires, we go into fallback
|
|
|
|
|
// mode. We will also need to cancel the timer when we receive a serverlist
|
|
|
|
|
// from the balancer.
|
|
|
|
|
if (initial_response->has_client_stats_report_interval) { |
|
|
|
|
const grpc_millis interval = xds_grpclb_duration_to_millis( |
|
|
|
|
&initial_response->client_stats_report_interval); |
|
|
|
|
if (interval > 0) { |
|
|
|
|
lb_calld->client_stats_report_interval_ = |
|
|
|
|
GPR_MAX(GPR_MS_PER_SEC, interval); |
|
|
|
|
} |
|
|
|
|
// TODO(juanlishen): When we convert this to use the xds protocol, the
|
|
|
|
|
// balancer will send us a fallback timeout such that we should go into
|
|
|
|
|
// fallback mode if we have lost contact with the balancer after a certain
|
|
|
|
|
// period of time. We will need to save the timeout value here, and then
|
|
|
|
|
// when the balancer call ends, we will need to start a timer for the
|
|
|
|
|
// specified period of time, and if the timer fires, we go into fallback
|
|
|
|
|
// mode. We will also need to cancel the timer when we receive a serverlist
|
|
|
|
|
// from the balancer.
|
|
|
|
|
// This anonymous lambda is a hack to avoid the usage of goto.
|
|
|
|
|
[&]() { |
|
|
|
|
// Parse the response.
|
|
|
|
|
XdsUpdate update; |
|
|
|
|
grpc_error* parse_error = |
|
|
|
|
XdsEdsResponseDecodeAndParse(response_slice, &update); |
|
|
|
|
if (parse_error != GRPC_ERROR_NONE) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] EDS response parsing failed. error=%s", |
|
|
|
|
xdslb_policy, grpc_error_string(parse_error)); |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
if (lb_calld->client_stats_report_interval_ != 0) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Received initial LB response message; " |
|
|
|
|
"client load reporting interval = %" PRId64 " milliseconds", |
|
|
|
|
xdslb_policy, lb_calld->client_stats_report_interval_); |
|
|
|
|
} else { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Received initial LB response message; client load " |
|
|
|
|
"reporting NOT enabled", |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
if (update.locality_list.empty()) { |
|
|
|
|
char* response_slice_str = |
|
|
|
|
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xdslb %p] EDS response '%s' doesn't contain any valid locality " |
|
|
|
|
"update. Ignoring.", |
|
|
|
|
xdslb_policy, response_slice_str); |
|
|
|
|
gpr_free(response_slice_str); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
xds_grpclb_initial_response_destroy(initial_response); |
|
|
|
|
lb_calld->seen_initial_response_ = true; |
|
|
|
|
} else if ((serverlist = xds_grpclb_response_parse_serverlist( |
|
|
|
|
response_slice)) != nullptr) { |
|
|
|
|
// Have seen initial response, look for serverlist.
|
|
|
|
|
GPR_ASSERT(lb_calld->lb_call_ != nullptr); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Serverlist with %" PRIuPTR " servers received", |
|
|
|
|
xdslb_policy, serverlist->num_servers); |
|
|
|
|
for (size_t i = 0; i < serverlist->num_servers; ++i) { |
|
|
|
|
grpc_resolved_address addr; |
|
|
|
|
ParseServer(serverlist->servers[i], &addr); |
|
|
|
|
char* ipport; |
|
|
|
|
grpc_sockaddr_to_string(&ipport, &addr, false); |
|
|
|
|
gpr_log(GPR_INFO, "[xdslb %p] Serverlist[%" PRIuPTR "]: %s", |
|
|
|
|
xdslb_policy, i, ipport); |
|
|
|
|
gpr_free(ipport); |
|
|
|
|
"[xdslb %p] EDS response with %" PRIuPTR " localities received", |
|
|
|
|
xdslb_policy, update.locality_list.size()); |
|
|
|
|
for (size_t i = 0; i < update.locality_list.size(); ++i) { |
|
|
|
|
const XdsLocalityInfo& locality = update.locality_list[i]; |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Locality %" PRIuPTR " %s contains %" PRIuPTR |
|
|
|
|
" server addresses", |
|
|
|
|
xdslb_policy, i, |
|
|
|
|
locality.locality_name->AsHumanReadableString(), |
|
|
|
|
locality.serverlist.size()); |
|
|
|
|
for (size_t j = 0; j < locality.serverlist.size(); ++j) { |
|
|
|
|
char* ipport; |
|
|
|
|
grpc_sockaddr_to_string(&ipport, &locality.serverlist[j].address(), |
|
|
|
|
false); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Locality %" PRIuPTR |
|
|
|
|
" %s, server address %" PRIuPTR ": %s", |
|
|
|
|
xdslb_policy, i, |
|
|
|
|
locality.locality_name->AsHumanReadableString(), j, ipport); |
|
|
|
|
gpr_free(ipport); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Pending LB channel receives a serverlist; promote it.
|
|
|
|
@ -1211,73 +1085,47 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release(); |
|
|
|
|
lb_calld->ScheduleNextClientLoadReportLocked(); |
|
|
|
|
} |
|
|
|
|
if (!xdslb_policy->locality_serverlist_.empty() && |
|
|
|
|
xds_grpclb_serverlist_equals( |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist, serverlist)) { |
|
|
|
|
// Ignore identical update.
|
|
|
|
|
if (xdslb_policy->locality_list_ == update.locality_list) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xdslb %p] Incoming server list identical to current, " |
|
|
|
|
"ignoring.", |
|
|
|
|
xdslb_policy); |
|
|
|
|
} |
|
|
|
|
xds_grpclb_destroy_serverlist(serverlist); |
|
|
|
|
} else { // New serverlist.
|
|
|
|
|
// If the balancer tells us to drop all the calls, we should exit fallback
|
|
|
|
|
// mode immediately.
|
|
|
|
|
// TODO(juanlishen): When we add EDS drop, we should change to check
|
|
|
|
|
// drop_percentage.
|
|
|
|
|
if (serverlist->num_servers == 0) xdslb_policy->MaybeExitFallbackMode(); |
|
|
|
|
if (!xdslb_policy->locality_serverlist_.empty()) { |
|
|
|
|
xds_grpclb_destroy_serverlist( |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist); |
|
|
|
|
} else { |
|
|
|
|
// Initialize locality serverlist, currently the list only handles
|
|
|
|
|
// one child.
|
|
|
|
|
xdslb_policy->locality_serverlist_.emplace_back( |
|
|
|
|
MakeUnique<LocalityServerlistEntry>()); |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->locality_name = |
|
|
|
|
MakeRefCounted<LocalityName>( |
|
|
|
|
UniquePtr<char>(gpr_strdup(kDefaultLocalityRegion)), |
|
|
|
|
UniquePtr<char>(gpr_strdup(kDefaultLocalityZone)), |
|
|
|
|
UniquePtr<char>(gpr_strdup(kDefaultLocalitySubzone))); |
|
|
|
|
xdslb_policy->locality_serverlist_[0]->locality_weight = |
|
|
|
|
kDefaultLocalityWeight; |
|
|
|
|
} |
|
|
|
|
// Update the serverlist in the XdsLb instance. This serverlist
|
|
|
|
|
// instance will be destroyed either upon the next update or when the
|
|
|
|
|
// XdsLb instance is destroyed.
|
|
|
|
|
xdslb_policy->locality_serverlist_[0]->serverlist = serverlist; |
|
|
|
|
xdslb_policy->locality_map_.UpdateLocked( |
|
|
|
|
xdslb_policy->locality_serverlist_, |
|
|
|
|
xdslb_policy->child_policy_config_.get(), xdslb_policy->args_, |
|
|
|
|
xdslb_policy); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// No valid initial response or serverlist found.
|
|
|
|
|
char* response_slice_str = |
|
|
|
|
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xdslb %p] Invalid LB response received: '%s'. Ignoring.", |
|
|
|
|
xdslb_policy, response_slice_str); |
|
|
|
|
gpr_free(response_slice_str); |
|
|
|
|
} |
|
|
|
|
// If the balancer tells us to drop all the calls, we should exit fallback
|
|
|
|
|
// mode immediately.
|
|
|
|
|
// TODO(juanlishen): When we add EDS drop, we should change to check
|
|
|
|
|
// drop_percentage.
|
|
|
|
|
if (update.locality_list[0].serverlist.empty()) { |
|
|
|
|
xdslb_policy->MaybeExitFallbackMode(); |
|
|
|
|
} |
|
|
|
|
// Update the locality list.
|
|
|
|
|
xdslb_policy->locality_list_ = std::move(update.locality_list); |
|
|
|
|
// Update the locality map.
|
|
|
|
|
xdslb_policy->locality_map_.UpdateLocked( |
|
|
|
|
xdslb_policy->locality_list_, xdslb_policy->child_policy_config_.get(), |
|
|
|
|
xdslb_policy->args_, xdslb_policy); |
|
|
|
|
}(); |
|
|
|
|
grpc_slice_unref_internal(response_slice); |
|
|
|
|
if (!xdslb_policy->shutting_down_) { |
|
|
|
|
// Keep listening for serverlist updates.
|
|
|
|
|
grpc_op op; |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; |
|
|
|
|
op.flags = 0; |
|
|
|
|
op.reserved = nullptr; |
|
|
|
|
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
|
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
lb_calld->lb_call_, &op, 1, |
|
|
|
|
&lb_calld->lb_on_balancer_message_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} else { |
|
|
|
|
if (xdslb_policy->shutting_down_) { |
|
|
|
|
lb_calld->Unref(DEBUG_LOCATION, "on_message_received+xds_shutdown"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Keep listening for serverlist updates.
|
|
|
|
|
grpc_op op; |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; |
|
|
|
|
op.flags = 0; |
|
|
|
|
op.reserved = nullptr; |
|
|
|
|
GPR_ASSERT(lb_calld->lb_call_ != nullptr); |
|
|
|
|
// Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
|
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
lb_calld->lb_call_, &op, 1, &lb_calld->lb_on_balancer_message_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
@ -1317,7 +1165,7 @@ void XdsLb::BalancerChannelState::BalancerCallState:: |
|
|
|
|
// This channel is the most recently created one. Try to restart the call
|
|
|
|
|
// and reresolve.
|
|
|
|
|
lb_chand->lb_calld_.reset(); |
|
|
|
|
if (lb_calld->seen_initial_response_) { |
|
|
|
|
if (lb_calld->seen_response_) { |
|
|
|
|
// If we lost connection to the LB server, reset the backoff and restart
|
|
|
|
|
// the LB call immediately.
|
|
|
|
|
lb_chand->lb_call_backoff_.Reset(); |
|
|
|
@ -1402,9 +1250,7 @@ grpc_channel_args* BuildBalancerChannelArgs(const grpc_channel_args* args) { |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::XdsLb(Args args) |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), |
|
|
|
|
locality_map_(), |
|
|
|
|
locality_serverlist_() { |
|
|
|
|
: LoadBalancingPolicy(std::move(args)), locality_map_(), locality_list_() { |
|
|
|
|
// Record server name.
|
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); |
|
|
|
|
const char* server_uri = grpc_channel_arg_get_string(arg); |
|
|
|
@ -1433,7 +1279,7 @@ XdsLb::~XdsLb() { |
|
|
|
|
} |
|
|
|
|
gpr_free((void*)server_name_); |
|
|
|
|
grpc_channel_args_destroy(args_); |
|
|
|
|
locality_serverlist_.clear(); |
|
|
|
|
locality_list_.clear(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::ShutdownLocked() { |
|
|
|
@ -1482,9 +1328,9 @@ void XdsLb::ResetBackoffLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::ProcessAddressesAndChannelArgsLocked( |
|
|
|
|
const ServerAddressList& addresses, const grpc_channel_args& args) { |
|
|
|
|
ServerAddressList addresses, const grpc_channel_args& args) { |
|
|
|
|
// Update fallback address list.
|
|
|
|
|
fallback_backend_addresses_ = ExtractBackendAddresses(addresses); |
|
|
|
|
fallback_backend_addresses_ = std::move(addresses); |
|
|
|
|
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
|
|
|
|
|
// since we use this to trigger the client_load_reporting filter.
|
|
|
|
|
static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; |
|
|
|
@ -1536,9 +1382,9 @@ void XdsLb::UpdateLocked(UpdateArgs args) { |
|
|
|
|
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args); |
|
|
|
|
locality_map_.UpdateLocked(locality_serverlist_, child_policy_config_.get(), |
|
|
|
|
args_, this); |
|
|
|
|
ProcessAddressesAndChannelArgsLocked(std::move(args.addresses), *args.args); |
|
|
|
|
locality_map_.UpdateLocked(locality_list_, child_policy_config_.get(), args_, |
|
|
|
|
this); |
|
|
|
|
// Update the existing fallback policy. The fallback policy config and/or the
|
|
|
|
|
// fallback addresses may be new.
|
|
|
|
|
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked(); |
|
|
|
@ -1736,16 +1582,16 @@ void XdsLb::MaybeExitFallbackMode() { |
|
|
|
|
// XdsLb::LocalityMap
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) { |
|
|
|
|
void XdsLb::LocalityMap::PruneLocalities(const XdsLocalityList& locality_list) { |
|
|
|
|
for (auto iter = map_.begin(); iter != map_.end();) { |
|
|
|
|
bool found = false; |
|
|
|
|
for (size_t i = 0; i < locality_list.size(); i++) { |
|
|
|
|
if (*locality_list[i]->locality_name == *iter->first) { |
|
|
|
|
if (*locality_list[i].locality_name == *iter->first) { |
|
|
|
|
found = true; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (!found) { // Remove entries not present in the locality list
|
|
|
|
|
if (!found) { // Remove entries not present in the locality list.
|
|
|
|
|
iter = map_.erase(iter); |
|
|
|
|
} else |
|
|
|
|
iter++; |
|
|
|
@ -1753,27 +1599,27 @@ void XdsLb::LocalityMap::PruneLocalities(const LocalityList& locality_list) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::UpdateLocked( |
|
|
|
|
const LocalityList& locality_serverlist, |
|
|
|
|
const XdsLocalityList& locality_list, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args, XdsLb* parent) { |
|
|
|
|
if (parent->shutting_down_) return; |
|
|
|
|
for (size_t i = 0; i < locality_serverlist.size(); i++) { |
|
|
|
|
auto iter = map_.find(locality_serverlist[i]->locality_name); |
|
|
|
|
for (size_t i = 0; i < locality_list.size(); i++) { |
|
|
|
|
auto iter = map_.find(locality_list[i].locality_name); |
|
|
|
|
// Add a new entry in the locality map if a new locality is received in the
|
|
|
|
|
// locality list.
|
|
|
|
|
if (iter == map_.end()) { |
|
|
|
|
OrphanablePtr<LocalityEntry> new_entry = MakeOrphanable<LocalityEntry>( |
|
|
|
|
parent->Ref(DEBUG_LOCATION, "LocalityEntry"), |
|
|
|
|
locality_serverlist[i]->locality_name, |
|
|
|
|
locality_serverlist[i]->locality_weight); |
|
|
|
|
iter = map_.emplace(locality_serverlist[i]->locality_name, |
|
|
|
|
std::move(new_entry)) |
|
|
|
|
locality_list[i].locality_name, locality_list[i].lb_weight); |
|
|
|
|
iter = map_.emplace(locality_list[i].locality_name, std::move(new_entry)) |
|
|
|
|
.first; |
|
|
|
|
} |
|
|
|
|
// Don't create new child policies if not directed to
|
|
|
|
|
xds_grpclb_serverlist* serverlist = |
|
|
|
|
parent->locality_serverlist_[i]->serverlist; |
|
|
|
|
iter->second->UpdateLocked(serverlist, child_policy_config, args); |
|
|
|
|
// Keep a copy of serverlist in locality_list_ so that we can compare it
|
|
|
|
|
// with the future ones.
|
|
|
|
|
iter->second->UpdateLocked(locality_list[i].serverlist, child_policy_config, |
|
|
|
|
args); |
|
|
|
|
} |
|
|
|
|
PruneLocalities(locality_serverlist); |
|
|
|
|
PruneLocalities(locality_list); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::ShutdownLocked() { map_.clear(); } |
|
|
|
@ -1789,7 +1635,7 @@ void XdsLb::LocalityMap::ResetBackoffLocked() { |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
XdsLb::LocalityMap::LocalityEntry::LocalityEntry( |
|
|
|
|
RefCountedPtr<XdsLb> parent, RefCountedPtr<LocalityName> name, |
|
|
|
|
RefCountedPtr<XdsLb> parent, RefCountedPtr<XdsLocalityName> name, |
|
|
|
|
uint32_t locality_weight) |
|
|
|
|
: parent_(std::move(parent)), |
|
|
|
|
name_(std::move(name)), |
|
|
|
@ -1861,13 +1707,13 @@ XdsLb::LocalityMap::LocalityEntry::CreateChildPolicyLocked( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsLb::LocalityMap::LocalityEntry::UpdateLocked( |
|
|
|
|
xds_grpclb_serverlist* serverlist, |
|
|
|
|
ServerAddressList serverlist, |
|
|
|
|
LoadBalancingPolicy::Config* child_policy_config, |
|
|
|
|
const grpc_channel_args* args_in) { |
|
|
|
|
if (parent_->shutting_down_) return; |
|
|
|
|
// Construct update args.
|
|
|
|
|
UpdateArgs update_args; |
|
|
|
|
update_args.addresses = ProcessServerlist(serverlist); |
|
|
|
|
update_args.addresses = std::move(serverlist); |
|
|
|
|
update_args.config = |
|
|
|
|
child_policy_config == nullptr ? nullptr : child_policy_config->Ref(); |
|
|
|
|
update_args.args = CreateChildPolicyArgsLocked(args_in); |
|
|
|
@ -2158,7 +2004,7 @@ void XdsLb::LocalityMap::LocalityEntry::Helper::RequestReresolution() { |
|
|
|
|
// the child policy. Otherwise, pass the re-resolution request up to the
|
|
|
|
|
// channel.
|
|
|
|
|
if (entry_->parent_->lb_chand_->lb_calld() == nullptr || |
|
|
|
|
!entry_->parent_->lb_chand_->lb_calld()->seen_initial_response()) { |
|
|
|
|
!entry_->parent_->lb_chand_->lb_calld()->seen_response()) { |
|
|
|
|
entry_->parent_->channel_control_helper()->RequestReresolution(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|