diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 4a5e4f751b9..2ae4fe862e9 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -36,7 +36,6 @@ void grpc_resolver_init(grpc_resolver *resolver, const grpc_resolver_vtable *vtable) { resolver->vtable = vtable; - resolver->pollset_set = grpc_pollset_set_create(); gpr_ref_init(&resolver->refs, 1); } @@ -63,7 +62,6 @@ void grpc_resolver_unref(grpc_resolver *resolver, void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { #endif if (gpr_unref(&resolver->refs)) { - grpc_pollset_set_destroy(resolver->pollset_set); resolver->vtable->destroy(exec_ctx, resolver); } } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 5b4527bbf68..96ece92b9d8 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -43,7 +43,6 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; /** \a grpc_resolver provides \a grpc_channel_args objects to its caller */ struct grpc_resolver { const grpc_resolver_vtable *vtable; - grpc_pollset_set *pollset_set; gpr_refcount refs; }; diff --git a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c index 01ed0114394..4301673a8bc 100644 --- a/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/resolver/dns/c_ares/dns_resolver_ares.c @@ -52,10 +52,11 @@ #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" -#define BACKOFF_MULTIPLIER 1.6 -#define BACKOFF_JITTER 0.2 -#define BACKOFF_MIN_SECONDS 1 -#define BACKOFF_MAX_SECONDS 120 +#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 +#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_DNS_RECONNECT_JITTER 0.2 typedef struct { /** base class: must be first */ @@ -66,6 +67,8 @@ typedef struct { char *default_port; /** channel args. */ grpc_channel_args *channel_args; + /** pollset_set to drive the name resolution process */ + grpc_pollset_set *interested_parties; /** Closures used by the combiner */ grpc_closure dns_ares_shutdown_locked; @@ -124,8 +127,8 @@ static void dns_ares_shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (r->next_completion != NULL) { *r->target_result = NULL; - grpc_exec_ctx_sched(exec_ctx, r->next_completion, - GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); + grpc_closure_sched(exec_ctx, r->next_completion, + GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-ares-shutdown"); @@ -135,8 +138,7 @@ static void dns_ares_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { ares_dns_resolver *r = (ares_dns_resolver *)resolver; GRPC_RESOLVER_REF(&r->base, "dns-ares-shutdown"); - grpc_combiner_execute(exec_ctx, r->combiner, &r->dns_ares_shutdown_locked, - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, &r->dns_ares_shutdown_locked, GRPC_ERROR_NONE); } static void dns_ares_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, @@ -154,9 +156,8 @@ static void dns_ares_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { ares_dns_resolver *r = (ares_dns_resolver *)resolver; GRPC_RESOLVER_REF(&r->base, "ares-channel-saw-error"); - grpc_combiner_execute(exec_ctx, r->combiner, - &r->dns_ares_channel_saw_error_locked, GRPC_ERROR_NONE, - false); + grpc_closure_sched(exec_ctx, &r->dns_ares_channel_saw_error_locked, + GRPC_ERROR_NONE); } static void dns_ares_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -171,14 +172,6 @@ static void dns_ares_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_ares_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - ares_dns_resolver *r = arg; - grpc_combiner_execute(exec_ctx, r->combiner, - &r->dns_ares_on_retry_timer_locked, - GRPC_ERROR_REF(error), false); -} - static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { ares_dns_resolver *r = arg; @@ -197,7 +190,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses); - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); } else { const char *msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); @@ -215,10 +208,10 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "retrying immediately"); } grpc_timer_init(exec_ctx, &r->retry_timer, next_try, - dns_ares_on_retry_timer, r, now); + &r->dns_ares_on_retry_timer_locked, now); } if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } r->resolved_result = result; r->resolved_version++; @@ -226,13 +219,6 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_ares_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - ares_dns_resolver *r = arg; - grpc_combiner_execute(exec_ctx, r->combiner, &r->dns_ares_on_resolved_locked, - GRPC_ERROR_REF(error), false); -} - typedef struct dns_ares_next_locked_args { grpc_resolver *resolver; grpc_channel_args **target_result; @@ -268,9 +254,10 @@ static void dns_ares_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, args->on_complete = on_complete; args->resolver = resolver; GRPC_RESOLVER_REF(resolver, "ares-next"); - grpc_combiner_execute(exec_ctx, r->combiner, - grpc_closure_create(dns_ares_next_locked, args), - GRPC_ERROR_NONE, false); + grpc_closure_sched(exec_ctx, grpc_closure_create( + dns_ares_next_locked, args, + grpc_combiner_scheduler(r->combiner, false)), + GRPC_ERROR_NONE); } static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, @@ -279,9 +266,9 @@ static void dns_ares_start_resolving_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!r->resolving); r->resolving = true; r->addresses = NULL; - grpc_resolve_address( - exec_ctx, r->name_to_resolve, r->default_port, r->base.pollset_set, - grpc_closure_create(dns_ares_on_resolved, r), &r->addresses); + grpc_resolve_address(exec_ctx, r->name_to_resolve, r->default_port, + r->interested_parties, &r->dns_ares_on_resolved_locked, + &r->addresses); } static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -291,7 +278,7 @@ static void dns_ares_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, *r->target_result = r->resolved_result == NULL ? NULL : grpc_channel_args_copy(r->resolved_result); - grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; r->published_version = r->resolved_version; } @@ -302,15 +289,16 @@ static void dns_ares_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { ares_dns_resolver *r = (ares_dns_resolver *)gr; grpc_combiner_destroy(exec_ctx, r->combiner); if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } gpr_free(r->name_to_resolve); gpr_free(r->default_port); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } -static grpc_resolver *dns_ares_create(grpc_resolver_args *args, +static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, + grpc_resolver_args *args, const char *default_port) { // Get name from args. const char *path = args->uri->path; @@ -329,21 +317,28 @@ static grpc_resolver *dns_ares_create(grpc_resolver_args *args, r->combiner = grpc_combiner_create(NULL); r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name; r->default_port = gpr_strdup(default_port); - grpc_arg server_name_arg; - server_name_arg.type = GRPC_ARG_STRING; - server_name_arg.key = GRPC_ARG_SERVER_NAME; - server_name_arg.value.string = (char *)path; - r->channel_args = - grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1); - gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, - BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); - grpc_closure_init(&r->dns_ares_shutdown_locked, dns_ares_shutdown_locked, r); + r->channel_args = grpc_channel_args_copy(args->args); + r->interested_parties = grpc_pollset_set_create(); + if (args->pollset_set != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, + args->pollset_set); + } + gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_DNS_RECONNECT_JITTER, + GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_closure_init(&r->dns_ares_shutdown_locked, dns_ares_shutdown_locked, r, + grpc_combiner_scheduler(r->combiner, false)); grpc_closure_init(&r->dns_ares_channel_saw_error_locked, - dns_ares_channel_saw_error_locked, r); + dns_ares_channel_saw_error_locked, r, + grpc_combiner_scheduler(r->combiner, false)); grpc_closure_init(&r->dns_ares_on_retry_timer_locked, - dns_ares_on_retry_timer_locked, r); + dns_ares_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->combiner, false)); grpc_closure_init(&r->dns_ares_on_resolved_locked, - dns_ares_on_resolved_locked, r); + dns_ares_on_resolved_locked, r, + grpc_combiner_scheduler(r->combiner, false)); return &r->base; } @@ -356,8 +351,9 @@ static void dns_ares_factory_ref(grpc_resolver_factory *factory) {} static void dns_ares_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args) { - return dns_ares_create(args, "https"); + grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, + grpc_resolver_args *args) { + return dns_ares_create(exec_ctx, args, "https"); } static char *dns_ares_factory_get_default_host_name( diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 4f7485c5f55..013832e6265 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -256,8 +256,10 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, fdn->readable_registered = false; fdn->writable_registered = false; gpr_mu_init(&fdn->mu); - grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn); - grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn); + grpc_closure_init(&fdn->read_closure, on_readable_cb, fdn, + grpc_schedule_on_exec_ctx); + grpc_closure_init(&fdn->write_closure, on_writable_cb, fdn, + grpc_schedule_on_exec_ctx); grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->grpc_fd); gpr_free(fd_name); diff --git a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c index a9e94a509dc..a1cbf9b91ce 100644 --- a/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -108,10 +108,10 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, // acquire locks in on_done. ares_dns_resolver is using combiner to // protect resources needed by on_done. grpc_exec_ctx new_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_exec_ctx_sched(&new_exec_ctx, r->on_done, r->error, NULL); + grpc_closure_sched(&new_exec_ctx, r->on_done, r->error); grpc_exec_ctx_finish(&new_exec_ctx); } else { - grpc_exec_ctx_sched(exec_ctx, r->on_done, r->error, NULL); + grpc_closure_sched(exec_ctx, r->on_done, r->error); } gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy(r->ev_driver); @@ -207,13 +207,13 @@ void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_error *err = grpc_error_set_str(GRPC_ERROR_CREATE("unparseable host:port"), GRPC_ERROR_STR_TARGET_ADDRESS, name); - grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } else if (port == NULL) { if (default_port == NULL) { grpc_error *err = grpc_error_set_str(GRPC_ERROR_CREATE("no port in name"), GRPC_ERROR_STR_TARGET_ADDRESS, name); - grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + grpc_closure_sched(exec_ctx, on_done, err); goto error_cleanup; } port = gpr_strdup(default_port); diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c index d5ee057657f..e6c4d63cff0 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c @@ -64,7 +64,7 @@ static void my_resolve_address(grpc_exec_ctx *exec_ctx, const char *addr, (*addrs)->addrs = gpr_malloc(sizeof(*(*addrs)->addrs)); (*addrs)->addrs[0].len = 123; } - grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); + grpc_closure_sched(exec_ctx, on_done, error); } static grpc_resolver *create_resolver(grpc_exec_ctx *exec_ctx, @@ -105,7 +105,7 @@ int main(int argc, char **argv) { grpc_init(); gpr_mu_init(&g_mu); - grpc_blocking_resolve_address = my_resolve_address; + grpc_resolve_address = my_resolve_address; grpc_channel_args *result = (grpc_channel_args *)1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/test/core/end2end/goaway_server_test.c b/test/core/end2end/goaway_server_test.c index dba80118dd8..e01e68819ee 100644 --- a/test/core/end2end/goaway_server_test.c +++ b/test/core/end2end/goaway_server_test.c @@ -86,7 +86,7 @@ static void my_resolve_address(grpc_exec_ctx *exec_ctx, const char *addr, (*addrs)->addrs[0].len = sizeof(*sa); gpr_mu_unlock(&g_mu); } - grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); + grpc_closure_sched(exec_ctx, on_done, error); } int main(int argc, char **argv) {