Fall calls with wait_for_ready=false on transient resolver failure.

pull/14733/head
Mark D. Roth 7 years ago
parent 6fbc1fbce4
commit e63e06d8d7
  1. 82
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/resolver.h
  3. 30
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  4. 5
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  5. 70
      test/core/end2end/no_server_test.cc

@ -303,11 +303,16 @@ static void request_reresolution_locked(void* arg, grpc_error* error) {
chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
}
// TODO(roth): The logic in this function is very hard to follow. We
// should refactor this so that it's easier to understand, perhaps as
// part of changing the resolver API to more clearly differentiate
// between transient failures and shutdown.
static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
channel_data* chand = static_cast<channel_data*>(arg);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
grpc_error_string(error));
gpr_log(GPR_DEBUG,
"chand=%p: got resolver result: resolver_result=%p error=%s", chand,
chand->resolver_result, grpc_error_string(error));
}
// Extract the following fields from the resolver result, if non-nullptr.
bool lb_policy_updated = false;
@ -423,8 +428,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
}
}
}
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG,
@ -497,6 +500,8 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
"Channel disconnected", &error, 1));
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_error* state_error =
@ -515,11 +520,16 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
chand->exit_idle_when_lb_policy_arrives = false;
}
watch_lb_policy_locked(chand, chand->lb_policy.get(), state);
} else if (chand->resolver_result == nullptr) {
// Transient failure.
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
}
if (!lb_policy_updated) {
set_channel_connectivity_state_locked(
chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
}
grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = nullptr;
chand->resolver->NextLocked(&chand->resolver_result,
&chand->on_resolver_result_changed);
GRPC_ERROR_UNREF(state_error);
@ -2753,7 +2763,45 @@ static void pick_after_resolver_result_done_locked(void* arg,
chand, calld);
}
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
} else if (chand->lb_policy != nullptr) {
} else if (chand->resolver == nullptr) {
// Shutting down.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
calld);
}
async_pick_done_locked(
elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
} else if (chand->lb_policy == nullptr) {
// Transient resolver failure.
// If call has wait_for_ready=true, try again; otherwise, fail.
uint32_t send_initial_metadata_flags =
calld->seen_send_initial_metadata
? calld->send_initial_metadata_flags
: calld->pending_batches[0]
.batch->payload->send_initial_metadata
.send_initial_metadata_flags;
if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: resolver returned but no LB policy; "
"wait_for_ready=true; trying again",
chand, calld);
}
pick_after_resolver_result_start_locked(elem);
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: resolver returned but no LB policy; "
"wait_for_ready=false; failing",
chand, calld);
}
async_pick_done_locked(
elem,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
@ -2767,30 +2815,6 @@ static void pick_after_resolver_result_done_locked(void* arg,
async_pick_done_locked(elem, GRPC_ERROR_NONE);
}
}
// TODO(roth): It should be impossible for chand->lb_policy to be nullptr
// here, so the rest of this code should never actually be executed.
// However, we have reports of a crash on iOS that triggers this case,
// so we are temporarily adding this to restore branches that were
// removed in https://github.com/grpc/grpc/pull/12297. Need to figure
// out what is actually causing this to occur and then figure out the
// right way to deal with it.
else if (chand->resolver != nullptr) {
// No LB policy, so try again.
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: resolver returned but no LB policy, "
"trying again",
chand, calld);
}
pick_after_resolver_result_start_locked(elem);
} else {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
calld);
}
async_pick_done_locked(
elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
}
}
static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {

@ -53,8 +53,12 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
/// Requests a callback when a new result becomes available.
/// When the new result is available, sets \a *result to the new result
/// and schedules \a on_complete for execution.
/// Upon transient failure, sets \a *result to nullptr and schedules
/// \a on_complete with no error.
/// If resolution is fatally broken, sets \a *result to nullptr and
/// schedules \a on_complete with an error.
/// TODO(roth): When we have time, improve the way this API represents
/// transient failure vs. shutdown.
///
/// Note that the client channel will almost always have a request
/// to \a NextLocked() pending. When it gets the callback, it will

@ -82,6 +82,8 @@ class FakeResolver : public Resolver {
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
grpc_channel_args** target_result_ = nullptr;
// if true, return failure
bool return_failure_ = false;
};
FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
@ -121,12 +123,16 @@ void FakeResolver::RequestReresolutionLocked() {
}
void FakeResolver::MaybeFinishNextLocked() {
if (next_completion_ != nullptr && next_results_ != nullptr) {
*target_result_ = grpc_channel_args_union(next_results_, channel_args_);
if (next_completion_ != nullptr &&
(next_results_ != nullptr || return_failure_)) {
*target_result_ =
return_failure_ ? nullptr
: grpc_channel_args_union(next_results_, channel_args_);
grpc_channel_args_destroy(next_results_);
next_results_ = nullptr;
GRPC_CLOSURE_SCHED(next_completion_, GRPC_ERROR_NONE);
next_completion_ = nullptr;
return_failure_ = false;
}
}
@ -197,6 +203,26 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
GRPC_ERROR_NONE);
}
void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
grpc_error* error) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
FakeResolver* resolver = closure_arg->generator->resolver_;
resolver->return_failure_ = true;
resolver->MaybeFinishNextLocked();
Delete(closure_arg);
}
void FakeResolverResponseGenerator::SetFailure() {
GPR_ASSERT(resolver_ != nullptr);
SetResponseClosureArg* closure_arg = New<SetResponseClosureArg>();
closure_arg->generator = this;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
closure_arg,
grpc_combiner_scheduler(resolver_->combiner())),
GRPC_ERROR_NONE);
}
namespace {
static void* response_generator_arg_copy(void* p) {

@ -56,6 +56,10 @@ class FakeResolverResponseGenerator
// resolver will return the last value set via \a SetResponse().
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by
// returning a null result with no error).
void SetFailure();
// Returns a channel arg containing \a generator.
static grpc_arg MakeChannelArg(FakeResolverResponseGenerator* generator);
@ -68,6 +72,7 @@ class FakeResolverResponseGenerator
static void SetResponseLocked(void* arg, grpc_error* error);
static void SetReresolutionResponseLocked(void* arg, grpc_error* error);
static void SetFailureLocked(void* arg, grpc_error* error);
FakeResolver* resolver_ = nullptr; // Do not own.
};

@ -22,45 +22,47 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "test/core/end2end/cq_verifier.h"
#include "test/core/util/test_config.h"
static void* tag(intptr_t i) { return (void*)i; }
int main(int argc, char** argv) {
grpc_channel* chan;
grpc_call* call;
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(2);
grpc_completion_queue* cq;
cq_verifier* cqv;
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array trailing_metadata_recv;
grpc_status_code status;
grpc_slice details;
void run_test(bool wait_for_ready) {
gpr_log(GPR_INFO, "TEST: wait_for_ready=%d", wait_for_ready);
grpc_test_init(argc, argv);
grpc_init();
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
cq_verifier* cqv = cq_verifier_create(cq);
cq = grpc_completion_queue_create_for_next(nullptr);
cqv = cq_verifier_create(cq);
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
grpc_arg arg = grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
response_generator.get());
grpc_channel_args args = {1, &arg};
/* create a call, channel to a non existant server */
chan = grpc_insecure_channel_create("nonexistant:54321", nullptr, nullptr);
grpc_slice host = grpc_slice_from_static_string("nonexistant");
call = grpc_channel_create_call(chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/Foo"), &host,
deadline, nullptr);
grpc_channel* chan =
grpc_insecure_channel_create("fake:nonexistant", &args, nullptr);
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(2);
grpc_call* call = grpc_channel_create_call(
chan, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/Foo"), nullptr, deadline, nullptr);
grpc_op ops[6];
memset(ops, 0, sizeof(ops));
op = ops;
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->flags = wait_for_ready ? GRPC_INITIAL_METADATA_WAIT_FOR_READY : 0;
op->reserved = nullptr;
op++;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_status_code status;
grpc_slice details;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
@ -71,11 +73,25 @@ int main(int argc, char** argv) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops,
(size_t)(op - ops), tag(1),
nullptr));
{
grpc_core::ExecCtx exec_ctx;
response_generator->SetFailure();
}
/* verify that all tags get completed */
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
gpr_log(GPR_INFO, "call status: %d", status);
if (wait_for_ready) {
GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
} else {
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_completion_queue_shutdown(cq);
while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
@ -87,10 +103,12 @@ int main(int argc, char** argv) {
grpc_channel_destroy(chan);
cq_verifier_destroy(cqv);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_shutdown();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
run_test(true /* wait_for_ready */);
run_test(false /* wait_for_ready */);
return 0;
}

Loading…
Cancel
Save