Eliminate PingOneLocked() from LB policy API.

pull/16167/head
Mark D. Roth 6 years ago
parent 334d47ee0d
commit f22dc13c5b
  1. 33
      src/core/ext/filters/client_channel/client_channel.cc
  2. 20
      src/core/ext/filters/client_channel/lb_policy.h
  3. 124
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 26
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 31
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc

@ -571,15 +571,27 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
if (chand->lb_policy == nullptr) { if (chand->lb_policy == nullptr) {
GRPC_CLOSURE_SCHED( grpc_error* error =
op->send_ping.on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
} else { } else {
chand->lb_policy->PingOneLocked(op->send_ping.on_initiate, grpc_error* error = GRPC_ERROR_NONE;
op->send_ping.on_ack); grpc_core::LoadBalancingPolicy::PickState pick_state;
memset(&pick_state, 0, sizeof(pick_state));
// Pick must return synchronously, because pick_state.on_complete is null.
GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
if (pick_state.connected_subchannel != nullptr) {
pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
} else {
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LB policy dropped call on ping");
}
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
}
op->bind_pollset = nullptr; op->bind_pollset = nullptr;
} }
op->send_ping.on_initiate = nullptr; op->send_ping.on_initiate = nullptr;
@ -2684,14 +2696,15 @@ class LbPicker {
grpc_combiner_scheduler(chand->combiner)); grpc_combiner_scheduler(chand->combiner));
calld->pick.on_complete = &calld->pick_closure; calld->pick.on_complete = &calld->pick_closure;
GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); grpc_error* error = GRPC_ERROR_NONE;
const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
if (GPR_LIKELY(pick_done)) { if (GPR_LIKELY(pick_done)) {
// Pick completed synchronously. // Pick completed synchronously.
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
chand, calld); chand, calld);
} }
pick_done_locked(elem, GRPC_ERROR_NONE); pick_done_locked(elem, error);
GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
} else { } else {
// Pick will be returned asynchronously. // Pick will be returned asynchronously.

@ -71,6 +71,7 @@ class LoadBalancingPolicy
/// Storage for LB token in \a initial_metadata, or nullptr if not used. /// Storage for LB token in \a initial_metadata, or nullptr if not used.
grpc_linked_mdelem lb_token_mdelem_storage; grpc_linked_mdelem lb_token_mdelem_storage;
/// Closure to run when pick is complete, if not completed synchronously. /// Closure to run when pick is complete, if not completed synchronously.
/// If null, pick will fail if a result is not available synchronously.
grpc_closure* on_complete; grpc_closure* on_complete;
/// Will be set to the selected subchannel, or nullptr on failure or when /// Will be set to the selected subchannel, or nullptr on failure or when
/// the LB policy decides to drop the call. /// the LB policy decides to drop the call.
@ -99,10 +100,15 @@ class LoadBalancingPolicy
/// Finds an appropriate subchannel for a call, based on data in \a pick. /// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete. /// \a pick must remain alive until the pick is complete.
/// ///
/// If the pick succeeds and a result is known immediately, returns true. /// If a result is known immediately, returns true, setting \a *error
/// Otherwise, \a pick->on_complete will be invoked once the pick is /// upon failure. Otherwise, \a pick->on_complete will be invoked once
/// complete with its error argument set to indicate success or failure. /// the pick is complete with its error argument set to indicate success
virtual bool PickLocked(PickState* pick) GRPC_ABSTRACT; /// or failure.
///
/// If \a pick->on_complete is null and no result is known immediately,
/// a synchronous failure will be returned (i.e., \a *error will be
/// set and true will be returned).
virtual bool PickLocked(PickState* pick, grpc_error** error) GRPC_ABSTRACT;
/// Cancels \a pick. /// Cancels \a pick.
/// The \a on_complete callback of the pending pick will be invoked with /// The \a on_complete callback of the pending pick will be invoked with
@ -133,12 +139,6 @@ class LoadBalancingPolicy
virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) virtual void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy)
GRPC_ABSTRACT; GRPC_ABSTRACT;
/// Performs a connected subchannel ping via \a ConnectedSubchannel::Ping()
/// against one of the connected subchannels managed by the policy.
/// Note: This is intended only for use in tests.
virtual void PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) GRPC_ABSTRACT;
/// Tries to enter a READY connectivity state. /// Tries to enter a READY connectivity state.
/// TODO(roth): As part of restructuring how we handle IDLE state, /// TODO(roth): As part of restructuring how we handle IDLE state,
/// consider whether this method is still needed. /// consider whether this method is still needed.

