Replace Closure::ToFunction with lambda

reviewable/pr21361/r5
Yash Tibrewal 5 years ago
parent ff7364ef37
commit ad3ca5b390
  1. 99
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc

@ -315,20 +315,19 @@ class GrpcLb : public LoadBalancingPolicy {
const grpc_channel_args& args); const grpc_channel_args& args);
static void OnBalancerChannelConnectivityChanged(void* arg, static void OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error); grpc_error* error);
static void OnBalancerChannelConnectivityChangedLocked(void* arg, void OnBalancerChannelConnectivityChangedLocked();
grpc_error* error);
void CancelBalancerChannelConnectivityWatchLocked(); void CancelBalancerChannelConnectivityWatchLocked();
// Methods for dealing with fallback state. // Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup(); void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimer(void* arg, grpc_error* error); static void OnFallbackTimer(void* arg, grpc_error* error);
static void OnFallbackTimerLocked(void* arg, grpc_error* error); void OnFallbackTimerLocked(grpc_error* error);
// Methods for dealing with the balancer call. // Methods for dealing with the balancer call.
void StartBalancerCallLocked(); void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked(); void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error* error); static void OnBalancerCallRetryTimer(void* arg, grpc_error* error);
static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); void OnBalancerCallRetryTimerLocked(grpc_error* error);
// Methods for dealing with the child policy. // Methods for dealing with the child policy.
grpc_channel_args* CreateChildPolicyArgsLocked( grpc_channel_args* CreateChildPolicyArgsLocked(
@ -1523,36 +1522,29 @@ void GrpcLb::ProcessAddressesAndChannelArgsLocked(
} }
void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg, void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error) { grpc_error* /*error*/) {
GrpcLb* self = static_cast<GrpcLb*>(arg); GrpcLb* self = static_cast<GrpcLb*>(arg);
self->logical_thread()->Run( self->logical_thread()->Run(
Closure::ToFunction( [self]() { self->OnBalancerChannelConnectivityChangedLocked(); },
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked,
self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg, void GrpcLb::OnBalancerChannelConnectivityChangedLocked() {
grpc_error* /*error*/) { if (!shutting_down_ && fallback_at_startup_checks_pending_) {
GrpcLb* self = static_cast<GrpcLb*>(arg); if (lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch. // Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem = grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element( grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->lb_channel_)); grpc_channel_get_channel_stack(lb_channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_, GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChanged, self, &GrpcLb::OnBalancerChannelConnectivityChanged, this,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_client_channel_watch_connectivity_state( grpc_client_channel_watch_connectivity_state(
client_channel_elem, client_channel_elem,
grpc_polling_entity_create_from_pollset_set( grpc_polling_entity_create_from_pollset_set(interested_parties()),
self->interested_parties()), &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
&self->lb_channel_connectivity_, nullptr);
&self->lb_channel_on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below. return; // Early out so we don't drop the ref below.
} }
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
@ -1560,14 +1552,14 @@ void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p] balancer channel in state TRANSIENT_FAILURE; " "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode", "entering fallback mode",
self); this);
self->fallback_at_startup_checks_pending_ = false; fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->lb_fallback_timer_); grpc_timer_cancel(&lb_fallback_timer_);
self->fallback_mode_ = true; fallback_mode_ = true;
self->CreateOrUpdateChildPolicyLocked(); CreateOrUpdateChildPolicyLocked();
} }
// Done watching connectivity state, so drop ref. // Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
} }
void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() { void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
@ -1624,27 +1616,24 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) { void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->logical_thread()->Run( grpclb_policy->logical_thread()->Run(
Closure::ToFunction( [grpclb_policy, error]() {
GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_, grpclb_policy->OnBalancerCallRetryTimerLocked(error);
&GrpcLb::OnBalancerCallRetryTimerLocked, },
grpclb_policy, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); retry_timer_callback_pending_ = false;
grpclb_policy->retry_timer_callback_pending_ = false; if (!shutting_down_ && error == GRPC_ERROR_NONE && lb_calld_ == nullptr) {
if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
grpclb_policy->lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
grpclb_policy);
} }
grpclb_policy->StartBalancerCallLocked(); StartBalancerCallLocked();
} }
grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
GRPC_ERROR_UNREF(error);
} }
// //
@ -1671,30 +1660,28 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) { void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg); GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
grpclb_policy->logical_thread()->Run( grpclb_policy->logical_thread()->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_, [grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
&GrpcLb::OnFallbackTimerLocked,
grpclb_policy, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION); DEBUG_LOCATION);
} }
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { void GrpcLb::OnFallbackTimerLocked(grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
// If we receive a serverlist after the timer fires but before this callback // If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back. // actually runs, don't fall back.
if (grpclb_policy->fallback_at_startup_checks_pending_ && if (fallback_at_startup_checks_pending_ && !shutting_down_ &&
!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) { error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; " "[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode", "entering fallback mode",
grpclb_policy); this);
grpclb_policy->fallback_at_startup_checks_pending_ = false; fallback_at_startup_checks_pending_ = false;
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy->fallback_mode_ = true; fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked(); CreateOrUpdateChildPolicyLocked();
} }
grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); Unref(DEBUG_LOCATION, "on_fallback_timer");
GRPC_ERROR_UNREF(error);
} }
// //

Loading…
Cancel
Save