Reviewer comments

reviewable/pr21361/r6
Yash Tibrewal 5 years ago
parent b338d84aec
commit 36bd748faa
  1. 291
      src/core/ext/filters/client_channel/xds/xds_client.cc
  2. 5
      src/core/lib/transport/connectivity_state.cc
  3. 5
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  4. 6
      test/core/client_channel/resolvers/dns_resolver_test.cc
  5. 6
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc

@ -93,7 +93,7 @@ class XdsClient::ChannelState::RetryableCall
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimer(void* arg, grpc_error* error);
static void OnRetryTimerLocked(void* arg, grpc_error* error);
void OnRetryTimerLocked(grpc_error* error);
// The wrapped xds call that talks to the xds server. It's instantiated
// every time we start a new call. It's null during call retry backoff.
@ -128,8 +128,8 @@ class XdsClient::ChannelState::AdsCallState
private:
static void OnResponseReceived(void* arg, grpc_error* error);
static void OnStatusReceived(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
void OnResponseReceivedLocked();
void OnStatusReceivedLocked(grpc_error* error);
bool IsCurrentCallOnChannel() const;
@ -180,6 +180,10 @@ class XdsClient::ChannelState::LrsCallState
public:
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
grpc_schedule_on_exec_ctx);
ScheduleNextReportLocked();
}
@ -188,10 +192,10 @@ class XdsClient::ChannelState::LrsCallState
private:
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
static void OnNextReportTimerLocked(void* arg, grpc_error* error);
void OnNextReportTimerLocked(grpc_error* error);
void SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
static void OnReportDoneLocked(void* arg, grpc_error* error);
void OnReportDoneLocked(grpc_error* error);
bool IsCurrentReporterOnCall() const {
return this == parent_->reporter_.get();
@ -213,9 +217,9 @@ class XdsClient::ChannelState::LrsCallState
static void OnInitialRequestSent(void* arg, grpc_error* error);
static void OnResponseReceived(void* arg, grpc_error* error);
static void OnStatusReceived(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
void OnInitialRequestSentLocked();
void OnResponseReceivedLocked();
void OnStatusReceivedLocked(grpc_error* error);
bool IsCurrentCallOnChannel() const;
@ -425,6 +429,9 @@ XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
// Closure Initialization
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
StartNewCallLocked();
}
@ -478,8 +485,6 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
chand()->xds_client(), chand(), timeout);
}
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
retry_timer_callback_pending_ = true;
}
@ -488,28 +493,26 @@ template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
calld->chand_->xds_client()->logical_thread_->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&calld->on_retry_timer_,
OnRetryTimerLocked, calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION);
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
calld->retry_timer_callback_pending_ = false;
if (!calld->shutting_down_ && error == GRPC_ERROR_NONE) {
grpc_error* error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error == GRPC_ERROR_NONE) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(
GPR_INFO,
"[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
calld->chand()->xds_client(), calld->chand(), calld);
chand()->xds_client(), chand(), this);
}
calld->StartNewCallLocked();
StartNewCallLocked();
}
calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
GRPC_ERROR_UNREF(error);
}
//
@ -632,33 +635,25 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
void* arg, grpc_error* /*error*/) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ads_calld->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_,
OnResponseReceivedLocked, ads_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
void* arg, grpc_error* /*error*/) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
XdsClient* xds_client = ads_calld->xds_client();
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!ads_calld->IsCurrentCallOnChannel() ||
ads_calld->recv_message_payload_ == nullptr) {
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
return;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_);
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(ads_calld->recv_message_payload_);
ads_calld->recv_message_payload_ = nullptr;
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// TODO(juanlishen): When we convert this to use the xds protocol, the
// balancer will send us a fallback timeout such that we should go into
// fallback mode if we have lost contact with the balancer after a certain
@ -676,7 +671,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response parsing failed. error=%s",
xds_client, grpc_error_string(parse_error));
xds_client(), grpc_error_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
return;
}
@ -686,17 +681,17 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
gpr_log(GPR_ERROR,
"[xds_client %p] ADS response '%s' doesn't contain any valid "
"locality but doesn't require to drop all calls. Ignoring.",
xds_client, response_slice_str);
xds_client(), response_slice_str);
gpr_free(response_slice_str);
return;
}
ads_calld->seen_response_ = true;
seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] ADS response with %" PRIuPTR
" priorities and %" PRIuPTR
" drop categories received (drop_all=%d)",
xds_client, update.priority_list_update.size(),
xds_client(), update.priority_list_update.size(),
update.drop_config->drop_category_list().size(), update.drop_all);
for (size_t priority = 0; priority < update.priority_list_update.size();
++priority) {
@ -705,14 +700,14 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
gpr_log(GPR_INFO,
"[xds_client %p] Priority %" PRIuPTR " contains %" PRIuPTR
" localities",
xds_client, priority, locality_map_update->size());
xds_client(), priority, locality_map_update->size());
size_t locality_count = 0;
for (const auto& p : locality_map_update->localities) {
const auto& locality = p.second;
gpr_log(GPR_INFO,
"[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
" %s contains %" PRIuPTR " server addresses",
xds_client, priority, locality_count,
xds_client(), priority, locality_count,
locality.name->AsHumanReadableString(),
locality.serverlist.size());
for (size_t i = 0; i < locality.serverlist.size(); ++i) {
@ -722,7 +717,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
gpr_log(GPR_INFO,
"[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
" %s, server address %" PRIuPTR ": %s",
xds_client, priority, locality_count,
xds_client(), priority, locality_count,
locality.name->AsHumanReadableString(), i, ipport);
gpr_free(ipport);
}
@ -735,18 +730,18 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
update.drop_config->drop_category_list()[i];
gpr_log(GPR_INFO,
"[xds_client %p] Drop category %s has drop rate %d per million",
xds_client, drop_category.name.get(),
xds_client(), drop_category.name.get(),
drop_category.parts_per_million);
}
}
// Start load reporting if needed.
auto& lrs_call = ads_calld->chand()->lrs_calld_;
auto& lrs_call = chand()->lrs_calld_;
if (lrs_call != nullptr) {
LrsCallState* lrs_calld = lrs_call->calld();
if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
}
// Ignore identical update.
const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update;
const EdsUpdate& prev_update = xds_client()->cluster_state_.eds_update;
const bool priority_list_changed =
prev_update.priority_list_update != update.priority_list_update;
const bool drop_config_changed =
@ -756,12 +751,12 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] EDS update identical to current, ignoring.",
xds_client);
xds_client());
}
return;
}
// Update the cluster state.
ClusterState& cluster_state = xds_client->cluster_state_;
ClusterState& cluster_state = xds_client()->cluster_state_;
cluster_state.eds_update = std::move(update);
// Notify all watchers.
for (const auto& p : cluster_state.endpoint_watchers) {
@ -769,61 +764,54 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
}
}();
grpc_slice_unref_internal(response_slice);
if (xds_client->shutting_down_) {
ads_calld->Unref(DEBUG_LOCATION,
"ADS+OnResponseReceivedLocked+xds_shutdown");
if (xds_client()->shutting_down_) {
Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown");
return;
}
// Keep listening for serverlist updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &ads_calld->recv_message_payload_;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(ads_calld->call_ != nullptr);
GPR_ASSERT(call_ != nullptr);
// Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, OnResponseReceived,
ads_calld, grpc_schedule_on_exec_ctx);
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
ads_calld->call_, &op, 1, &ads_calld->on_response_received_);
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
ads_calld->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&ads_calld->on_status_received_,
OnStatusReceivedLocked, ads_calld, nullptr),
GRPC_ERROR_REF(error)),
[ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); },
DEBUG_LOCATION);
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ChannelState* chand = ads_calld->chand();
XdsClient* xds_client = ads_calld->xds_client();
grpc_error* error) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(ads_calld->status_details_);
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] ADS call status received. Status = %d, details "
"= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
xds_client, ads_calld->status_code_, status_details, chand,
ads_calld, ads_calld->call_, grpc_error_string(error));
xds_client(), status_code_, status_details, chand(), this, call_,
grpc_error_string(error));
gpr_free(status_details);
}
// Ignore status from a stale call.
if (ads_calld->IsCurrentCallOnChannel()) {
if (IsCurrentCallOnChannel()) {
// Try to restart the call.
ads_calld->parent_->OnCallFinishedLocked();
parent_->OnCallFinishedLocked();
// Send error to all watchers.
xds_client->NotifyOnError(
xds_client()->NotifyOnError(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
}
ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
@ -846,8 +834,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&next_report_timer_, next_report_time,
&on_next_report_timer_);
next_report_timer_callback_pending_ = true;
@ -856,23 +842,22 @@ void XdsClient::ChannelState::LrsCallState::Reporter::
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
self->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_next_report_timer_,
OnNextReportTimerLocked, self, nullptr),
GRPC_ERROR_REF(error)),
[self, error]() { self->OnNextReportTimerLocked(error); },
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->next_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
self->Unref(DEBUG_LOCATION, "Reporter+timer");
grpc_error* error) {
next_report_timer_callback_pending_ = false;
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
Unref(DEBUG_LOCATION, "Reporter+timer");
GRPC_ERROR_UNREF(error);
return;
}
self->SendReportLocked();
SendReportLocked();
GRPC_ERROR_UNREF(error);
}
void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
@ -903,8 +888,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = parent_->send_message_payload_;
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute(
parent_->call_, &op, 1, &on_report_done_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
@ -918,28 +901,27 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
self->xds_client()->logical_thread_->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&self->on_report_done_,
OnReportDoneLocked, self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
grpc_byte_buffer_destroy(self->parent_->send_message_payload_);
self->parent_->send_message_payload_ = nullptr;
if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) {
grpc_error* error) {
grpc_byte_buffer_destroy(parent_->send_message_payload_);
parent_->send_message_payload_ = nullptr;
if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
// If this reporter is no longer the current one on the call, the reason
// might be that it was orphaned for a new one due to config update.
if (!self->IsCurrentReporterOnCall()) {
self->parent_->MaybeStartReportingLocked();
if (!IsCurrentReporterOnCall()) {
parent_->MaybeStartReportingLocked();
}
self->Unref(DEBUG_LOCATION, "Reporter+report_done");
Unref(DEBUG_LOCATION, "Reporter+report_done");
GRPC_ERROR_UNREF(error);
return;
}
self->ScheduleNextReportLocked();
ScheduleNextReportLocked();
GRPC_ERROR_UNREF(error);
}
//
@ -1086,54 +1068,41 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error* error) {
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_,
OnInitialRequestSentLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
[lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); },
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked(
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
// Clear the send_message_payload_.
grpc_byte_buffer_destroy(lrs_calld->send_message_payload_);
lrs_calld->send_message_payload_ = nullptr;
lrs_calld->MaybeStartReportingLocked();
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
grpc_byte_buffer_destroy(send_message_payload_);
send_message_payload_ = nullptr;
MaybeStartReportingLocked();
Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_,
OnResponseReceivedLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
void* arg, grpc_error* /*error*/) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
XdsClient* xds_client = lrs_calld->xds_client();
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
// Empty payload means the call was cancelled.
if (!lrs_calld->IsCurrentCallOnChannel() ||
lrs_calld->recv_message_payload_ == nullptr) {
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
return;
}
// Read the response.
grpc_byte_buffer_reader bbr;
grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_);
grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_);
lrs_calld->recv_message_payload_ = nullptr;
grpc_byte_buffer_destroy(recv_message_payload_);
recv_message_payload_ = nullptr;
// This anonymous lambda is a hack to avoid the usage of goto.
[&]() {
// Parse the response.
@ -1144,16 +1113,17 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
if (parse_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"[xds_client %p] LRS response parsing failed. error=%s",
xds_client, grpc_error_string(parse_error));
xds_client(), grpc_error_string(parse_error));
GRPC_ERROR_UNREF(parse_error);
return;
}
lrs_calld->seen_response_ = true;
seen_response_ = true;
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] LRS response received, cluster_name=%s, "
"load_report_interval=%" PRId64 "ms",
xds_client, new_cluster_name.get(), new_load_reporting_interval);
xds_client(), new_cluster_name.get(),
new_load_reporting_interval);
}
if (new_load_reporting_interval <
GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
@ -1163,83 +1133,76 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
gpr_log(GPR_INFO,
"[xds_client %p] Increased load_report_interval to minimum "
"value %dms",
xds_client, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
}
}
// Ignore identical update.
if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval &&
strcmp(lrs_calld->cluster_name_.get(), new_cluster_name.get()) == 0) {
if (load_reporting_interval_ == new_load_reporting_interval &&
strcmp(cluster_name_.get(), new_cluster_name.get()) == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
"[xds_client %p] Incoming LRS response identical to current, "
"ignoring.",
xds_client);
xds_client());
}
return;
}
// Stop current load reporting (if any) to adopt the new config.
lrs_calld->reporter_.reset();
reporter_.reset();
// Record the new config.
lrs_calld->cluster_name_ = std::move(new_cluster_name);
lrs_calld->load_reporting_interval_ = new_load_reporting_interval;
cluster_name_ = std::move(new_cluster_name);
load_reporting_interval_ = new_load_reporting_interval;
// Try starting sending load report.
lrs_calld->MaybeStartReportingLocked();
MaybeStartReportingLocked();
}();
grpc_slice_unref_internal(response_slice);
if (xds_client->shutting_down_) {
lrs_calld->Unref(DEBUG_LOCATION,
"LRS+OnResponseReceivedLocked+xds_shutdown");
if (xds_client()->shutting_down_) {
Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown");
return;
}
// Keep listening for LRS config updates.
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_;
op.data.recv_message.recv_message = &recv_message_payload_;
op.flags = 0;
op.reserved = nullptr;
GPR_ASSERT(lrs_calld->call_ != nullptr);
GPR_ASSERT(call_ != nullptr);
// Reuse the "OnResponseReceivedLocked" ref taken in ctor.
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_, OnResponseReceived,
lrs_calld, grpc_schedule_on_exec_ctx);
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
lrs_calld->call_, &op, 1, &lrs_calld->on_response_received_);
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
lrs_calld->xds_client()->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_,
OnStatusReceivedLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
[lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); },
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
XdsClient* xds_client = lrs_calld->xds_client();
ChannelState* chand = lrs_calld->chand();
GPR_ASSERT(lrs_calld->call_ != nullptr);
grpc_error* error) {
GPR_ASSERT(call_ != nullptr);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_);
char* status_details = grpc_slice_to_c_string(status_details_);
gpr_log(GPR_INFO,
"[xds_client %p] LRS call status received. Status = %d, details "
"= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
xds_client, lrs_calld->status_code_, status_details, chand,
lrs_calld, lrs_calld->call_, grpc_error_string(error));
xds_client(), status_code_, status_details, chand(), this, call_,
grpc_error_string(error));
gpr_free(status_details);
}
// Ignore status from a stale call.
if (lrs_calld->IsCurrentCallOnChannel()) {
GPR_ASSERT(!xds_client->shutting_down_);
if (IsCurrentCallOnChannel()) {
GPR_ASSERT(!xds_client()->shutting_down_);
// Try to restart the call.
lrs_calld->parent_->OnCallFinishedLocked();
parent_->OnCallFinishedLocked();
}
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
GRPC_ERROR_UNREF(error);
}
bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {

@ -62,10 +62,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
const RefCountedPtr<LogicalThread>& logical_thread)
: watcher_(std::move(watcher)), state_(state) {
if (logical_thread != nullptr) {
logical_thread->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr),
GRPC_ERROR_NONE),
logical_thread->Run([this]() { SendNotification(this, GRPC_ERROR_NONE); },
DEBUG_LOCATION);
} else {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,

@ -320,8 +320,6 @@ int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
auto logical_thread = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_logical_thread = &logical_thread;
@ -331,8 +329,7 @@ int main(int argc, char** argv) {
grpc_set_resolver_impl(&test_resolver);
test_cooldown();
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown_blocking();
GPR_ASSERT(g_all_callbacks_invoked);
return 0;

@ -73,10 +73,7 @@ int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
{
auto logical_thread =
grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
auto logical_thread = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_logical_thread = &logical_thread;
grpc_core::ResolverFactory* dns =
@ -94,7 +91,6 @@ int main(int argc, char** argv) {
test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");
}
}
}
grpc_shutdown();
return 0;

@ -78,8 +78,7 @@ static void test_fails(grpc_core::ResolverFactory* factory,
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
auto logical_thread = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_logical_thread = &logical_thread;
@ -101,8 +100,7 @@ int main(int argc, char** argv) {
test_succeeds(ipv6, "ipv6:[::]:1234");
test_fails(ipv6, "ipv6:[::]:123456");
test_fails(ipv6, "ipv6:www.google.com");
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown();
return 0;

Loading…
Cancel
Save