@ -123,7 +123,7 @@ class GrpcLb : public LoadBalancingPolicy {
GrpcLb(const grpc_lb_addresses* addresses, const Args& args); GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
void UpdateLocked(const grpc_channel_args& args) override; void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick) override; bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
@ -133,7 +133,6 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked( grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override; grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels, void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) override; ChildRefsList* child_channels) override;
@ -167,13 +166,6 @@ class GrpcLb : public LoadBalancingPolicy {
PendingPick* next = nullptr; PendingPick* next = nullptr;
}; };
/// A linked list of pending pings waiting for the RR policy to be created.
struct PendingPing {
grpc_closure* on_initiate;
grpc_closure* on_ack;
PendingPing* next = nullptr;
};
/// Contains a call to the LB server and all the data related to the call. /// Contains a call to the LB server and all the data related to the call.
class BalancerCallState class BalancerCallState
: public InternallyRefCountedWithTracing<BalancerCallState> { : public InternallyRefCountedWithTracing<BalancerCallState> {
@ -272,14 +264,12 @@ class GrpcLb : public LoadBalancingPolicy {
void AddPendingPick(PendingPick* pp); void AddPendingPick(PendingPick* pp);
static void OnPendingPickComplete(void* arg, grpc_error* error); static void OnPendingPickComplete(void* arg, grpc_error* error);
// Pending ping methods.
void AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack);
// Methods for dealing with the RR policy. // Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked(); void CreateOrUpdateRoundRobinPolicyLocked();
grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
void CreateRoundRobinPolicyLocked(const Args& args); void CreateRoundRobinPolicyLocked(const Args& args);
bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp); bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error);
void UpdateConnectivityStateFromRoundRobinPolicyLocked( void UpdateConnectivityStateFromRoundRobinPolicyLocked(
grpc_error* rr_state_error); grpc_error* rr_state_error);
static void OnRoundRobinConnectivityChangedLocked(void* arg, static void OnRoundRobinConnectivityChangedLocked(void* arg,
@ -342,9 +332,8 @@ class GrpcLb : public LoadBalancingPolicy {
grpc_timer lb_fallback_timer_; grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_; grpc_closure lb_on_fallback_;
// Pending picks and pings that are waiting on the RR policy's connectivity. // Pending picks that are waiting on the RR policy's connectivity.
PendingPick* pending_picks_ = nullptr; PendingPick* pending_picks_ = nullptr;
PendingPing* pending_pings_ = nullptr;
// The RR policy to use for the backends. // The RR policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> rr_policy_; OrphanablePtr<LoadBalancingPolicy> rr_policy_;
@ -1080,7 +1069,6 @@ GrpcLb::GrpcLb(const grpc_lb_addresses* addresses,
GrpcLb::~GrpcLb() { GrpcLb::~GrpcLb() {
GPR_ASSERT(pending_picks_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr);
GPR_ASSERT(pending_pings_ == nullptr);
gpr_mu_destroy(&lb_channel_mu_); gpr_mu_destroy(&lb_channel_mu_);
gpr_free((void*)server_name_); gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_); grpc_channel_args_destroy(args_);
@ -1126,14 +1114,6 @@ void GrpcLb::ShutdownLocked() {
// Note: pp is deleted in this callback. // Note: pp is deleted in this callback.
GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error));
} }
// Clear pending pings.
PendingPing* pping;
while ((pping = pending_pings_) != nullptr) {
pending_pings_ = pping->next;
GRPC_CLOSURE_SCHED(pping->on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(pping->on_ack, GRPC_ERROR_REF(error));
Delete(pping);
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -1147,9 +1127,10 @@ void GrpcLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
pending_picks_ = pp->next; pending_picks_ = pp->next;
pp->pick->on_complete = pp->original_on_complete; pp->pick->on_complete = pp->original_on_complete;
pp->pick->user_data = nullptr; pp->pick->user_data = nullptr;
if (new_policy->PickLocked(pp->pick)) { grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pp->pick, &error)) {
// Synchronous return; schedule closure. // Synchronous return; schedule closure.
GRPC_CLOSURE_SCHED(pp->pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pp->pick->on_complete, error);
} }
Delete(pp); Delete(pp);
} }
@ -1233,58 +1214,37 @@ void GrpcLb::ExitIdleLocked() {
} }
} }
bool GrpcLb::PickLocked(PickState* pick) { bool GrpcLb::PickLocked(PickState* pick, grpc_error** error) {
PendingPick* pp = PendingPickCreate(pick); PendingPick* pp = PendingPickCreate(pick);
bool pick_done = false; bool pick_done = false;
if (rr_policy_ != nullptr) { if (rr_policy_ != nullptr) {
const grpc_connectivity_state rr_connectivity_state = if (grpc_lb_glb_trace.enabled()) {
rr_policy_->CheckConnectivityLocked(nullptr); gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
// The RR policy may have transitioned to SHUTDOWN but the callback rr_policy_.get());
// registered to capture this event (on_rr_connectivity_changed_) may not }
// have been invoked yet. We need to make sure we aren't trying to pick pick_done =
// from an RR policy instance that's in shutdown. PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error);
if (rr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { } else { // rr_policy_ == NULL
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
pick_done = true;
} else {
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"[grpclb %p] NOT picking from from RR %p: RR conn state=%s", "[grpclb %p] No RR policy. Adding to grpclb's pending picks",
this, rr_policy_.get(), this);
grpc_connectivity_state_name(rr_connectivity_state));
} }
AddPendingPick(pp); AddPendingPick(pp);
pick_done = false; if (!started_picking_) {
} else { // RR not in shutdown StartPickingLocked();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", this,
rr_policy_.get());
} }
pick_done = PickFromRoundRobinPolicyLocked(false /* force_async */, pp); pick_done = false;
}
} else { // rr_policy_ == NULL
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO,
"[grpclb %p] No RR policy. Adding to grpclb's pending picks",
this);
}
AddPendingPick(pp);
if (!started_picking_) {
StartPickingLocked();
} }
pick_done = false;
} }
return pick_done; return pick_done;
} }
void GrpcLb::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (rr_policy_ != nullptr) {
rr_policy_->PingOneLocked(on_initiate, on_ack);
} else {
AddPendingPing(on_initiate, on_ack);
if (!started_picking_) {
StartPickingLocked();
}
}
}
void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels, void GrpcLb::FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* child_channels) { ChildRefsList* child_channels) {
// delegate to the RoundRobin to fill the children subchannels. // delegate to the RoundRobin to fill the children subchannels.
@ -1598,18 +1558,6 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
pending_picks_ = pp; pending_picks_ = pp;
} }
//
// PendingPing
//
void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
PendingPing* pping = New<PendingPing>();
pping->on_initiate = on_initiate;
pping->on_ack = on_ack;
pping->next = pending_pings_;
pending_pings_ = pping;
}
// //
// code for interacting with the RR policy // code for interacting with the RR policy
// //
@ -1619,7 +1567,8 @@ void GrpcLb::AddPendingPing(grpc_closure* on_initiate, grpc_closure* on_ack) {
// cleanups this callback would otherwise be responsible for. // cleanups this callback would otherwise be responsible for.
// If \a force_async is true, then we will manually schedule the // If \a force_async is true, then we will manually schedule the
// completion callback even if the pick is available immediately. // completion callback even if the pick is available immediately.
bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) { bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error) {
// Check for drops if we are not using fallback backend addresses. // Check for drops if we are not using fallback backend addresses.
if (serverlist_ != nullptr) { if (serverlist_ != nullptr) {
// Look at the index into the serverlist to see if we should drop this call. // Look at the index into the serverlist to see if we should drop this call.
@ -1653,11 +1602,12 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp) {
GPR_ASSERT(pp->pick->user_data == nullptr); GPR_ASSERT(pp->pick->user_data == nullptr);
pp->pick->user_data = (void**)&pp->lb_token; pp->pick->user_data = (void**)&pp->lb_token;
// Pick via the RR policy. // Pick via the RR policy.
bool pick_done = rr_policy_->PickLocked(pp->pick); bool pick_done = rr_policy_->PickLocked(pp->pick, error);
if (pick_done) { if (pick_done) {
PendingPickSetMetadataAndContext(pp); PendingPickSetMetadataAndContext(pp);
if (force_async) { if (force_async) {
GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pp->original_on_complete, *error);
*error = GRPC_ERROR_NONE;
pick_done = false; pick_done = false;
} }
Delete(pp); Delete(pp);
@ -1709,18 +1659,8 @@ void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
"[grpclb %p] Pending pick about to (async) PICK from RR %p", this, "[grpclb %p] Pending pick about to (async) PICK from RR %p", this,
rr_policy_.get()); rr_policy_.get());
} }
PickFromRoundRobinPolicyLocked(true /* force_async */, pp); grpc_error* error = GRPC_ERROR_NONE;
} PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error);
// Send pending pings to RR policy.
PendingPing* pping;
while ((pping = pending_pings_)) {
pending_pings_ = pping->next;
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
this, rr_policy_.get());
}
rr_policy_->PingOneLocked(pping->on_initiate, pping->on_ack);
Delete(pping);
} }
} }

