diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index c1954c03ee1..3cdacd79121 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -97,6 +97,7 @@ class XdsClient::ChannelState : public InternallyRefCounted { private: void StartNewCallLocked(); void StartRetryTimerLocked(); + static void OnRetryTimer(void* arg, grpc_error* error); static void OnRetryTimerLocked(void* arg, grpc_error* error); // The wrapped call that talks to the xds server. It's instantiated @@ -129,6 +130,8 @@ class XdsClient::ChannelState : public InternallyRefCounted { bool seen_response() const { return seen_response_; } private: + static void OnResponseReceived(void* arg, grpc_error* error); + static void OnStatusReceived(void* arg, grpc_error* error); static void OnResponseReceivedLocked(void* arg, grpc_error* error); static void OnStatusReceivedLocked(void* arg, grpc_error* error); @@ -180,10 +183,6 @@ class XdsClient::ChannelState : public InternallyRefCounted { public: Reporter(RefCountedPtr parent, grpc_millis report_interval) : parent_(std::move(parent)), report_interval_(report_interval) { - GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); ScheduleNextReportLocked(); } @@ -191,8 +190,10 @@ class XdsClient::ChannelState : public InternallyRefCounted { private: void ScheduleNextReportLocked(); + static void OnNextReportTimer(void* arg, grpc_error* error); static void OnNextReportTimerLocked(void* arg, grpc_error* error); void SendReportLocked(); + static void OnReportDone(void* arg, grpc_error* error); static void OnReportDoneLocked(void* arg, grpc_error* error); bool IsCurrentReporterOnCall() const { @@ -212,6 +213,9 @@ class XdsClient::ChannelState : public InternallyRefCounted { grpc_closure on_report_done_; }; + static void OnInitialRequestSent(void* arg, grpc_error* error); + static void OnResponseReceived(void* arg, grpc_error* error); + static void OnStatusReceived(void* arg, grpc_error* error); static void OnInitialRequestSentLocked(void* arg, grpc_error* error); static void OnResponseReceivedLocked(void* arg, grpc_error* error); static void OnStatusReceivedLocked(void* arg, grpc_error* error); @@ -449,8 +453,6 @@ XdsClient::ChannelState::RetryableCall::RetryableCall( .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_XDS_RECONNECT_JITTER) .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { - GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimerLocked, this, - grpc_combiner_scheduler(chand_->xds_client()->combiner_)); StartNewCallLocked(); } @@ -504,10 +506,21 @@ void XdsClient::ChannelState::RetryableCall::StartRetryTimerLocked() { chand()->xds_client(), chand(), timeout); } this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release(); + GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this, + grpc_schedule_on_exec_ctx); grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_); retry_timer_callback_pending_ = true; } +template +void XdsClient::ChannelState::RetryableCall::OnRetryTimer( + void* arg, grpc_error* error) { + RetryableCall* calld = static_cast(arg); + calld->chand_->xds_client()->combiner()->Run( + GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimerLocked, this, nullptr), + GRPC_ERROR_REF(error)); +} + template void XdsClient::ChannelState::RetryableCall::OnRetryTimerLocked( void* arg, grpc_error* error) { @@ -555,10 +568,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // Init other data associated with the call. grpc_metadata_array_init(&initial_metadata_recv_); grpc_metadata_array_init(&trailing_metadata_recv_); - GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, @@ -602,6 +611,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( op->reserved = nullptr; op++; Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release(); + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -617,6 +628,9 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // This callback signals the end of the call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. + + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -643,6 +657,16 @@ void XdsClient::ChannelState::AdsCallState::Orphan() { // corresponding unref happens in on_status_received_ instead of here. } +void XdsClient::ChannelState::AdsCallState::OnResponseReceived( + void* arg, grpc_error* error) { + AdsCallState* ads_calld = static_cast(arg); + ads_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, + nullptr), + GRPC_ERROR_REF(error)); + ; +} + void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); @@ -786,6 +810,15 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked( GPR_ASSERT(GRPC_CALL_OK == call_error); } +void XdsClient::ChannelState::AdsCallState::OnStatusReceived( + void* arg, grpc_error* error) { + AdsCallState* ads_calld = static_cast(arg); + ads_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked( void* arg, grpc_error* error) { AdsCallState* ads_calld = static_cast(arg); @@ -831,11 +864,22 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() { void XdsClient::ChannelState::LrsCallState::Reporter:: ScheduleNextReportLocked() { const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_; + GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this, + grpc_schedule_on_exec_ctx); grpc_timer_init(&next_report_timer_, next_report_time, &on_next_report_timer_); next_report_timer_callback_pending_ = true; } +void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( + void* arg, grpc_error* error) { + Reporter* self = static_cast(arg); + self->xds_client()->combiner()->Run( + GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); @@ -875,6 +919,8 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { memset(&op, 0, sizeof(op)); op.op = GRPC_OP_SEND_MESSAGE; op.data.send_message.send_message = parent_->send_message_payload_; + GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this, + grpc_schedule_on_exec_ctx); grpc_call_error call_error = grpc_call_start_batch_and_execute( parent_->call_, &op, 1, &on_report_done_); if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { @@ -885,6 +931,14 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() { } } +void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone( + void* arg, grpc_error* error) { + Reporter* self = static_cast(arg); + self->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); @@ -931,12 +985,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( // Init other data associated with the LRS call. grpc_metadata_array_init(&initial_metadata_recv_); grpc_metadata_array_init(&trailing_metadata_recv_); - GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); - GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, - grpc_combiner_scheduler(xds_client()->combiner_)); + // Start the call. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { gpr_log(GPR_INFO, @@ -963,6 +1012,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( op->reserved = nullptr; op++; Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release(); + GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), &on_initial_request_sent_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -981,6 +1032,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( op->reserved = nullptr; op++; Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release(); + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), &on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -996,6 +1049,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState( // This callback signals the end of the call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, + grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); @@ -1044,6 +1099,15 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() { Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_); } +void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent( + void* arg, grpc_error* error) { + LrsCallState* lrs_calld = static_cast(arg); + lrs_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, + this, nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); @@ -1054,6 +1118,15 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked( lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked"); } +void XdsClient::ChannelState::LrsCallState::OnResponseReceived( + void* arg, grpc_error* error) { + LrsCallState* lrs_calld = static_cast(arg); + lrs_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); @@ -1137,11 +1210,22 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked( op.reserved = nullptr; GPR_ASSERT(lrs_calld->call_ != nullptr); // Reuse the "OnResponseReceivedLocked" ref taken in ctor. + GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, + grpc_schedule_on_exec_ctx); const grpc_call_error call_error = grpc_call_start_batch_and_execute( lrs_calld->call_, &op, 1, &lrs_calld->on_response_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); } +void XdsClient::ChannelState::LrsCallState::OnStatusReceived( + void* arg, grpc_error* error) { + LrsCallState* lrs_calld = static_cast(arg); + lrs_calld->xds_client()->combiner_->Run( + GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this, + nullptr), + GRPC_ERROR_REF(error)); +} + void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked( void* arg, grpc_error* error) { LrsCallState* lrs_calld = static_cast(arg); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index ef4210acb3d..5de6c37ad18 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -108,6 +108,8 @@ class XdsClient : public InternallyRefCounted { static RefCountedPtr GetFromChannelArgs( const grpc_channel_args& args); + grpc_combiner* combiner() { return combiner_; } + private: class ChannelState;