mirror of https://github.com/grpc/grpc.git
commit
268685bcbd
87 changed files with 2505 additions and 1641 deletions
@ -0,0 +1,158 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/backup_poller.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/sync.h> |
||||||
|
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||||
|
#include "src/core/lib/iomgr/error.h" |
||||||
|
#include "src/core/lib/iomgr/pollset.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
#include "src/core/lib/support/env.h" |
||||||
|
#include "src/core/lib/support/string.h" |
||||||
|
#include "src/core/lib/surface/channel.h" |
||||||
|
#include "src/core/lib/surface/completion_queue.h" |
||||||
|
|
||||||
|
#define DEFAULT_POLL_INTERVAL_MS 5000 |
||||||
|
|
||||||
|
typedef struct backup_poller { |
||||||
|
grpc_timer polling_timer; |
||||||
|
grpc_closure run_poller_closure; |
||||||
|
grpc_closure shutdown_closure; |
||||||
|
gpr_mu* pollset_mu; |
||||||
|
grpc_pollset* pollset; // guarded by pollset_mu
|
||||||
|
bool shutting_down; // guarded by pollset_mu
|
||||||
|
gpr_refcount refs; |
||||||
|
gpr_refcount shutdown_refs; |
||||||
|
} backup_poller; |
||||||
|
|
||||||
|
static gpr_once g_once = GPR_ONCE_INIT; |
||||||
|
static gpr_mu g_poller_mu; |
||||||
|
static backup_poller* g_poller = NULL; // guarded by g_poller_mu
|
||||||
|
// g_poll_interval_ms is set only once at the first time
|
||||||
|
// grpc_client_channel_start_backup_polling() is called, after that it is
|
||||||
|
// treated as const.
|
||||||
|
static int g_poll_interval_ms = DEFAULT_POLL_INTERVAL_MS; |
||||||
|
|
||||||
|
static void init_globals() { |
||||||
|
gpr_mu_init(&g_poller_mu); |
||||||
|
char* env = gpr_getenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS"); |
||||||
|
if (env != NULL) { |
||||||
|
int poll_interval_ms = gpr_parse_nonnegative_int(env); |
||||||
|
if (poll_interval_ms == -1) { |
||||||
|
gpr_log(GPR_ERROR, |
||||||
|
"Invalid GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS: %s, " |
||||||
|
"default value %d will be used.", |
||||||
|
env, g_poll_interval_ms); |
||||||
|
} else { |
||||||
|
g_poll_interval_ms = poll_interval_ms; |
||||||
|
} |
||||||
|
} |
||||||
|
gpr_free(env); |
||||||
|
} |
||||||
|
|
||||||
|
static void backup_poller_shutdown_unref(grpc_exec_ctx* exec_ctx, |
||||||
|
backup_poller* p) { |
||||||
|
if (gpr_unref(&p->shutdown_refs)) { |
||||||
|
grpc_pollset_destroy(exec_ctx, p->pollset); |
||||||
|
gpr_free(p->pollset); |
||||||
|
gpr_free(p); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void done_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { |
||||||
|
backup_poller_shutdown_unref(exec_ctx, (backup_poller*)arg); |
||||||
|
} |
||||||
|
|
||||||
|
static void g_poller_unref(grpc_exec_ctx* exec_ctx) { |
||||||
|
if (gpr_unref(&g_poller->refs)) { |
||||||
|
gpr_mu_lock(&g_poller_mu); |
||||||
|
backup_poller* p = g_poller; |
||||||
|
g_poller = NULL; |
||||||
|
gpr_mu_unlock(&g_poller_mu); |
||||||
|
gpr_mu_lock(p->pollset_mu); |
||||||
|
p->shutting_down = true; |
||||||
|
grpc_pollset_shutdown(exec_ctx, p->pollset, |
||||||
|
GRPC_CLOSURE_INIT(&p->shutdown_closure, done_poller, |
||||||
|
p, grpc_schedule_on_exec_ctx)); |
||||||
|
gpr_mu_unlock(p->pollset_mu); |
||||||
|
grpc_timer_cancel(exec_ctx, &p->polling_timer); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static void run_poller(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { |
||||||
|
backup_poller* p = (backup_poller*)arg; |
||||||
|
if (error != GRPC_ERROR_NONE) { |
||||||
|
if (error != GRPC_ERROR_CANCELLED) { |
||||||
|
GRPC_LOG_IF_ERROR("run_poller", GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
backup_poller_shutdown_unref(exec_ctx, p); |
||||||
|
return; |
||||||
|
} |
||||||
|
gpr_mu_lock(p->pollset_mu); |
||||||
|
if (p->shutting_down) { |
||||||
|
gpr_mu_unlock(p->pollset_mu); |
||||||
|
backup_poller_shutdown_unref(exec_ctx, p); |
||||||
|
return; |
||||||
|
} |
||||||
|
grpc_error* err = grpc_pollset_work(exec_ctx, p->pollset, NULL, |
||||||
|
grpc_exec_ctx_now(exec_ctx)); |
||||||
|
gpr_mu_unlock(p->pollset_mu); |
||||||
|
GRPC_LOG_IF_ERROR("Run client channel backup poller", err); |
||||||
|
grpc_timer_init(exec_ctx, &p->polling_timer, |
||||||
|
grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms, |
||||||
|
&p->run_poller_closure); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_client_channel_start_backup_polling( |
||||||
|
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) { |
||||||
|
gpr_once_init(&g_once, init_globals); |
||||||
|
if (g_poll_interval_ms == 0) { |
||||||
|
return; |
||||||
|
} |
||||||
|
gpr_mu_lock(&g_poller_mu); |
||||||
|
if (g_poller == NULL) { |
||||||
|
g_poller = (backup_poller*)gpr_zalloc(sizeof(backup_poller)); |
||||||
|
g_poller->pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); |
||||||
|
g_poller->shutting_down = false; |
||||||
|
grpc_pollset_init(g_poller->pollset, &g_poller->pollset_mu); |
||||||
|
gpr_ref_init(&g_poller->refs, 0); |
||||||
|
// one for timer cancellation, one for pollset shutdown
|
||||||
|
gpr_ref_init(&g_poller->shutdown_refs, 2); |
||||||
|
GRPC_CLOSURE_INIT(&g_poller->run_poller_closure, run_poller, g_poller, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
grpc_timer_init(exec_ctx, &g_poller->polling_timer, |
||||||
|
grpc_exec_ctx_now(exec_ctx) + g_poll_interval_ms, |
||||||
|
&g_poller->run_poller_closure); |
||||||
|
} |
||||||
|
gpr_ref(&g_poller->refs); |
||||||
|
gpr_mu_unlock(&g_poller_mu); |
||||||
|
grpc_pollset_set_add_pollset(exec_ctx, interested_parties, g_poller->pollset); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_client_channel_stop_backup_polling( |
||||||
|
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties) { |
||||||
|
if (g_poll_interval_ms == 0) { |
||||||
|
return; |
||||||
|
} |
||||||
|
grpc_pollset_set_del_pollset(exec_ctx, interested_parties, g_poller->pollset); |
||||||
|
g_poller_unref(exec_ctx); |
||||||
|
} |
@ -0,0 +1,34 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include "src/core/lib/channel/channel_stack.h" |
||||||
|
#include "src/core/lib/iomgr/exec_ctx.h" |
||||||
|
|
||||||
|
/* Start polling \a interested_parties periodically in the timer thread */ |
||||||
|
void grpc_client_channel_start_backup_polling( |
||||||
|
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties); |
||||||
|
|
||||||
|
/* Stop polling \a interested_parties */ |
||||||
|
void grpc_client_channel_stop_backup_polling( |
||||||
|
grpc_exec_ctx* exec_ctx, grpc_pollset_set* interested_parties); |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_BACKUP_POLLER_H */ |
@ -0,0 +1,265 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/debug/trace.h" |
||||||
|
#include "src/core/lib/iomgr/closure.h" |
||||||
|
#include "src/core/lib/iomgr/combiner.h" |
||||||
|
#include "src/core/lib/iomgr/sockaddr_utils.h" |
||||||
|
#include "src/core/lib/transport/connectivity_state.h" |
||||||
|
|
||||||
|
void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, |
||||||
|
grpc_lb_subchannel_data *sd, |
||||||
|
const char *reason) { |
||||||
|
if (sd->subchannel != NULL) { |
||||||
|
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { |
||||||
|
gpr_log( |
||||||
|
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR |
||||||
|
" of %" PRIuPTR " (subchannel %p): unreffing subchannel", |
||||||
|
sd->subchannel_list->tracer->name, sd->subchannel_list->policy, |
||||||
|
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), |
||||||
|
sd->subchannel_list->num_subchannels, sd->subchannel); |
||||||
|
} |
||||||
|
GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); |
||||||
|
sd->subchannel = NULL; |
||||||
|
if (sd->connected_subchannel != NULL) { |
||||||
|
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, sd->connected_subchannel, |
||||||
|
reason); |
||||||
|
sd->connected_subchannel = NULL; |
||||||
|
} |
||||||
|
if (sd->user_data != NULL) { |
||||||
|
GPR_ASSERT(sd->user_data_vtable != NULL); |
||||||
|
sd->user_data_vtable->destroy(exec_ctx, sd->user_data); |
||||||
|
sd->user_data = NULL; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_data_start_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { |
||||||
|
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||||
|
" (subchannel %p): requesting connectivity change notification", |
||||||
|
sd->subchannel_list->tracer->name, sd->subchannel_list->policy, |
||||||
|
sd->subchannel_list, |
||||||
|
(size_t)(sd - sd->subchannel_list->subchannels), |
||||||
|
sd->subchannel_list->num_subchannels, sd->subchannel); |
||||||
|
} |
||||||
|
sd->connectivity_notification_pending = true; |
||||||
|
grpc_subchannel_notify_on_state_change( |
||||||
|
exec_ctx, sd->subchannel, sd->subchannel_list->policy->interested_parties, |
||||||
|
&sd->pending_connectivity_state_unsafe, |
||||||
|
&sd->connectivity_changed_closure); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_data_stop_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { |
||||||
|
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { |
||||||
|
gpr_log( |
||||||
|
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||||
|
" (subchannel %p): stopping connectivity watch", |
||||||
|
sd->subchannel_list->tracer->name, sd->subchannel_list->policy, |
||||||
|
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), |
||||||
|
sd->subchannel_list->num_subchannels, sd->subchannel); |
||||||
|
} |
||||||
|
GPR_ASSERT(sd->connectivity_notification_pending); |
||||||
|
sd->connectivity_notification_pending = false; |
||||||
|
} |
||||||
|
|
||||||
|
grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, |
||||||
|
const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, |
||||||
|
grpc_iomgr_cb_func connectivity_changed_cb) { |
||||||
|
grpc_lb_subchannel_list *subchannel_list = |
||||||
|
(grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); |
||||||
|
if (GRPC_TRACER_ON(*tracer)) { |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", |
||||||
|
tracer->name, p, subchannel_list, addresses->num_addresses); |
||||||
|
} |
||||||
|
subchannel_list->policy = p; |
||||||
|
subchannel_list->tracer = tracer; |
||||||
|
gpr_ref_init(&subchannel_list->refcount, 1); |
||||||
|
subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( |
||||||
|
sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); |
||||||
|
// We need to remove the LB addresses in order to be able to compare the
|
||||||
|
// subchannel keys of subchannels from a different batch of addresses.
|
||||||
|
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, |
||||||
|
GRPC_ARG_LB_ADDRESSES}; |
||||||
|
// Create a subchannel for each address.
|
||||||
|
grpc_subchannel_args sc_args; |
||||||
|
size_t subchannel_index = 0; |
||||||
|
for (size_t i = 0; i < addresses->num_addresses; i++) { |
||||||
|
// If there were any balancer, we would have chosen grpclb policy instead.
|
||||||
|
GPR_ASSERT(!addresses->addresses[i].is_balancer); |
||||||
|
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); |
||||||
|
grpc_arg addr_arg = |
||||||
|
grpc_create_subchannel_address_arg(&addresses->addresses[i].address); |
||||||
|
grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( |
||||||
|
args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, |
||||||
|
1); |
||||||
|
gpr_free(addr_arg.value.string); |
||||||
|
sc_args.args = new_args; |
||||||
|
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( |
||||||
|
exec_ctx, args->client_channel_factory, &sc_args); |
||||||
|
grpc_channel_args_destroy(exec_ctx, new_args); |
||||||
|
if (subchannel == NULL) { |
||||||
|
// Subchannel could not be created.
|
||||||
|
if (GRPC_TRACER_ON(*tracer)) { |
||||||
|
char *address_uri = |
||||||
|
grpc_sockaddr_to_uri(&addresses->addresses[i].address); |
||||||
|
gpr_log(GPR_DEBUG, |
||||||
|
"[%s %p] could not create subchannel for address uri %s, " |
||||||
|
"ignoring", |
||||||
|
tracer->name, subchannel_list->policy, address_uri); |
||||||
|
gpr_free(address_uri); |
||||||
|
} |
||||||
|
continue; |
||||||
|
} |
||||||
|
if (GRPC_TRACER_ON(*tracer)) { |
||||||
|
char *address_uri = |
||||||
|
grpc_sockaddr_to_uri(&addresses->addresses[i].address); |
||||||
|
gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR |
||||||
|
": Created subchannel %p for address uri %s", |
||||||
|
tracer->name, p, subchannel_list, subchannel_index, subchannel, |
||||||
|
address_uri); |
||||||
|
gpr_free(address_uri); |
||||||
|
} |
||||||
|
grpc_lb_subchannel_data *sd = |
||||||
|
&subchannel_list->subchannels[subchannel_index++]; |
||||||
|
sd->subchannel_list = subchannel_list; |
||||||
|
sd->subchannel = subchannel; |
||||||
|
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, |
||||||
|
connectivity_changed_cb, sd, |
||||||
|
grpc_combiner_scheduler(args->combiner)); |
||||||
|
// We assume that the current state is IDLE. If not, we'll get a
|
||||||
|
// callback telling us that.
|
||||||
|
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; |
||||||
|
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; |
||||||
|
sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; |
||||||
|
sd->user_data_vtable = addresses->user_data_vtable; |
||||||
|
if (sd->user_data_vtable != NULL) { |
||||||
|
sd->user_data = |
||||||
|
sd->user_data_vtable->copy(addresses->addresses[i].user_data); |
||||||
|
} |
||||||
|
} |
||||||
|
subchannel_list->num_subchannels = subchannel_index; |
||||||
|
subchannel_list->num_idle = subchannel_index; |
||||||
|
return subchannel_list; |
||||||
|
} |
||||||
|
|
||||||
|
static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, |
||||||
|
grpc_lb_subchannel_list *subchannel_list) { |
||||||
|
if (GRPC_TRACER_ON(*subchannel_list->tracer)) { |
||||||
|
gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", |
||||||
|
subchannel_list->tracer->name, subchannel_list->policy, |
||||||
|
subchannel_list); |
||||||
|
} |
||||||
|
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { |
||||||
|
grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; |
||||||
|
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, |
||||||
|
"subchannel_list_destroy"); |
||||||
|
} |
||||||
|
gpr_free(subchannel_list->subchannels); |
||||||
|
gpr_free(subchannel_list); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason) { |
||||||
|
gpr_ref_non_zero(&subchannel_list->refcount); |
||||||
|
if (GRPC_TRACER_ON(*subchannel_list->tracer)) { |
||||||
|
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); |
||||||
|
gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", |
||||||
|
subchannel_list->tracer->name, subchannel_list->policy, |
||||||
|
subchannel_list, (unsigned long)(count - 1), (unsigned long)count, |
||||||
|
reason); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, |
||||||
|
grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason) { |
||||||
|
const bool done = gpr_unref(&subchannel_list->refcount); |
||||||
|
if (GRPC_TRACER_ON(*subchannel_list->tracer)) { |
||||||
|
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); |
||||||
|
gpr_log(GPR_DEBUG, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", |
||||||
|
subchannel_list->tracer->name, subchannel_list->policy, |
||||||
|
subchannel_list, (unsigned long)(count + 1), (unsigned long)count, |
||||||
|
reason); |
||||||
|
} |
||||||
|
if (done) { |
||||||
|
subchannel_list_destroy(exec_ctx, subchannel_list); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_ref_for_connectivity_watch( |
||||||
|
grpc_lb_subchannel_list *subchannel_list, const char *reason) { |
||||||
|
GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); |
||||||
|
grpc_lb_subchannel_list_ref(subchannel_list, reason); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_unref_for_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason) { |
||||||
|
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason); |
||||||
|
grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); |
||||||
|
} |
||||||
|
|
||||||
|
static void subchannel_data_cancel_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { |
||||||
|
if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { |
||||||
|
gpr_log( |
||||||
|
GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||||
|
" (subchannel %p): canceling connectivity watch (%s)", |
||||||
|
sd->subchannel_list->tracer->name, sd->subchannel_list->policy, |
||||||
|
sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), |
||||||
|
sd->subchannel_list->num_subchannels, sd->subchannel, reason); |
||||||
|
} |
||||||
|
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, |
||||||
|
&sd->connectivity_changed_closure); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_shutdown_and_unref( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason) { |
||||||
|
if (GRPC_TRACER_ON(*subchannel_list->tracer)) { |
||||||
|
gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", |
||||||
|
subchannel_list->tracer->name, subchannel_list->policy, |
||||||
|
subchannel_list, reason); |
||||||
|
} |
||||||
|
GPR_ASSERT(!subchannel_list->shutting_down); |
||||||
|
subchannel_list->shutting_down = true; |
||||||
|
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { |
||||||
|
grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; |
||||||
|
// If there's a pending notification for this subchannel, cancel it;
|
||||||
|
// the callback is responsible for unreffing the subchannel.
|
||||||
|
// Otherwise, unref the subchannel directly.
|
||||||
|
if (sd->connectivity_notification_pending) { |
||||||
|
subchannel_data_cancel_connectivity_watch(exec_ctx, sd, reason); |
||||||
|
} else if (sd->subchannel != NULL) { |
||||||
|
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, reason); |
||||||
|
} |
||||||
|
} |
||||||
|
grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); |
||||||
|
} |
@ -0,0 +1,153 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||||
|
#include "src/core/ext/filters/client_channel/subchannel.h" |
||||||
|
#include "src/core/lib/debug/trace.h" |
||||||
|
#include "src/core/lib/transport/connectivity_state.h" |
||||||
|
|
||||||
|
// TODO(roth): This code is intended to be shared between pick_first and
|
||||||
|
// round_robin. However, the interface needs more work to provide clean
|
||||||
|
// encapsulation. For example, the structs here have some fields that are
|
||||||
|
// only used in one of the two (e.g., the state counters in
|
||||||
|
// grpc_lb_subchannel_list and the prev_connectivity_state field in
|
||||||
|
// grpc_lb_subchannel_data are only used in round_robin, and the
|
||||||
|
// checking_subchannel field in grpc_lb_subchannel_list is only used by
|
||||||
|
// pick_first). Also, there is probably some code duplication between the
|
||||||
|
// connectivity state notification callback code in both pick_first and
|
||||||
|
// round_robin that could be refactored and moved here. In a future PR,
|
||||||
|
// need to clean this up.
|
||||||
|
|
||||||
|
#ifdef __cplusplus |
||||||
|
extern "C" { |
||||||
|
#endif |
||||||
|
|
||||||
|
typedef struct grpc_lb_subchannel_list grpc_lb_subchannel_list; |
||||||
|
|
||||||
|
typedef struct { |
||||||
|
/** backpointer to owning subchannel list */ |
||||||
|
grpc_lb_subchannel_list *subchannel_list; |
||||||
|
/** subchannel itself */ |
||||||
|
grpc_subchannel *subchannel; |
||||||
|
grpc_connected_subchannel *connected_subchannel; |
||||||
|
/** Is a connectivity notification pending? */ |
||||||
|
bool connectivity_notification_pending; |
||||||
|
/** notification that connectivity has changed on subchannel */ |
||||||
|
grpc_closure connectivity_changed_closure; |
||||||
|
/** previous and current connectivity states. Updated by \a
|
||||||
|
* \a connectivity_changed_closure based on |
||||||
|
* \a pending_connectivity_state_unsafe. */ |
||||||
|
grpc_connectivity_state prev_connectivity_state; |
||||||
|
grpc_connectivity_state curr_connectivity_state; |
||||||
|
/** connectivity state to be updated by
|
||||||
|
* grpc_subchannel_notify_on_state_change(), not guarded by |
||||||
|
* the combiner. To be copied to \a curr_connectivity_state by |
||||||
|
* \a connectivity_changed_closure. */ |
||||||
|
grpc_connectivity_state pending_connectivity_state_unsafe; |
||||||
|
/** the subchannel's target user data */ |
||||||
|
void *user_data; |
||||||
|
/** vtable to operate over \a user_data */ |
||||||
|
const grpc_lb_user_data_vtable *user_data_vtable; |
||||||
|
} grpc_lb_subchannel_data; |
||||||
|
|
||||||
|
/// Unrefs the subchannel contained in sd.
|
||||||
|
void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, |
||||||
|
grpc_lb_subchannel_data *sd, |
||||||
|
const char *reason); |
||||||
|
|
||||||
|
/// Starts watching the connectivity state of the subchannel.
|
||||||
|
/// The connectivity_changed_cb callback must invoke either
|
||||||
|
/// grpc_lb_subchannel_data_stop_connectivity_watch() or again call
|
||||||
|
/// grpc_lb_subchannel_data_start_connectivity_watch().
|
||||||
|
void grpc_lb_subchannel_data_start_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); |
||||||
|
|
||||||
|
/// Stops watching the connectivity state of the subchannel.
|
||||||
|
void grpc_lb_subchannel_data_stop_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd); |
||||||
|
|
||||||
|
struct grpc_lb_subchannel_list { |
||||||
|
/** backpointer to owning policy */ |
||||||
|
grpc_lb_policy *policy; |
||||||
|
|
||||||
|
grpc_tracer_flag *tracer; |
||||||
|
|
||||||
|
/** all our subchannels */ |
||||||
|
size_t num_subchannels; |
||||||
|
grpc_lb_subchannel_data *subchannels; |
||||||
|
|
||||||
|
/** Index into subchannels of the one we're currently checking.
|
||||||
|
* Used when connecting to subchannels serially instead of in parallel. */ |
||||||
|
// TODO(roth): When we have time, we can probably make this go away
|
||||||
|
// and compute the index dynamically by subtracting
|
||||||
|
// subchannel_list->subchannels from the subchannel_data pointer.
|
||||||
|
size_t checking_subchannel; |
||||||
|
|
||||||
|
/** how many subchannels are in state READY */ |
||||||
|
size_t num_ready; |
||||||
|
/** how many subchannels are in state TRANSIENT_FAILURE */ |
||||||
|
size_t num_transient_failures; |
||||||
|
/** how many subchannels are in state SHUTDOWN */ |
||||||
|
size_t num_shutdown; |
||||||
|
/** how many subchannels are in state IDLE */ |
||||||
|
size_t num_idle; |
||||||
|
|
||||||
|
/** There will be one ref for each entry in subchannels for which there is a
|
||||||
|
* pending connectivity state watcher callback. */ |
||||||
|
gpr_refcount refcount; |
||||||
|
|
||||||
|
/** Is this list shutting down? This may be true due to the shutdown of the
|
||||||
|
* policy itself or because a newer update has arrived while this one hadn't |
||||||
|
* finished processing. */ |
||||||
|
bool shutting_down; |
||||||
|
}; |
||||||
|
|
||||||
|
grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, |
||||||
|
const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, |
||||||
|
grpc_iomgr_cb_func connectivity_changed_cb); |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason); |
||||||
|
|
||||||
|
void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, |
||||||
|
grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason); |
||||||
|
|
||||||
|
/// Takes and releases refs needed for a connectivity notification.
|
||||||
|
/// This includes a ref to subchannel_list and a weak ref to the LB policy.
|
||||||
|
void grpc_lb_subchannel_list_ref_for_connectivity_watch( |
||||||
|
grpc_lb_subchannel_list *subchannel_list, const char *reason); |
||||||
|
void grpc_lb_subchannel_list_unref_for_connectivity_watch( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason); |
||||||
|
|
||||||
|
/// Mark subchannel_list as discarded. Unsubscribes all its subchannels. The
|
||||||
|
/// connectivity state notification callback will ultimately unref it.
|
||||||
|
void grpc_lb_subchannel_list_shutdown_and_unref( |
||||||
|
grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, |
||||||
|
const char *reason); |
||||||
|
|
||||||
|
#ifdef __cplusplus |
||||||
|
} |
||||||
|
#endif |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */ |
@ -1 +1 @@ |
|||||||
Subproject commit 44c25c892a6229b20db7cd9dc05584ea865896de |
Subproject commit 5b7683f49e1e9223cf9927b24f6fd3d6bd82e3f8 |
Loading…
Reference in new issue