Merge pull request #15896 from markdroth/client_channel_refactor

Refactor resolver result callback
pull/15991/head
Mark D. Roth 7 years ago committed by GitHub
commit 4decd144dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 397
      src/core/ext/filters/client_channel/client_channel.cc
  2. 8
      src/core/ext/filters/client_channel/lb_policy_factory.cc
  3. 4
      src/core/ext/filters/client_channel/lb_policy_factory.h

@ -126,9 +126,9 @@ typedef struct client_channel_channel_data {
/* the following properties are guarded by a mutex since APIs require them /* the following properties are guarded by a mutex since APIs require them
to be instantaneously available */ to be instantaneously available */
gpr_mu info_mu; gpr_mu info_mu;
char* info_lb_policy_name; grpc_core::UniquePtr<char> info_lb_policy_name;
/** service config in JSON form */ /** service config in JSON form */
char* info_service_config_json; grpc_core::UniquePtr<char> info_service_config_json;
} channel_data; } channel_data;
typedef struct { typedef struct {
@ -284,51 +284,53 @@ static void parse_retry_throttle_params(
} }
} }
static void request_reresolution_locked(void* arg, grpc_error* error) { // Invoked from the resolver NextLocked() callback when the resolver
reresolution_request_args* args = // is shutting down.
static_cast<reresolution_request_args*>(arg); static void on_resolver_shutdown_locked(channel_data* chand,
channel_data* chand = args->chand; grpc_error* error) {
// If this invocation is for a stale LB policy, treat it as an LB shutdown if (grpc_client_channel_trace.enabled()) {
// signal. gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
gpr_free(args);
return;
} }
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand); gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
chand->lb_policy.get());
} }
chand->resolver->RequestReresolutionLocked(); grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
// Give back the closure to the LB policy. chand->interested_parties);
chand->lb_policy->SetReresolutionClosureLocked(&args->closure); chand->lb_policy.reset();
}
if (chand->resolver != nullptr) {
// This should never happen; it can only be triggered by a resolver
// implementation spotaneously deciding to report shutdown without
// being orphaned. This code is included just to be defensive.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
chand, chand->resolver.get());
}
chand->resolver.reset();
set_channel_connectivity_state_locked(
chand, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver spontaneous shutdown", &error, 1),
"resolver_spontaneous_shutdown");
}
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Channel disconnected", &error, 1));
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
GRPC_ERROR_UNREF(error);
} }
// TODO(roth): The logic in this function is very hard to follow. We // Returns the LB policy name from the resolver result.
// should refactor this so that it's easier to understand, perhaps as static grpc_core::UniquePtr<char>
// part of changing the resolver API to more clearly differentiate get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
// between transient failures and shutdown. // Find LB policy name in channel args.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { const grpc_arg* channel_arg =
channel_data* chand = static_cast<channel_data*>(arg); grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p: got resolver result: resolver_result=%p error=%s", chand,
chand->resolver_result, grpc_error_string(error));
}
// Extract the following fields from the resolver result, if non-nullptr.
bool lb_policy_updated = false;
bool lb_policy_created = false;
char* lb_policy_name_dup = nullptr;
bool lb_policy_name_changed = false;
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy;
char* service_config_json = nullptr;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
if (chand->resolver_result != nullptr) {
if (chand->resolver != nullptr) {
// Find LB policy name.
const grpc_arg* channel_arg = grpc_channel_args_find(
chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg); const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
// Special case: If at least one balancer address is present, we use // Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver actually specified. // the grpclb policy, regardless of what the resolver actually specified.
@ -337,16 +339,9 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) { if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses* addresses = grpc_lb_addresses* addresses =
static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p); static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
bool found_balancer_address = false; if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) {
found_balancer_address = true;
break;
}
}
if (found_balancer_address) {
if (lb_policy_name != nullptr && if (lb_policy_name != nullptr &&
strcmp(lb_policy_name, "grpclb") != 0) { gpr_stricmp(lb_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one " "resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy", "balancer address -- forcing use of grpclb LB policy",
@ -358,59 +353,105 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// Use pick_first if nothing was specified and we didn't select grpclb // Use pick_first if nothing was specified and we didn't select grpclb
// above. // above.
if (lb_policy_name == nullptr) lb_policy_name = "pick_first"; if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
// Check to see if we're already using the right LB policy. return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
// Note: It's safe to use chand->info_lb_policy_name here without }
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked static void request_reresolution_locked(void* arg, grpc_error* error) {
// once at any given time. reresolution_request_args* args =
lb_policy_name_changed = static_cast<reresolution_request_args*>(arg);
chand->info_lb_policy_name == nullptr || channel_data* chand = args->chand;
gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0; // If this invocation is for a stale LB policy, treat it as an LB shutdown
if (chand->lb_policy != nullptr && !lb_policy_name_changed) { // signal.
// Continue using the same LB policy. Update with new addresses. if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
lb_policy_updated = true; chand->resolver == nullptr) {
chand->lb_policy->UpdateLocked(*chand->resolver_result); GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
} else { gpr_free(args);
// Instantiate new LB policy. return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
}
chand->resolver->RequestReresolutionLocked();
// Give back the closure to the LB policy.
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
// Creates a new LB policy, replacing any previous one.
// If the new policy is created successfully, sets *connectivity_state and
// *connectivity_error to its initial connectivity state; otherwise,
// leaves them unchanged.
static void create_new_lb_policy_locked(
channel_data* chand, char* lb_policy_name,
grpc_connectivity_state* connectivity_state,
grpc_error** connectivity_error) {
grpc_core::LoadBalancingPolicy::Args lb_policy_args; grpc_core::LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = chand->combiner; lb_policy_args.combiner = chand->combiner;
lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.args = chand->resolver_result; lb_policy_args.args = chand->resolver_result;
new_lb_policy = grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
lb_policy_name, lb_policy_args); lb_policy_name, lb_policy_args);
if (GPR_UNLIKELY(new_lb_policy == nullptr)) { if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
lb_policy_name);
} else { } else {
lb_policy_created = true; if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
lb_policy_name, new_lb_policy.get());
}
// Swap out the LB policy and update the fds in
// chand->interested_parties.
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
chand->lb_policy.get());
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
}
chand->lb_policy = std::move(new_lb_policy);
grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
// Set up re-resolution callback.
reresolution_request_args* args = reresolution_request_args* args =
static_cast<reresolution_request_args*>( static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
gpr_zalloc(sizeof(*args)));
args->chand = chand; args->chand = chand;
args->lb_policy = new_lb_policy.get(); args->lb_policy = chand->lb_policy.get();
GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args, GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
grpc_combiner_scheduler(chand->combiner)); grpc_combiner_scheduler(chand->combiner));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution"); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
new_lb_policy->SetReresolutionClosureLocked(&args->closure); chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
// Get the new LB policy's initial connectivity state and start a
// connectivity watch.
GRPC_ERROR_UNREF(*connectivity_error);
*connectivity_state =
chand->lb_policy->CheckConnectivityLocked(connectivity_error);
if (chand->exit_idle_when_lb_policy_arrives) {
chand->lb_policy->ExitIdleLocked();
chand->exit_idle_when_lb_policy_arrives = false;
} }
watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
} }
// Before we clean up, save a copy of lb_policy_name, since it might }
// be pointing to data inside chand->resolver_result.
// The copy will be saved in chand->lb_policy_name below. // Returns the service config (as a JSON string) from the resolver result.
lb_policy_name_dup = gpr_strdup(lb_policy_name); // Also updates state in chand.
// Find service config. static grpc_core::UniquePtr<char>
channel_arg = grpc_channel_args_find(chand->resolver_result, get_service_config_from_resolver_result_locked(channel_data* chand) {
GRPC_ARG_SERVICE_CONFIG); const grpc_arg* channel_arg =
service_config_json = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
gpr_strdup(grpc_channel_arg_get_string(channel_arg)); const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
chand, service_config_json);
}
grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config = grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
grpc_core::ServiceConfig::Create(service_config_json); grpc_core::ServiceConfig::Create(service_config_json);
if (service_config != nullptr) { if (service_config != nullptr) {
if (chand->enable_retries) { if (chand->enable_retries) {
channel_arg = grpc_channel_args_find(chand->resolver_result, channel_arg =
GRPC_ARG_SERVER_URI); grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(channel_arg); const char* server_uri = grpc_channel_arg_get_string(channel_arg);
GPR_ASSERT(server_uri != nullptr); GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true); grpc_uri* uri = grpc_uri_parse(server_uri, true);
@ -422,116 +463,96 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
service_config->ParseGlobalParams(parse_retry_throttle_params, service_config->ParseGlobalParams(parse_retry_throttle_params,
&parsing_state); &parsing_state);
grpc_uri_destroy(uri); grpc_uri_destroy(uri);
retry_throttle_data = std::move(parsing_state.retry_throttle_data); chand->retry_throttle_data =
std::move(parsing_state.retry_throttle_data);
} }
method_params_table = service_config->CreateMethodConfigTable( chand->method_params_table = service_config->CreateMethodConfigTable(
ClientChannelMethodParams::CreateFromJson); ClientChannelMethodParams::CreateFromJson);
} }
} }
} return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
} }
// Callback invoked when a resolver result is available.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
const char* disposition =
chand->resolver_result != nullptr
? ""
: (error == GRPC_ERROR_NONE ? " (transient error)"
: " (resolver shutdown)");
gpr_log(GPR_INFO, gpr_log(GPR_INFO,
"chand=%p: resolver result: lb_policy_name=\"%s\"%s, " "chand=%p: got resolver result: resolver_result=%p error=%s%s",
"service_config=\"%s\"", chand, chand->resolver_result, grpc_error_string(error),
chand, lb_policy_name_dup, disposition);
lb_policy_name_changed ? " (changed)" : "", service_config_json);
}
// Now swap out fields in chand. Note that the new values may still
// be nullptr if (e.g.) the resolver failed to return results or the
// results did not contain the necessary data.
//
// First, swap out the data used by cc_get_channel_info().
gpr_mu_lock(&chand->info_mu);
if (lb_policy_name_dup != nullptr) {
gpr_free(chand->info_lb_policy_name);
chand->info_lb_policy_name = lb_policy_name_dup;
} }
if (service_config_json != nullptr) { // Handle shutdown.
gpr_free(chand->info_service_config_json);
chand->info_service_config_json = service_config_json;
}
gpr_mu_unlock(&chand->info_mu);
// Swap out the retry throttle data.
chand->retry_throttle_data = std::move(retry_throttle_data);
// Swap out the method params table.
chand->method_params_table = std::move(method_params_table);
// If we have a new LB policy or are shutting down (in which case
// new_lb_policy will be nullptr), swap out the LB policy, unreffing the
// old one and removing its fds from chand->interested_parties.
// Note that we do NOT do this if either (a) we updated the existing
// LB policy above or (b) we failed to create the new LB policy (in
// which case we want to continue using the most recent one we had).
if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
if (chand->lb_policy != nullptr) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: unreffing lb_policy=%p", chand,
chand->lb_policy.get());
}
grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
chand->lb_policy.reset();
}
chand->lb_policy = std::move(new_lb_policy);
}
// Now that we've swapped out the relevant fields of chand, check for
// error or shutdown.
if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) { if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
return;
}
// Data used to set the channel's connectivity state.
bool set_connectivity_state = true;
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* connectivity_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
// chand->resolver_result will be null in the case of a transient
// resolution error. In that case, we don't have any new result to
// process, which means that we keep using the previous result (if any).
if (chand->resolver_result == nullptr) {
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: shutting down", chand); gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
} }
if (chand->resolver != nullptr) { } else {
grpc_core::UniquePtr<char> lb_policy_name =
get_lb_policy_name_from_resolver_result_locked(chand);
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
gpr_stricmp(chand->info_lb_policy_name.get(),
lb_policy_name.get()) != 0;
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
if (grpc_client_channel_trace.enabled()) { if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: shutting down resolver", chand); gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
chand, lb_policy_name.get(), chand->lb_policy.get());
} }
chand->resolver.reset(); chand->lb_policy->UpdateLocked(*chand->resolver_result);
// No need to set the channel's connectivity state; the existing
// watch on the LB policy will take care of that.
set_connectivity_state = false;
} else {
// Instantiate new LB policy.
create_new_lb_policy_locked(chand, lb_policy_name.get(),
&connectivity_state, &connectivity_error);
} }
set_channel_connectivity_state_locked( // Find service config.
chand, GRPC_CHANNEL_SHUTDOWN, grpc_core::UniquePtr<char> service_config_json =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( get_service_config_from_resolver_result_locked(chand);
"Got resolver result after disconnection", &error, 1), // Swap out the data used by cc_get_channel_info().
"resolver_gone"); gpr_mu_lock(&chand->info_mu);
grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, chand->info_lb_policy_name = std::move(lb_policy_name);
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( chand->info_service_config_json = std::move(service_config_json);
"Channel disconnected", &error, 1)); gpr_mu_unlock(&chand->info_mu);
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); // Clean up.
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
grpc_channel_args_destroy(chand->resolver_result); grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr; chand->resolver_result = nullptr;
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (lb_policy_created) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: initializing new LB policy", chand);
} }
GRPC_ERROR_UNREF(state_error); // Set the channel's connectivity state if needed.
state = chand->lb_policy->CheckConnectivityLocked(&state_error); if (set_connectivity_state) {
grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
chand->interested_parties);
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
if (chand->exit_idle_when_lb_policy_arrives) {
chand->lb_policy->ExitIdleLocked();
chand->exit_idle_when_lb_policy_arrives = false;
}
watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
} else if (chand->resolver_result == nullptr) {
// Transient failure.
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (!lb_policy_updated) {
set_channel_connectivity_state_locked( set_channel_connectivity_state_locked(
chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); chand, connectivity_state, connectivity_error, "resolver_result");
} else {
GRPC_ERROR_UNREF(connectivity_error);
} }
grpc_channel_args_destroy(chand->resolver_result); // Invoke closures that were waiting for results and renew the watch.
chand->resolver_result = nullptr; GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
chand->resolver->NextLocked(&chand->resolver_result, chand->resolver->NextLocked(&chand->resolver_result,
&chand->on_resolver_result_changed); &chand->on_resolver_result_changed);
GRPC_ERROR_UNREF(state_error);
}
} }
static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
@ -611,15 +632,11 @@ static void cc_get_channel_info(grpc_channel_element* elem,
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
gpr_mu_lock(&chand->info_mu); gpr_mu_lock(&chand->info_mu);
if (info->lb_policy_name != nullptr) { if (info->lb_policy_name != nullptr) {
*info->lb_policy_name = chand->info_lb_policy_name == nullptr *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
? nullptr
: gpr_strdup(chand->info_lb_policy_name);
} }
if (info->service_config_json != nullptr) { if (info->service_config_json != nullptr) {
*info->service_config_json = *info->service_config_json =
chand->info_service_config_json == nullptr gpr_strdup(chand->info_service_config_json.get());
? nullptr
: gpr_strdup(chand->info_service_config_json);
} }
gpr_mu_unlock(&chand->info_mu); gpr_mu_unlock(&chand->info_mu);
} }
@ -699,19 +716,15 @@ static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void shutdown_resolver_locked(void* arg, grpc_error* error) {
grpc_core::Resolver* resolver = static_cast<grpc_core::Resolver*>(arg);
resolver->Orphan();
}
/* Destructor for channel_data */ /* Destructor for channel_data */
static void cc_destroy_channel_elem(grpc_channel_element* elem) { static void cc_destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data); channel_data* chand = static_cast<channel_data*>(elem->channel_data);
if (chand->resolver != nullptr) { if (chand->resolver != nullptr) {
GRPC_CLOSURE_SCHED( // The only way we can get here is if we never started resolving,
GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver.release(), // because we take a ref to the channel stack when we start
grpc_combiner_scheduler(chand->combiner)), // resolving and do not release it until the resolver callback is
GRPC_ERROR_NONE); // invoked after the resolver shuts down.
chand->resolver.reset();
} }
if (chand->client_channel_factory != nullptr) { if (chand->client_channel_factory != nullptr) {
grpc_client_channel_factory_unref(chand->client_channel_factory); grpc_client_channel_factory_unref(chand->client_channel_factory);
@ -721,8 +734,10 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) {
chand->interested_parties); chand->interested_parties);
chand->lb_policy.reset(); chand->lb_policy.reset();
} }
gpr_free(chand->info_lb_policy_name); // TODO(roth): Once we convert the filter API to C++, there will no
gpr_free(chand->info_service_config_json); // longer be any need to explicitly reset these smart pointer data members.
chand->info_lb_policy_name.reset();
chand->info_service_config_json.reset();
chand->retry_throttle_data.reset(); chand->retry_throttle_data.reset();
chand->method_params_table.reset(); chand->method_params_table.reset();
grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_client_channel_stop_backup_polling(chand->interested_parties);

@ -153,3 +153,11 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
return nullptr; return nullptr;
return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p); return static_cast<grpc_lb_addresses*>(lb_addresses_arg->value.pointer.p);
} }
bool grpc_lb_addresses_contains_balancer_address(
const grpc_lb_addresses& addresses) {
for (size_t i = 0; i < addresses.num_addresses; ++i) {
if (addresses.addresses[i].is_balancer) return true;
}
return false;
}

@ -101,6 +101,10 @@ grpc_arg grpc_lb_addresses_create_channel_arg(
grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
const grpc_channel_args* channel_args); const grpc_channel_args* channel_args);
// Returns true if addresses contains at least one balancer address.
bool grpc_lb_addresses_contains_balancer_address(
const grpc_lb_addresses& addresses);
// //
// LB policy factory // LB policy factory
// //

Loading…
Cancel
Save