reviewer feedback

pull/16904/head
ncteisen 6 years ago
parent aa7b8e5bc6
commit 265eace8e6
  1. 58
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/lb_policy.h
  3. 5
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 13
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  5. 7
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  6. 5
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc

@ -525,7 +525,9 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
// (c) Address resolution that causes number of backends to go from
// non-zero to zero.
// (d) Address resolution that causes a new LB policy to be created.
bool trace_this_address_resolution = false;
//
// we track a list of strings to eventually be concatenated and traced.
grpc_core::InlinedVector<char*, 3> trace_strings;
grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* connectivity_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
@ -553,44 +555,51 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
chand, lb_policy_name.get(), chand->lb_policy.get());
}
// case (b) or (c)
trace_this_address_resolution =
chand->lb_policy->UpdateLocked(*chand->resolver_result);
chand->lb_policy->UpdateLocked(*chand->resolver_result);
// No need to set the channel's connectivity state; the existing
// watch on the LB policy will take care of that.
set_connectivity_state = false;
} else {
trace_this_address_resolution = true; // case (d)
// Instantiate new LB policy.
create_new_lb_policy_locked(chand, lb_policy_name.get(),
&connectivity_state, &connectivity_error);
// we also log the name of the new LB policy in addition to logging this
// resolution event.
if (chand->channelz_channel != nullptr) {
char* str;
gpr_asprintf(&str, "Switched LB policy to %s", lb_policy_name.get());
chand->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_copied_string(str));
gpr_free(str);
trace_strings.push_back(str);
}
}
// Find service config.
grpc_core::UniquePtr<char> service_config_json =
get_service_config_from_resolver_result_locked(chand);
if ((service_config_json == nullptr &&
chand->info_service_config_json != nullptr) ||
(service_config_json != nullptr &&
chand->info_service_config_json == nullptr) ||
(service_config_json != nullptr &&
chand->info_service_config_json != nullptr &&
gpr_stricmp(service_config_json.get(),
chand->info_service_config_json.get()) != 0)) {
trace_this_address_resolution = true; // case (a)
if (chand->channelz_channel != nullptr) {
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
if (chand->channelz_channel != nullptr) {
if ((service_config_json == nullptr &&
chand->info_service_config_json != nullptr) ||
(service_config_json != nullptr &&
chand->info_service_config_json == nullptr) ||
(service_config_json != nullptr &&
chand->info_service_config_json != nullptr &&
strcmp(service_config_json.get(),
chand->info_service_config_json.get()) != 0)) {
trace_strings.push_back(gpr_strdup("Service config reloaded"));
}
//TODO
// const grpc_arg* channel_arg =
// grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
// if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
// grpc_lb_addresses* addresses =
// static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
// } else {
// }
if (!trace_strings.empty()) {
// TODO, concatenate the strings from trace_strings.
chand->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("Service config reloaded"));
grpc_slice_from_static_string("New addresses resolved"));
}
}
// Swap out the data used by cc_get_channel_info().
@ -598,11 +607,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
chand->info_lb_policy_name = std::move(lb_policy_name);
chand->info_service_config_json = std::move(service_config_json);
gpr_mu_unlock(&chand->info_mu);
if (trace_this_address_resolution && chand->channelz_channel != nullptr) {
chand->channelz_channel->AddTraceEvent(
grpc_core::channelz::ChannelTrace::Severity::Info,
grpc_slice_from_static_string("New addresses resolved"));
}
// Clean up.
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;

@ -95,9 +95,7 @@ class LoadBalancingPolicy
/// Updates the policy with a new set of \a args from the resolver.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_LB_ADDRESSES channel arg.
/// Returns true if the update caused the number of backends to go from
/// zero to non-zero, or non-zero to zero.
virtual bool UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT;
virtual void UpdateLocked(const grpc_channel_args& args) GRPC_ABSTRACT;
/// Finds an appropriate subchannel for a call, based on data in \a pick.
/// \a pick must remain alive until the pick is complete.

