Add a results_upon_error setter to fake resolver

pull/14281/head
Juanli Shen 7 years ago
parent f4a94b6126
commit d82e137c7b
  1. 69
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  2. 15
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  3. 194
      test/core/client_channel/resolvers/fake_resolver_test.cc
  4. 23
      test/cpp/end2end/grpclb_end2end_test.cc

@ -47,10 +47,10 @@
//
typedef struct {
// base class -- must be first
// Base class -- must be first
grpc_resolver base;
// passed-in parameters
// Passed-in parameters
grpc_channel_args* channel_args;
// If not NULL, the next set of resolution results to be returned to
@ -61,9 +61,16 @@ typedef struct {
// fake_resolver_channel_saw_error_locked().
grpc_channel_args* results_upon_error;
// pending next completion, or NULL
// TODO(juanlishen): This can go away once pick_first is changed to not throw
// away its subchannels, since that will eliminate its dependence on
// channel_saw_error_locked() causing an immediate resolver return.
// A copy of the most-recently used resolution results.
grpc_channel_args* last_used_results;
// Pending next completion, or NULL
grpc_closure* next_completion;
// target result address for next completion
// Target result address for next completion
grpc_channel_args** target_result;
} fake_resolver;
@ -71,6 +78,7 @@ static void fake_resolver_destroy(grpc_resolver* gr) {
fake_resolver* r = (fake_resolver*)gr;
grpc_channel_args_destroy(r->next_results);
grpc_channel_args_destroy(r->results_upon_error);
grpc_channel_args_destroy(r->last_used_results);
grpc_channel_args_destroy(r->channel_args);
gpr_free(r);
}
@ -98,9 +106,15 @@ static void fake_resolver_maybe_finish_next_locked(fake_resolver* r) {
static void fake_resolver_channel_saw_error_locked(grpc_resolver* resolver) {
fake_resolver* r = (fake_resolver*)resolver;
if (r->next_results == nullptr && r->results_upon_error != nullptr) {
// Pretend we re-resolved.
// A resolution must have been returned before an error is seen.
GPR_ASSERT(r->last_used_results != nullptr);
grpc_channel_args_destroy(r->next_results);
if (r->results_upon_error != nullptr) {
r->next_results = grpc_channel_args_copy(r->results_upon_error);
} else {
// If results_upon_error is unavailable, re-resolve with the most-recently
// used results to avoid a no-op re-resolution.
r->next_results = grpc_channel_args_copy(r->last_used_results);
}
fake_resolver_maybe_finish_next_locked(r);
}
@ -149,35 +163,56 @@ void grpc_fake_resolver_response_generator_unref(
typedef struct set_response_closure_arg {
grpc_closure set_response_closure;
grpc_fake_resolver_response_generator* generator;
grpc_channel_args* next_response;
grpc_channel_args* response;
bool upon_error;
} set_response_closure_arg;
static void set_response_closure_fn(void* arg, grpc_error* error) {
static void set_response_closure_locked(void* arg, grpc_error* error) {
set_response_closure_arg* closure_arg = (set_response_closure_arg*)arg;
grpc_fake_resolver_response_generator* generator = closure_arg->generator;
fake_resolver* r = generator->resolver;
if (r->next_results != nullptr) {
if (!closure_arg->upon_error) {
grpc_channel_args_destroy(r->next_results);
}
r->next_results = closure_arg->next_response;
if (r->results_upon_error != nullptr) {
r->next_results = closure_arg->response;
grpc_channel_args_destroy(r->last_used_results);
r->last_used_results = grpc_channel_args_copy(closure_arg->response);
fake_resolver_maybe_finish_next_locked(r);
} else {
grpc_channel_args_destroy(r->results_upon_error);
r->results_upon_error = closure_arg->response;
}
r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response);
gpr_free(closure_arg);
fake_resolver_maybe_finish_next_locked(r);
}
void grpc_fake_resolver_response_generator_set_response(
grpc_fake_resolver_response_generator* generator,
grpc_channel_args* next_response) {
grpc_channel_args* response) {
GPR_ASSERT(generator->resolver != nullptr);
GPR_ASSERT(response != nullptr);
set_response_closure_arg* closure_arg =
(set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg));
closure_arg->generator = generator;
closure_arg->response = grpc_channel_args_copy(response);
closure_arg->upon_error = false;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
set_response_closure_locked, closure_arg,
grpc_combiner_scheduler(
generator->resolver->base.combiner)),
GRPC_ERROR_NONE);
}
void grpc_fake_resolver_response_generator_set_response_upon_error(
grpc_fake_resolver_response_generator* generator,
grpc_channel_args* response) {
GPR_ASSERT(generator->resolver != nullptr);
set_response_closure_arg* closure_arg =
(set_response_closure_arg*)gpr_zalloc(sizeof(*closure_arg));
closure_arg->generator = generator;
closure_arg->next_response = grpc_channel_args_copy(next_response);
closure_arg->response =
response != nullptr ? grpc_channel_args_copy(response) : nullptr;
closure_arg->upon_error = true;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
set_response_closure_fn, closure_arg,
set_response_closure_locked, closure_arg,
grpc_combiner_scheduler(
generator->resolver->base.combiner)),
GRPC_ERROR_NONE);

@ -36,11 +36,20 @@ typedef struct grpc_fake_resolver_response_generator
grpc_fake_resolver_response_generator*
grpc_fake_resolver_response_generator_create();
// Instruct the fake resolver associated with the \a response_generator instance
// to trigger a new resolution for \a uri and \a args.
// Set next response of the fake resolver associated with the \a
// response_generator instance and trigger a new resolution.
void grpc_fake_resolver_response_generator_set_response(
grpc_fake_resolver_response_generator* generator,
grpc_channel_args* next_response);
grpc_channel_args* response);
// Set results_upon_error of the fake resolver associated with the \a
// response_generator instance. When fake_resolver_channel_saw_error_locked() is
// called, results_upon_error will be returned as long as it's non-NULL,
// otherwise the last value set by
// grpc_fake_resolver_response_generator_set_response() will be returned.
void grpc_fake_resolver_response_generator_set_response_upon_error(
grpc_fake_resolver_response_generator* generator,
grpc_channel_args* response);
// Return a \a grpc_arg for a \a grpc_fake_resolver_response_generator instance.
grpc_arg grpc_fake_resolver_response_generator_arg(

@ -55,6 +55,7 @@ typedef struct on_resolution_arg {
gpr_event ev;
} on_resolution_arg;
// Callback to check the resolution result is as expected.
void on_resolution_cb(void* arg, grpc_error* error) {
on_resolution_arg* res = static_cast<on_resolution_arg*>(arg);
// We only check the addresses channel arg because that's the only one
@ -71,85 +72,167 @@ void on_resolution_cb(void* arg, grpc_error* error) {
gpr_event_set(&res->ev, (void*)1);
}
static void test_fake_resolver() {
grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create();
// Create resolver.
grpc_fake_resolver_response_generator* response_generator =
grpc_fake_resolver_response_generator_create();
grpc_resolver* resolver = build_fake_resolver(combiner, response_generator);
GPR_ASSERT(resolver != nullptr);
// Setup expectations.
grpc_uri* uris[] = {grpc_uri_parse("ipv4:10.2.1.1:1234", true),
grpc_uri_parse("ipv4:127.0.0.1:4321", true)};
const char* balancer_names[] = {"name1", "name2"};
const bool is_balancer[] = {true, false};
grpc_lb_addresses* addresses = grpc_lb_addresses_create(3, nullptr);
for (size_t i = 0; i < GPR_ARRAY_SIZE(uris); ++i) {
// Create a new resolution containing 2 addresses.
static grpc_channel_args* create_new_resolver_result() {
static size_t test_counter = 0;
const size_t num_addresses = 2;
char* uri_string;
char* balancer_name;
// Create grpc_lb_addresses.
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(num_addresses, nullptr);
for (size_t i = 0; i < num_addresses; ++i) {
gpr_asprintf(&uri_string, "ipv4:127.0.0.1:100%" PRIuPTR,
test_counter * num_addresses + i);
grpc_uri* uri = grpc_uri_parse(uri_string, true);
gpr_asprintf(&balancer_name, "balancer%" PRIuPTR,
test_counter * num_addresses + i);
grpc_lb_addresses_set_address_from_uri(
addresses, i, uris[i], is_balancer[i], balancer_names[i], nullptr);
grpc_uri_destroy(uris[i]);
addresses, i, uri, bool(num_addresses % 2), balancer_name, nullptr);
gpr_free(balancer_name);
grpc_uri_destroy(uri);
gpr_free(uri_string);
}
// Convert grpc_lb_addresses to grpc_channel_args.
const grpc_arg addresses_arg =
grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args* results =
grpc_channel_args_copy_and_add(nullptr, &addresses_arg, 1);
grpc_lb_addresses_destroy(addresses);
++test_counter;
return results;
}
static on_resolution_arg create_on_resolution_arg(grpc_channel_args* results) {
on_resolution_arg on_res_arg;
memset(&on_res_arg, 0, sizeof(on_res_arg));
on_res_arg.expected_resolver_result = results;
gpr_event_init(&on_res_arg.ev);
return on_res_arg;
}
static void test_fake_resolver() {
grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create();
// Create resolver.
grpc_fake_resolver_response_generator* response_generator =
grpc_fake_resolver_response_generator_create();
grpc_resolver* resolver = build_fake_resolver(combiner, response_generator);
GPR_ASSERT(resolver != nullptr);
// Test 1: normal resolution.
// next_results != NULL, results_upon_error == NULL, last_used_results ==
// NULL. Expected response is next_results.
grpc_channel_args* results = create_new_resolver_result();
on_resolution_arg on_res_arg = create_on_resolution_arg(results);
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
on_resolution_cb, &on_res_arg, grpc_combiner_scheduler(combiner));
// Set resolver results and trigger first resolution. on_resolution_cb
// performs the checks.
// Resolution won't be triggered until next_results is set.
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
grpc_fake_resolver_response_generator_set_response(response_generator,
results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 2: update resolution.
// next_results != NULL, results_upon_error == NULL, last_used_results !=
// NULL. Expected response is next_results.
results = create_new_resolver_result();
grpc_channel_args* last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
// Resolution won't be triggered until next_results is set.
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
grpc_fake_resolver_response_generator_set_response(response_generator,
results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Setup update.
grpc_uri* uris_update[] = {grpc_uri_parse("ipv4:192.168.1.0:31416", true)};
const char* balancer_names_update[] = {"name3"};
const bool is_balancer_update[] = {false};
grpc_lb_addresses* addresses_update = grpc_lb_addresses_create(1, nullptr);
for (size_t i = 0; i < GPR_ARRAY_SIZE(uris_update); ++i) {
grpc_lb_addresses_set_address_from_uri(addresses_update, i, uris_update[i],
is_balancer_update[i],
balancer_names_update[i], nullptr);
grpc_uri_destroy(uris_update[i]);
}
grpc_arg addresses_update_arg =
grpc_lb_addresses_create_channel_arg(addresses_update);
grpc_channel_args* results_update =
grpc_channel_args_copy_and_add(nullptr, &addresses_update_arg, 1);
grpc_lb_addresses_destroy(addresses_update);
// Setup expectations for the update.
on_resolution_arg on_res_arg_update;
memset(&on_res_arg_update, 0, sizeof(on_res_arg_update));
on_res_arg_update.expected_resolver_result = results_update;
gpr_event_init(&on_res_arg_update.ev);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg_update,
// Test 3: fallback re-resolution.
// next_results == NULL, results_upon_error == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
// Set updated resolver results and trigger a second resolution.
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
// Trigger a re-resolution.
grpc_resolver_channel_saw_error_locked(resolver);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 4: normal re-resolution.
// next_results == NULL, results_upon_error != NULL, last_used_results !=
// NULL. Expected response is results_upon_error.
grpc_channel_args* results_upon_error = create_new_resolver_result();
on_res_arg =
create_on_resolution_arg(grpc_channel_args_copy(results_upon_error));
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
// Set results_upon_error.
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator, results_upon_error);
// Flush here to guarantee that the response has been set.
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
grpc_resolver_channel_saw_error_locked(resolver);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 5: repeat re-resolution.
// next_results == NULL, results_upon_error != NULL, last_used_results !=
// NULL. Expected response is results_upon_error.
on_res_arg = create_on_resolution_arg(results_upon_error);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
// Trigger a re-resolution.
grpc_resolver_channel_saw_error_locked(resolver);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 6: normal resolution.
// next_results != NULL, results_upon_error != NULL, last_used_results !=
// NULL. Expected response is next_results.
results = create_new_resolver_result();
last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
// Resolution won't be triggered until next_results is set.
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
grpc_fake_resolver_response_generator_set_response(response_generator,
results_update);
grpc_resolver_next_locked(resolver, &on_res_arg_update.resolver_result,
results);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 7: fallback re-resolution.
// next_results == NULL, results_upon_error == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
// Reset results_upon_error.
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator, nullptr);
// Flush here to guarantee that results_upon_error has been reset.
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
grpc_resolver_channel_saw_error_locked(resolver);
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg_update.ev,
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Requesting a new resolution without re-senting the response shouldn't
// trigger the resolution callback.
// Test 8: no-op.
// Requesting a new resolution without setting the response shouldn't trigger
// the resolution callback.
memset(&on_res_arg, 0, sizeof(on_res_arg));
grpc_resolver_next_locked(resolver, &on_res_arg.resolver_result,
on_resolution);
@ -157,10 +240,9 @@ static void test_fake_resolver() {
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_milliseconds_to_deadline(100)) ==
nullptr);
// Clean up.
GRPC_COMBINER_UNREF(combiner, "test_fake_resolver");
GRPC_RESOLVER_UNREF(resolver, "test_fake_resolver");
grpc_fake_resolver_response_generator_unref(response_generator);
}

@ -454,8 +454,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc::string balancer_name;
};
void SetNextResolution(const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* CreateLbAddressesFromAddressDataList(
const std::vector<AddressData>& address_data) {
grpc_lb_addresses* addresses =
grpc_lb_addresses_create(address_data.size(), nullptr);
for (size_t i = 0; i < address_data.size(); ++i) {
@ -469,6 +469,13 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
return addresses;
}
void SetNextResolution(const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
CreateLbAddressesFromAddressDataList(address_data);
grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
grpc_fake_resolver_response_generator_set_response(response_generator_,
@ -476,6 +483,18 @@ class GrpclbEnd2endTest : public ::testing::Test {
grpc_lb_addresses_destroy(addresses);
}
void SetNextResolutionUponError(
const std::vector<AddressData>& address_data) {
grpc_core::ExecCtx exec_ctx;
grpc_lb_addresses* addresses =
CreateLbAddressesFromAddressDataList(address_data);
grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
grpc_channel_args fake_result = {1, &fake_addresses};
grpc_fake_resolver_response_generator_set_response_upon_error(
response_generator_, &fake_result);
grpc_lb_addresses_destroy(addresses);
}
const std::vector<int> GetBackendPorts(const size_t start_index = 0) const {
std::vector<int> backend_ports;
for (size_t i = start_index; i < backend_servers_.size(); ++i) {

Loading…
Cancel
Save