@ -46,7 +46,7 @@ class PickFirst : public LoadBalancingPolicy {
explicit PickFirst(const Args& args); explicit PickFirst(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override; void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick) override; bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
@ -56,7 +56,6 @@ class PickFirst : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked( grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override; grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels, void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override; ChildRefsList* ignored) override;
@ -173,9 +172,10 @@ void PickFirst::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick; PickState* pick;
while ((pick = pending_picks_) != nullptr) { while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next; pending_picks_ = pick->next;
if (new_policy->PickLocked(pick)) { grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure. // Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, error);
} }
} }
} }
@ -259,13 +259,18 @@ void PickFirst::ExitIdleLocked() {
} }
} }
bool PickFirst::PickLocked(PickState* pick) { bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
// If we have a selected subchannel already, return synchronously. // If we have a selected subchannel already, return synchronously.
if (selected_ != nullptr) { if (selected_ != nullptr) {
pick->connected_subchannel = selected_->connected_subchannel()->Ref(); pick->connected_subchannel = selected_->connected_subchannel()->Ref();
return true; return true;
} }
// No subchannel selected yet, so handle asynchronously. // No subchannel selected yet, so handle asynchronously.
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
return true;
}
if (!started_picking_) { if (!started_picking_) {
StartPickingLocked(); StartPickingLocked();
} }
@ -293,17 +298,6 @@ void PickFirst::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify); notify);
} }
void PickFirst::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) {
if (selected_ != nullptr) {
selected_->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
GRPC_CLOSURE_SCHED(on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
}
}
void PickFirst::FillChildRefsForChannelz( void PickFirst::FillChildRefsForChannelz(
ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) { ChildRefsList* child_subchannels_to_fill, ChildRefsList* ignored) {
mu_guard guard(&child_refs_mu_); mu_guard guard(&child_refs_mu_);

@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy {
explicit RoundRobin(const Args& args); explicit RoundRobin(const Args& args);
void UpdateLocked(const grpc_channel_args& args) override; void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick) override; bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq, uint32_t initial_metadata_flags_eq,
@ -67,7 +67,6 @@ class RoundRobin : public LoadBalancingPolicy {
grpc_connectivity_state CheckConnectivityLocked( grpc_connectivity_state CheckConnectivityLocked(
grpc_error** connectivity_error) override; grpc_error** connectivity_error) override;
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override;
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override;
void ExitIdleLocked() override; void ExitIdleLocked() override;
void FillChildRefsForChannelz(ChildRefsList* child_subchannels, void FillChildRefsForChannelz(ChildRefsList* child_subchannels,
ChildRefsList* ignored) override; ChildRefsList* ignored) override;
@ -253,9 +252,10 @@ void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) {
PickState* pick; PickState* pick;
while ((pick = pending_picks_) != nullptr) { while ((pick = pending_picks_) != nullptr) {
pending_picks_ = pick->next; pending_picks_ = pick->next;
if (new_policy->PickLocked(pick)) { grpc_error* error = GRPC_ERROR_NONE;
if (new_policy->PickLocked(pick, &error)) {
// Synchronous return, schedule closure. // Synchronous return, schedule closure.
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(pick->on_complete, error);
} }
} }
} }
@ -368,7 +368,7 @@ void RoundRobin::DrainPendingPicksLocked() {
} }
} }
bool RoundRobin::PickLocked(PickState* pick) { bool RoundRobin::PickLocked(PickState* pick, grpc_error** error) {
if (grpc_lb_round_robin_trace.enabled()) { if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_);
} }
@ -376,6 +376,11 @@ bool RoundRobin::PickLocked(PickState* pick) {
if (subchannel_list_ != nullptr) { if (subchannel_list_ != nullptr) {
if (DoPickLocked(pick)) return true; if (DoPickLocked(pick)) return true;
} }
if (pick->on_complete == nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"No pick result available but synchronous result required.");
return true;
}
/* no pick currently available. Save for later in list of pending picks */ /* no pick currently available. Save for later in list of pending picks */
pick->next = pending_picks_; pick->next = pending_picks_;
pending_picks_ = pick; pending_picks_ = pick;
@ -647,22 +652,6 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify); notify);
} }
void RoundRobin::PingOneLocked(grpc_closure* on_initiate,
grpc_closure* on_ack) {
const size_t next_ready_index =
subchannel_list_->GetNextReadySubchannelIndexLocked();
if (next_ready_index < subchannel_list_->num_subchannels()) {
RoundRobinSubchannelData* selected =
subchannel_list_->subchannel(next_ready_index);
selected->connected_subchannel()->Ping(on_initiate, on_ack);
} else {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Round Robin not connected"));
}
}
void RoundRobin::UpdateLocked(const grpc_channel_args& args) { void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this); AutoChildRefsUpdater guard(this);

Loading…
Cancel
Save