@ -123,7 +123,7 @@ class GrpcLb : public LoadBalancingPolicy {
public:
GrpcLb(const grpc_lb_addresses* addresses, const Args& args);
bool UpdateLocked(const grpc_channel_args& args) override;
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@ -1331,7 +1331,7 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
bool GrpcLb::UpdateLocked(const grpc_channel_args& args) {
void GrpcLb::UpdateLocked(const grpc_channel_args& args) {
ProcessChannelArgsLocked(args);
// If fallback is configured and the RR policy already exists, update
// it with the new fallback addresses.
@ -1358,7 +1358,6 @@ bool GrpcLb::UpdateLocked(const grpc_channel_args& args) {
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
}
return false; // TODO
}
//

@ -46,7 +46,7 @@ class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
bool UpdateLocked(const grpc_channel_args& args) override;
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@ -333,7 +333,7 @@ void PickFirst::UpdateChildRefsLocked() {
child_subchannels_ = std::move(cs);
}
bool PickFirst::UpdateLocked(const grpc_channel_args& args) {
void PickFirst::UpdateLocked(const grpc_channel_args& args) {
AutoChildRefsUpdater guard(this);
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
if (arg == nullptr || arg->type != GRPC_ARG_POINTER) {
@ -350,12 +350,10 @@ bool PickFirst::UpdateLocked(const grpc_channel_args& args) {
"ignoring.",
this);
}
return false;
return;
}
const grpc_lb_addresses* addresses =
static_cast<const grpc_lb_addresses*>(arg->value.pointer.p);
size_t old_size =
subchannel_list_ == nullptr ? 0 : subchannel_list_->num_subchannels();
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO,
"Pick First %p received update with %" PRIuPTR " addresses", this,
@ -373,7 +371,7 @@ bool PickFirst::UpdateLocked(const grpc_channel_args& args) {
"pf_update_empty");
subchannel_list_ = std::move(subchannel_list); // Empty list.
selected_ = nullptr;
return old_size != 0;
return;
}
if (selected_ == nullptr) {
// We don't yet have a selected subchannel, so replace the current
@ -413,7 +411,7 @@ bool PickFirst::UpdateLocked(const grpc_channel_args& args) {
// not have contained the currently selected subchannel), drop
// it, so that it doesn't override what we've done here.
latest_pending_subchannel_list_.reset();
return false;
return;
}
GRPC_ERROR_UNREF(error);
}
@ -439,7 +437,6 @@ bool PickFirst::UpdateLocked(const grpc_channel_args& args) {
->CheckConnectivityStateAndStartWatchingLocked();
}
}
return old_size == 0;
}
void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(

@ -57,7 +57,7 @@ class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
bool UpdateLocked(const grpc_channel_args& args) override;
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@ -664,7 +664,7 @@ void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current,
notify);
}
bool RoundRobin::UpdateLocked(const grpc_channel_args& args) {
void RoundRobin::UpdateLocked(const grpc_channel_args& args) {
const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES);
AutoChildRefsUpdater guard(this);
if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) {
@ -677,7 +677,7 @@ bool RoundRobin::UpdateLocked(const grpc_channel_args& args) {
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"),
"rr_update_missing");
}
return false; // TODO
return;
}
grpc_lb_addresses* addresses =
static_cast<grpc_lb_addresses*>(arg->value.pointer.p);
@ -711,7 +711,6 @@ bool RoundRobin::UpdateLocked(const grpc_channel_args& args) {
// If we've started picking, start watching the new list.
latest_pending_subchannel_list_->StartWatchingLocked();
}
return false; // todo
}
//

@ -123,7 +123,7 @@ class XdsLb : public LoadBalancingPolicy {
public:
XdsLb(const grpc_lb_addresses* addresses, const Args& args);
bool UpdateLocked(const grpc_channel_args& args) override;
void UpdateLocked(const grpc_channel_args& args) override;
bool PickLocked(PickState* pick, grpc_error** error) override;
void CancelPickLocked(PickState* pick, grpc_error* error) override;
void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
@ -1325,7 +1325,7 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
bool XdsLb::UpdateLocked(const grpc_channel_args& args) {
void XdsLb::UpdateLocked(const grpc_channel_args& args) {
ProcessChannelArgsLocked(args);
// If fallback is configured and the RR policy already exists, update
// it with the new fallback addresses.
@ -1352,7 +1352,6 @@ bool XdsLb::UpdateLocked(const grpc_channel_args& args) {
&lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
nullptr);
}
return false; // TODO
}
//

Loading…
Cancel
Save