mirror of https://github.com/grpc/grpc.git
parent
5c4543d9f5
commit
4fb049b647
20 changed files with 1595 additions and 48 deletions
@ -0,0 +1,530 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/client_config/lb_policies/round_robin.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include "src/core/transport/connectivity_state.h" |
||||
|
||||
int grpc_lb_round_robin_trace = 0; |
||||
|
||||
/** List of entities waiting for a pick.
|
||||
* |
||||
* Once a pick is available, \a target is updated and \a on_complete called. */ |
||||
typedef struct pending_pick { |
||||
struct pending_pick *next; |
||||
grpc_pollset *pollset; |
||||
grpc_subchannel **target; |
||||
grpc_iomgr_closure *on_complete; |
||||
} pending_pick; |
||||
|
||||
/** List of subchannels in a connectivity READY state */ |
||||
typedef struct ready_list { |
||||
grpc_subchannel *subchannel; |
||||
struct ready_list *next; |
||||
struct ready_list *prev; |
||||
} ready_list; |
||||
|
||||
typedef struct { |
||||
size_t subchannel_idx; /**< Index over p->subchannels */ |
||||
void *p; /**< round_robin_lb_policy instance */ |
||||
} connectivity_changed_cb_arg; |
||||
|
||||
typedef struct { |
||||
/** base policy: must be first */ |
||||
grpc_lb_policy base; |
||||
|
||||
/** all our subchannels */ |
||||
grpc_subchannel **subchannels; |
||||
size_t num_subchannels; |
||||
|
||||
/** Callbacks, one per subchannel being watched, to be called when their
|
||||
* respective connectivity changes */ |
||||
grpc_iomgr_closure *connectivity_changed_cbs; |
||||
connectivity_changed_cb_arg *cb_args; |
||||
|
||||
/** mutex protecting remaining members */ |
||||
gpr_mu mu; |
||||
/** have we started picking? */ |
||||
int started_picking; |
||||
/** are we shutting down? */ |
||||
int shutdown; |
||||
/** Connectivity state of the subchannels being watched */ |
||||
grpc_connectivity_state *subchannel_connectivity; |
||||
/** List of picks that are waiting on connectivity */ |
||||
pending_pick *pending_picks; |
||||
|
||||
/** our connectivity state tracker */ |
||||
grpc_connectivity_state_tracker state_tracker; |
||||
|
||||
/** (Dummy) root of the doubly linked list containing READY subchannels */ |
||||
ready_list ready_list; |
||||
/** Last pick from the ready list. */ |
||||
ready_list *ready_list_last_pick; |
||||
|
||||
/** Subchannel index to ready_list node.
|
||||
* |
||||
* Kept in order to remove nodes from the ready list associated with a |
||||
* subchannel */ |
||||
ready_list **subchannel_index_to_readylist_node; |
||||
} round_robin_lb_policy; |
||||
|
||||
/** Returns the next subchannel from the connected list or NULL if the list is
|
||||
* empty. |
||||
* |
||||
* Note that this function does *not* advance p->ready_list_last_pick. Use \a |
||||
* advance_last_picked_locked() for that. */ |
||||
static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { |
||||
ready_list *selected; |
||||
selected = p->ready_list_last_pick->next; |
||||
|
||||
while (selected != NULL) { |
||||
if (selected == &p->ready_list) { |
||||
GPR_ASSERT(selected->subchannel == NULL); |
||||
/* skip dummy root */ |
||||
selected = selected->next; |
||||
} else { |
||||
GPR_ASSERT(selected->subchannel != NULL); |
||||
return selected; |
||||
} |
||||
} |
||||
return NULL; |
||||
} |
||||
|
||||
/** Advance the \a ready_list picking head. */ |
||||
static void advance_last_picked_locked(round_robin_lb_policy *p) { |
||||
if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ |
||||
p->ready_list_last_pick = p->ready_list_last_pick->next; |
||||
if (p->ready_list_last_pick == &p->ready_list) { |
||||
/* skip dummy root */ |
||||
p->ready_list_last_pick = p->ready_list_last_pick->next; |
||||
} |
||||
} else { /* should be an empty list */ |
||||
GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); |
||||
} |
||||
|
||||
if (grpc_lb_round_robin_trace) { |
||||
gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", |
||||
p->ready_list_last_pick, p->ready_list_last_pick->subchannel); |
||||
} |
||||
} |
||||
|
||||
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
|
||||
* csc to the list of ready subchannels. */ |
||||
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, |
||||
grpc_subchannel *csc) { |
||||
ready_list *new_elem = gpr_malloc(sizeof(ready_list)); |
||||
new_elem->subchannel = csc; |
||||
if (p->ready_list.prev == NULL) { |
||||
/* first element */ |
||||
new_elem->next = &p->ready_list; |
||||
new_elem->prev = &p->ready_list; |
||||
p->ready_list.next = new_elem; |
||||
p->ready_list.prev = new_elem; |
||||
} else { |
||||
new_elem->next = &p->ready_list; |
||||
new_elem->prev = p->ready_list.prev; |
||||
p->ready_list.prev->next = new_elem; |
||||
p->ready_list.prev = new_elem; |
||||
} |
||||
if (grpc_lb_round_robin_trace) { |
||||
gpr_log(GPR_DEBUG, |
||||
"[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc); |
||||
} |
||||
return new_elem; |
||||
} |
||||
|
||||
/** Removes \a node from the list of connected subchannels */ |
||||
static void remove_disconnected_sc_locked(round_robin_lb_policy *p, |
||||
ready_list *node) { |
||||
if (node == NULL) { |
||||
return; |
||||
} |
||||
if (node == p->ready_list_last_pick) { |
||||
/* If removing the lastly picked node, reset the last pick pointer to the
|
||||
* dummy root of the list */ |
||||
p->ready_list_last_pick = &p->ready_list; |
||||
} |
||||
|
||||
/* removing last item */ |
||||
if (node->next == &p->ready_list && node->prev == &p->ready_list) { |
||||
GPR_ASSERT(p->ready_list.next == node); |
||||
GPR_ASSERT(p->ready_list.prev == node); |
||||
p->ready_list.next = NULL; |
||||
p->ready_list.prev = NULL; |
||||
} else { |
||||
node->prev->next = node->next; |
||||
node->next->prev = node->prev; |
||||
} |
||||
|
||||
if (grpc_lb_round_robin_trace) { |
||||
gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, |
||||
node->subchannel); |
||||
} |
||||
|
||||
node->next = NULL; |
||||
node->prev = NULL; |
||||
node->subchannel = NULL; |
||||
|
||||
gpr_free(node); |
||||
} |
||||
|
||||
void rr_destroy(grpc_lb_policy *pol) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
size_t i; |
||||
ready_list *elem; |
||||
for (i = 0; i < p->num_subchannels; i++) { |
||||
GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "round_robin"); |
||||
} |
||||
gpr_free(p->connectivity_changed_cbs); |
||||
gpr_free(p->subchannel_connectivity); |
||||
|
||||
grpc_connectivity_state_destroy(&p->state_tracker); |
||||
gpr_free(p->subchannels); |
||||
gpr_mu_destroy(&p->mu); |
||||
|
||||
elem = p->ready_list.next; |
||||
while (elem != NULL && elem != &p->ready_list) { |
||||
ready_list *tmp; |
||||
tmp = elem->next; |
||||
elem->next = NULL; |
||||
elem->prev = NULL; |
||||
elem->subchannel = NULL; |
||||
gpr_free(elem); |
||||
elem = tmp; |
||||
} |
||||
gpr_free(p->subchannel_index_to_readylist_node); |
||||
gpr_free(p->cb_args); |
||||
gpr_free(p); |
||||
} |
||||
|
||||
void rr_shutdown(grpc_lb_policy *pol) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
pending_pick *pp; |
||||
gpr_mu_lock(&p->mu); |
||||
|
||||
p->shutdown = 1; |
||||
while ((pp = p->pending_picks)) { |
||||
p->pending_picks = pp->next; |
||||
*pp->target = NULL; |
||||
grpc_iomgr_add_delayed_callback(pp->on_complete, 0); |
||||
gpr_free(pp); |
||||
} |
||||
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, |
||||
"shutdown"); |
||||
gpr_mu_unlock(&p->mu); |
||||
} |
||||
|
||||
static void start_picking(round_robin_lb_policy *p) { |
||||
size_t i; |
||||
p->started_picking = 1; |
||||
|
||||
for (i = 0; i < p->num_subchannels; i++) { |
||||
p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; |
||||
grpc_subchannel_notify_on_state_change(p->subchannels[i], |
||||
&p->subchannel_connectivity[i], |
||||
&p->connectivity_changed_cbs[i]); |
||||
GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); |
||||
} |
||||
} |
||||
|
||||
void rr_exit_idle(grpc_lb_policy *pol) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
gpr_mu_lock(&p->mu); |
||||
if (!p->started_picking) { |
||||
start_picking(p); |
||||
} |
||||
gpr_mu_unlock(&p->mu); |
||||
} |
||||
|
||||
void rr_pick(grpc_lb_policy *pol, grpc_pollset *pollset, |
||||
grpc_metadata_batch *initial_metadata, grpc_subchannel **target, |
||||
grpc_iomgr_closure *on_complete) { |
||||
size_t i; |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
pending_pick *pp; |
||||
ready_list *selected; |
||||
gpr_mu_lock(&p->mu); |
||||
if ((selected = peek_next_connected_locked(p))) { |
||||
gpr_mu_unlock(&p->mu); |
||||
*target = selected->subchannel; |
||||
if (grpc_lb_round_robin_trace) { |
||||
gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)", |
||||
selected->subchannel, selected); |
||||
} |
||||
/* only advance the last picked pointer if the selection was used */ |
||||
advance_last_picked_locked(p); |
||||
on_complete->cb(on_complete->cb_arg, 1); |
||||
} else { |
||||
if (!p->started_picking) { |
||||
start_picking(p); |
||||
} |
||||
for (i = 0; i < p->num_subchannels; i++) { |
||||
grpc_subchannel_add_interested_party(p->subchannels[i], pollset); |
||||
} |
||||
pp = gpr_malloc(sizeof(*pp)); |
||||
pp->next = p->pending_picks; |
||||
pp->pollset = pollset; |
||||
pp->target = target; |
||||
pp->on_complete = on_complete; |
||||
p->pending_picks = pp; |
||||
gpr_mu_unlock(&p->mu); |
||||
} |
||||
} |
||||
|
||||
static void rr_connectivity_changed(void *arg, int iomgr_success) { |
||||
connectivity_changed_cb_arg *cb_arg = arg; |
||||
round_robin_lb_policy *p = cb_arg->p; |
||||
/* index over p->subchannels of this cb's subchannel */ |
||||
const size_t this_idx = cb_arg->subchannel_idx; |
||||
pending_pick *pp; |
||||
ready_list *selected; |
||||
|
||||
int unref = 0; |
||||
|
||||
/* connectivity state of this cb's subchannel */ |
||||
grpc_connectivity_state *this_connectivity; |
||||
|
||||
gpr_mu_lock(&p->mu); |
||||
|
||||
this_connectivity = |
||||
&p->subchannel_connectivity[this_idx]; |
||||
|
||||
if (p->shutdown) { |
||||
unref = 1; |
||||
} else { |
||||
switch (*this_connectivity) { |
||||
case GRPC_CHANNEL_READY: |
||||
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, |
||||
"connecting_ready"); |
||||
/* add the newly connected subchannel to the list of connected ones.
|
||||
* Note that it goes to the "end of the line". */ |
||||
p->subchannel_index_to_readylist_node[this_idx] = |
||||
add_connected_sc_locked(p, p->subchannels[this_idx]); |
||||
/* at this point we know there's at least one suitable subchannel. Go
|
||||
* ahead and pick one and notify the pending suitors in |
||||
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */ |
||||
selected = peek_next_connected_locked(p); |
||||
if (p->pending_picks != NULL) { |
||||
/* if the selected subchannel is going to be used for the pending
|
||||
* picks, update the last picked pointer */ |
||||
advance_last_picked_locked(p); |
||||
} |
||||
while ((pp = p->pending_picks)) { |
||||
p->pending_picks = pp->next; |
||||
*pp->target = selected->subchannel; |
||||
if (grpc_lb_round_robin_trace) { |
||||
gpr_log(GPR_DEBUG, |
||||
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", |
||||
selected->subchannel, selected); |
||||
} |
||||
grpc_subchannel_del_interested_party(selected->subchannel, pp->pollset); |
||||
grpc_iomgr_add_delayed_callback(pp->on_complete, 1); |
||||
gpr_free(pp); |
||||
} |
||||
grpc_subchannel_notify_on_state_change( |
||||
p->subchannels[this_idx], this_connectivity, |
||||
&p->connectivity_changed_cbs[this_idx]); |
||||
break; |
||||
case GRPC_CHANNEL_CONNECTING: |
||||
case GRPC_CHANNEL_IDLE: |
||||
grpc_connectivity_state_set(&p->state_tracker, *this_connectivity, |
||||
"connecting_changed"); |
||||
grpc_subchannel_notify_on_state_change( |
||||
p->subchannels[this_idx], this_connectivity, |
||||
&p->connectivity_changed_cbs[this_idx]); |
||||
break; |
||||
case GRPC_CHANNEL_TRANSIENT_FAILURE: |
||||
grpc_connectivity_state_set(&p->state_tracker, |
||||
GRPC_CHANNEL_TRANSIENT_FAILURE, |
||||
"connecting_transient_failure"); |
||||
|
||||
/* renew state notification */ |
||||
grpc_subchannel_notify_on_state_change( |
||||
p->subchannels[this_idx], this_connectivity, |
||||
&p->connectivity_changed_cbs[this_idx]); |
||||
|
||||
/* remove for ready list if still present */ |
||||
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { |
||||
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); |
||||
p->subchannel_index_to_readylist_node[this_idx] = NULL; |
||||
} |
||||
break; |
||||
case GRPC_CHANNEL_FATAL_FAILURE: |
||||
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { |
||||
remove_disconnected_sc_locked(p, p->subchannel_index_to_readylist_node[this_idx]); |
||||
p->subchannel_index_to_readylist_node[this_idx] = NULL; |
||||
} |
||||
|
||||
GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx], |
||||
p->subchannels[p->num_subchannels - 1]); |
||||
p->num_subchannels--; |
||||
GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], |
||||
"round_robin"); |
||||
|
||||
if (p->num_subchannels == 0) { |
||||
grpc_connectivity_state_set(&p->state_tracker, |
||||
GRPC_CHANNEL_FATAL_FAILURE, |
||||
"no_more_channels"); |
||||
while ((pp = p->pending_picks)) { |
||||
p->pending_picks = pp->next; |
||||
*pp->target = NULL; |
||||
grpc_iomgr_add_delayed_callback(pp->on_complete, 1); |
||||
gpr_free(pp); |
||||
} |
||||
unref = 1; |
||||
} else { |
||||
grpc_connectivity_state_set(&p->state_tracker, |
||||
GRPC_CHANNEL_TRANSIENT_FAILURE, |
||||
"subchannel_failed"); |
||||
} |
||||
} /* switch */ |
||||
} /* !unref */ |
||||
|
||||
gpr_mu_unlock(&p->mu); |
||||
|
||||
if (unref) { |
||||
GRPC_LB_POLICY_UNREF(&p->base, "round_robin_connectivity"); |
||||
} |
||||
} |
||||
|
||||
static void rr_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
size_t i; |
||||
size_t n; |
||||
grpc_subchannel **subchannels; |
||||
|
||||
gpr_mu_lock(&p->mu); |
||||
n = p->num_subchannels; |
||||
subchannels = gpr_malloc(n * sizeof(*subchannels)); |
||||
for (i = 0; i < n; i++) { |
||||
subchannels[i] = p->subchannels[i]; |
||||
GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast"); |
||||
} |
||||
gpr_mu_unlock(&p->mu); |
||||
|
||||
for (i = 0; i < n; i++) { |
||||
grpc_subchannel_process_transport_op(subchannels[i], op); |
||||
GRPC_SUBCHANNEL_UNREF(subchannels[i], "rr_broadcast"); |
||||
} |
||||
gpr_free(subchannels); |
||||
} |
||||
|
||||
static grpc_connectivity_state rr_check_connectivity(grpc_lb_policy *pol) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
grpc_connectivity_state st; |
||||
gpr_mu_lock(&p->mu); |
||||
st = grpc_connectivity_state_check(&p->state_tracker); |
||||
gpr_mu_unlock(&p->mu); |
||||
return st; |
||||
} |
||||
|
||||
static void rr_notify_on_state_change(grpc_lb_policy *pol, |
||||
grpc_connectivity_state *current, |
||||
grpc_iomgr_closure *notify) { |
||||
round_robin_lb_policy *p = (round_robin_lb_policy *)pol; |
||||
gpr_mu_lock(&p->mu); |
||||
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, |
||||
notify); |
||||
gpr_mu_unlock(&p->mu); |
||||
} |
||||
|
||||
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { |
||||
rr_destroy, |
||||
rr_shutdown, |
||||
rr_pick, |
||||
rr_exit_idle, |
||||
rr_broadcast, |
||||
rr_check_connectivity, |
||||
rr_notify_on_state_change}; |
||||
|
||||
|
||||
static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} |
||||
|
||||
static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} |
||||
|
||||
static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory, |
||||
grpc_subchannel **subchannels, |
||||
size_t num_subchannels) { |
||||
size_t i; |
||||
round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); |
||||
GPR_ASSERT(num_subchannels); |
||||
memset(p, 0, sizeof(*p)); |
||||
grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); |
||||
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); |
||||
p->num_subchannels = num_subchannels; |
||||
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, |
||||
"round_robin"); |
||||
memcpy(p->subchannels, subchannels, |
||||
sizeof(grpc_subchannel *) * num_subchannels); |
||||
|
||||
gpr_mu_init(&p->mu); |
||||
p->connectivity_changed_cbs = |
||||
gpr_malloc(sizeof(grpc_iomgr_closure) * num_subchannels); |
||||
p->subchannel_connectivity = |
||||
gpr_malloc(sizeof(grpc_connectivity_state) * num_subchannels); |
||||
|
||||
p->cb_args = |
||||
gpr_malloc(sizeof(connectivity_changed_cb_arg) * num_subchannels); |
||||
for(i = 0; i < num_subchannels; i++) { |
||||
p->cb_args[i].subchannel_idx = i; |
||||
p->cb_args[i].p = p; |
||||
grpc_iomgr_closure_init(&p->connectivity_changed_cbs[i], |
||||
rr_connectivity_changed, &p->cb_args[i]); |
||||
} |
||||
|
||||
/* The (dummy node) root of the ready list */ |
||||
p->ready_list.subchannel = NULL; |
||||
p->ready_list.prev = NULL; |
||||
p->ready_list.next = NULL; |
||||
p->ready_list_last_pick = &p->ready_list; |
||||
|
||||
p->subchannel_index_to_readylist_node = |
||||
gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); |
||||
memset(p->subchannel_index_to_readylist_node, 0, |
||||
sizeof(grpc_subchannel *) * num_subchannels); |
||||
return &p->base; |
||||
} |
||||
|
||||
static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { |
||||
round_robin_factory_ref, round_robin_factory_unref, create_round_robin, |
||||
"round_robin"}; |
||||
|
||||
static grpc_lb_policy_factory round_robin_lb_policy_factory = { |
||||
&round_robin_factory_vtable}; |
||||
|
||||
grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { |
||||
return &round_robin_lb_policy_factory; |
||||
} |
@ -0,0 +1,47 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H |
||||
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_ROUND_ROBIN_H |
||||
|
||||
#include "src/core/client_config/lb_policy.h" |
||||
|
||||
extern int grpc_lb_round_robin_trace; |
||||
|
||||
#include "src/core/client_config/lb_policy_factory.h" |
||||
|
||||
/** Returns a load balancing factory for the round robin policy */ |
||||
grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); |
||||
|
||||
|
||||
#endif |
@ -0,0 +1,651 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/host_port.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/channel/channel_stack.h" |
||||
#include "src/core/surface/channel.h" |
||||
#include "src/core/channel/client_channel.h" |
||||
#include "src/core/surface/server.h" |
||||
#include "src/core/support/string.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/end2end/cq_verifier.h" |
||||
|
||||
typedef struct servers_fixture { |
||||
size_t num_servers; |
||||
grpc_server **servers; |
||||
grpc_call **server_calls; |
||||
grpc_completion_queue *cq; |
||||
char **servers_hostports; |
||||
grpc_metadata_array *request_metadata_recv; |
||||
} servers_fixture; |
||||
|
||||
typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *, |
||||
const int *, const size_t); |
||||
|
||||
typedef struct test_spec { |
||||
size_t num_iters; |
||||
size_t num_servers; |
||||
|
||||
int **kill_at; |
||||
int **revive_at; |
||||
|
||||
verifier_fn verifier; |
||||
|
||||
} test_spec; |
||||
|
||||
static void test_spec_reset(test_spec *spec) { |
||||
size_t i, j; |
||||
|
||||
for (i = 0; i < spec->num_iters; i++) { |
||||
for (j = 0; j < spec->num_servers; j++) { |
||||
spec->kill_at[i][j] = 0; |
||||
spec->revive_at[i][j] = 0; |
||||
} |
||||
} |
||||
} |
||||
|
||||
static test_spec *test_spec_create(size_t num_iters, int num_servers) { |
||||
test_spec *spec; |
||||
size_t i; |
||||
|
||||
spec = gpr_malloc(sizeof(test_spec)); |
||||
spec->num_iters = num_iters; |
||||
spec->num_servers = num_servers; |
||||
spec->kill_at = gpr_malloc(sizeof(int*) * num_iters); |
||||
spec->revive_at = gpr_malloc(sizeof(int*) * num_iters); |
||||
for (i = 0; i < num_iters; i++) { |
||||
spec->kill_at[i] = gpr_malloc(sizeof(int) * num_servers); |
||||
spec->revive_at[i] = gpr_malloc(sizeof(int) * num_servers); |
||||
} |
||||
|
||||
test_spec_reset(spec); |
||||
return spec; |
||||
} |
||||
|
||||
static void test_spec_destroy(test_spec *spec) { |
||||
size_t i; |
||||
for (i = 0; i < spec->num_iters; i++) { |
||||
gpr_free(spec->kill_at[i]); |
||||
gpr_free(spec->revive_at[i]); |
||||
} |
||||
|
||||
gpr_free(spec->kill_at); |
||||
gpr_free(spec->revive_at); |
||||
|
||||
gpr_free(spec); |
||||
} |
||||
|
||||
static void *tag(gpr_intptr t) { return (void *)t; } |
||||
|
||||
static gpr_timespec n_seconds_time(int n) { |
||||
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); |
||||
} |
||||
|
||||
static void drain_cq(grpc_completion_queue *cq) { |
||||
grpc_event ev; |
||||
do { |
||||
ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL); |
||||
} while (ev.type != GRPC_QUEUE_SHUTDOWN); |
||||
} |
||||
|
||||
static void kill_server(const servers_fixture *f, int i) { |
||||
gpr_log(GPR_INFO, "KILLING SERVER %d", i); |
||||
GPR_ASSERT(f->servers[i] != NULL); |
||||
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); |
||||
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), |
||||
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), |
||||
NULL) |
||||
.type == GRPC_OP_COMPLETE); |
||||
grpc_server_destroy(f->servers[i]); |
||||
f->servers[i] = NULL; |
||||
} |
||||
|
||||
static void revive_server(const servers_fixture *f, int i) { |
||||
int got_port; |
||||
gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i); |
||||
GPR_ASSERT(f->servers[i] == NULL); |
||||
f->servers[i] = grpc_server_create(NULL, NULL); |
||||
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL); |
||||
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port( |
||||
f->servers[i], f->servers_hostports[i])) > 0); |
||||
grpc_server_start(f->servers[i]); |
||||
} |
||||
|
||||
static servers_fixture *setup_servers(const char *server_host, |
||||
const size_t num_servers) { |
||||
servers_fixture *f = gpr_malloc(sizeof(servers_fixture)); |
||||
int *ports; |
||||
int got_port; |
||||
size_t i; |
||||
|
||||
f->num_servers = num_servers; |
||||
f->server_calls = gpr_malloc(sizeof(grpc_call*) * num_servers); |
||||
f->request_metadata_recv = gpr_malloc(sizeof(grpc_metadata_array) * num_servers); |
||||
/* Create servers. */ |
||||
ports = gpr_malloc(sizeof(int*) * num_servers); |
||||
f->servers = gpr_malloc(sizeof(grpc_server*) * num_servers); |
||||
f->servers_hostports = gpr_malloc(sizeof(char*) * num_servers); |
||||
f->cq = grpc_completion_queue_create(NULL); |
||||
for (i = 0; i < num_servers; i++) { |
||||
ports[i] = grpc_pick_unused_port_or_die(); |
||||
|
||||
gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]); |
||||
|
||||
f->servers[i] = grpc_server_create(NULL, NULL); |
||||
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL); |
||||
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port( |
||||
f->servers[i], f->servers_hostports[i])) > 0); |
||||
GPR_ASSERT(ports[i] == got_port); |
||||
grpc_server_start(f->servers[i]); |
||||
} |
||||
gpr_free(ports); |
||||
return f; |
||||
} |
||||
|
||||
static void teardown_servers(servers_fixture *f) { |
||||
size_t i; |
||||
/* Destroy server. */ |
||||
for (i = 0; i < f->num_servers; i++) { |
||||
if (f->servers[i] == NULL) continue; |
||||
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); |
||||
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), |
||||
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), |
||||
NULL) |
||||
.type == GRPC_OP_COMPLETE); |
||||
grpc_server_destroy(f->servers[i]); |
||||
} |
||||
grpc_completion_queue_shutdown(f->cq); |
||||
drain_cq(f->cq); |
||||
grpc_completion_queue_destroy(f->cq); |
||||
|
||||
gpr_free(f->servers); |
||||
|
||||
for (i = 0; i < f->num_servers; i++) { |
||||
gpr_free(f->servers_hostports[i]); |
||||
} |
||||
|
||||
gpr_free(f->servers_hostports); |
||||
gpr_free(f->request_metadata_recv); |
||||
gpr_free(f->server_calls); |
||||
gpr_free(f); |
||||
} |
||||
|
||||
/** Returns connection sequence (server indices), which must be freed */ |
||||
int *perform_request(servers_fixture *f, grpc_channel *client, |
||||
const test_spec *spec) { |
||||
grpc_call *c; |
||||
int s_idx; |
||||
int *s_valid; |
||||
gpr_timespec deadline; |
||||
grpc_op ops[6]; |
||||
grpc_op *op; |
||||
grpc_status_code status; |
||||
char *details; |
||||
size_t details_capacity; |
||||
int was_cancelled; |
||||
grpc_call_details *call_details; |
||||
size_t i, iter_num; |
||||
grpc_event ev; |
||||
int read_tag; |
||||
int *connection_sequence; |
||||
grpc_metadata_array initial_metadata_recv; |
||||
grpc_metadata_array trailing_metadata_recv; |
||||
|
||||
s_valid = gpr_malloc(sizeof(int) * f->num_servers); |
||||
call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers); |
||||
connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters); |
||||
|
||||
/* Send a trivial request. */ |
||||
deadline = n_seconds_time(60); |
||||
|
||||
for (iter_num = 0; iter_num < spec->num_iters; iter_num++) { |
||||
cq_verifier *cqv = cq_verifier_create(f->cq); |
||||
details = NULL; |
||||
details_capacity = 0; |
||||
was_cancelled = 2; |
||||
|
||||
for (i = 0; i < f->num_servers; i++) { |
||||
if (spec->kill_at[iter_num][i] != 0) { |
||||
kill_server(f, i); |
||||
} else if (spec->revive_at[iter_num][i] != 0) { |
||||
/* killing takes precedence */ |
||||
revive_server(f, i); |
||||
} |
||||
} |
||||
|
||||
connection_sequence[iter_num] = -1; |
||||
grpc_metadata_array_init(&initial_metadata_recv); |
||||
grpc_metadata_array_init(&trailing_metadata_recv); |
||||
|
||||
for (i = 0; i < f->num_servers; i++) { |
||||
grpc_call_details_init(&call_details[i]); |
||||
} |
||||
memset(s_valid, 0, f->num_servers * sizeof(int)); |
||||
|
||||
c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq, |
||||
"/foo", "foo.test.google.fr", deadline, NULL); |
||||
GPR_ASSERT(c); |
||||
|
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
op->data.recv_initial_metadata = &initial_metadata_recv; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; |
||||
op->data.recv_status_on_client.status = &status; |
||||
op->data.recv_status_on_client.status_details = &details; |
||||
op->data.recv_status_on_client.status_details_capacity = &details_capacity; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
GPR_ASSERT(GRPC_CALL_OK == |
||||
grpc_call_start_batch(c, ops, op - ops, tag(1), NULL)); |
||||
|
||||
/* "listen" on all servers */ |
||||
for (i = 0; i < f->num_servers; i++) { |
||||
grpc_metadata_array_init(&f->request_metadata_recv[i]); |
||||
if (f->servers[i] != NULL) { |
||||
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( |
||||
f->servers[i], &f->server_calls[i], |
||||
&call_details[i], &f->request_metadata_recv[i], |
||||
f->cq, f->cq, tag(1000 + i))); |
||||
} |
||||
} |
||||
|
||||
s_idx = -1; |
||||
while ((ev = grpc_completion_queue_next( |
||||
f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)) |
||||
.type != GRPC_QUEUE_TIMEOUT) { |
||||
read_tag = ((int)(gpr_intptr)ev.tag); |
||||
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d", |
||||
ev.success, ev.type, read_tag, iter_num); |
||||
if (ev.success && read_tag >= 1000) { |
||||
GPR_ASSERT(s_idx == -1); /* only one server must reply */ |
||||
/* only server notifications for non-shutdown events */ |
||||
s_idx = read_tag - 1000; |
||||
s_valid[s_idx] = 1; |
||||
connection_sequence[iter_num] = s_idx; |
||||
} |
||||
} |
||||
|
||||
if (s_idx >= 0) { |
||||
op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
||||
op->data.send_status_from_server.trailing_metadata_count = 0; |
||||
op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; |
||||
op->data.send_status_from_server.status_details = "xyz"; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; |
||||
op->data.recv_close_on_server.cancelled = &was_cancelled; |
||||
op->flags = 0; |
||||
op->reserved = NULL; |
||||
op++; |
||||
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(f->server_calls[s_idx], |
||||
ops, op - ops, tag(102), |
||||
NULL)); |
||||
|
||||
cq_expect_completion(cqv, tag(102), 1); |
||||
cq_expect_completion(cqv, tag(1), 1); |
||||
cq_verify(cqv); |
||||
|
||||
|
||||
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); |
||||
GPR_ASSERT(0 == strcmp(details, "xyz")); |
||||
GPR_ASSERT(0 == strcmp(call_details[s_idx].method, "/foo")); |
||||
GPR_ASSERT(0 == strcmp(call_details[s_idx].host, "foo.test.google.fr")); |
||||
GPR_ASSERT(was_cancelled == 1); |
||||
} |
||||
|
||||
for (i = 0; i < f->num_servers; i++) { |
||||
if (s_valid[i] != 0){ |
||||
grpc_call_destroy(f->server_calls[i]); |
||||
} |
||||
grpc_metadata_array_destroy(&f->request_metadata_recv[i]); |
||||
} |
||||
grpc_metadata_array_destroy(&initial_metadata_recv); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv); |
||||
|
||||
cq_verifier_destroy(cqv); |
||||
|
||||
grpc_call_destroy(c); |
||||
|
||||
for (i = 0; i < f->num_servers; i++) { |
||||
grpc_call_details_destroy(&call_details[i]); |
||||
} |
||||
gpr_free(details); |
||||
} |
||||
|
||||
gpr_free(call_details); |
||||
gpr_free(s_valid); |
||||
|
||||
return connection_sequence; |
||||
} |
||||
|
||||
static void assert_channel_connectivity( |
||||
grpc_channel *ch, grpc_connectivity_state expected_conn_state) { |
||||
grpc_channel_stack *client_stack; |
||||
grpc_channel_element *client_channel_filter; |
||||
grpc_connectivity_state actual_conn_state; |
||||
|
||||
client_stack = grpc_channel_get_channel_stack(ch); |
||||
client_channel_filter = grpc_channel_stack_last_element(client_stack); |
||||
actual_conn_state = grpc_client_channel_check_connectivity_state( |
||||
client_channel_filter, 0 /* don't try to connect */); |
||||
GPR_ASSERT(actual_conn_state == expected_conn_state); |
||||
} |
||||
|
||||
void run_spec(const test_spec *spec) { |
||||
grpc_channel *client; |
||||
char *client_hostport; |
||||
char *servers_hostports_str; |
||||
int *actual_connection_sequence; |
||||
servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers); |
||||
|
||||
/* Create client. */ |
||||
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports, |
||||
f->num_servers, ",", NULL); |
||||
gpr_asprintf(&client_hostport, "ipv4:%s", servers_hostports_str); |
||||
client = grpc_insecure_channel_create(client_hostport, NULL, NULL); |
||||
|
||||
gpr_log(GPR_INFO, "Testing with servers=%s client=%s", |
||||
servers_hostports_str, client_hostport); |
||||
|
||||
actual_connection_sequence = perform_request(f, client, spec); |
||||
|
||||
spec->verifier(f, client, actual_connection_sequence, spec->num_iters); |
||||
|
||||
gpr_free(client_hostport); |
||||
gpr_free(servers_hostports_str); |
||||
gpr_free(actual_connection_sequence); |
||||
|
||||
grpc_channel_destroy(client); |
||||
teardown_servers(f); |
||||
} |
||||
|
||||
static void print_failed_expectations(const int *expected_connection_sequence, |
||||
const int *actual_connection_sequence, |
||||
const size_t expected_seq_length, |
||||
const size_t num_iters) { |
||||
size_t i; |
||||
for (i = 0; i < num_iters; i++) { |
||||
gpr_log(GPR_ERROR, "FAILURE: Iter, expected, actual:%d (%d, %d)", i, |
||||
expected_connection_sequence[i % expected_seq_length], |
||||
actual_connection_sequence[i]); |
||||
} |
||||
} |
||||
|
||||
static void verify_vanilla_round_robin(const servers_fixture *f, |
||||
grpc_channel *client, |
||||
const int *actual_connection_sequence, |
||||
const size_t num_iters) { |
||||
int *expected_connection_sequence; |
||||
size_t i; |
||||
const int expected_seq_length = f->num_servers; |
||||
|
||||
/* verify conn. seq. expectation */ |
||||
/* get the first sequence of "num_servers" elements */ |
||||
expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length); |
||||
memcpy(expected_connection_sequence, actual_connection_sequence, |
||||
sizeof(int) * expected_seq_length); |
||||
|
||||
for (i = 0; i < num_iters; i++) { |
||||
const int actual = actual_connection_sequence[i]; |
||||
const int expected = expected_connection_sequence[i % expected_seq_length]; |
||||
if (actual != expected) { |
||||
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected, |
||||
actual, i); |
||||
print_failed_expectations(expected_connection_sequence, |
||||
actual_connection_sequence, expected_seq_length, |
||||
num_iters); |
||||
abort(); |
||||
} |
||||
} |
||||
assert_channel_connectivity(client, GRPC_CHANNEL_READY); |
||||
|
||||
gpr_free(expected_connection_sequence); |
||||
} |
||||
|
||||
/* At the start of the second iteration, all but the first and last servers (as
|
||||
* given in "f") are killed */ |
||||
static void verify_vanishing_floor_round_robin( |
||||
const servers_fixture *f, grpc_channel *client, |
||||
const int *actual_connection_sequence, const size_t num_iters) { |
||||
int *expected_connection_sequence; |
||||
const int expected_seq_length = 2; |
||||
size_t i; |
||||
|
||||
/* verify conn. seq. expectation */ |
||||
/* copy the first full sequence (without -1s) */ |
||||
expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length); |
||||
memcpy(expected_connection_sequence, actual_connection_sequence + 2, |
||||
expected_seq_length * sizeof(int)); |
||||
|
||||
/* first three elements of the sequence should be [<1st>, -1] */ |
||||
GPR_ASSERT(actual_connection_sequence[0] == expected_connection_sequence[0]); |
||||
GPR_ASSERT(actual_connection_sequence[1] == -1); |
||||
|
||||
for (i = 2; i < num_iters; i++) { |
||||
const int actual = actual_connection_sequence[i]; |
||||
const int expected = expected_connection_sequence[i % expected_seq_length]; |
||||
if (actual != expected) { |
||||
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected, |
||||
actual, i); |
||||
print_failed_expectations(expected_connection_sequence, |
||||
actual_connection_sequence, expected_seq_length, |
||||
num_iters); |
||||
abort(); |
||||
} |
||||
} |
||||
gpr_free(expected_connection_sequence); |
||||
} |
||||
|
||||
static void verify_total_carnage_round_robin( |
||||
const servers_fixture *f, grpc_channel *client, |
||||
const int *actual_connection_sequence, const size_t num_iters) { |
||||
size_t i; |
||||
|
||||
for (i = 0; i < num_iters; i++) { |
||||
const int actual = actual_connection_sequence[i]; |
||||
const int expected = -1; |
||||
if (actual != expected) { |
||||
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected, |
||||
actual, i); |
||||
abort(); |
||||
} |
||||
} |
||||
|
||||
/* even though we know all the servers are dead, the client is still trying
|
||||
* retrying, believing it's in a transient failure situation */ |
||||
assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE); |
||||
} |
||||
|
||||
static void verify_partial_carnage_round_robin( |
||||
const servers_fixture *f, grpc_channel *client, |
||||
const int *actual_connection_sequence, const size_t num_iters) { |
||||
int *expected_connection_sequence; |
||||
size_t i; |
||||
const int expected_seq_length = f->num_servers; |
||||
|
||||
/* verify conn. seq. expectation */ |
||||
/* get the first sequence of "num_servers" elements */ |
||||
expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length); |
||||
memcpy(expected_connection_sequence, actual_connection_sequence, |
||||
sizeof(int) * expected_seq_length); |
||||
|
||||
for (i = 0; i < num_iters/2; i++) { |
||||
const int actual = actual_connection_sequence[i]; |
||||
const int expected = expected_connection_sequence[i % expected_seq_length]; |
||||
if (actual != expected) { |
||||
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected, |
||||
actual, i); |
||||
print_failed_expectations(expected_connection_sequence, |
||||
actual_connection_sequence, expected_seq_length, |
||||
num_iters); |
||||
abort(); |
||||
} |
||||
} |
||||
|
||||
/* second half of the iterations go without response */ |
||||
for (; i < num_iters; i++) { |
||||
GPR_ASSERT(actual_connection_sequence[i] == -1); |
||||
} |
||||
|
||||
/* even though we know all the servers are dead, the client is still trying
|
||||
* retrying, believing it's in a transient failure situation */ |
||||
assert_channel_connectivity(client, GRPC_CHANNEL_TRANSIENT_FAILURE); |
||||
gpr_free(expected_connection_sequence); |
||||
} |
||||
|
||||
static void verify_rebirth_round_robin(const servers_fixture *f, |
||||
grpc_channel *client, |
||||
const int *actual_connection_sequence, |
||||
const size_t num_iters) { |
||||
int *expected_connection_sequence; |
||||
size_t i; |
||||
const int expected_seq_length = f->num_servers; |
||||
|
||||
/* verify conn. seq. expectation */ |
||||
/* get the first sequence of "num_servers" elements */ |
||||
expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length); |
||||
memcpy(expected_connection_sequence, actual_connection_sequence + 4, |
||||
sizeof(int) * expected_seq_length); |
||||
|
||||
/* first iteration succeeds */ |
||||
GPR_ASSERT(actual_connection_sequence[0] != -1); |
||||
|
||||
/* back up on the third iteration */ |
||||
for (i = 3; i < num_iters; i++) { |
||||
const int actual = actual_connection_sequence[i]; |
||||
const int expected = expected_connection_sequence[i % expected_seq_length]; |
||||
if (actual != expected) { |
||||
gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected, |
||||
actual, i); |
||||
print_failed_expectations(expected_connection_sequence, |
||||
actual_connection_sequence, expected_seq_length, |
||||
num_iters); |
||||
abort(); |
||||
} |
||||
} |
||||
|
||||
/* things are fine once the servers are brought back up */ |
||||
assert_channel_connectivity(client, GRPC_CHANNEL_READY); |
||||
gpr_free(expected_connection_sequence); |
||||
} |
||||
|
||||
|
||||
int main(int argc, char **argv) { |
||||
test_spec *spec; |
||||
size_t i; |
||||
const size_t NUM_ITERS = 10; |
||||
const size_t NUM_SERVERS = 4; |
||||
|
||||
grpc_test_init(argc, argv); |
||||
grpc_init(); |
||||
|
||||
/* everything is fine, all servers stay up the whole time and life's peachy */ |
||||
spec = test_spec_create(NUM_ITERS, NUM_SERVERS); |
||||
spec->verifier = verify_vanilla_round_robin; |
||||
gpr_log(GPR_DEBUG, "test_all_server_up"); |
||||
run_spec(spec); |
||||
|
||||
/* Kill all servers first thing in the morning */ |
||||
test_spec_reset(spec); |
||||
spec->verifier = verify_total_carnage_round_robin; |
||||
for (i = 0; i < NUM_SERVERS; i++) { |
||||
spec->kill_at[0][i] = 1; |
||||
} |
||||
gpr_log(GPR_DEBUG, "test_kill_all_server"); |
||||
run_spec(spec); |
||||
|
||||
/* at the start of the 2nd iteration, kill all but the first and last servers.
|
||||
* This should knock down the server bound to be selected next */ |
||||
test_spec_reset(spec); |
||||
spec->verifier = verify_vanishing_floor_round_robin; |
||||
for (i = 1; i < NUM_SERVERS - 1; i++) { |
||||
spec->kill_at[1][i] = 1; |
||||
} |
||||
gpr_log(GPR_DEBUG, "test_kill_all_server_at_2nd_iteration"); |
||||
run_spec(spec); |
||||
|
||||
/* Midway, kill all servers. */ |
||||
test_spec_reset(spec); |
||||
spec->verifier = verify_partial_carnage_round_robin; |
||||
for (i = 0; i < NUM_SERVERS; i++) { |
||||
spec->kill_at[spec->num_iters / 2][i] = 1; |
||||
} |
||||
gpr_log(GPR_DEBUG, "test_kill_all_server_midway"); |
||||
run_spec(spec); |
||||
|
||||
|
||||
/* After first iteration, kill all servers. On the third one, bring them all
|
||||
* back up. */ |
||||
test_spec_reset(spec); |
||||
spec->verifier = verify_rebirth_round_robin; |
||||
for (i = 0; i < NUM_SERVERS; i++) { |
||||
spec->kill_at[1][i] = 1; |
||||
spec->revive_at[3][i] = 1; |
||||
} |
||||
gpr_log(GPR_DEBUG, "test_kill_all_server_after_1st_resurrect_at_3rd"); |
||||
run_spec(spec); |
||||
|
||||
test_spec_destroy(spec); |
||||
|
||||
grpc_shutdown(); |
||||
return 0; |
||||
} |
File diff suppressed because one or more lines are too long
@ -0,0 +1,154 @@ |
||||
<?xml version="1.0" encoding="utf-8"?> |
||||
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
||||
<ItemGroup Label="ProjectConfigurations"> |
||||
<ProjectConfiguration Include="Debug|Win32"> |
||||
<Configuration>Debug</Configuration> |
||||
<Platform>Win32</Platform> |
||||
</ProjectConfiguration> |
||||
<ProjectConfiguration Include="Debug|x64"> |
||||
<Configuration>Debug</Configuration> |
||||
<Platform>x64</Platform> |
||||
</ProjectConfiguration> |
||||
<ProjectConfiguration Include="Release|Win32"> |
||||
<Configuration>Release</Configuration> |
||||
<Platform>Win32</Platform> |
||||
</ProjectConfiguration> |
||||
<ProjectConfiguration Include="Release|x64"> |
||||
<Configuration>Release</Configuration> |
||||
<Platform>x64</Platform> |
||||
</ProjectConfiguration> |
||||
</ItemGroup> |
||||
<PropertyGroup Label="Globals"> |
||||
<ProjectGuid>{62D58A08-3B5E-D6A8-ABBB-77995AA0A8C6}</ProjectGuid> |
||||
</PropertyGroup> |
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> |
||||
<PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration"> |
||||
<PlatformToolset>v100</PlatformToolset> |
||||
</PropertyGroup> |
||||
<PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration"> |
||||
<PlatformToolset>v110</PlatformToolset> |
||||
</PropertyGroup> |
||||
<PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration"> |
||||
<PlatformToolset>v120</PlatformToolset> |
||||
</PropertyGroup> |
||||
<PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration"> |
||||
<ConfigurationType>Application</ConfigurationType> |
||||
<UseDebugLibraries>true</UseDebugLibraries> |
||||
<CharacterSet>Unicode</CharacterSet> |
||||
</PropertyGroup> |
||||
<PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration"> |
||||
<ConfigurationType>Application</ConfigurationType> |
||||
<UseDebugLibraries>false</UseDebugLibraries> |
||||
<WholeProgramOptimization>true</WholeProgramOptimization> |
||||
<CharacterSet>Unicode</CharacterSet> |
||||
</PropertyGroup> |
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> |
||||
<ImportGroup Label="ExtensionSettings"> |
||||
</ImportGroup> |
||||
<ImportGroup Label="PropertySheets"> |
||||
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> |
||||
<Import Project="..\..\..\..\vsprojects\global.props" /> |
||||
<Import Project="..\..\..\..\vsprojects\openssl.props" /> |
||||
<Import Project="..\..\..\..\vsprojects\protobuf.props" /> |
||||
<Import Project="..\..\..\..\vsprojects\winsock.props" /> |
||||
<Import Project="..\..\..\..\vsprojects\zlib.props" /> |
||||
</ImportGroup> |
||||
<PropertyGroup Label="UserMacros" /> |
||||
<PropertyGroup Condition="'$(Configuration)'=='Debug'"> |
||||
<TargetName>lb_policies_test</TargetName> |
||||
</PropertyGroup> |
||||
<PropertyGroup Condition="'$(Configuration)'=='Release'"> |
||||
<TargetName>lb_policies_test</TargetName> |
||||
</PropertyGroup> |
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'"> |
||||
<ClCompile> |
||||
<PrecompiledHeader>NotUsing</PrecompiledHeader> |
||||
<WarningLevel>Level3</WarningLevel> |
||||
<Optimization>Disabled</Optimization> |
||||
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;_USE_32BIT_TIME_T;%(PreprocessorDefinitions)</PreprocessorDefinitions> |
||||
<SDLCheck>true</SDLCheck> |
||||
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> |
||||
</ClCompile> |
||||
<Link> |
||||
<SubSystem>Console</SubSystem> |
||||
<GenerateDebugInformation>true</GenerateDebugInformation> |
||||
</Link> |
||||
</ItemDefinitionGroup> |
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> |
||||
<ClCompile> |
||||
<PrecompiledHeader>NotUsing</PrecompiledHeader> |
||||
<WarningLevel>Level3</WarningLevel> |
||||
<Optimization>Disabled</Optimization> |
||||
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> |
||||
<SDLCheck>true</SDLCheck> |
||||
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> |
||||
</ClCompile> |
||||
<Link> |
||||
<SubSystem>Console</SubSystem> |
||||
<GenerateDebugInformation>true</GenerateDebugInformation> |
||||
</Link> |
||||
</ItemDefinitionGroup> |
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'"> |
||||
<ClCompile> |
||||
<WarningLevel>Level3</WarningLevel> |
||||
<PrecompiledHeader>NotUsing</PrecompiledHeader> |
||||
<Optimization>MaxSpeed</Optimization> |
||||
<FunctionLevelLinking>true</FunctionLevelLinking> |
||||
<IntrinsicFunctions>true</IntrinsicFunctions> |
||||
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;_USE_32BIT_TIME_T;%(PreprocessorDefinitions)</PreprocessorDefinitions> |
||||
<SDLCheck>true</SDLCheck> |
||||
<RuntimeLibrary>MultiThreaded</RuntimeLibrary> |
||||
</ClCompile> |
||||
<Link> |
||||
<SubSystem>Console</SubSystem> |
||||
<GenerateDebugInformation>true</GenerateDebugInformation> |
||||
<EnableCOMDATFolding>true</EnableCOMDATFolding> |
||||
<OptimizeReferences>true</OptimizeReferences> |
||||
</Link> |
||||
</ItemDefinitionGroup> |
||||
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> |
||||
<ClCompile> |
||||
<WarningLevel>Level3</WarningLevel> |
||||
<PrecompiledHeader>NotUsing</PrecompiledHeader> |
||||
<Optimization>MaxSpeed</Optimization> |
||||
<FunctionLevelLinking>true</FunctionLevelLinking> |
||||
<IntrinsicFunctions>true</IntrinsicFunctions> |
||||
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> |
||||
<SDLCheck>true</SDLCheck> |
||||
<RuntimeLibrary>MultiThreaded</RuntimeLibrary> |
||||
</ClCompile> |
||||
<Link> |
||||
<SubSystem>Console</SubSystem> |
||||
<GenerateDebugInformation>true</GenerateDebugInformation> |
||||
<EnableCOMDATFolding>true</EnableCOMDATFolding> |
||||
<OptimizeReferences>true</OptimizeReferences> |
||||
</Link> |
||||
</ItemDefinitionGroup> |
||||
<ItemGroup> |
||||
<ClCompile Include="..\..\..\..\test\core\client_config\lb_policies_test.c"> |
||||
</ClCompile> |
||||
</ItemGroup> |
||||
<ItemGroup> |
||||
<ProjectReference Include="..\..\..\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj"> |
||||
<Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project> |
||||
</ProjectReference> |
||||
<ProjectReference Include="..\..\..\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj"> |
||||
<Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project> |
||||
</ProjectReference> |
||||
<ProjectReference Include="..\..\..\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj"> |
||||
<Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project> |
||||
</ProjectReference> |
||||
<ProjectReference Include="..\..\..\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj"> |
||||
<Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project> |
||||
</ProjectReference> |
||||
</ItemGroup> |
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> |
||||
<ImportGroup Label="ExtensionTargets"> |
||||
</ImportGroup> |
||||
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"> |
||||
<PropertyGroup> |
||||
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText> |
||||
</PropertyGroup> |
||||
</Target> |
||||
</Project> |
||||
|
@ -0,0 +1,21 @@ |
||||
<?xml version="1.0" encoding="utf-8"?> |
||||
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
||||
<ItemGroup> |
||||
<ClCompile Include="..\..\..\..\test\core\client_config\lb_policies_test.c"> |
||||
<Filter>test\core\client_config</Filter> |
||||
</ClCompile> |
||||
</ItemGroup> |
||||
|
||||
<ItemGroup> |
||||
<Filter Include="test"> |
||||
<UniqueIdentifier>{58736598-65ad-bf09-4484-a4de1bb9b51f}</UniqueIdentifier> |
||||
</Filter> |
||||
<Filter Include="test\core"> |
||||
<UniqueIdentifier>{6e194f4b-ceb1-0e6b-e77a-8149b0411d99}</UniqueIdentifier> |
||||
</Filter> |
||||
<Filter Include="test\core\client_config"> |
||||
<UniqueIdentifier>{f948fe8f-47f8-fcce-2740-6c390af3c30b}</UniqueIdentifier> |
||||
</Filter> |
||||
</ItemGroup> |
||||
</Project> |
||||
|
Loading…
Reference in new issue