reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent 3c3a7d0e9b
commit 9f933903b9
  1. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 4
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  3. 18
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  4. 2
      src/core/ext/filters/client_channel/xds/xds_client.cc
  5. 6
      src/core/lib/iomgr/combiner.cc

@ -999,7 +999,7 @@ void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
ClientLoadReportDone, lb_calld, nullptr),
ClientLoadReportDoneLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
}

@ -1652,8 +1652,8 @@ void XdsLb::PriorityList::LocalityMap::Locality::DeactivateLocked() {
weight_ = 0;
// Start a timer to delete the locality.
Ref(DEBUG_LOCATION, "Locality+timer").release();
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimerLocked,
this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(
&delayed_removal_timer_,
ExecCtx::Get()->Now() + xds_policy()->locality_retention_interval_ms_,

@ -304,13 +304,6 @@ static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver_unref(driver);
}
static void on_readable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->combiner->Run(
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable, fdn, nullptr),
GRPC_ERROR_REF(error));
}
static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
GPR_ASSERT(fdn->readable_registered);
@ -336,10 +329,10 @@ static void on_readable_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver_unref(ev_driver);
}
static void on_writable(void* arg, grpc_error* error) {
static void on_readable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->combiner->Run(
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable, fdn, nullptr),
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, nullptr),
GRPC_ERROR_REF(error));
}
@ -366,6 +359,13 @@ static void on_writable_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver_unref(ev_driver);
}
static void on_writable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->combiner->Run(
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, nullptr),
GRPC_ERROR_REF(error));
}
ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel;

@ -805,6 +805,8 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
op.reserved = nullptr;
GPR_ASSERT(ads_calld->call_ != nullptr);
// Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_, OnResponseReceived,
ads_calld, grpc_schedule_on_exec_ctx);
const grpc_call_error call_error = grpc_call_start_batch_and_execute(
ads_calld->call_, &op, 1, &ads_calld->on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);

@ -331,6 +331,9 @@ static void enqueue_finally(void* closure, grpc_error* error) {
namespace grpc_core {
void Combiner::Run(grpc_closure* closure, grpc_error* error) {
GPR_ASSERT(closure->scheduler == nullptr ||
closure->scheduler ==
reinterpret_cast<grpc_closure_scheduler*>(this));
combiner_exec(this, closure, error);
}
@ -345,6 +348,9 @@ void Combiner::Run(grpc_closure_list* list) {
}
void Combiner::FinallyRun(grpc_closure* closure, grpc_error* error) {
GPR_ASSERT(closure->scheduler == nullptr ||
closure->scheduler ==
reinterpret_cast<grpc_closure_scheduler*>(this));
combiner_finally_exec(this, closure, error);
}
} // namespace grpc_core

Loading…
Cancel
Save