|
|
|
@ -40,34 +40,94 @@ |
|
|
|
|
#include "src/core/lib/transport/connectivity_state.h" |
|
|
|
|
#include "src/core/lib/transport/static_metadata.h" |
|
|
|
|
|
|
|
|
|
grpc_core::TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); |
|
|
|
|
|
|
|
|
|
typedef struct round_robin_lb_policy { |
|
|
|
|
/** base policy: must be first */ |
|
|
|
|
grpc_lb_policy base; |
|
|
|
|
|
|
|
|
|
grpc_lb_subchannel_list* subchannel_list; |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
|
|
|
|
|
TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// round_robin LB policy
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
class RoundRobin : public LoadBalancingPolicy { |
|
|
|
|
public: |
|
|
|
|
explicit RoundRobin(const Args& args); |
|
|
|
|
|
|
|
|
|
void UpdateLocked(const grpc_channel_args& args) override; |
|
|
|
|
bool PickLocked(PickState* pick) override; |
|
|
|
|
void CancelPickLocked(PickState* pick, grpc_error* error) override; |
|
|
|
|
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, |
|
|
|
|
uint32_t initial_metadata_flags_eq, |
|
|
|
|
grpc_error* error) override; |
|
|
|
|
void NotifyOnStateChangeLocked(grpc_connectivity_state* state, |
|
|
|
|
grpc_closure* closure) override; |
|
|
|
|
grpc_connectivity_state CheckConnectivityLocked( |
|
|
|
|
grpc_error** connectivity_error) override; |
|
|
|
|
void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; |
|
|
|
|
void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; |
|
|
|
|
void ExitIdleLocked() override; |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
~RoundRobin(); |
|
|
|
|
|
|
|
|
|
void ShutdownLocked() override; |
|
|
|
|
|
|
|
|
|
void StartPickingLocked(); |
|
|
|
|
size_t GetNextReadySubchannelIndexLocked(); |
|
|
|
|
void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); |
|
|
|
|
void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, |
|
|
|
|
grpc_error* error); |
|
|
|
|
|
|
|
|
|
static void OnConnectivityChangedLocked(void* arg, grpc_error* error); |
|
|
|
|
|
|
|
|
|
void SubchannelListRefForConnectivityWatch( |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list, const char* reason); |
|
|
|
|
void SubchannelListUnrefForConnectivityWatch( |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list, const char* reason); |
|
|
|
|
|
|
|
|
|
/** list of subchannels */ |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list_ = nullptr; |
|
|
|
|
/** have we started picking? */ |
|
|
|
|
bool started_picking; |
|
|
|
|
bool started_picking_ = false; |
|
|
|
|
/** are we shutting down? */ |
|
|
|
|
bool shutdown; |
|
|
|
|
bool shutdown_ = false; |
|
|
|
|
/** List of picks that are waiting on connectivity */ |
|
|
|
|
grpc_lb_policy_pick_state* pending_picks; |
|
|
|
|
|
|
|
|
|
PickState* pending_picks_ = nullptr; |
|
|
|
|
/** our connectivity state tracker */ |
|
|
|
|
grpc_connectivity_state_tracker state_tracker; |
|
|
|
|
|
|
|
|
|
grpc_connectivity_state_tracker state_tracker_; |
|
|
|
|
/** Index into subchannels for last pick. */ |
|
|
|
|
size_t last_ready_subchannel_index; |
|
|
|
|
|
|
|
|
|
size_t last_ready_subchannel_index_ = 0; |
|
|
|
|
/** Latest version of the subchannel list.
|
|
|
|
|
* Subchannel connectivity callbacks will only promote updated subchannel |
|
|
|
|
* lists if they equal \a latest_pending_subchannel_list. In other words, |
|
|
|
|
* racing callbacks that reference outdated subchannel lists won't perform any |
|
|
|
|
* update. */ |
|
|
|
|
grpc_lb_subchannel_list* latest_pending_subchannel_list; |
|
|
|
|
} round_robin_lb_policy; |
|
|
|
|
grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { |
|
|
|
|
GPR_ASSERT(args.client_channel_factory != nullptr); |
|
|
|
|
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, |
|
|
|
|
"round_robin"); |
|
|
|
|
UpdateLocked(*args.args); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, |
|
|
|
|
subchannel_list_->num_subchannels); |
|
|
|
|
} |
|
|
|
|
grpc_subchannel_index_ref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
RoundRobin::~RoundRobin() { |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(subchannel_list_ == nullptr); |
|
|
|
|
GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); |
|
|
|
|
GPR_ASSERT(pending_picks_ == nullptr); |
|
|
|
|
grpc_connectivity_state_destroy(&state_tracker_); |
|
|
|
|
grpc_subchannel_index_unref(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Returns the index into p->subchannel_list->subchannels of the next
|
|
|
|
|
* subchannel in READY state, or p->subchannel_list->num_subchannels if no |
|
|
|
@ -75,195 +135,190 @@ typedef struct round_robin_lb_policy { |
|
|
|
|
* |
|
|
|
|
* Note that this function does *not* update p->last_ready_subchannel_index. |
|
|
|
|
* The caller must do that if it returns a pick. */ |
|
|
|
|
static size_t get_next_ready_subchannel_index_locked( |
|
|
|
|
const round_robin_lb_policy* p) { |
|
|
|
|
GPR_ASSERT(p->subchannel_list != nullptr); |
|
|
|
|
size_t RoundRobin::GetNextReadySubchannelIndexLocked() { |
|
|
|
|
GPR_ASSERT(subchannel_list_ != nullptr); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"[RR %p] getting next ready subchannel (out of %lu), " |
|
|
|
|
"last_ready_subchannel_index=%lu", |
|
|
|
|
(void*)p, |
|
|
|
|
static_cast<unsigned long>(p->subchannel_list->num_subchannels), |
|
|
|
|
static_cast<unsigned long>(p->last_ready_subchannel_index)); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) { |
|
|
|
|
const size_t index = (i + p->last_ready_subchannel_index + 1) % |
|
|
|
|
p->subchannel_list->num_subchannels; |
|
|
|
|
"[RR %p] getting next ready subchannel (out of %" PRIuPTR |
|
|
|
|
"), " |
|
|
|
|
"last_ready_subchannel_index=%" PRIuPTR, |
|
|
|
|
this, subchannel_list_->num_subchannels, |
|
|
|
|
last_ready_subchannel_index_); |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { |
|
|
|
|
const size_t index = (i + last_ready_subchannel_index_ + 1) % |
|
|
|
|
subchannel_list_->num_subchannels; |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " |
|
|
|
|
"state=%s", |
|
|
|
|
(void*)p, (void*)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
(void*)p->subchannel_list, static_cast<unsigned long>(index), |
|
|
|
|
"[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR |
|
|
|
|
": state=%s", |
|
|
|
|
this, subchannel_list_->subchannels[index].subchannel, |
|
|
|
|
subchannel_list_, index, |
|
|
|
|
grpc_connectivity_state_name( |
|
|
|
|
p->subchannel_list->subchannels[index].curr_connectivity_state)); |
|
|
|
|
subchannel_list_->subchannels[index].curr_connectivity_state)); |
|
|
|
|
} |
|
|
|
|
if (p->subchannel_list->subchannels[index].curr_connectivity_state == |
|
|
|
|
if (subchannel_list_->subchannels[index].curr_connectivity_state == |
|
|
|
|
GRPC_CHANNEL_READY) { |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] found next ready subchannel (%p) at index %lu of " |
|
|
|
|
"subchannel_list %p", |
|
|
|
|
(void*)p, |
|
|
|
|
(void*)p->subchannel_list->subchannels[index].subchannel, |
|
|
|
|
static_cast<unsigned long>(index), (void*)p->subchannel_list); |
|
|
|
|
"[RR %p] found next ready subchannel (%p) at index %" PRIuPTR |
|
|
|
|
" of subchannel_list %p", |
|
|
|
|
this, subchannel_list_->subchannels[index].subchannel, index, |
|
|
|
|
subchannel_list_); |
|
|
|
|
} |
|
|
|
|
return index; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", (void*)p); |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); |
|
|
|
|
} |
|
|
|
|
return p->subchannel_list->num_subchannels; |
|
|
|
|
return subchannel_list_->num_subchannels; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sets p->last_ready_subchannel_index to last_ready_index.
|
|
|
|
|
static void update_last_ready_subchannel_index_locked(round_robin_lb_policy* p, |
|
|
|
|
size_t last_ready_index) { |
|
|
|
|
GPR_ASSERT(last_ready_index < p->subchannel_list->num_subchannels); |
|
|
|
|
p->last_ready_subchannel_index = last_ready_index; |
|
|
|
|
// Sets last_ready_subchannel_index_ to last_ready_index.
|
|
|
|
|
void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { |
|
|
|
|
GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); |
|
|
|
|
last_ready_subchannel_index_ = last_ready_index; |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] setting last_ready_subchannel_index=%lu (SC %p, CSC %p)", |
|
|
|
|
(void*)p, static_cast<unsigned long>(last_ready_index), |
|
|
|
|
(void*)p->subchannel_list->subchannels[last_ready_index].subchannel, |
|
|
|
|
(void*)p->subchannel_list->subchannels[last_ready_index] |
|
|
|
|
"[RR %p] setting last_ready_subchannel_index=%" PRIuPTR |
|
|
|
|
" (SC %p, CSC %p)", |
|
|
|
|
this, last_ready_index, |
|
|
|
|
subchannel_list_->subchannels[last_ready_index].subchannel, |
|
|
|
|
subchannel_list_->subchannels[last_ready_index] |
|
|
|
|
.connected_subchannel.get()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_destroy(grpc_lb_policy* pol) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", |
|
|
|
|
(void*)pol, (void*)pol); |
|
|
|
|
void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { |
|
|
|
|
PickState* pick; |
|
|
|
|
while ((pick = pending_picks_) != nullptr) { |
|
|
|
|
pending_picks_ = pick->next; |
|
|
|
|
if (new_policy->PickLocked(pick)) { |
|
|
|
|
// Synchronous return, schedule closure.
|
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(p->subchannel_list == nullptr); |
|
|
|
|
GPR_ASSERT(p->latest_pending_subchannel_list == nullptr); |
|
|
|
|
grpc_connectivity_state_destroy(&p->state_tracker); |
|
|
|
|
grpc_subchannel_index_unref(); |
|
|
|
|
gpr_free(p); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_shutdown_locked(grpc_lb_policy* pol, |
|
|
|
|
grpc_lb_policy* new_policy) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
void RoundRobin::ShutdownLocked() { |
|
|
|
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p); |
|
|
|
|
} |
|
|
|
|
p->shutdown = true; |
|
|
|
|
grpc_lb_policy_pick_state* pick; |
|
|
|
|
while ((pick = p->pending_picks) != nullptr) { |
|
|
|
|
p->pending_picks = pick->next; |
|
|
|
|
if (new_policy != nullptr) { |
|
|
|
|
// Hand off to new LB policy.
|
|
|
|
|
if (grpc_lb_policy_pick_locked(new_policy, pick)) { |
|
|
|
|
// Synchronous return; schedule callback.
|
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
pick->connected_subchannel.reset(); |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); |
|
|
|
|
} |
|
|
|
|
shutdown_ = true; |
|
|
|
|
PickState* pick; |
|
|
|
|
while ((pick = pending_picks_) != nullptr) { |
|
|
|
|
pending_picks_ = pick->next; |
|
|
|
|
pick->connected_subchannel.reset(); |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_SHUTDOWN, |
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, |
|
|
|
|
GRPC_ERROR_REF(error), "rr_shutdown"); |
|
|
|
|
if (p->subchannel_list != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, |
|
|
|
|
if (subchannel_list_ != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, |
|
|
|
|
"sl_shutdown_rr_shutdown"); |
|
|
|
|
p->subchannel_list = nullptr; |
|
|
|
|
subchannel_list_ = nullptr; |
|
|
|
|
} |
|
|
|
|
if (p->latest_pending_subchannel_list != nullptr) { |
|
|
|
|
if (latest_pending_subchannel_list_ != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref( |
|
|
|
|
p->latest_pending_subchannel_list, "sl_shutdown_pending_rr_shutdown"); |
|
|
|
|
p->latest_pending_subchannel_list = nullptr; |
|
|
|
|
latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); |
|
|
|
|
latest_pending_subchannel_list_ = nullptr; |
|
|
|
|
} |
|
|
|
|
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, |
|
|
|
|
GRPC_ERROR_CANCELLED); |
|
|
|
|
TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_cancel_pick_locked(grpc_lb_policy* pol, |
|
|
|
|
grpc_lb_policy_pick_state* pick, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
grpc_lb_policy_pick_state* pp = p->pending_picks; |
|
|
|
|
p->pending_picks = nullptr; |
|
|
|
|
void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) { |
|
|
|
|
PickState* pp = pending_picks_; |
|
|
|
|
pending_picks_ = nullptr; |
|
|
|
|
while (pp != nullptr) { |
|
|
|
|
grpc_lb_policy_pick_state* next = pp->next; |
|
|
|
|
PickState* next = pp->next; |
|
|
|
|
if (pp == pick) { |
|
|
|
|
pick->connected_subchannel.reset(); |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"Pick cancelled", &error, 1)); |
|
|
|
|
"Pick Cancelled", &error, 1)); |
|
|
|
|
} else { |
|
|
|
|
pp->next = p->pending_picks; |
|
|
|
|
p->pending_picks = pp; |
|
|
|
|
pp->next = pending_picks_; |
|
|
|
|
pending_picks_ = pp; |
|
|
|
|
} |
|
|
|
|
pp = next; |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_cancel_picks_locked(grpc_lb_policy* pol, |
|
|
|
|
uint32_t initial_metadata_flags_mask, |
|
|
|
|
uint32_t initial_metadata_flags_eq, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
grpc_lb_policy_pick_state* pick = p->pending_picks; |
|
|
|
|
p->pending_picks = nullptr; |
|
|
|
|
void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, |
|
|
|
|
uint32_t initial_metadata_flags_eq, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
PickState* pick = pending_picks_; |
|
|
|
|
pending_picks_ = nullptr; |
|
|
|
|
while (pick != nullptr) { |
|
|
|
|
grpc_lb_policy_pick_state* next = pick->next; |
|
|
|
|
PickState* next = pick->next; |
|
|
|
|
if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == |
|
|
|
|
initial_metadata_flags_eq) { |
|
|
|
|
pick->connected_subchannel.reset(); |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( |
|
|
|
|
"Pick cancelled", &error, 1)); |
|
|
|
|
"Pick Cancelled", &error, 1)); |
|
|
|
|
} else { |
|
|
|
|
pick->next = p->pending_picks; |
|
|
|
|
p->pending_picks = pick; |
|
|
|
|
pick->next = pending_picks_; |
|
|
|
|
pending_picks_ = pick; |
|
|
|
|
} |
|
|
|
|
pick = next; |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void start_picking_locked(round_robin_lb_policy* p) { |
|
|
|
|
p->started_picking = true; |
|
|
|
|
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { |
|
|
|
|
if (p->subchannel_list->subchannels[i].subchannel != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list, |
|
|
|
|
"connectivity_watch"); |
|
|
|
|
void RoundRobin::SubchannelListRefForConnectivityWatch( |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list, const char* reason) { |
|
|
|
|
// TODO(roth): We currently track this ref manually. Once the new
|
|
|
|
|
// ClosureRef API is ready and the subchannel_list code has been
|
|
|
|
|
// converted to a C++ API, find a way to hold the RefCountedPtr<>
|
|
|
|
|
// somewhere (maybe in the subchannel_data object) instead of doing
|
|
|
|
|
// this manually.
|
|
|
|
|
auto self = Ref(DEBUG_LOCATION, reason); |
|
|
|
|
self.release(); |
|
|
|
|
grpc_lb_subchannel_list_ref(subchannel_list, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RoundRobin::SubchannelListUnrefForConnectivityWatch( |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list, const char* reason) { |
|
|
|
|
Unref(DEBUG_LOCATION, reason); |
|
|
|
|
grpc_lb_subchannel_list_unref(subchannel_list, reason); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RoundRobin::StartPickingLocked() { |
|
|
|
|
started_picking_ = true; |
|
|
|
|
for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { |
|
|
|
|
if (subchannel_list_->subchannels[i].subchannel != nullptr) { |
|
|
|
|
SubchannelListRefForConnectivityWatch(subchannel_list_, |
|
|
|
|
"connectivity_watch"); |
|
|
|
|
grpc_lb_subchannel_data_start_connectivity_watch( |
|
|
|
|
&p->subchannel_list->subchannels[i]); |
|
|
|
|
&subchannel_list_->subchannels[i]); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_exit_idle_locked(grpc_lb_policy* pol) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
if (!p->started_picking) { |
|
|
|
|
start_picking_locked(p); |
|
|
|
|
void RoundRobin::ExitIdleLocked() { |
|
|
|
|
if (!started_picking_) { |
|
|
|
|
StartPickingLocked(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static int rr_pick_locked(grpc_lb_policy* pol, |
|
|
|
|
grpc_lb_policy_pick_state* pick) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
bool RoundRobin::PickLocked(PickState* pick) { |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "[RR %p] Trying to pick (shutdown: %d)", pol, |
|
|
|
|
p->shutdown); |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, |
|
|
|
|
shutdown_); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(!p->shutdown); |
|
|
|
|
if (p->subchannel_list != nullptr) { |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
if (next_ready_index < p->subchannel_list->num_subchannels) { |
|
|
|
|
GPR_ASSERT(!shutdown_); |
|
|
|
|
if (subchannel_list_ != nullptr) { |
|
|
|
|
const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); |
|
|
|
|
if (next_ready_index < subchannel_list_->num_subchannels) { |
|
|
|
|
/* readily available, report right away */ |
|
|
|
|
grpc_lb_subchannel_data* sd = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
&subchannel_list_->subchannels[next_ready_index]; |
|
|
|
|
pick->connected_subchannel = sd->connected_subchannel; |
|
|
|
|
if (pick->user_data != nullptr) { |
|
|
|
|
*pick->user_data = sd->user_data; |
|
|
|
@ -273,24 +328,24 @@ static int rr_pick_locked(grpc_lb_policy* pol, |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " |
|
|
|
|
"index %" PRIuPTR ")", |
|
|
|
|
p, sd->subchannel, pick->connected_subchannel.get(), |
|
|
|
|
this, sd->subchannel, pick->connected_subchannel.get(), |
|
|
|
|
sd->subchannel_list, next_ready_index); |
|
|
|
|
} |
|
|
|
|
/* only advance the last picked pointer if the selection was used */ |
|
|
|
|
update_last_ready_subchannel_index_locked(p, next_ready_index); |
|
|
|
|
return 1; |
|
|
|
|
UpdateLastReadySubchannelIndexLocked(next_ready_index); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
/* no pick currently available. Save for later in list of pending picks */ |
|
|
|
|
if (!p->started_picking) { |
|
|
|
|
start_picking_locked(p); |
|
|
|
|
if (!started_picking_) { |
|
|
|
|
StartPickingLocked(); |
|
|
|
|
} |
|
|
|
|
pick->next = p->pending_picks; |
|
|
|
|
p->pending_picks = pick; |
|
|
|
|
return 0; |
|
|
|
|
pick->next = pending_picks_; |
|
|
|
|
pending_picks_ = pick; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { |
|
|
|
|
void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; |
|
|
|
|
GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); |
|
|
|
|
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); |
|
|
|
@ -318,8 +373,8 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) { |
|
|
|
|
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the |
|
|
|
|
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used |
|
|
|
|
* only if the policy transitions to state TRANSIENT_FAILURE. */ |
|
|
|
|
static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, |
|
|
|
|
grpc_error* error) { |
|
|
|
|
/* In priority order. The first rule to match terminates the search (ie, if we
|
|
|
|
|
* are on rule n, all previous rules were unfulfilled). |
|
|
|
|
* |
|
|
|
@ -335,64 +390,61 @@ static void update_lb_connectivity_status_locked(grpc_lb_subchannel_data* sd, |
|
|
|
|
* subchannel_list->num_subchannels. |
|
|
|
|
*/ |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; |
|
|
|
|
round_robin_lb_policy* p = |
|
|
|
|
reinterpret_cast<round_robin_lb_policy*>(subchannel_list->policy); |
|
|
|
|
GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); |
|
|
|
|
if (subchannel_list->num_ready > 0) { |
|
|
|
|
/* 1) READY */ |
|
|
|
|
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, |
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, |
|
|
|
|
GRPC_ERROR_NONE, "rr_ready"); |
|
|
|
|
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { |
|
|
|
|
/* 2) CONNECTING */ |
|
|
|
|
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_CONNECTING, |
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, |
|
|
|
|
GRPC_ERROR_NONE, "rr_connecting"); |
|
|
|
|
} else if (subchannel_list->num_transient_failures == |
|
|
|
|
subchannel_list->num_subchannels) { |
|
|
|
|
/* 3) TRANSIENT_FAILURE */ |
|
|
|
|
grpc_connectivity_state_set(&p->state_tracker, |
|
|
|
|
GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
GRPC_ERROR_REF(error), "rr_transient_failure"); |
|
|
|
|
grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
GRPC_ERROR_REF(error), |
|
|
|
|
"rr_exhausted_subchannels"); |
|
|
|
|
} |
|
|
|
|
GRPC_ERROR_UNREF(error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { |
|
|
|
|
grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); |
|
|
|
|
round_robin_lb_policy* p = |
|
|
|
|
reinterpret_cast<round_robin_lb_policy*>(sd->subchannel_list->policy); |
|
|
|
|
void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { |
|
|
|
|
grpc_lb_subchannel_data* sd = reinterpret_cast<grpc_lb_subchannel_data*>(arg); |
|
|
|
|
RoundRobin* p = reinterpret_cast<RoundRobin*>(sd->subchannel_list->policy); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_DEBUG, |
|
|
|
|
"[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " |
|
|
|
|
"prev_state=%s new_state=%s p->shutdown=%d " |
|
|
|
|
"sd->subchannel_list->shutting_down=%d error=%s", |
|
|
|
|
(void*)p, (void*)sd->subchannel, (void*)sd->subchannel_list, |
|
|
|
|
p, sd->subchannel, sd->subchannel_list, |
|
|
|
|
grpc_connectivity_state_name(sd->prev_connectivity_state), |
|
|
|
|
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), |
|
|
|
|
p->shutdown, sd->subchannel_list->shutting_down, |
|
|
|
|
p->shutdown_, sd->subchannel_list->shutting_down, |
|
|
|
|
grpc_error_string(error)); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(sd->subchannel != nullptr); |
|
|
|
|
// If the policy is shutting down, unref and return.
|
|
|
|
|
if (p->shutdown) { |
|
|
|
|
if (p->shutdown_) { |
|
|
|
|
grpc_lb_subchannel_data_stop_connectivity_watch(sd); |
|
|
|
|
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); |
|
|
|
|
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, |
|
|
|
|
"rr_shutdown"); |
|
|
|
|
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, |
|
|
|
|
"rr_shutdown"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If the subchannel list is shutting down, stop watching.
|
|
|
|
|
if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { |
|
|
|
|
grpc_lb_subchannel_data_stop_connectivity_watch(sd); |
|
|
|
|
grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); |
|
|
|
|
grpc_lb_subchannel_list_unref_for_connectivity_watch(sd->subchannel_list, |
|
|
|
|
"rr_sl_shutdown"); |
|
|
|
|
p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, |
|
|
|
|
"rr_sl_shutdown"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
// If we're still here, the notification must be for a subchannel in
|
|
|
|
|
// either the current or latest pending subchannel lists.
|
|
|
|
|
GPR_ASSERT(sd->subchannel_list == p->subchannel_list || |
|
|
|
|
sd->subchannel_list == p->latest_pending_subchannel_list); |
|
|
|
|
GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || |
|
|
|
|
sd->subchannel_list == p->latest_pending_subchannel_list_); |
|
|
|
|
GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); |
|
|
|
|
// Now that we're inside the combiner, copy the pending connectivity
|
|
|
|
|
// state (which was set by the connectivity state watcher) to
|
|
|
|
@ -409,8 +461,7 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { |
|
|
|
|
"Requesting re-resolution", |
|
|
|
|
p, sd->subchannel); |
|
|
|
|
} |
|
|
|
|
grpc_lb_policy_try_reresolve(&p->base, &grpc_lb_round_robin_trace, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case GRPC_CHANNEL_READY: { |
|
|
|
@ -418,49 +469,47 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { |
|
|
|
|
sd->connected_subchannel = |
|
|
|
|
grpc_subchannel_get_connected_subchannel(sd->subchannel); |
|
|
|
|
} |
|
|
|
|
if (sd->subchannel_list != p->subchannel_list) { |
|
|
|
|
// promote sd->subchannel_list to p->subchannel_list.
|
|
|
|
|
if (sd->subchannel_list != p->subchannel_list_) { |
|
|
|
|
// promote sd->subchannel_list to p->subchannel_list_.
|
|
|
|
|
// sd->subchannel_list must be equal to
|
|
|
|
|
// p->latest_pending_subchannel_list because we have already filtered
|
|
|
|
|
// p->latest_pending_subchannel_list_ because we have already filtered
|
|
|
|
|
// for sds belonging to outdated subchannel lists.
|
|
|
|
|
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list); |
|
|
|
|
GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); |
|
|
|
|
GPR_ASSERT(!sd->subchannel_list->shutting_down); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
const unsigned long num_subchannels = |
|
|
|
|
p->subchannel_list != nullptr |
|
|
|
|
? static_cast<unsigned long>( |
|
|
|
|
p->subchannel_list->num_subchannels) |
|
|
|
|
const size_t num_subchannels = |
|
|
|
|
p->subchannel_list_ != nullptr |
|
|
|
|
? p->subchannel_list_->num_subchannels |
|
|
|
|
: 0; |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] phasing out subchannel list %p (size %lu) in favor " |
|
|
|
|
"of %p (size %lu)", |
|
|
|
|
p, p->subchannel_list, num_subchannels, sd->subchannel_list, |
|
|
|
|
"[RR %p] phasing out subchannel list %p (size %" PRIuPTR |
|
|
|
|
") in favor of %p (size %" PRIuPTR ")", |
|
|
|
|
p, p->subchannel_list_, num_subchannels, sd->subchannel_list, |
|
|
|
|
num_subchannels); |
|
|
|
|
} |
|
|
|
|
if (p->subchannel_list != nullptr) { |
|
|
|
|
if (p->subchannel_list_ != nullptr) { |
|
|
|
|
// dispose of the current subchannel_list
|
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, |
|
|
|
|
"sl_phase_out_shutdown"); |
|
|
|
|
} |
|
|
|
|
p->subchannel_list = p->latest_pending_subchannel_list; |
|
|
|
|
p->latest_pending_subchannel_list = nullptr; |
|
|
|
|
p->subchannel_list_ = p->latest_pending_subchannel_list_; |
|
|
|
|
p->latest_pending_subchannel_list_ = nullptr; |
|
|
|
|
} |
|
|
|
|
/* at this point we know there's at least one suitable subchannel. Go
|
|
|
|
|
* ahead and pick one and notify the pending suitors in |
|
|
|
|
* p->pending_picks. This preemptively replicates rr_pick()'s actions. |
|
|
|
|
*/ |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels); |
|
|
|
|
* p->pending_picks. This preemptively replicates rr_pick()'s actions. */ |
|
|
|
|
const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); |
|
|
|
|
GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); |
|
|
|
|
grpc_lb_subchannel_data* selected = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
if (p->pending_picks != nullptr) { |
|
|
|
|
&p->subchannel_list_->subchannels[next_ready_index]; |
|
|
|
|
if (p->pending_picks_ != nullptr) { |
|
|
|
|
// if the selected subchannel is going to be used for the pending
|
|
|
|
|
// picks, update the last picked pointer
|
|
|
|
|
update_last_ready_subchannel_index_locked(p, next_ready_index); |
|
|
|
|
p->UpdateLastReadySubchannelIndexLocked(next_ready_index); |
|
|
|
|
} |
|
|
|
|
grpc_lb_policy_pick_state* pick; |
|
|
|
|
while ((pick = p->pending_picks)) { |
|
|
|
|
p->pending_picks = pick->next; |
|
|
|
|
PickState* pick; |
|
|
|
|
while ((pick = p->pending_picks_)) { |
|
|
|
|
p->pending_picks_ = pick->next; |
|
|
|
|
pick->connected_subchannel = selected->connected_subchannel; |
|
|
|
|
if (pick->user_data != nullptr) { |
|
|
|
|
*pick->user_data = selected->user_data; |
|
|
|
@ -468,10 +517,9 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] Fulfilling pending pick. Target <-- subchannel %p " |
|
|
|
|
"(subchannel_list %p, index %lu)", |
|
|
|
|
(void*)p, (void*)selected->subchannel, |
|
|
|
|
(void*)p->subchannel_list, |
|
|
|
|
static_cast<unsigned long>(next_ready_index)); |
|
|
|
|
"(subchannel_list %p, index %" PRIuPTR ")", |
|
|
|
|
p, selected->subchannel, p->subchannel_list_, |
|
|
|
|
next_ready_index); |
|
|
|
|
} |
|
|
|
|
GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
@ -482,40 +530,34 @@ static void rr_connectivity_changed_locked(void* arg, grpc_error* error) { |
|
|
|
|
case GRPC_CHANNEL_CONNECTING: |
|
|
|
|
case GRPC_CHANNEL_IDLE:; // fallthrough
|
|
|
|
|
} |
|
|
|
|
// Update state counters and new overall state.
|
|
|
|
|
update_state_counters_locked(sd); |
|
|
|
|
// Update state counters.
|
|
|
|
|
UpdateStateCountersLocked(sd); |
|
|
|
|
// Only update connectivity based on the selected subchannel list.
|
|
|
|
|
if (sd->subchannel_list == p->subchannel_list) { |
|
|
|
|
update_lb_connectivity_status_locked(sd, GRPC_ERROR_REF(error)); |
|
|
|
|
if (sd->subchannel_list == p->subchannel_list_) { |
|
|
|
|
p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); |
|
|
|
|
} |
|
|
|
|
// Renew notification.
|
|
|
|
|
grpc_lb_subchannel_data_start_connectivity_watch(sd); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static grpc_connectivity_state rr_check_connectivity_locked( |
|
|
|
|
grpc_lb_policy* pol, grpc_error** error) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
return grpc_connectivity_state_get(&p->state_tracker, error); |
|
|
|
|
grpc_connectivity_state RoundRobin::CheckConnectivityLocked( |
|
|
|
|
grpc_error** error) { |
|
|
|
|
return grpc_connectivity_state_get(&state_tracker_, error); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_notify_on_state_change_locked(grpc_lb_policy* pol, |
|
|
|
|
grpc_connectivity_state* current, |
|
|
|
|
grpc_closure* notify) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, |
|
|
|
|
void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, |
|
|
|
|
grpc_closure* notify) { |
|
|
|
|
grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, |
|
|
|
|
notify); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, |
|
|
|
|
void RoundRobin::PingOneLocked(grpc_closure* on_initiate, |
|
|
|
|
grpc_closure* on_ack) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(pol); |
|
|
|
|
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); |
|
|
|
|
if (next_ready_index < p->subchannel_list->num_subchannels) { |
|
|
|
|
const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); |
|
|
|
|
if (next_ready_index < subchannel_list_->num_subchannels) { |
|
|
|
|
grpc_lb_subchannel_data* selected = |
|
|
|
|
&p->subchannel_list->subchannels[next_ready_index]; |
|
|
|
|
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> target = |
|
|
|
|
selected->connected_subchannel; |
|
|
|
|
target->Ping(on_initiate, on_ack); |
|
|
|
|
&subchannel_list_->subchannels[next_ready_index]; |
|
|
|
|
selected->connected_subchannel->Ping(on_initiate, on_ack); |
|
|
|
|
} else { |
|
|
|
|
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
|
|
|
"Round Robin not connected")); |
|
|
|
@ -524,45 +566,41 @@ static void rr_ping_one_locked(grpc_lb_policy* pol, grpc_closure* on_initiate, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_update_locked(grpc_lb_policy* policy, |
|
|
|
|
const grpc_lb_policy_args* args) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy); |
|
|
|
|
const grpc_arg* arg = |
|
|
|
|
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
void RoundRobin::UpdateLocked(const grpc_channel_args& args) { |
|
|
|
|
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); |
|
|
|
|
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { |
|
|
|
|
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", p); |
|
|
|
|
gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); |
|
|
|
|
// If we don't have a current subchannel list, go into TRANSIENT_FAILURE.
|
|
|
|
|
// Otherwise, keep using the current subchannel list (ignore this update).
|
|
|
|
|
if (p->subchannel_list == nullptr) { |
|
|
|
|
if (subchannel_list_ == nullptr) { |
|
|
|
|
grpc_connectivity_state_set( |
|
|
|
|
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), |
|
|
|
|
"rr_update_missing"); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
grpc_lb_addresses* addresses = |
|
|
|
|
static_cast<grpc_lb_addresses*>(arg->value.pointer.p); |
|
|
|
|
grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", p, |
|
|
|
|
addresses->num_addresses); |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", |
|
|
|
|
this, addresses->num_addresses); |
|
|
|
|
} |
|
|
|
|
grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( |
|
|
|
|
&p->base, &grpc_lb_round_robin_trace, addresses, args, |
|
|
|
|
rr_connectivity_changed_locked); |
|
|
|
|
this, &grpc_lb_round_robin_trace, addresses, combiner(), |
|
|
|
|
client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); |
|
|
|
|
if (subchannel_list->num_subchannels == 0) { |
|
|
|
|
grpc_connectivity_state_set( |
|
|
|
|
&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, |
|
|
|
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), |
|
|
|
|
"rr_update_empty"); |
|
|
|
|
if (p->subchannel_list != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list, |
|
|
|
|
if (subchannel_list_ != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, |
|
|
|
|
"sl_shutdown_empty_update"); |
|
|
|
|
} |
|
|
|
|
p->subchannel_list = subchannel_list; // empty list
|
|
|
|
|
subchannel_list_ = subchannel_list; // empty list
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (p->started_picking) { |
|
|
|
|
if (started_picking_) { |
|
|
|
|
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { |
|
|
|
|
const grpc_connectivity_state subchannel_state = |
|
|
|
|
grpc_subchannel_check_connectivity( |
|
|
|
@ -587,96 +625,61 @@ static void rr_update_locked(grpc_lb_policy* policy, |
|
|
|
|
++subchannel_list->num_transient_failures; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (p->latest_pending_subchannel_list != nullptr) { |
|
|
|
|
if (latest_pending_subchannel_list_ != nullptr) { |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"[RR %p] Shutting down latest pending subchannel list %p, " |
|
|
|
|
"about to be replaced by newer latest %p", |
|
|
|
|
(void*)p, (void*)p->latest_pending_subchannel_list, |
|
|
|
|
(void*)subchannel_list); |
|
|
|
|
this, latest_pending_subchannel_list_, subchannel_list); |
|
|
|
|
} |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref( |
|
|
|
|
p->latest_pending_subchannel_list, "sl_outdated"); |
|
|
|
|
latest_pending_subchannel_list_, "sl_outdated"); |
|
|
|
|
} |
|
|
|
|
p->latest_pending_subchannel_list = subchannel_list; |
|
|
|
|
latest_pending_subchannel_list_ = subchannel_list; |
|
|
|
|
for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { |
|
|
|
|
/* Watch every new subchannel. A subchannel list becomes active the
|
|
|
|
|
* moment one of its subchannels is READY. At that moment, we swap |
|
|
|
|
* p->subchannel_list for sd->subchannel_list, provided the subchannel |
|
|
|
|
* list is still valid (ie, isn't shutting down) */ |
|
|
|
|
grpc_lb_subchannel_list_ref_for_connectivity_watch(subchannel_list, |
|
|
|
|
"connectivity_watch"); |
|
|
|
|
SubchannelListRefForConnectivityWatch(subchannel_list, |
|
|
|
|
"connectivity_watch"); |
|
|
|
|
grpc_lb_subchannel_data_start_connectivity_watch( |
|
|
|
|
&subchannel_list->subchannels[i]); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
// The policy isn't picking yet. Save the update for later, disposing of
|
|
|
|
|
// previous version if any.
|
|
|
|
|
if (p->subchannel_list != nullptr) { |
|
|
|
|
if (subchannel_list_ != nullptr) { |
|
|
|
|
grpc_lb_subchannel_list_shutdown_and_unref( |
|
|
|
|
p->subchannel_list, "rr_update_before_started_picking"); |
|
|
|
|
subchannel_list_, "rr_update_before_started_picking"); |
|
|
|
|
} |
|
|
|
|
p->subchannel_list = subchannel_list; |
|
|
|
|
subchannel_list_ = subchannel_list; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void rr_set_reresolve_closure_locked( |
|
|
|
|
grpc_lb_policy* policy, grpc_closure* request_reresolution) { |
|
|
|
|
round_robin_lb_policy* p = reinterpret_cast<round_robin_lb_policy*>(policy); |
|
|
|
|
GPR_ASSERT(!p->shutdown); |
|
|
|
|
GPR_ASSERT(policy->request_reresolution == nullptr); |
|
|
|
|
policy->request_reresolution = request_reresolution; |
|
|
|
|
} |
|
|
|
|
//
|
|
|
|
|
// factory
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { |
|
|
|
|
rr_destroy, |
|
|
|
|
rr_shutdown_locked, |
|
|
|
|
rr_pick_locked, |
|
|
|
|
rr_cancel_pick_locked, |
|
|
|
|
rr_cancel_picks_locked, |
|
|
|
|
rr_ping_one_locked, |
|
|
|
|
rr_exit_idle_locked, |
|
|
|
|
rr_check_connectivity_locked, |
|
|
|
|
rr_notify_on_state_change_locked, |
|
|
|
|
rr_update_locked, |
|
|
|
|
rr_set_reresolve_closure_locked}; |
|
|
|
|
|
|
|
|
|
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {} |
|
|
|
|
|
|
|
|
|
static void round_robin_factory_unref(grpc_lb_policy_factory* factory) {} |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy* round_robin_create(grpc_lb_policy_factory* factory, |
|
|
|
|
grpc_lb_policy_args* args) { |
|
|
|
|
GPR_ASSERT(args->client_channel_factory != nullptr); |
|
|
|
|
round_robin_lb_policy* p = |
|
|
|
|
static_cast<round_robin_lb_policy*>(gpr_zalloc(sizeof(*p))); |
|
|
|
|
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); |
|
|
|
|
grpc_subchannel_index_ref(); |
|
|
|
|
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, |
|
|
|
|
"round_robin"); |
|
|
|
|
rr_update_locked(&p->base, args); |
|
|
|
|
if (grpc_lb_round_robin_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void*)p, |
|
|
|
|
static_cast<unsigned long>(p->subchannel_list->num_subchannels)); |
|
|
|
|
class RoundRobinFactory : public LoadBalancingPolicyFactory { |
|
|
|
|
public: |
|
|
|
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
|
|
|
|
const LoadBalancingPolicy::Args& args) const override { |
|
|
|
|
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); |
|
|
|
|
} |
|
|
|
|
return &p->base; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { |
|
|
|
|
round_robin_factory_ref, round_robin_factory_unref, round_robin_create, |
|
|
|
|
"round_robin"}; |
|
|
|
|
const char* name() const override { return "round_robin"; } |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy_factory round_robin_lb_policy_factory = { |
|
|
|
|
&round_robin_factory_vtable}; |
|
|
|
|
|
|
|
|
|
static grpc_lb_policy_factory* round_robin_lb_factory_create() { |
|
|
|
|
return &round_robin_lb_policy_factory; |
|
|
|
|
} |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
/* Plugin registration */ |
|
|
|
|
} // namespace grpc_core
|
|
|
|
|
|
|
|
|
|
void grpc_lb_policy_round_robin_init() { |
|
|
|
|
grpc_register_lb_policy(round_robin_lb_factory_create()); |
|
|
|
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
|
|
|
|
RegisterLoadBalancingPolicyFactory( |
|
|
|
|
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( |
|
|
|
|
grpc_core::New<grpc_core::RoundRobinFactory>())); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_lb_policy_round_robin_shutdown() {} |
|
|
|
|