LB policy API improvements (#26481)

* LB policy API improvements

* clang-format

* fix build

* a bit more cleanup

* use absl::variant<> for pick result

* fix retry_lb_drop test

* clang-format

* fix grpclb_end2end_test

* fix xds_end2end_test

* try to make variant code a bit cleaner

* clang-format

* fix memory leak

* fix build

* clang-format

* fix error refcount bug

* remove PickResult factory functions

* clang-format

* add ctors to structs

* clang-format

* fix clang-tidy

* update comments

* move LB recv_trailing_metadata callback instead of copying it

* use Match() instead of providing PickResult::Handle()

* don't use Match() for now, since it breaks lock annotations

* update retry_lb_fail test
pull/26867/head
Mark D. Roth 3 years ago committed by GitHub
parent 826da6136c
commit 6b71ec3704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 240
      src/core/ext/filters/client_channel/client_channel.cc
  2. 2
      src/core/ext/filters/client_channel/client_channel.h
  3. 16
      src/core/ext/filters/client_channel/lb_policy.cc
  4. 116
      src/core/ext/filters/client_channel/lb_policy.h
  5. 15
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 33
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 8
      src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
  8. 51
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  9. 23
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  10. 9
      src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
  11. 20
      src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
  12. 36
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
  13. 11
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
  14. 13
      src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
  15. 9
      test/core/end2end/tests/retry_lb_drop.cc
  16. 20
      test/core/end2end/tests/retry_lb_fail.cc
  17. 15
      test/core/util/test_lb_policies.cc
  18. 10
      test/cpp/end2end/grpclb_end2end_test.cc
  19. 32
      test/cpp/end2end/xds_end2end_test.cc

@ -1356,11 +1356,12 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
grpc_error_handle state_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
absl::Status status = grpc_error_to_absl_status(state_error);
{
MutexLock lock(&resolution_mu_);
// Update resolver transient failure.
GRPC_ERROR_UNREF(resolver_transient_failure_error_);
resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
resolver_transient_failure_error_ = state_error;
// Process calls that were queued waiting for the resolver result.
for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
call = call->next) {
@ -1374,10 +1375,8 @@ void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
}
// Update connectivity state.
UpdateStateAndPickerLocked(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
"resolver failure",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
state_error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status, "resolver failure",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(status));
}
GRPC_ERROR_UNREF(error);
}
@ -1688,6 +1687,40 @@ void ClientChannel::UpdateStateAndPickerLocked(
pending_subchannel_updates_.clear();
}
namespace {
// TODO(roth): Remove this in favor of the gprpp Match() function once
// we can do that without breaking lock annotations.
template <typename T>
T HandlePickResult(
LoadBalancingPolicy::PickResult* result,
std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
auto* complete_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
if (complete_pick != nullptr) {
return complete_func(complete_pick);
}
auto* queue_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
if (queue_pick != nullptr) {
return queue_func(queue_pick);
}
auto* fail_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
if (fail_pick != nullptr) {
return fail_func(fail_pick);
}
auto* drop_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
GPR_ASSERT(drop_pick != nullptr);
return drop_func(drop_pick);
}
} // namespace
grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
if (state_tracker_.state() != GRPC_CHANNEL_READY) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
@ -1697,21 +1730,31 @@ grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
MutexLock lock(&data_plane_mu_);
result = picker_->Pick(LoadBalancingPolicy::PickArgs());
}
ConnectedSubchannel* connected_subchannel = nullptr;
if (result.subchannel != nullptr) {
SubchannelWrapper* subchannel =
static_cast<SubchannelWrapper*>(result.subchannel.get());
connected_subchannel = subchannel->connected_subchannel();
}
if (connected_subchannel != nullptr) {
connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
} else {
if (result.error == GRPC_ERROR_NONE) {
result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"LB policy dropped call on ping");
}
}
return result.error;
return HandlePickResult<grpc_error_handle>(
&result,
// Complete pick.
[op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
complete_pick->subchannel.get());
ConnectedSubchannel* connected_subchannel =
subchannel->connected_subchannel();
connected_subchannel->Ping(op->send_ping.on_initiate,
op->send_ping.on_ack);
return GRPC_ERROR_NONE;
},
// Queue pick.
[](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("LB picker queued call");
},
// Fail pick.
[](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
return absl_status_to_grpc_error(fail_pick->status);
},
// Drop pick.
[](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
return absl_status_to_grpc_error(drop_pick->status);
});
}
void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
@ -1766,7 +1809,7 @@ void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
UpdateStateAndPickerLocked(
GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
GRPC_ERROR_REF(op->disconnect_with_error)));
grpc_error_to_absl_status(op->disconnect_with_error)));
}
}
GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
@ -2919,12 +2962,10 @@ void ClientChannel::LoadBalancedCall::RecvTrailingMetadataReady(
// If the LB policy requested a callback for trailing metadata, invoke
// the callback.
if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
grpc_error_handle error_for_lb = absl_status_to_grpc_error(status);
Metadata trailing_metadata(self, self->recv_trailing_metadata_);
LbCallState lb_call_state(self);
self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
self->lb_recv_trailing_metadata_ready_(status, &trailing_metadata,
&lb_call_state);
GRPC_ERROR_UNREF(error_for_lb);
}
}
// Chain to original callback.
@ -3057,23 +3098,6 @@ void ClientChannel::LoadBalancedCall::PickDone(void* arg,
self->CreateSubchannelCall();
}
namespace {
const char* PickResultTypeName(
LoadBalancingPolicy::PickResult::ResultType type) {
switch (type) {
case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
return "COMPLETE";
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
return "QUEUE";
case LoadBalancingPolicy::PickResult::PICK_FAILED:
return "FAILED";
}
GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
} // namespace
void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg,
grpc_error_handle error) {
auto* self = static_cast<LoadBalancedCall*>(arg);
@ -3107,64 +3131,82 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
Metadata initial_metadata(this, initial_metadata_batch);
pick_args.initial_metadata = &initial_metadata;
auto result = chand_->picker_->Pick(pick_args);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(
GPR_INFO,
"chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
grpc_error_std_string(result.error).c_str());
}
switch (result.type) {
case LoadBalancingPolicy::PickResult::PICK_FAILED: {
// If we're shutting down, fail all RPCs.
grpc_error_handle disconnect_error = chand_->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(result.error);
MaybeRemoveCallFromLbQueuedCallsLocked();
*error = GRPC_ERROR_REF(disconnect_error);
return true;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if ((send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
grpc_error_handle new_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to pick subchannel", &result.error, 1);
GRPC_ERROR_UNREF(result.error);
*error = new_error;
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
GRPC_ERROR_UNREF(result.error);
}
// Fallthrough
case LoadBalancingPolicy::PickResult::PICK_QUEUE:
MaybeAddCallToLbQueuedCallsLocked();
return false;
default: // PICK_COMPLETE
MaybeRemoveCallFromLbQueuedCallsLocked();
// Handle drops.
if (GPR_UNLIKELY(result.subchannel == nullptr)) {
result.error = grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE),
GRPC_ERROR_INT_LB_POLICY_DROP, 1);
} else {
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
connected_subchannel_ =
chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
*error = result.error;
return true;
}
return HandlePickResult<bool>(
&result,
// CompletePick
[this](LoadBalancingPolicy::PickResult::Complete* complete_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
chand_, this, complete_pick->subchannel.get());
}
GPR_ASSERT(complete_pick->subchannel != nullptr);
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
connected_subchannel_ = chand_->GetConnectedSubchannelInDataPlane(
complete_pick->subchannel.get());
GPR_ASSERT(connected_subchannel_ != nullptr);
lb_recv_trailing_metadata_ready_ =
std::move(complete_pick->recv_trailing_metadata_ready);
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
},
// QueuePick
[this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
this);
}
MaybeAddCallToLbQueuedCallsLocked();
return false;
},
// FailPick
[this, send_initial_metadata_flags,
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s",
chand_, this, fail_pick->status.ToString().c_str());
}
// If we're shutting down, fail all RPCs.
grpc_error_handle disconnect_error = chand_->disconnect_error();
if (disconnect_error != GRPC_ERROR_NONE) {
MaybeRemoveCallFromLbQueuedCallsLocked();
*error = GRPC_ERROR_REF(disconnect_error);
return true;
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if ((send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
grpc_error_handle lb_error =
absl_status_to_grpc_error(fail_pick->status);
*error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to pick subchannel", &lb_error, 1);
GRPC_ERROR_UNREF(lb_error);
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
MaybeAddCallToLbQueuedCallsLocked();
return false;
},
// DropPick
[this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s",
chand_, this, drop_pick->status.ToString().c_str());
}
*error =
grpc_error_set_int(absl_status_to_grpc_error(drop_pick->status),
GRPC_ERROR_INT_LB_POLICY_DROP, 1);
MaybeRemoveCallFromLbQueuedCallsLocked();
return true;
});
}
} // namespace grpc_core

