diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 12f8f8605ff..65a3e2d6779 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -93,7 +93,7 @@ class XdsClient::ChannelState::RetryableCall void StartNewCallLocked(); void StartRetryTimerLocked(); static void OnRetryTimer(void* arg, grpc_error* error); - static void OnRetryTimerLocked(void* arg, grpc_error* error); + void OnRetryTimerLocked(grpc_error* error); // The wrapped xds call that talks to the xds server. It's instantiated // every time we start a new call. It's null during call retry backoff. @@ -128,8 +128,8 @@ class XdsClient::ChannelState::AdsCallState private: static void OnResponseReceived(void* arg, grpc_error* error); static void OnStatusReceived(void* arg, grpc_error* error); - static void OnResponseReceivedLocked(void* arg, grpc_error* error); - static void OnStatusReceivedLocked(void* arg, grpc_error* error); + void OnResponseReceivedLocked(); + void OnStatusReceivedLocked(grpc_error* error); bool IsCurrentCallOnChannel() const; @@ -180,6 +180,10 @@ class XdsClient::ChannelState::LrsCallState public: Reporter(RefCountedPtr parent, grpc_millis report_interval) : parent_(std::move(parent)), report_interval_(report_interval) { + GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, + grpc_schedule_on_exec_ctx); ScheduleNextReportLocked(); } @@ -188,10 +192,10 @@ class XdsClient::ChannelState::LrsCallState private: void ScheduleNextReportLocked(); static void OnNextReportTimer(void* arg, grpc_error* error); - static void OnNextReportTimerLocked(void* arg, grpc_error* error); + void OnNextReportTimerLocked(grpc_error* error); void SendReportLocked(); static void OnReportDone(void* arg, grpc_error* error); - static void OnReportDoneLocked(void* arg, grpc_error* error); + void OnReportDoneLocked(grpc_error* error); bool IsCurrentReporterOnCall() const { return this == parent_->reporter_.get(); @@ -213,9 +217,9 @@ class XdsClient::ChannelState::LrsCallState static void OnInitialRequestSent(void* arg, grpc_error* error); static void OnResponseReceived(void* arg, grpc_error* error); static void OnStatusReceived(void* arg, grpc_error* error); - static void OnInitialRequestSentLocked(void* arg, grpc_error* error); - static void OnResponseReceivedLocked(void* arg, grpc_error* error); - static void OnStatusReceivedLocked(void* arg, grpc_error* error); + void OnInitialRequestSentLocked(); + void OnResponseReceivedLocked(); + void OnStatusReceivedLocked(grpc_error* error); bool IsCurrentCallOnChannel() const; @@ -425,6 +429,9 @@ XdsClient::ChannelState::RetryableCall::RetryableCall( .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_XDS_RECONNECT_JITTER) .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { + // Closure Initialization + GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, + grpc_schedule_on_exec_ctx); StartNewCallLocked(); } @@ -478,8 +485,6 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { chand()->xds_client(), chand(), timeout); } this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release(); - GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, - grpc_schedule_on_exec_ctx); grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); retry_timer_callback_pending_ = true; } @@ -488,28 +493,26 @@ template void XdsClient::ChannelState::RetryableCall::OnRetryTimer( void* arg, grpc_error* error) { RetryableCall* calld = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda calld->chand_->xds_client()->logical_thread_->Run( - Closure::ToFunction(GRPC_CLOSURE_INIT(&calld->on_retry_timer_, - OnRetryTimerLocked, calld, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION); } template void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( - void* arg, grpc_error* error) { - RetryableCall* calld = static_cast(arg); - calld->retry_timer_callback_pending_ = false; - if (!calld->shutting_down_ && error == GRPC_ERROR_NONE) { + grpc_error* error) { + retry_timer_callback_pending_ = false; + if (!shutting_down_ && error == GRPC_ERROR_NONE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log( GPR_INFO, "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)", - calld->chand()->xds_client(), calld->chand(), calld); + chand()->xds_client(), chand(), this); } - calld->StartNewCallLocked(); + StartNewCallLocked(); } - calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); + this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done"); + GRPC_ERROR_UNREF(error); } // @@ -632,33 +635,25 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { } void XdsClient::ChannelState::AdsCallState::OnResponseReceived( - void* arg, grpc_error* error) { + void* arg, grpc_error* /*error*/) { AdsCallState* ads_calld = static_cast(arg); ads_calld->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, - OnResponseReceivedLocked, ads_calld, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); } -void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( - void* arg, grpc_error* /*error*/) { - AdsCallState* ads_calld = static_cast(arg); - XdsClient* xds_client = ads_calld->xds_client(); +void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() { // Empty payload means the call was cancelled. - if (!ads_calld->IsCurrentCallOnChannel() || - ads_calld->recv_message_payload_ == nullptr) { - ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); + if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { + Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked"); return; } // Read the response. grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, ads_calld->recv_message_payload_); + grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(ads_calld->recv_message_payload_); - ads_calld->recv_message_payload_ = nullptr; + grpc_byte_buffer_destroy(recv_message_payload_); + recv_message_payload_ = nullptr; // TODO(juanlishen): When we convert this to use the xds protocol, the // balancer will send us a fallback timeout such that we should go into // fallback mode if we have lost contact with the balancer after a certain @@ -676,7 +671,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( if (parse_error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "[xds_client %p] ADS response parsing failed. error=%s", - xds_client, grpc_error_string(parse_error)); + xds_client(), grpc_error_string(parse_error)); GRPC_ERROR_UNREF(parse_error); return; } @@ -686,17 +681,17 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( gpr_log(GPR_ERROR, "[xds_client %p] ADS response '%s' doesn't contain any valid " "locality but doesn't require to drop all calls. Ignoring.", - xds_client, response_slice_str); + xds_client(), response_slice_str); gpr_free(response_slice_str); return; } - ads_calld->seen_response_ = true; + seen_response_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] ADS response with %" PRIuPTR " priorities and %" PRIuPTR " drop categories received (drop_all=%d)", - xds_client, update.priority_list_update.size(), + xds_client(), update.priority_list_update.size(), update.drop_config->drop_category_list().size(), update.drop_all); for (size_t priority = 0; priority < update.priority_list_update.size(); ++priority) { @@ -705,14 +700,14 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR " contains %" PRIuPTR " localities", - xds_client, priority, locality_map_update->size()); + xds_client(), priority, locality_map_update->size()); size_t locality_count = 0; for (const auto& p : locality_map_update->localities) { const auto& locality = p.second; gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR " %s contains %" PRIuPTR " server addresses", - xds_client, priority, locality_count, + xds_client(), priority, locality_count, locality.name->AsHumanReadableString(), locality.serverlist.size()); for (size_t i = 0; i < locality.serverlist.size(); ++i) { @@ -722,7 +717,7 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( gpr_log(GPR_INFO, "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR " %s, server address %" PRIuPTR ": %s", - xds_client, priority, locality_count, + xds_client(), priority, locality_count, locality.name->AsHumanReadableString(), i, ipport); gpr_free(ipport); } @@ -735,18 +730,18 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( update.drop_config->drop_category_list()[i]; gpr_log(GPR_INFO, "[xds_client %p] Drop category %s has drop rate %d per million", - xds_client, drop_category.name.get(), + xds_client(), drop_category.name.get(), drop_category.parts_per_million); } } // Start load reporting if needed. - auto& lrs_call = ads_calld->chand()->lrs_calld_; + auto& lrs_call = chand()->lrs_calld_; if (lrs_call != nullptr) { LrsCallState* lrs_calld = lrs_call->calld(); if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked(); } // Ignore identical update. - const EdsUpdate& prev_update = xds_client->cluster_state_.eds_update; + const EdsUpdate& prev_update = xds_client()->cluster_state_.eds_update; const bool priority_list_changed = prev_update.priority_list_update != update.priority_list_update; const bool drop_config_changed = @@ -756,12 +751,12 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] EDS update identical to current, ignoring.", - xds_client); + xds_client()); } return; } // Update the cluster state. - ClusterState& cluster_state = xds_client->cluster_state_; + ClusterState& cluster_state = xds_client()->cluster_state_; cluster_state.eds_update = std::move(update); // Notify all watchers. for (const auto& p : cluster_state.endpoint_watchers) { @@ -769,61 +764,54 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( } }(); grpc_slice_unref_internal(response_slice); - if (xds_client->shutting_down_) { - ads_calld->Unref(DEBUG_LOCATION, - "ADS+OnResponseReceivedLocked+xds_shutdown"); + if (xds_client()->shutting_down_) { + Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown"); return; } // Keep listening for serverlist updates. grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; - op.data.recv_message.recv_message = &ads_calld->recv_message_payload_; + op.data.recv_message.recv_message = &recv_message_payload_; op.flags = 0; op.reserved = nullptr; - GPR_ASSERT(ads_calld->call_ != nullptr); + GPR_ASSERT(call_ != nullptr); // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor. - GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, OnResponseReceived, - ads_calld, grpc_schedule_on_exec_ctx); - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - ads_calld->call_, &op, 1, &ads_calld->on_response_received_); + const grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } void XdsClient::ChannelState::AdsCallState::OnStatusReceived( void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda ads_calld->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&ads_calld->on_status_received_, - OnStatusReceivedLocked, ads_calld, nullptr), - GRPC_ERROR_REF(error)), + [ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); }, DEBUG_LOCATION); } void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( - void* arg, grpc_error* error) { - AdsCallState* ads_calld = static_cast(arg); - ChannelState* chand = ads_calld->chand(); - XdsClient* xds_client = ads_calld->xds_client(); + grpc_error* error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - char* status_details = grpc_slice_to_c_string(ads_calld->status_details_); + char* status_details = grpc_slice_to_c_string(status_details_); gpr_log(GPR_INFO, "[xds_client %p] ADS call status received. Status = %d, details " "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'", - xds_client, ads_calld->status_code_, status_details, chand, - ads_calld, ads_calld->call_, grpc_error_string(error)); + xds_client(), status_code_, status_details, chand(), this, call_, + grpc_error_string(error)); gpr_free(status_details); } // Ignore status from a stale call. - if (ads_calld->IsCurrentCallOnChannel()) { + if (IsCurrentCallOnChannel()) { // Try to restart the call. - ads_calld->parent_->OnCallFinishedLocked(); + parent_->OnCallFinishedLocked(); // Send error to all watchers. - xds_client->NotifyOnError( + xds_client()->NotifyOnError( GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed")); } - ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); + Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked"); + GRPC_ERROR_UNREF(error); } bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const { @@ -846,8 +834,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { void XdsClient::ChannelState::LrsCallState::Reporter:: ScheduleNextReportLocked() { const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_; - GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, - grpc_schedule_on_exec_ctx); grpc_timer_init(&next_report_timer_, next_report_time, &on_next_report_timer_); next_report_timer_callback_pending_ = true; @@ -856,23 +842,22 @@ void XdsClient::ChannelState::LrsCallState::Reporter:: void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda self->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&self->on_next_report_timer_, - OnNextReportTimerLocked, self, nullptr), - GRPC_ERROR_REF(error)), + [self, error]() { self->OnNextReportTimerLocked(error); }, DEBUG_LOCATION); } void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( - void* arg, grpc_error* error) { - Reporter* self = static_cast(arg); - self->next_report_timer_callback_pending_ = false; - if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { - self->Unref(DEBUG_LOCATION, "Reporter+timer"); + grpc_error* error) { + next_report_timer_callback_pending_ = false; + if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { + Unref(DEBUG_LOCATION, "Reporter+timer"); + GRPC_ERROR_UNREF(error); return; } - self->SendReportLocked(); + SendReportLocked(); + GRPC_ERROR_UNREF(error); } void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { @@ -903,8 +888,6 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = parent_->send_message_payload_; - GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, - grpc_schedule_on_exec_ctx); grpc_call_error call_error = grpc_call_start_batch_and_execute( parent_->call_, &op, 1, &on_report_done_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { @@ -918,28 +901,27 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda self->xds_client()->logical_thread_->Run( - Closure::ToFunction(GRPC_CLOSURE_INIT(&self->on_report_done_, - OnReportDoneLocked, self, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION); } void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( - void* arg, grpc_error* error) { - Reporter* self = static_cast(arg); - grpc_byte_buffer_destroy(self->parent_->send_message_payload_); - self->parent_->send_message_payload_ = nullptr; - if (error != GRPC_ERROR_NONE || !self->IsCurrentReporterOnCall()) { + grpc_error* error) { + grpc_byte_buffer_destroy(parent_->send_message_payload_); + parent_->send_message_payload_ = nullptr; + if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) { // If this reporter is no longer the current one on the call, the reason // might be that it was orphaned for a new one due to config update. - if (!self->IsCurrentReporterOnCall()) { - self->parent_->MaybeStartReportingLocked(); + if (!IsCurrentReporterOnCall()) { + parent_->MaybeStartReportingLocked(); } - self->Unref(DEBUG_LOCATION, "Reporter+report_done"); + Unref(DEBUG_LOCATION, "Reporter+report_done"); + GRPC_ERROR_UNREF(error); return; } - self->ScheduleNextReportLocked(); + ScheduleNextReportLocked(); + GRPC_ERROR_UNREF(error); } // @@ -1086,54 +1068,41 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { } void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( - void* arg, grpc_error* error) { + void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); lrs_calld->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_, - OnInitialRequestSentLocked, lrs_calld, nullptr), - GRPC_ERROR_REF(error)), + [lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); }, DEBUG_LOCATION); } -void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked( - void* arg, grpc_error* /*error*/) { - LrsCallState* lrs_calld = static_cast(arg); +void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() { // Clear the send_message_payload_. - grpc_byte_buffer_destroy(lrs_calld->send_message_payload_); - lrs_calld->send_message_payload_ = nullptr; - lrs_calld->MaybeStartReportingLocked(); - lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); + grpc_byte_buffer_destroy(send_message_payload_); + send_message_payload_ = nullptr; + MaybeStartReportingLocked(); + Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); } void XdsClient::ChannelState::LrsCallState::OnResponseReceived( - void* arg, grpc_error* error) { + void* arg, grpc_error* /*error*/) { LrsCallState* lrs_calld = static_cast(arg); lrs_calld->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_, - OnResponseReceivedLocked, lrs_calld, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION); } -void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( - void* arg, grpc_error* /*error*/) { - LrsCallState* lrs_calld = static_cast(arg); - XdsClient* xds_client = lrs_calld->xds_client(); +void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() { // Empty payload means the call was cancelled. - if (!lrs_calld->IsCurrentCallOnChannel() || - lrs_calld->recv_message_payload_ == nullptr) { - lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); + if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) { + Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked"); return; } // Read the response. grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, lrs_calld->recv_message_payload_); + grpc_byte_buffer_reader_init(&bbr, recv_message_payload_); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(lrs_calld->recv_message_payload_); - lrs_calld->recv_message_payload_ = nullptr; + grpc_byte_buffer_destroy(recv_message_payload_); + recv_message_payload_ = nullptr; // This anonymous lambda is a hack to avoid the usage of goto. [&]() { // Parse the response. @@ -1144,16 +1113,17 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( if (parse_error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "[xds_client %p] LRS response parsing failed. error=%s", - xds_client, grpc_error_string(parse_error)); + xds_client(), grpc_error_string(parse_error)); GRPC_ERROR_UNREF(parse_error); return; } - lrs_calld->seen_response_ = true; + seen_response_ = true; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] LRS response received, cluster_name=%s, " "load_report_interval=%" PRId64 "ms", - xds_client, new_cluster_name.get(), new_load_reporting_interval); + xds_client(), new_cluster_name.get(), + new_load_reporting_interval); } if (new_load_reporting_interval < GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) { @@ -1163,83 +1133,76 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( gpr_log(GPR_INFO, "[xds_client %p] Increased load_report_interval to minimum " "value %dms", - xds_client, GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); + xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS); } } // Ignore identical update. - if (lrs_calld->load_reporting_interval_ == new_load_reporting_interval && - strcmp(lrs_calld->cluster_name_.get(), new_cluster_name.get()) == 0) { + if (load_reporting_interval_ == new_load_reporting_interval && + strcmp(cluster_name_.get(), new_cluster_name.get()) == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, "[xds_client %p] Incoming LRS response identical to current, " "ignoring.", - xds_client); + xds_client()); } return; } // Stop current load reporting (if any) to adopt the new config. - lrs_calld->reporter_.reset(); + reporter_.reset(); // Record the new config. - lrs_calld->cluster_name_ = std::move(new_cluster_name); - lrs_calld->load_reporting_interval_ = new_load_reporting_interval; + cluster_name_ = std::move(new_cluster_name); + load_reporting_interval_ = new_load_reporting_interval; // Try starting sending load report. - lrs_calld->MaybeStartReportingLocked(); + MaybeStartReportingLocked(); }(); grpc_slice_unref_internal(response_slice); - if (xds_client->shutting_down_) { - lrs_calld->Unref(DEBUG_LOCATION, - "LRS+OnResponseReceivedLocked+xds_shutdown"); + if (xds_client()->shutting_down_) { + Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown"); return; } // Keep listening for LRS config updates. grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; - op.data.recv_message.recv_message = &lrs_calld->recv_message_payload_; + op.data.recv_message.recv_message = &recv_message_payload_; op.flags = 0; op.reserved = nullptr; - GPR_ASSERT(lrs_calld->call_ != nullptr); + GPR_ASSERT(call_ != nullptr); // Reuse the "OnResponseReceivedLocked" ref taken in ctor. - GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_, OnResponseReceived, - lrs_calld, grpc_schedule_on_exec_ctx); - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - lrs_calld->call_, &op, 1, &lrs_calld->on_response_received_); + const grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } void XdsClient::ChannelState::LrsCallState::OnStatusReceived( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda lrs_calld->xds_client()->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_, - OnStatusReceivedLocked, lrs_calld, nullptr), - GRPC_ERROR_REF(error)), + [lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); }, DEBUG_LOCATION); } void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( - void* arg, grpc_error* error) { - LrsCallState* lrs_calld = static_cast(arg); - XdsClient* xds_client = lrs_calld->xds_client(); - ChannelState* chand = lrs_calld->chand(); - GPR_ASSERT(lrs_calld->call_ != nullptr); + grpc_error* error) { + GPR_ASSERT(call_ != nullptr); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { - char* status_details = grpc_slice_to_c_string(lrs_calld->status_details_); + char* status_details = grpc_slice_to_c_string(status_details_); gpr_log(GPR_INFO, "[xds_client %p] LRS call status received. Status = %d, details " "= '%s', (chand: %p, calld: %p, call: %p), error '%s'", - xds_client, lrs_calld->status_code_, status_details, chand, - lrs_calld, lrs_calld->call_, grpc_error_string(error)); + xds_client(), status_code_, status_details, chand(), this, call_, + grpc_error_string(error)); gpr_free(status_details); } // Ignore status from a stale call. - if (lrs_calld->IsCurrentCallOnChannel()) { - GPR_ASSERT(!xds_client->shutting_down_); + if (IsCurrentCallOnChannel()) { + GPR_ASSERT(!xds_client()->shutting_down_); // Try to restart the call. - lrs_calld->parent_->OnCallFinishedLocked(); + parent_->OnCallFinishedLocked(); } - lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); + Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked"); + GRPC_ERROR_UNREF(error); } bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const { diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index a370a645a55..b1507391a06 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -62,11 +62,8 @@ class AsyncConnectivityStateWatcherInterface::Notifier { const RefCountedPtr& logical_thread) : watcher_(std::move(watcher)), state_(state) { if (logical_thread != nullptr) { - logical_thread->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr), - GRPC_ERROR_NONE), - DEBUG_LOCATION); + logical_thread->Run([this]() { SendNotification(this, GRPC_ERROR_NONE); }, + DEBUG_LOCATION); } else { GRPC_CLOSURE_INIT(&closure_, SendNotification, this, grpc_schedule_on_exec_ctx); diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index cac30472760..9bf218c8ad8 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -320,19 +320,16 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto logical_thread = grpc_core::MakeRefCounted(); + g_logical_thread = &logical_thread; - g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; - grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; - default_resolve_address = grpc_resolve_address_impl; - grpc_set_resolver_impl(&test_resolver); + g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked; + grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked; + default_resolve_address = grpc_resolve_address_impl; + grpc_set_resolver_impl(&test_resolver); + + test_cooldown(); - test_cooldown(); - grpc_core::ExecCtx::Get()->Flush(); - } grpc_shutdown_blocking(); GPR_ASSERT(g_all_callbacks_invoked); return 0; diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index 8ee63f303dc..d2b01bf435e 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -73,26 +73,22 @@ int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); { - grpc_core::ExecCtx exec_ctx; - { - auto logical_thread = - grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; + auto logical_thread = grpc_core::MakeRefCounted(); + g_logical_thread = &logical_thread; - grpc_core::ResolverFactory* dns = - grpc_core::ResolverRegistry::LookupResolverFactory("dns"); + grpc_core::ResolverFactory* dns = + grpc_core::ResolverRegistry::LookupResolverFactory("dns"); - test_succeeds(dns, "dns:10.2.1.1"); - test_succeeds(dns, "dns:10.2.1.1:1234"); - test_succeeds(dns, "dns:www.google.com"); - test_succeeds(dns, "dns:///www.google.com"); - grpc_core::UniquePtr resolver = - GPR_GLOBAL_CONFIG_GET(grpc_dns_resolver); - if (gpr_stricmp(resolver.get(), "native") == 0) { - test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888"); - } else { - test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888"); - } + test_succeeds(dns, "dns:10.2.1.1"); + test_succeeds(dns, "dns:10.2.1.1:1234"); + test_succeeds(dns, "dns:www.google.com"); + test_succeeds(dns, "dns:///www.google.com"); + grpc_core::UniquePtr resolver = + GPR_GLOBAL_CONFIG_GET(grpc_dns_resolver); + if (gpr_stricmp(resolver.get(), "native") == 0) { + test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888"); + } else { + test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888"); } } grpc_shutdown(); diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index be17de306b9..7ce2ad9c247 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -78,31 +78,29 @@ static void test_fails(grpc_core::ResolverFactory* factory, int main(int argc, char** argv) { grpc::testing::TestEnvironment env(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - auto logical_thread = grpc_core::MakeRefCounted(); - g_logical_thread = &logical_thread; - - grpc_core::ResolverFactory* ipv4 = - grpc_core::ResolverRegistry::LookupResolverFactory("ipv4"); - grpc_core::ResolverFactory* ipv6 = - grpc_core::ResolverRegistry::LookupResolverFactory("ipv6"); - - test_fails(ipv4, "ipv4:10.2.1.1"); - test_succeeds(ipv4, "ipv4:10.2.1.1:1234"); - test_succeeds(ipv4, "ipv4:10.2.1.1:1234,127.0.0.1:4321"); - test_fails(ipv4, "ipv4:10.2.1.1:123456"); - test_fails(ipv4, "ipv4:www.google.com"); - test_fails(ipv4, "ipv4:["); - test_fails(ipv4, "ipv4://8.8.8.8/8.8.8.8:8888"); - - test_fails(ipv6, "ipv6:["); - test_fails(ipv6, "ipv6:[::]"); - test_succeeds(ipv6, "ipv6:[::]:1234"); - test_fails(ipv6, "ipv6:[::]:123456"); - test_fails(ipv6, "ipv6:www.google.com"); - grpc_core::ExecCtx::Get()->Flush(); - } + + auto logical_thread = grpc_core::MakeRefCounted(); + g_logical_thread = &logical_thread; + + grpc_core::ResolverFactory* ipv4 = + grpc_core::ResolverRegistry::LookupResolverFactory("ipv4"); + grpc_core::ResolverFactory* ipv6 = + grpc_core::ResolverRegistry::LookupResolverFactory("ipv6"); + + test_fails(ipv4, "ipv4:10.2.1.1"); + test_succeeds(ipv4, "ipv4:10.2.1.1:1234"); + test_succeeds(ipv4, "ipv4:10.2.1.1:1234,127.0.0.1:4321"); + test_fails(ipv4, "ipv4:10.2.1.1:123456"); + test_fails(ipv4, "ipv4:www.google.com"); + test_fails(ipv4, "ipv4:["); + test_fails(ipv4, "ipv4://8.8.8.8/8.8.8.8:8888"); + + test_fails(ipv6, "ipv6:["); + test_fails(ipv6, "ipv6:[::]"); + test_succeeds(ipv6, "ipv6:[::]:1234"); + test_fails(ipv6, "ipv6:[::]:123456"); + test_fails(ipv6, "ipv6:www.google.com"); + grpc_shutdown(); return 0;