One left from xds_client.cc

reviewable/pr20542/r1
Yash Tibrewal 6 years ago
parent 4b1223f875
commit 5f22055d0d
  1. 116
      src/core/ext/filters/client_channel/xds/xds_client.cc
  2. 2
      src/core/ext/filters/client_channel/xds/xds_client.h

@ -97,6 +97,7 @@ class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
private:
void StartNewCallLocked();
void StartRetryTimerLocked();
static void OnRetryTimer(void* arg, grpc_error* error);
static void OnRetryTimerLocked(void* arg, grpc_error* error);
// The wrapped call that talks to the xds server. It's instantiated
@ -129,6 +130,8 @@ class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
bool seen_response() const { return seen_response_; }
private:
static void OnResponseReceived(void* arg, grpc_error* error);
static void OnStatusReceived(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
@ -180,10 +183,6 @@ class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
public:
Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
: parent_(std::move(parent)), report_interval_(report_interval) {
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
ScheduleNextReportLocked();
}
@ -191,8 +190,10 @@ class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
private:
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
static void OnNextReportTimerLocked(void* arg, grpc_error* error);
void SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
static void OnReportDoneLocked(void* arg, grpc_error* error);
bool IsCurrentReporterOnCall() const {
@ -212,6 +213,9 @@ class XdsClient::ChannelState : public InternallyRefCounted<ChannelState> {
grpc_closure on_report_done_;
};
static void OnInitialRequestSent(void* arg, grpc_error* error);
static void OnResponseReceived(void* arg, grpc_error* error);
static void OnStatusReceived(void* arg, grpc_error* error);
static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
static void OnResponseReceivedLocked(void* arg, grpc_error* error);
static void OnStatusReceivedLocked(void* arg, grpc_error* error);
@ -449,8 +453,6 @@ XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
.set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_XDS_RECONNECT_JITTER)
.set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimerLocked, this,
grpc_combiner_scheduler(chand_->xds_client()->combiner_));
StartNewCallLocked();
}
@ -504,10 +506,21 @@ void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
chand()->xds_client(), chand(), timeout);
}
this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
retry_timer_callback_pending_ = true;
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
calld->chand_->xds_client()->combiner()->Run(
GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimerLocked, this, nullptr),
GRPC_ERROR_REF(error));
}
template <typename T>
void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
void* arg, grpc_error* error) {
@ -555,10 +568,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// Init other data associated with the call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
@ -602,6 +611,8 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -617,6 +628,9 @@ XdsClient::ChannelState::AdsCallState::AdsCallState(
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -643,6 +657,16 @@ void XdsClient::ChannelState::AdsCallState::Orphan() {
// corresponding unref happens in on_status_received_ instead of here.
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ads_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
nullptr),
GRPC_ERROR_REF(error));
;
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
@ -786,6 +810,15 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ads_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
@ -831,11 +864,22 @@ void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked() {
const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&next_report_timer_, next_report_time,
&on_next_report_timer_);
next_report_timer_callback_pending_ = true;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->xds_client()->combiner()->Run(
GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimerLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
@ -875,6 +919,8 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message.send_message = parent_->send_message_payload_;
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
grpc_schedule_on_exec_ctx);
grpc_call_error call_error = grpc_call_start_batch_and_execute(
parent_->call_, &op, 1, &on_report_done_);
if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
@ -885,6 +931,14 @@ void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
}
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_report_done_, OnReportDoneLocked, this, nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
@ -931,12 +985,7 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
// Init other data associated with the LRS call.
grpc_metadata_array_init(&initial_metadata_recv_);
grpc_metadata_array_init(&trailing_metadata_recv_);
GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
grpc_combiner_scheduler(xds_client()->combiner_));
// Start the call.
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO,
@ -963,6 +1012,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_initial_request_sent_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -981,6 +1032,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -996,6 +1049,8 @@ XdsClient::ChannelState::LrsCallState::LrsCallState(
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
grpc_schedule_on_exec_ctx);
call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
&on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
@ -1044,6 +1099,15 @@ void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSentLocked,
this, nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
@ -1054,6 +1118,15 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked(
lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceivedLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
@ -1137,11 +1210,22 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
op.reserved = nullptr;
GPR_ASSERT(lrs_calld->call_ != nullptr);
// Reuse the "OnResponseReceivedLocked" ref taken in ctor.
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
grpc_schedule_on_exec_ctx);
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
lrs_calld->call_, &op, 1, &lrs_calld->on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceivedLocked, this,
nullptr),
GRPC_ERROR_REF(error));
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);

@ -108,6 +108,8 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static RefCountedPtr<XdsClient> GetFromChannelArgs(
const grpc_channel_args& args);
grpc_combiner* combiner() { return combiner_; }
private:
class ChannelState;

Loading…
Cancel
Save