@ -494,7 +494,7 @@ class ClientChannel::LoadBalancedCall
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
std::function<void(grpc_error_handle, LoadBalancingPolicy::MetadataInterface*,
std::function<void(absl::Status, LoadBalancingPolicy::MetadataInterface*,
LoadBalancingPolicy::CallState*)>
lb_recv_trailing_metadata_ready_;

@ -125,21 +125,7 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
parent, nullptr),
GRPC_ERROR_NONE);
}
PickResult result;
result.type = PickResult::PICK_QUEUE;
return result;
}
//
// LoadBalancingPolicy::TransientFailurePicker
//
LoadBalancingPolicy::PickResult
LoadBalancingPolicy::TransientFailurePicker::Pick(PickArgs /*args*/) {
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error = GRPC_ERROR_REF(error_);
return result;
return PickResult::Queue();
}
} // namespace grpc_core

@ -26,6 +26,7 @@
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/variant.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
@ -204,47 +205,69 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// The result of picking a subchannel for a call.
struct PickResult {
enum ResultType {
/// Pick complete. If \a subchannel is non-null, the client channel
/// will immediately proceed with the call on that subchannel;
/// otherwise, it will drop the call.
PICK_COMPLETE,
/// Pick cannot be completed until something changes on the control
/// plane. The client channel will queue the pick and try again the
/// next time the picker is updated.
PICK_QUEUE,
/// Pick failed. If the call is wait_for_ready, the client channel
/// will wait for the next picker and try again; otherwise, it
/// will immediately fail the call with the status indicated via
/// \a error (although the call may be retried if the client channel
/// is configured to do so).
PICK_FAILED,
/// A successful pick.
struct Complete {
/// The subchannel to be used for the call. Must be non-null.
RefCountedPtr<SubchannelInterface> subchannel;
/// Callback set by LB policy to be notified of trailing metadata.
/// If non-null, the client channel will invoke the callback
/// when trailing metadata is returned.
/// The metadata may be modified by the callback. However, the callback
/// does not take ownership, so any data that needs to be used after
/// returning must be copied.
/// The call state can be used to obtain backend metric data.
// TODO(roth): The arguments to this callback should be moved into a
// struct, so that we can later add new fields without breaking
// existing implementations.
std::function<void(absl::Status, MetadataInterface*, CallState*)>
recv_trailing_metadata_ready;
explicit Complete(
RefCountedPtr<SubchannelInterface> sc,
std::function<void(absl::Status, MetadataInterface*, CallState*)> cb =
nullptr)
: subchannel(std::move(sc)),
recv_trailing_metadata_ready(std::move(cb)) {}
};
ResultType type;
/// Used only if type is PICK_COMPLETE. Will be set to the selected
/// subchannel, or nullptr if the LB policy decides to drop the call.
RefCountedPtr<SubchannelInterface> subchannel;
/// Used only if type is PICK_FAILED.
/// Error to be set when returning a failure.
// TODO(roth): Replace this with something similar to grpc::Status,
// so that we don't expose grpc_error to this API.
grpc_error_handle error = GRPC_ERROR_NONE;
/// Used only if type is PICK_COMPLETE.
/// Callback set by LB policy to be notified of trailing metadata.
/// If set by LB policy, the client channel will invoke the callback
/// when trailing metadata is returned.
/// The metadata may be modified by the callback. However, the callback
/// does not take ownership, so any data that needs to be used after
/// returning must be copied.
/// The call state can be used to obtain backend metric data.
// TODO(roth): The arguments to this callback should be moved into a
// struct, so that we can later add new fields without breaking
// existing implementations.
std::function<void(grpc_error_handle, MetadataInterface*, CallState*)>
recv_trailing_metadata_ready;
/// Pick cannot be completed until something changes on the control
/// plane. The client channel will queue the pick and try again the
/// next time the picker is updated.
struct Queue {};
/// Pick failed. If the call is wait_for_ready, the client channel
/// will wait for the next picker and try again; otherwise, it
/// will immediately fail the call with the status indicated (although
/// the call may be retried if the client channel is configured to do so).
struct Fail {
absl::Status status;
explicit Fail(absl::Status s) : status(s) {}
};
/// Pick will be dropped with the status specified.
/// Unlike FailPick, the call will be dropped even if it is
/// wait_for_ready, and retries (if configured) will be inhibited.
struct Drop {
absl::Status status;
explicit Drop(absl::Status s) : status(s) {}
};
// A pick result must be one of these types.
// Default to Queue, just to allow default construction.
absl::variant<Complete, Queue, Fail, Drop> result = Queue();
PickResult() = default;
// NOLINTNEXTLINE(google-explicit-constructor)
PickResult(Complete complete) : result(std::move(complete)) {}
// NOLINTNEXTLINE(google-explicit-constructor)
PickResult(Queue queue) : result(queue) {}
// NOLINTNEXTLINE(google-explicit-constructor)
PickResult(Fail fail) : result(std::move(fail)) {}
// NOLINTNEXTLINE(google-explicit-constructor)
PickResult(Drop drop) : result(std::move(drop)) {}
};
/// A subchannel picker is the object used to pick the subchannel to
@ -367,7 +390,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
// Note: This must be invoked while holding the work_serializer.
void Orphan() override;
// A picker that returns PICK_QUEUE for all picks.
// A picker that returns PickResult::Queue for all picks.
// Also calls the parent LB policy's ExitIdleLocked() method when the
// first pick is seen.
class QueuePicker : public SubchannelPicker {
@ -384,16 +407,17 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
bool exit_idle_called_ = false;
};
// A picker that returns PICK_TRANSIENT_FAILURE for all picks.
// A picker that returns PickResult::Fail for all picks.
class TransientFailurePicker : public SubchannelPicker {
public:
explicit TransientFailurePicker(grpc_error_handle error) : error_(error) {}
~TransientFailurePicker() override { GRPC_ERROR_UNREF(error_); }
explicit TransientFailurePicker(absl::Status status) : status_(status) {}
PickResult Pick(PickArgs args) override;
PickResult Pick(PickArgs /*args*/) override {
return PickResult::Fail(status_);
}
private:
grpc_error_handle error_;
absl::Status status_;
};
protected:

@ -606,7 +606,6 @@ const char* GrpcLb::Serverlist::ShouldDrop() {
//
GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
PickResult result;
// Check if we should drop the call.
const char* drop_token =
serverlist_ == nullptr ? nullptr : serverlist_->ShouldDrop();
@ -619,16 +618,16 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
if (client_stats_ != nullptr) {
client_stats_->AddCallDropped(drop_token);
}
result.type = PickResult::PICK_COMPLETE;
return result;
return PickResult::Drop(
absl::UnavailableError("drop directed by grpclb balancer"));
}
// Forward pick to child policy.
result = child_picker_->Pick(args);
PickResult result = child_picker_->Pick(args);
// If pick succeeded, add LB token to initial metadata.
if (result.type == PickResult::PICK_COMPLETE &&
result.subchannel != nullptr) {
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
const SubchannelWrapper* subchannel_wrapper =
static_cast<SubchannelWrapper*>(result.subchannel.get());
static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
// Encode client stats object into metadata for use by
// client_load_reporting filter.
GrpcLbClientStats* client_stats = subchannel_wrapper->client_stats();
@ -654,7 +653,7 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
args.initial_metadata->Add(kGrpcLbLbTokenMetadataKey, lb_token);
}
// Unwrap subchannel to pass up to the channel.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
return result;
}

