|
|
|
@ -49,10 +49,10 @@ |
|
|
|
|
#include "src/core/lib/gprpp/orphanable.h" |
|
|
|
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
|
#include "src/core/lib/iomgr/combiner.h" |
|
|
|
|
#include "src/core/lib/iomgr/sockaddr.h" |
|
|
|
|
#include "src/core/lib/iomgr/sockaddr_utils.h" |
|
|
|
|
#include "src/core/lib/iomgr/timer.h" |
|
|
|
|
#include "src/core/lib/iomgr/work_serializer.h" |
|
|
|
|
#include "src/core/lib/slice/slice_hash_table.h" |
|
|
|
|
#include "src/core/lib/slice/slice_internal.h" |
|
|
|
|
#include "src/core/lib/slice/slice_string_helpers.h" |
|
|
|
@ -96,7 +96,7 @@ class XdsClient::ChannelState::RetryableCall |
|
|
|
|
void StartNewCallLocked(); |
|
|
|
|
void StartRetryTimerLocked(); |
|
|
|
|
static void OnRetryTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnRetryTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnRetryTimerLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
// The wrapped xds call that talks to the xds server. It's instantiated
|
|
|
|
|
// every time we start a new call. It's null during call retry backoff.
|
|
|
|
@ -170,51 +170,48 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
|
private: |
|
|
|
|
static void OnTimer(void* arg, grpc_error* error) { |
|
|
|
|
ResourceState* self = static_cast<ResourceState*>(arg); |
|
|
|
|
self->ads_calld_->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->timer_callback_, OnTimerLocked, self, |
|
|
|
|
nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->ads_calld_->xds_client()->work_serializer_->Run( |
|
|
|
|
[self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void OnTimerLocked(void* arg, grpc_error* error) { |
|
|
|
|
ResourceState* self = static_cast<ResourceState*>(arg); |
|
|
|
|
if (error == GRPC_ERROR_NONE && self->timer_pending_) { |
|
|
|
|
self->timer_pending_ = false; |
|
|
|
|
void OnTimerLocked(grpc_error* error) { |
|
|
|
|
if (error == GRPC_ERROR_NONE && timer_pending_) { |
|
|
|
|
timer_pending_ = false; |
|
|
|
|
char* msg; |
|
|
|
|
gpr_asprintf( |
|
|
|
|
&msg, |
|
|
|
|
"timeout obtaining resource {type=%s name=%s} from xds server", |
|
|
|
|
self->type_url_.c_str(), self->name_.c_str()); |
|
|
|
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); |
|
|
|
|
type_url_.c_str(), name_.c_str()); |
|
|
|
|
grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); |
|
|
|
|
gpr_free(msg); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] %s", |
|
|
|
|
self->ads_calld_->xds_client(), grpc_error_string(error)); |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(), |
|
|
|
|
grpc_error_string(watcher_error)); |
|
|
|
|
} |
|
|
|
|
if (self->type_url_ == XdsApi::kLdsTypeUrl || |
|
|
|
|
self->type_url_ == XdsApi::kRdsTypeUrl) { |
|
|
|
|
self->ads_calld_->xds_client()->service_config_watcher_->OnError( |
|
|
|
|
error); |
|
|
|
|
} else if (self->type_url_ == XdsApi::kCdsTypeUrl) { |
|
|
|
|
ClusterState& state = |
|
|
|
|
self->ads_calld_->xds_client()->cluster_map_[self->name_]; |
|
|
|
|
if (type_url_ == XdsApi::kLdsTypeUrl || |
|
|
|
|
type_url_ == XdsApi::kRdsTypeUrl) { |
|
|
|
|
ads_calld_->xds_client()->service_config_watcher_->OnError( |
|
|
|
|
watcher_error); |
|
|
|
|
} else if (type_url_ == XdsApi::kCdsTypeUrl) { |
|
|
|
|
ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_]; |
|
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
|
p.first->OnError(GRPC_ERROR_REF(error)); |
|
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} else if (self->type_url_ == XdsApi::kEdsTypeUrl) { |
|
|
|
|
EndpointState& state = |
|
|
|
|
self->ads_calld_->xds_client()->endpoint_map_[self->name_]; |
|
|
|
|
GRPC_ERROR_UNREF(watcher_error); |
|
|
|
|
} else if (type_url_ == XdsApi::kEdsTypeUrl) { |
|
|
|
|
EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_]; |
|
|
|
|
for (const auto& p : state.watchers) { |
|
|
|
|
p.first->OnError(GRPC_ERROR_REF(error)); |
|
|
|
|
p.first->OnError(GRPC_ERROR_REF(watcher_error)); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
GRPC_ERROR_UNREF(watcher_error); |
|
|
|
|
} else { |
|
|
|
|
GPR_UNREACHABLE_CODE(return ); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self->ads_calld_.reset(); |
|
|
|
|
self->Unref(); |
|
|
|
|
ads_calld_.reset(); |
|
|
|
|
Unref(); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const std::string type_url_; |
|
|
|
@ -248,11 +245,11 @@ class XdsClient::ChannelState::AdsCallState |
|
|
|
|
void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map); |
|
|
|
|
|
|
|
|
|
static void OnRequestSent(void* arg, grpc_error* error); |
|
|
|
|
static void OnRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnRequestSentLocked(grpc_error* error); |
|
|
|
|
static void OnResponseReceived(void* arg, grpc_error* error); |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnResponseReceivedLocked(); |
|
|
|
|
static void OnStatusReceived(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnStatusReceivedLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
@ -315,6 +312,10 @@ class XdsClient::ChannelState::LrsCallState |
|
|
|
|
public: |
|
|
|
|
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval) |
|
|
|
|
: parent_(std::move(parent)), report_interval_(report_interval) { |
|
|
|
|
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
ScheduleNextReportLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -323,10 +324,10 @@ class XdsClient::ChannelState::LrsCallState |
|
|
|
|
private: |
|
|
|
|
void ScheduleNextReportLocked(); |
|
|
|
|
static void OnNextReportTimer(void* arg, grpc_error* error); |
|
|
|
|
static void OnNextReportTimerLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnNextReportTimerLocked(grpc_error* error); |
|
|
|
|
void SendReportLocked(); |
|
|
|
|
static void OnReportDone(void* arg, grpc_error* error); |
|
|
|
|
static void OnReportDoneLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnReportDoneLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
bool IsCurrentReporterOnCall() const { |
|
|
|
|
return this == parent_->reporter_.get(); |
|
|
|
@ -346,11 +347,11 @@ class XdsClient::ChannelState::LrsCallState |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void OnInitialRequestSent(void* arg, grpc_error* error); |
|
|
|
|
void OnInitialRequestSentLocked(); |
|
|
|
|
static void OnResponseReceived(void* arg, grpc_error* error); |
|
|
|
|
void OnResponseReceivedLocked(); |
|
|
|
|
static void OnStatusReceived(void* arg, grpc_error* error); |
|
|
|
|
static void OnInitialRequestSentLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnResponseReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
static void OnStatusReceivedLocked(void* arg, grpc_error* error); |
|
|
|
|
void OnStatusReceivedLocked(grpc_error* error); |
|
|
|
|
|
|
|
|
|
bool IsCurrentCallOnChannel() const; |
|
|
|
|
|
|
|
|
@ -392,7 +393,8 @@ class XdsClient::ChannelState::StateWatcher |
|
|
|
|
: public AsyncConnectivityStateWatcherInterface { |
|
|
|
|
public: |
|
|
|
|
explicit StateWatcher(RefCountedPtr<ChannelState> parent) |
|
|
|
|
: AsyncConnectivityStateWatcherInterface(parent->xds_client()->combiner_), |
|
|
|
|
: AsyncConnectivityStateWatcherInterface( |
|
|
|
|
parent->xds_client()->work_serializer_), |
|
|
|
|
parent_(std::move(parent)) {} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
@ -581,6 +583,9 @@ XdsClient::ChannelState::RetryableCall<T>::RetryableCall( |
|
|
|
|
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) |
|
|
|
|
.set_jitter(GRPC_XDS_RECONNECT_JITTER) |
|
|
|
|
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { |
|
|
|
|
// Closure Initialization
|
|
|
|
|
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
StartNewCallLocked(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -634,8 +639,6 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() { |
|
|
|
|
chand()->xds_client(), chand(), timeout); |
|
|
|
|
} |
|
|
|
|
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release(); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); |
|
|
|
|
retry_timer_callback_pending_ = true; |
|
|
|
|
} |
|
|
|
@ -644,27 +647,26 @@ template <typename T> |
|
|
|
|
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
RetryableCall* calld = static_cast<RetryableCall*>(arg); |
|
|
|
|
calld->chand_->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&calld->on_retry_timer_, OnRetryTimerLocked, calld, |
|
|
|
|
nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
calld->chand_->xds_client()->work_serializer_->Run( |
|
|
|
|
[calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
template <typename T> |
|
|
|
|
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
RetryableCall* calld = static_cast<RetryableCall*>(arg); |
|
|
|
|
calld->retry_timer_callback_pending_ = false; |
|
|
|
|
if (!calld->shutting_down_ && error == GRPC_ERROR_NONE) { |
|
|
|
|
grpc_error* error) { |
|
|
|
|
retry_timer_callback_pending_ = false; |
|
|
|
|
if (!shutting_down_ && error == GRPC_ERROR_NONE) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_INFO, |
|
|
|
|
"[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)", |
|
|
|
|
calld->chand()->xds_client(), calld->chand(), calld); |
|
|
|
|
chand()->xds_client(), chand(), this); |
|
|
|
|
} |
|
|
|
|
calld->StartNewCallLocked(); |
|
|
|
|
StartNewCallLocked(); |
|
|
|
|
} |
|
|
|
|
calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); |
|
|
|
|
this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1164,19 +1166,18 @@ void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate( |
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg); |
|
|
|
|
ads_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&ads_calld->on_request_sent_, OnRequestSentLocked, |
|
|
|
|
ads_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
ads_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[ads_calld, error]() { ads_calld->OnRequestSentLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
AdsCallState* self = static_cast<AdsCallState*>(arg); |
|
|
|
|
if (self->IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) { |
|
|
|
|
grpc_error* error) { |
|
|
|
|
if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) { |
|
|
|
|
// Clean up the sent message.
|
|
|
|
|
grpc_byte_buffer_destroy(self->send_message_payload_); |
|
|
|
|
self->send_message_payload_ = nullptr; |
|
|
|
|
grpc_byte_buffer_destroy(send_message_payload_); |
|
|
|
|
send_message_payload_ = nullptr; |
|
|
|
|
// Continue to send another pending message if any.
|
|
|
|
|
// TODO(roth): The current code to handle buffered messages has the
|
|
|
|
|
// advantage of sending only the most recent list of resource names for
|
|
|
|
@ -1186,41 +1187,36 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked( |
|
|
|
|
// order of resource types. We need to fix this if we are seeing some
|
|
|
|
|
// resource type(s) starved due to frequent requests of other resource
|
|
|
|
|
// type(s).
|
|
|
|
|
auto it = self->buffered_requests_.begin(); |
|
|
|
|
if (it != self->buffered_requests_.end()) { |
|
|
|
|
self->SendMessageLocked(*it); |
|
|
|
|
self->buffered_requests_.erase(it); |
|
|
|
|
auto it = buffered_requests_.begin(); |
|
|
|
|
if (it != buffered_requests_.end()) { |
|
|
|
|
SendMessageLocked(*it); |
|
|
|
|
buffered_requests_.erase(it); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); |
|
|
|
|
Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnResponseReceived( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void* arg, grpc_error* /* error */) { |
|
|
|
|
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg); |
|
|
|
|
ads_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, |
|
|
|
|
OnResponseReceivedLocked, ads_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
ads_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( |
|
|
|
|
void* arg, grpc_error* /*error*/) { |
|
|
|
|
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg); |
|
|
|
|
XdsClient* xds_client = ads_calld->xds_client(); |
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { |
|
|
|
|
// Empty payload means the call was cancelled.
|
|
|
|
|
if (!ads_calld->IsCurrentCallOnChannel() || |
|
|
|
|
ads_calld->recv_message_payload_ == nullptr) { |
|
|
|
|
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); |
|
|
|
|
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { |
|
|
|
|
Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Read the response.
|
|
|
|
|
grpc_byte_buffer_reader bbr; |
|
|
|
|
grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_); |
|
|
|
|
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); |
|
|
|
|
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
|
|
|
|
grpc_byte_buffer_reader_destroy(&bbr); |
|
|
|
|
grpc_byte_buffer_destroy(ads_calld->recv_message_payload_); |
|
|
|
|
ads_calld->recv_message_payload_ = nullptr; |
|
|
|
|
grpc_byte_buffer_destroy(recv_message_payload_); |
|
|
|
|
recv_message_payload_ = nullptr; |
|
|
|
|
// TODO(juanlishen): When we convert this to use the xds protocol, the
|
|
|
|
|
// balancer will send us a fallback timeout such that we should go into
|
|
|
|
|
// fallback mode if we have lost contact with the balancer after a certain
|
|
|
|
@ -1238,24 +1234,24 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( |
|
|
|
|
std::string nonce; |
|
|
|
|
std::string type_url; |
|
|
|
|
// Note that ParseAdsResponse() also validates the response.
|
|
|
|
|
grpc_error* parse_error = xds_client->api_.ParseAdsResponse( |
|
|
|
|
response_slice, xds_client->server_name_, |
|
|
|
|
(xds_client->lds_result_.has_value() |
|
|
|
|
? xds_client->lds_result_->route_config_name |
|
|
|
|
grpc_error* parse_error = xds_client()->api_.ParseAdsResponse( |
|
|
|
|
response_slice, xds_client()->server_name_, |
|
|
|
|
(xds_client()->lds_result_.has_value() |
|
|
|
|
? xds_client()->lds_result_->route_config_name |
|
|
|
|
: ""), |
|
|
|
|
xds_client->xds_routing_enabled_, ads_calld->ClusterNamesForRequest(), |
|
|
|
|
ads_calld->EdsServiceNamesForRequest(), &lds_update, &rds_update, |
|
|
|
|
&cds_update_map, &eds_update_map, &version, &nonce, &type_url); |
|
|
|
|
xds_client()->xds_routing_enabled_, ClusterNamesForRequest(), |
|
|
|
|
EdsServiceNamesForRequest(), &lds_update, &rds_update, &cds_update_map, |
|
|
|
|
&eds_update_map, &version, &nonce, &type_url); |
|
|
|
|
grpc_slice_unref_internal(response_slice); |
|
|
|
|
if (type_url.empty()) { |
|
|
|
|
// Ignore unparsable response.
|
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xds_client %p] Error parsing ADS response (%s) -- ignoring", |
|
|
|
|
xds_client, grpc_error_string(parse_error)); |
|
|
|
|
xds_client(), grpc_error_string(parse_error)); |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
} else { |
|
|
|
|
// Update nonce.
|
|
|
|
|
auto& state = ads_calld->state_map_[type_url]; |
|
|
|
|
auto& state = state_map_[type_url]; |
|
|
|
|
state.nonce = std::move(nonce); |
|
|
|
|
// NACK or ACK the response.
|
|
|
|
|
if (parse_error != GRPC_ERROR_NONE) { |
|
|
|
@ -1265,85 +1261,80 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xds_client %p] ADS response invalid for resource type %s " |
|
|
|
|
"version %s, will NACK: nonce=%s error=%s", |
|
|
|
|
xds_client, type_url.c_str(), version.c_str(), |
|
|
|
|
xds_client(), type_url.c_str(), version.c_str(), |
|
|
|
|
state.nonce.c_str(), grpc_error_string(parse_error)); |
|
|
|
|
ads_calld->SendMessageLocked(type_url); |
|
|
|
|
SendMessageLocked(type_url); |
|
|
|
|
} else { |
|
|
|
|
ads_calld->seen_response_ = true; |
|
|
|
|
seen_response_ = true; |
|
|
|
|
// Accept the ADS response according to the type_url.
|
|
|
|
|
if (type_url == XdsApi::kLdsTypeUrl) { |
|
|
|
|
ads_calld->AcceptLdsUpdate(std::move(lds_update)); |
|
|
|
|
AcceptLdsUpdate(std::move(lds_update)); |
|
|
|
|
} else if (type_url == XdsApi::kRdsTypeUrl) { |
|
|
|
|
ads_calld->AcceptRdsUpdate(std::move(rds_update)); |
|
|
|
|
AcceptRdsUpdate(std::move(rds_update)); |
|
|
|
|
} else if (type_url == XdsApi::kCdsTypeUrl) { |
|
|
|
|
ads_calld->AcceptCdsUpdate(std::move(cds_update_map)); |
|
|
|
|
AcceptCdsUpdate(std::move(cds_update_map)); |
|
|
|
|
} else if (type_url == XdsApi::kEdsTypeUrl) { |
|
|
|
|
ads_calld->AcceptEdsUpdate(std::move(eds_update_map)); |
|
|
|
|
AcceptEdsUpdate(std::move(eds_update_map)); |
|
|
|
|
} |
|
|
|
|
state.version = std::move(version); |
|
|
|
|
// ACK the update.
|
|
|
|
|
ads_calld->SendMessageLocked(type_url); |
|
|
|
|
SendMessageLocked(type_url); |
|
|
|
|
// Start load reporting if needed.
|
|
|
|
|
auto& lrs_call = ads_calld->chand()->lrs_calld_; |
|
|
|
|
auto& lrs_call = chand()->lrs_calld_; |
|
|
|
|
if (lrs_call != nullptr) { |
|
|
|
|
LrsCallState* lrs_calld = lrs_call->calld(); |
|
|
|
|
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (xds_client->shutting_down_) { |
|
|
|
|
ads_calld->Unref(DEBUG_LOCATION, |
|
|
|
|
"ADS+OnResponseReceivedLocked+xds_shutdown"); |
|
|
|
|
if (xds_client()->shutting_down_) { |
|
|
|
|
Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Keep listening for updates.
|
|
|
|
|
grpc_op op; |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message.recv_message = &ads_calld->recv_message_payload_; |
|
|
|
|
op.data.recv_message.recv_message = &recv_message_payload_; |
|
|
|
|
op.flags = 0; |
|
|
|
|
op.reserved = nullptr; |
|
|
|
|
GPR_ASSERT(ads_calld->call_ != nullptr); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
// Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
|
|
|
|
|
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, OnResponseReceived, |
|
|
|
|
ads_calld, grpc_schedule_on_exec_ctx); |
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
ads_calld->call_, &op, 1, &ads_calld->on_response_received_); |
|
|
|
|
const grpc_call_error call_error = |
|
|
|
|
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnStatusReceived( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg); |
|
|
|
|
ads_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&ads_calld->on_status_received_, OnStatusReceivedLocked, |
|
|
|
|
ads_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
ads_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg); |
|
|
|
|
ChannelState* chand = ads_calld->chand(); |
|
|
|
|
XdsClient* xds_client = ads_calld->xds_client(); |
|
|
|
|
grpc_error* error) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
char* status_details = grpc_slice_to_c_string(ads_calld->status_details_); |
|
|
|
|
char* status_details = grpc_slice_to_c_string(status_details_); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_client %p] ADS call status received. Status = %d, details " |
|
|
|
|
"= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'", |
|
|
|
|
xds_client, ads_calld->status_code_, status_details, chand, |
|
|
|
|
ads_calld, ads_calld->call_, grpc_error_string(error)); |
|
|
|
|
xds_client(), status_code_, status_details, chand(), this, call_, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
gpr_free(status_details); |
|
|
|
|
} |
|
|
|
|
// Ignore status from a stale call.
|
|
|
|
|
if (ads_calld->IsCurrentCallOnChannel()) { |
|
|
|
|
if (IsCurrentCallOnChannel()) { |
|
|
|
|
// Try to restart the call.
|
|
|
|
|
ads_calld->parent_->OnCallFinishedLocked(); |
|
|
|
|
parent_->OnCallFinishedLocked(); |
|
|
|
|
// Send error to all watchers.
|
|
|
|
|
xds_client->NotifyOnError( |
|
|
|
|
xds_client()->NotifyOnError( |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed")); |
|
|
|
|
} |
|
|
|
|
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); |
|
|
|
|
Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { |
|
|
|
@ -1388,8 +1379,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { |
|
|
|
|
void XdsClient::ChannelState::LrsCallState::Reporter:: |
|
|
|
|
ScheduleNextReportLocked() { |
|
|
|
|
const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_; |
|
|
|
|
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_timer_init(&next_report_timer_, next_report_time, |
|
|
|
|
&on_next_report_timer_); |
|
|
|
|
next_report_timer_callback_pending_ = true; |
|
|
|
@ -1398,21 +1387,21 @@ void XdsClient::ChannelState::LrsCallState::Reporter:: |
|
|
|
|
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Reporter* self = static_cast<Reporter*>(arg); |
|
|
|
|
self->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_next_report_timer_, OnNextReportTimerLocked, |
|
|
|
|
self, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->xds_client()->work_serializer_->Run( |
|
|
|
|
[self, error]() { self->OnNextReportTimerLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Reporter* self = static_cast<Reporter*>(arg); |
|
|
|
|
self->next_report_timer_callback_pending_ = false; |
|
|
|
|
if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { |
|
|
|
|
self->Unref(DEBUG_LOCATION, "Reporter+timer"); |
|
|
|
|
return; |
|
|
|
|
grpc_error* error) { |
|
|
|
|
next_report_timer_callback_pending_ = false; |
|
|
|
|
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { |
|
|
|
|
Unref(DEBUG_LOCATION, "Reporter+timer"); |
|
|
|
|
} else { |
|
|
|
|
SendReportLocked(); |
|
|
|
|
} |
|
|
|
|
self->SendReportLocked(); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
@ -1456,8 +1445,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_SEND_MESSAGE; |
|
|
|
|
op.data.send_message.send_message = parent_->send_message_payload_; |
|
|
|
|
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, |
|
|
|
|
grpc_schedule_on_exec_ctx); |
|
|
|
|
grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
parent_->call_, &op, 1, &on_report_done_); |
|
|
|
|
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { |
|
|
|
@ -1471,33 +1458,32 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { |
|
|
|
|
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Reporter* self = static_cast<Reporter*>(arg); |
|
|
|
|
self->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&self->on_report_done_, OnReportDoneLocked, self, |
|
|
|
|
nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
self->xds_client()->work_serializer_->Run( |
|
|
|
|
[self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
Reporter* self = static_cast<Reporter*>(arg); |
|
|
|
|
grpc_byte_buffer_destroy(self->parent_->send_message_payload_); |
|
|
|
|
self->parent_->send_message_payload_ = nullptr; |
|
|
|
|
grpc_error* error) { |
|
|
|
|
grpc_byte_buffer_destroy(parent_->send_message_payload_); |
|
|
|
|
parent_->send_message_payload_ = nullptr; |
|
|
|
|
// If there are no more registered stats to report, cancel the call.
|
|
|
|
|
if (self->xds_client()->load_report_map_.empty()) { |
|
|
|
|
self->parent_->chand()->StopLrsCall(); |
|
|
|
|
self->Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters"); |
|
|
|
|
if (xds_client()->load_report_map_.empty()) { |
|
|
|
|
parent_->chand()->StopLrsCall(); |
|
|
|
|
Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { |
|
|
|
|
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { |
|
|
|
|
// If this reporter is no longer the current one on the call, the reason
|
|
|
|
|
// might be that it was orphaned for a new one due to config update.
|
|
|
|
|
if (!self->IsCurrentReporterOnCall()) { |
|
|
|
|
self->parent_->MaybeStartReportingLocked(); |
|
|
|
|
if (!IsCurrentReporterOnCall()) { |
|
|
|
|
parent_->MaybeStartReportingLocked(); |
|
|
|
|
} |
|
|
|
|
self->Unref(DEBUG_LOCATION, "Reporter+report_done"); |
|
|
|
|
return; |
|
|
|
|
Unref(DEBUG_LOCATION, "Reporter+report_done"); |
|
|
|
|
} else { |
|
|
|
|
ScheduleNextReportLocked(); |
|
|
|
|
} |
|
|
|
|
self->ScheduleNextReportLocked(); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
@ -1643,75 +1629,66 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void* arg, grpc_error* /*error*/) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
lrs_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_, |
|
|
|
|
OnInitialRequestSentLocked, lrs_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
lrs_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked( |
|
|
|
|
void* arg, grpc_error* /*error*/) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { |
|
|
|
|
// Clear the send_message_payload_.
|
|
|
|
|
grpc_byte_buffer_destroy(lrs_calld->send_message_payload_); |
|
|
|
|
lrs_calld->send_message_payload_ = nullptr; |
|
|
|
|
lrs_calld->MaybeStartReportingLocked(); |
|
|
|
|
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); |
|
|
|
|
grpc_byte_buffer_destroy(send_message_payload_); |
|
|
|
|
send_message_payload_ = nullptr; |
|
|
|
|
MaybeStartReportingLocked(); |
|
|
|
|
Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnResponseReceived( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
void* arg, grpc_error* /*error*/) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
lrs_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_, |
|
|
|
|
OnResponseReceivedLocked, lrs_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
lrs_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( |
|
|
|
|
void* arg, grpc_error* /*error*/) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
XdsClient* xds_client = lrs_calld->xds_client(); |
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { |
|
|
|
|
// Empty payload means the call was cancelled.
|
|
|
|
|
if (!lrs_calld->IsCurrentCallOnChannel() || |
|
|
|
|
lrs_calld->recv_message_payload_ == nullptr) { |
|
|
|
|
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); |
|
|
|
|
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { |
|
|
|
|
Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Read the response.
|
|
|
|
|
grpc_byte_buffer_reader bbr; |
|
|
|
|
grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_); |
|
|
|
|
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); |
|
|
|
|
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
|
|
|
|
grpc_byte_buffer_reader_destroy(&bbr); |
|
|
|
|
grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_); |
|
|
|
|
lrs_calld->recv_message_payload_ = nullptr; |
|
|
|
|
grpc_byte_buffer_destroy(recv_message_payload_); |
|
|
|
|
recv_message_payload_ = nullptr; |
|
|
|
|
// This anonymous lambda is a hack to avoid the usage of goto.
|
|
|
|
|
[&]() { |
|
|
|
|
// Parse the response.
|
|
|
|
|
std::set<std::string> new_cluster_names; |
|
|
|
|
grpc_millis new_load_reporting_interval; |
|
|
|
|
grpc_error* parse_error = xds_client->api_.ParseLrsResponse( |
|
|
|
|
grpc_error* parse_error = xds_client()->api_.ParseLrsResponse( |
|
|
|
|
response_slice, &new_cluster_names, &new_load_reporting_interval); |
|
|
|
|
if (parse_error != GRPC_ERROR_NONE) { |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"[xds_client %p] LRS response parsing failed. error=%s", |
|
|
|
|
xds_client, grpc_error_string(parse_error)); |
|
|
|
|
xds_client(), grpc_error_string(parse_error)); |
|
|
|
|
GRPC_ERROR_UNREF(parse_error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
lrs_calld->seen_response_ = true; |
|
|
|
|
seen_response_ = true; |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_client %p] LRS response received, %" PRIuPTR |
|
|
|
|
" cluster names, load_report_interval=%" PRId64 "ms", |
|
|
|
|
xds_client, new_cluster_names.size(), |
|
|
|
|
xds_client(), new_cluster_names.size(), |
|
|
|
|
new_load_reporting_interval); |
|
|
|
|
size_t i = 0; |
|
|
|
|
for (const auto& name : new_cluster_names) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s", |
|
|
|
|
xds_client, i++, name.c_str()); |
|
|
|
|
xds_client(), i++, name.c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (new_load_reporting_interval < |
|
|
|
@ -1722,81 +1699,76 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_client %p] Increased load_report_interval to minimum " |
|
|
|
|
"value %dms", |
|
|
|
|
xds_client, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); |
|
|
|
|
xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Ignore identical update.
|
|
|
|
|
if (lrs_calld->cluster_names_ == new_cluster_names && |
|
|
|
|
lrs_calld->load_reporting_interval_ == new_load_reporting_interval) { |
|
|
|
|
if (cluster_names_ == new_cluster_names && |
|
|
|
|
load_reporting_interval_ == new_load_reporting_interval) { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_client %p] Incoming LRS response identical to current, " |
|
|
|
|
"ignoring.", |
|
|
|
|
xds_client); |
|
|
|
|
xds_client()); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Stop current load reporting (if any) to adopt the new config.
|
|
|
|
|
lrs_calld->reporter_.reset(); |
|
|
|
|
reporter_.reset(); |
|
|
|
|
// Record the new config.
|
|
|
|
|
lrs_calld->cluster_names_ = std::move(new_cluster_names); |
|
|
|
|
lrs_calld->load_reporting_interval_ = new_load_reporting_interval; |
|
|
|
|
cluster_names_ = std::move(new_cluster_names); |
|
|
|
|
load_reporting_interval_ = new_load_reporting_interval; |
|
|
|
|
// Try starting sending load report.
|
|
|
|
|
lrs_calld->MaybeStartReportingLocked(); |
|
|
|
|
MaybeStartReportingLocked(); |
|
|
|
|
}(); |
|
|
|
|
grpc_slice_unref_internal(response_slice); |
|
|
|
|
if (xds_client->shutting_down_) { |
|
|
|
|
lrs_calld->Unref(DEBUG_LOCATION, |
|
|
|
|
"LRS+OnResponseReceivedLocked+xds_shutdown"); |
|
|
|
|
if (xds_client()->shutting_down_) { |
|
|
|
|
Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// Keep listening for LRS config updates.
|
|
|
|
|
grpc_op op; |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_; |
|
|
|
|
op.data.recv_message.recv_message = &recv_message_payload_; |
|
|
|
|
op.flags = 0; |
|
|
|
|
op.reserved = nullptr; |
|
|
|
|
GPR_ASSERT(lrs_calld->call_ != nullptr); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
// Reuse the "OnResponseReceivedLocked" ref taken in ctor.
|
|
|
|
|
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_, OnResponseReceived, |
|
|
|
|
lrs_calld, grpc_schedule_on_exec_ctx); |
|
|
|
|
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
lrs_calld->call_, &op, 1, &lrs_calld->on_response_received_); |
|
|
|
|
const grpc_call_error call_error = |
|
|
|
|
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnStatusReceived( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
lrs_calld->xds_client()->combiner_->Run( |
|
|
|
|
GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_, OnStatusReceivedLocked, |
|
|
|
|
lrs_calld, nullptr), |
|
|
|
|
GRPC_ERROR_REF(error)); |
|
|
|
|
GRPC_ERROR_REF(error); // ref owned by lambda
|
|
|
|
|
lrs_calld->xds_client()->work_serializer_->Run( |
|
|
|
|
[lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); }, |
|
|
|
|
DEBUG_LOCATION); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( |
|
|
|
|
void* arg, grpc_error* error) { |
|
|
|
|
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg); |
|
|
|
|
XdsClient* xds_client = lrs_calld->xds_client(); |
|
|
|
|
ChannelState* chand = lrs_calld->chand(); |
|
|
|
|
GPR_ASSERT(lrs_calld->call_ != nullptr); |
|
|
|
|
grpc_error* error) { |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_); |
|
|
|
|
char* status_details = grpc_slice_to_c_string(status_details_); |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[xds_client %p] LRS call status received. Status = %d, details " |
|
|
|
|
"= '%s', (chand: %p, calld: %p, call: %p), error '%s'", |
|
|
|
|
xds_client, lrs_calld->status_code_, status_details, chand, |
|
|
|
|
lrs_calld, lrs_calld->call_, grpc_error_string(error)); |
|
|
|
|
xds_client(), status_code_, status_details, chand(), this, call_, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
gpr_free(status_details); |
|
|
|
|
} |
|
|
|
|
// Ignore status from a stale call.
|
|
|
|
|
if (lrs_calld->IsCurrentCallOnChannel()) { |
|
|
|
|
GPR_ASSERT(!xds_client->shutting_down_); |
|
|
|
|
if (IsCurrentCallOnChannel()) { |
|
|
|
|
GPR_ASSERT(!xds_client()->shutting_down_); |
|
|
|
|
// Try to restart the call.
|
|
|
|
|
lrs_calld->parent_->OnCallFinishedLocked(); |
|
|
|
|
parent_->OnCallFinishedLocked(); |
|
|
|
|
} |
|
|
|
|
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); |
|
|
|
|
Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { |
|
|
|
@ -1825,14 +1797,15 @@ bool GetXdsRoutingEnabled(const grpc_channel_args& args) { |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties, |
|
|
|
|
XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer, |
|
|
|
|
grpc_pollset_set* interested_parties, |
|
|
|
|
StringView server_name, |
|
|
|
|
std::unique_ptr<ServiceConfigWatcherInterface> watcher, |
|
|
|
|
const grpc_channel_args& channel_args, grpc_error** error) |
|
|
|
|
: InternallyRefCounted<XdsClient>(&grpc_xds_client_trace), |
|
|
|
|
request_timeout_(GetRequestTimeout(channel_args)), |
|
|
|
|
xds_routing_enabled_(GetXdsRoutingEnabled(channel_args)), |
|
|
|
|
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")), |
|
|
|
|
work_serializer_(std::move(work_serializer)), |
|
|
|
|
interested_parties_(interested_parties), |
|
|
|
|
bootstrap_( |
|
|
|
|
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)), |
|
|
|
@ -1871,7 +1844,6 @@ XdsClient::~XdsClient() { |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this); |
|
|
|
|
} |
|
|
|
|
GRPC_COMBINER_UNREF(combiner_, "xds_client"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void XdsClient::Orphan() { |
|
|
|
|