Merge pull request #16912 from markdroth/fail_rpcs_on_transient_failure

Fail wait_for_ready=false RPCs when channel is in TRANSIENT_FAILURE.
reviewable/pr16507/r3^2
Mark D. Roth 6 years ago committed by GitHub
commit 55906e4d23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      src/core/ext/filters/client_channel/client_channel.cc
  2. 18
      test/cpp/end2end/client_lb_end2end_test.cc
  3. 8
      test/cpp/end2end/grpclb_end2end_test.cc

@ -2951,6 +2951,27 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
}
}
// If the channel is in TRANSIENT_FAILURE and the call is not
// wait_for_ready=true, fails the call and returns true.
static bool fail_call_if_in_transient_failure(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_transport_stream_op_batch* batch = calld->pending_batches[0].batch;
if (grpc_connectivity_state_check(&chand->state_tracker) ==
GRPC_CHANNEL_TRANSIENT_FAILURE &&
(batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
pending_batches_fail(
elem,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"channel is in state TRANSIENT_FAILURE"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
true /* yield_call_combiner */);
return true;
}
return false;
}
// Invoked once resolver results are available.
static void process_service_config_and_start_lb_pick_locked(
grpc_call_element* elem) {
@ -2958,6 +2979,9 @@ static void process_service_config_and_start_lb_pick_locked(
// Only get service config data on the first attempt.
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
apply_service_config_to_call_locked(elem);
// Check this after applying service config, since it may have
// affected the call's wait_for_ready value.
if (fail_call_if_in_transient_failure(elem)) return;
}
// Start LB pick.
grpc_core::LbPicker::StartLocked(elem);
@ -3127,6 +3151,16 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
// We do not yet have an LB policy, so wait for a resolver result.
if (GPR_UNLIKELY(!chand->started_resolving)) {
start_resolving_locked(chand);
} else {
// Normally, we want to do this check in
// process_service_config_and_start_lb_pick_locked(), so that we
// can honor the wait_for_ready setting in the service config.
// However, if the channel is in TRANSIENT_FAILURE at this point, that
// means that the resolver has returned a failure, so we're not going
// to get a service config right away. In that case, we fail the
// call now based on the wait_for_ready value passed in from the
// application.
if (fail_call_if_in_transient_failure(elem)) return;
}
// Create a new waiter, which will delete itself when done.
grpc_core::New<grpc_core::ResolverResultWaiter>(elem);

@ -212,13 +212,14 @@ class ClientLbEnd2endTest : public ::testing::Test {
bool SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
EchoResponse* response = nullptr, int timeout_ms = 1000,
Status* result = nullptr) {
Status* result = nullptr, bool wait_for_ready = false) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
if (wait_for_ready) context.set_wait_for_ready(true);
Status status = stub->Echo(&context, request, response);
if (result != nullptr) *result = status;
if (local_response) delete response;
@ -227,10 +228,11 @@ class ClientLbEnd2endTest : public ::testing::Test {
void CheckRpcSendOk(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
const grpc_core::DebugLocation& location) {
const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
EchoResponse response;
Status status;
const bool success = SendRpc(stub, &response, 2000, &status);
const bool success =
SendRpc(stub, &response, 2000, &status, wait_for_ready);
ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
<< "\n"
<< "Error: " << status.error_message() << " "
@ -301,7 +303,7 @@ class ClientLbEnd2endTest : public ::testing::Test {
if (ignore_failure) {
SendRpc(stub);
} else {
CheckRpcSendOk(stub, location);
CheckRpcSendOk(stub, location, true);
}
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
@ -506,7 +508,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
do {
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
// Next update introduces servers_[1], making the channel recover.
@ -830,7 +832,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
do {
channel_state = channel->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
GPR_ASSERT(channel_state != GRPC_CHANNEL_READY);
ASSERT_NE(channel_state, GRPC_CHANNEL_READY);
servers_[0]->service_.ResetCounters();
// Next update introduces servers_[1], making the channel recover.
@ -839,7 +841,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
SetNextResolution(ports);
WaitForServer(stub, 1, DEBUG_LOCATION);
channel_state = channel->GetState(false /* try to connect */);
GPR_ASSERT(channel_state == GRPC_CHANNEL_READY);
ASSERT_EQ(channel_state, GRPC_CHANNEL_READY);
// Check LB policy name for the channel.
EXPECT_EQ("round_robin", channel->GetLoadBalancingPolicyName());
@ -952,7 +954,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
if (SendRpc(stub)) break;
now = gpr_now(GPR_CLOCK_MONOTONIC);
}
GPR_ASSERT(gpr_time_cmp(deadline, now) > 0);
ASSERT_GT(gpr_time_cmp(deadline, now), 0);
}
TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {

@ -539,13 +539,15 @@ class GrpclbEnd2endTest : public ::testing::Test {
balancers_.at(i)->add_response(response, delay_ms);
}
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000) {
Status SendRpc(EchoResponse* response = nullptr, int timeout_ms = 1000,
bool wait_for_ready = false) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
if (wait_for_ready) context.set_wait_for_ready(true);
Status status = stub_->Echo(&context, request, response);
if (local_response) delete response;
return status;
@ -1366,7 +1368,7 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
{}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses},
{"load_balancing", num_of_drop_by_load_balancing_addresses}}),
0);
const Status status = SendRpc();
const Status status = SendRpc(nullptr, 1000, true);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");
}
@ -1391,7 +1393,7 @@ TEST_F(SingleBalancerTest, DropAll) {
// fail.
Status status;
do {
status = SendRpc();
status = SendRpc(nullptr, 1000, true);
} while (status.ok());
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Call dropped by load balancing policy");

Loading…
Cancel
Save