@ -114,10 +114,7 @@ class PickFirst : public LoadBalancingPolicy {
: subchannel_(std::move(subchannel)) {}
PickResult Pick(PickArgs /*args*/) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
result.subchannel = subchannel_;
return result;
return PickResult::Complete(subchannel_);
}
private:
@ -197,12 +194,10 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// (If we are idle, then this will happen in ExitIdleLocked() if we
// haven't gotten a non-empty update by the time the application tries
// to start a new call.)
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError("Empty update");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return;
}
// If one of the subchannels in the new list is already in state
@ -314,13 +309,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
if (p->subchannel_list_->in_transient_failure()) {
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"selected subchannel failed; switching to pending update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError(
"selected subchannel failed; switching to pending update");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
} else {
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, absl::Status(),
@ -393,13 +386,11 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
subchannel_list()->set_in_transient_failure(true);
// Only report new state in case 1.
if (subchannel_list() == p->subchannel_list_.get()) {
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"failed to connect to all addresses"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status =
absl::UnavailableError("failed to connect to all addresses");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
}
}
sd->CheckConnectivityStateAndStartWatchingLocked();

@ -472,12 +472,10 @@ void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
this);
}
current_child_from_before_update_ = nullptr;
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError("no ready priority");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
}
void PriorityLb::SelectPriorityLocked(uint32_t priority) {

@ -380,18 +380,12 @@ RingHash::Picker::Picker(RefCountedPtr<RingHash> parent,
}
RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
PickResult result;
// Initialize to PICK_FAILED.
result.type = PickResult::PICK_FAILED;
auto hash =
args.call_state->ExperimentalGetCallAttribute(kRequestRingHashAttribute);
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash value is not a number").c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
return PickResult::Fail(
absl::InternalError("xds ring hash value is not a number"));
}
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
// (ketama_get_server) NOTE: The algorithm depends on using signed integers
@ -431,15 +425,12 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
};
switch (ring_[first_index].connectivity_state) {
case GRPC_CHANNEL_READY:
result.type = PickResult::PICK_COMPLETE;
result.subchannel = ring_[first_index].subchannel;
return result;
return PickResult::Complete(ring_[first_index].subchannel);
case GRPC_CHANNEL_IDLE:
ScheduleSubchannelConnectionAttempt(ring_[first_index].subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
return PickResult::Queue();
default: // GRPC_CHANNEL_TRANSIENT_FAILURE
break;
}
@ -455,9 +446,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
continue;
}
if (entry.connectivity_state == GRPC_CHANNEL_READY) {
result.type = PickResult::PICK_COMPLETE;
result.subchannel = entry.subchannel;
return result;
return PickResult::Complete(entry.subchannel);
}
if (!found_second_subchannel) {
switch (entry.connectivity_state) {
@ -465,8 +454,7 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
ScheduleSubchannelConnectionAttempt(entry.subchannel);
// fallthrough
case GRPC_CHANNEL_CONNECTING:
result.type = PickResult::PICK_QUEUE;
return result;
return PickResult::Queue();
default:
break;
}
@ -483,13 +471,8 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
}
}
}
result.error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds ring hash found a subchannel "
"that is in TRANSIENT_FAILURE state")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
return PickResult::Fail(absl::UnavailableError(
"xds ring hash found a subchannel that is in TRANSIENT_FAILURE state"));
}
//
@ -580,13 +563,11 @@ bool RingHash::RingHashSubchannelList::UpdateRingHashConnectivityStateLocked() {
this));
return false;
}
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to backend failing or idle"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status =
absl::UnavailableError("connections to backend failing or idle");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return true;
}
@ -724,12 +705,10 @@ void RingHash::UpdateLocked(UpdateArgs args) {
this, &grpc_lb_ring_hash_trace, std::move(addresses), *args.args);
if (subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately transition to TRANSIENT_FAILURE.
grpc_error* error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError("Empty update");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
} else {
// Start watching the new list.
subchannel_list_->StartWatchingLocked();

@ -213,10 +213,7 @@ RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs /*args*/) {
parent_, this, last_picked_index_,
subchannels_[last_picked_index_].get());
}
PickResult result;
result.type = PickResult::PICK_COMPLETE;
result.subchannel = subchannels_[last_picked_index_];
return result;
return PickResult::Complete(subchannels_[last_picked_index_]);
}
//
@ -331,13 +328,11 @@ void RoundRobin::RoundRobinSubchannelList::
absl::make_unique<QueuePicker>(p->Ref(DEBUG_LOCATION, "QueuePicker")));
} else if (num_transient_failure_ == num_subchannels()) {
/* 3) TRANSIENT_FAILURE */
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connections to all backends failing"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status =
absl::UnavailableError("connections to all backends failing");
p->channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
}
}
@ -449,12 +444,10 @@ void RoundRobin::UpdateLocked(UpdateArgs args) {
if (latest_pending_subchannel_list_->num_subchannels() == 0) {
// If the new list is empty, immediately promote the new list to the
// current list and transition to TRANSIENT_FAILURE.
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError("Empty update");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
subchannel_list_ = std::move(latest_pending_subchannel_list_);
} else if (subchannel_list_ == nullptr) {
// If there is no current list, immediately promote the new list to

@ -387,12 +387,9 @@ void WeightedTargetLb::UpdateStateLocked() {
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
break;
default:
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"weighted_target: all children report state TRANSIENT_FAILURE"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
status = grpc_error_to_absl_status(error);
picker = absl::make_unique<TransientFailurePicker>(error);
status = absl::UnavailableError(
"weighted_target: all children report state TRANSIENT_FAILURE");
picker = absl::make_unique<TransientFailurePicker>(status);
}
channel_control_helper()->UpdateState(connectivity_state, status,
std::move(picker));

@ -536,12 +536,12 @@ void CdsLb::OnError(const std::string& name, grpc_error_handle error) {
// policy (i.e., we have not yet received data from xds). Otherwise,
// we keep running with the data we had previously.
if (child_policy_ == nullptr) {
absl::Status status = grpc_error_to_absl_status(error);
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
} else {
GRPC_ERROR_UNREF(error);
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
}
GRPC_ERROR_UNREF(error);
}
void CdsLb::OnResourceDoesNotExist(const std::string& name) {
@ -549,15 +549,11 @@ void CdsLb::OnResourceDoesNotExist(const std::string& name) {
"[cdslb %p] CDS resource for %s does not exist -- reporting "
"TRANSIENT_FAILURE",
this, name.c_str());
grpc_error_handle error =
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("CDS resource \"", config_->cluster(),
"\" does not exist")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
absl::Status status = absl::UnavailableError(
absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist"));
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
MaybeDestroyChildPolicyLocked();
}

@ -284,59 +284,51 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
const std::string* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
return PickResult::Drop(absl::UnavailableError(
absl::StrCat("EDS-configured drop: ", *drop_category)));
}
// Handle circuit breaking.
uint32_t current = call_counter_->Load();
// Check and see if we exceeded the max concurrent requests count.
if (current >= max_concurrent_requests_) {
if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
}
call_counter_->Increment();
// If we're not dropping the call, we should always have a child picker.
if (picker_ == nullptr) { // Should never happen.
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds_cluster_impl picker not given any child picker"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
call_counter_->Decrement();
return result;
return PickResult::Fail(absl::InternalError(
"xds_cluster_impl picker not given any child picker"));
}
// Not dropping, so delegate to child picker.
PickResult result = picker_->Pick(args);
if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
XdsClusterLocalityStats* locality_stats = nullptr;
if (drop_stats_ != nullptr) { // If load reporting is enabled.
auto* subchannel_wrapper =
static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
// Handle load reporting.
locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
// Record a call started.
locality_stats->AddCallStarted();
// Unwrap subchannel to pass back up the stack.
result.subchannel = subchannel_wrapper->wrapped_subchannel();
complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
}
// Intercept the recv_trailing_metadata op to record call completion.
auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
auto original_recv_trailing_metadata_ready =
result.recv_trailing_metadata_ready;
result.recv_trailing_metadata_ready =
complete_pick->recv_trailing_metadata_ready;
complete_pick->recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// work serializer or in the data plane mutex.
[locality_stats, original_recv_trailing_metadata_ready, call_counter](
grpc_error_handle error, MetadataInterface* metadata,
absl::Status status, MetadataInterface* metadata,
CallState* call_state) {
// Record call completion for load reporting.
if (locality_stats != nullptr) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->AddCallFinished(!status.ok());
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}
// Decrement number of calls in flight.
@ -344,7 +336,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
call_counter->Unref(DEBUG_LOCATION, "call");
// Invoke the original recv_trailing_metadata_ready callback, if any.
if (original_recv_trailing_metadata_ready != nullptr) {
original_recv_trailing_metadata_ready(error, metadata, call_state);
original_recv_trailing_metadata_ready(status, metadata, call_state);
}
};
} else {

@ -212,15 +212,8 @@ XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
if (it != cluster_map_.end()) {
return it->second->Pick(args);
}
PickResult result;
result.type = PickResult::PICK_FAILED;
result.error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(
absl::StrCat("xds cluster manager picker: unknown cluster \"",
cluster_name, "\"")
.c_str()),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
return result;
return PickResult::Fail(absl::InternalError(absl::StrCat(
"xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
}
//

@ -1027,15 +1027,12 @@ XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
"config -- "
"will put channel in TRANSIENT_FAILURE: %s",
this, grpc_error_std_string(error).c_str());
error = grpc_error_set_int(
grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"xds_cluster_resolver LB policy: error "
"parsing generated child policy config"),
error),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
absl::Status status = absl::InternalError(
"xds_cluster_resolver LB policy: error parsing generated child policy "
"config");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
absl::make_unique<TransientFailurePicker>(error));
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<TransientFailurePicker>(status));
return nullptr;
}
return config;

@ -60,9 +60,8 @@ class DropPolicy : public LoadBalancingPolicy {
class DropPicker : public SubchannelPicker {
public:
PickResult Pick(PickArgs /*args*/) override {
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
return PickResult::Drop(
absl::UnavailableError("Call dropped by drop LB policy"));
}
};
};
@ -246,8 +245,8 @@ static void test_retry_lb_drop(grpc_end2end_test_config config) {
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
GPR_ASSERT(0 == grpc_slice_str_cmp(details,
"Call dropped by load balancing policy"));
GPR_ASSERT(0 ==
grpc_slice_str_cmp(details, "Call dropped by drop LB policy"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);

@ -51,12 +51,10 @@ class FailPolicy : public LoadBalancingPolicy {
const char* name() const override { return kFailPolicyName; }
void UpdateLocked(UpdateArgs) override {
grpc_error_handle error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("LB pick failed"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED);
channel_control_helper()->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_to_absl_status(error),
absl::make_unique<FailPicker>(error));
absl::Status status = absl::AbortedError("LB pick failed");
channel_control_helper()->UpdateState(
GRPC_CHANNEL_TRANSIENT_FAILURE, status,
absl::make_unique<FailPicker>(status));
}
void ResetBackoffLocked() override {}
@ -65,19 +63,15 @@ class FailPolicy : public LoadBalancingPolicy {
private:
class FailPicker : public SubchannelPicker {
public:
explicit FailPicker(grpc_error_handle error) : error_(error) {}
~FailPicker() override { GRPC_ERROR_UNREF(error_); }
explicit FailPicker(absl::Status status) : status_(status) {}
PickResult Pick(PickArgs /*args*/) override {
PickResult result;
g_num_lb_picks.FetchAdd(1);
result.type = PickResult::PICK_FAILED;
result.error = GRPC_ERROR_REF(error_);
return result;
return PickResult::Fail(status_);
}
private:
grpc_error_handle error_;
absl::Status status_;
};
};

@ -229,10 +229,10 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
// Do pick.
PickResult result = delegate_picker_->Pick(args);
// Intercept trailing metadata.
if (result.type == PickResult::PICK_COMPLETE &&
result.subchannel != nullptr) {
auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
if (complete_pick != nullptr) {
new (args.call_state->Alloc(sizeof(TrailingMetadataHandler)))
TrailingMetadataHandler(&result, cb_);
TrailingMetadataHandler(complete_pick, cb_);
}
return result;
}
@ -277,19 +277,18 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
class TrailingMetadataHandler {
public:
TrailingMetadataHandler(PickResult* result,
TrailingMetadataHandler(PickResult::Complete* result,
InterceptRecvTrailingMetadataCallback cb)
: cb_(std::move(cb)) {
result->recv_trailing_metadata_ready = [this](grpc_error_handle error,
result->recv_trailing_metadata_ready = [this](absl::Status /*status*/,
MetadataInterface* metadata,
CallState* call_state) {
RecordRecvTrailingMetadata(error, metadata, call_state);
RecordRecvTrailingMetadata(metadata, call_state);
};
}
private:
void RecordRecvTrailingMetadata(grpc_error_handle /*error*/,
MetadataInterface* recv_trailing_metadata,
void RecordRecvTrailingMetadata(MetadataInterface* recv_trailing_metadata,
CallState* call_state) {
TrailingMetadataArgsSeen args_seen;
args_seen.backend_metric_data = call_state->GetBackendMetricData();

@ -495,7 +495,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
if (status.ok()) {
++*num_ok;
} else {
if (status.error_message() == "Call dropped by load balancing policy") {
if (status.error_message() == "drop directed by grpclb balancer") {
++*num_drops;
} else {
++*num_failure;
@ -1781,7 +1781,7 @@ TEST_F(SingleBalancerTest, Drop) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
status.error_message() == "drop directed by grpclb balancer") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -1813,7 +1813,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
0);
const Status status = SendRpc(nullptr, 1000, true);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer");
}
TEST_F(SingleBalancerTest, DropAll) {
@ -1838,7 +1838,7 @@ TEST_F(SingleBalancerTest, DropAll) {
status = SendRpc(nullptr, 1000, true);
} while (status.ok());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_EQ(status.error_message(), "drop directed by grpclb balancer");
}
class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
@ -1970,7 +1970,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
EchoResponse response;
const Status status = SendRpc(&response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
status.error_message() == "drop directed by grpclb balancer") {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()

@ -2021,16 +2021,15 @@ class XdsEnd2endTest : public ::testing::TestWithParam<TestType> {
return true;
}
void SendRpcAndCount(int* num_total, int* num_ok, int* num_failure,
int* num_drops,
const RpcOptions& rpc_options = RpcOptions(),
const char* drop_error_message =
"Call dropped by load balancing policy") {
void SendRpcAndCount(
int* num_total, int* num_ok, int* num_failure, int* num_drops,
const RpcOptions& rpc_options = RpcOptions(),
const char* drop_error_message_prefix = "EDS-configured drop: ") {
const Status status = SendRpc(rpc_options);
if (status.ok()) {
++*num_ok;
} else {
if (status.error_message() == drop_error_message) {
if (absl::StartsWith(status.error_message(), drop_error_message_prefix)) {
++*num_drops;
} else {
++*num_failure;
@ -3211,7 +3210,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreaking) {
// we hit the max concurrent requests limit and got dropped.
Status status = SendRpc();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_EQ(status.error_message(), "circuit breaker drop");
// Cancel one RPC to allow another one through
rpcs[0].CancelRpc();
status = SendRpc();
@ -3272,7 +3271,7 @@ TEST_P(XdsResolverOnlyTest, CircuitBreakingMultipleChannelsShareCallCounter) {
// we hit the max concurrent requests limit and got dropped.
Status status = SendRpc();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_EQ(status.error_message(), "circuit breaker drop");
// Cancel one RPC to allow another one through
rpcs[0].CancelRpc();
status = SendRpc();
@ -10965,7 +10964,7 @@ TEST_P(DropTest, Vanilla) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11002,7 +11001,7 @@ TEST_P(DropTest, DropPerHundred) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11039,7 +11038,7 @@ TEST_P(DropTest, DropPerTenThousand) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11083,7 +11082,7 @@ TEST_P(DropTest, Update) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11113,7 +11112,7 @@ TEST_P(DropTest, Update) {
const Status status = SendRpc(RpcOptions(), &response);
++num_rpcs;
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11129,7 +11128,7 @@ TEST_P(DropTest, Update) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
@ -11163,7 +11162,8 @@ TEST_P(DropTest, DropAll) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE);
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
EXPECT_THAT(status.error_message(),
::testing::StartsWith("EDS-configured drop: "));
}
}
@ -11630,7 +11630,7 @@ TEST_P(ClientLoadReportingWithDropTest, Vanilla) {
EchoResponse response;
const Status status = SendRpc(RpcOptions(), &response);
if (!status.ok() &&
status.error_message() == "Call dropped by load balancing policy") {
absl::StartsWith(status.error_message(), "EDS-configured drop: ")) {
++num_drops;
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()

Loading…
Cancel
Save