diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index b1a4aa9a1ca..6cbc333b832 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -230,7 +230,7 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, if (w->lb_policy == w->chand->lb_policy) { if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); w->chand->lb_policy = NULL; } @@ -386,11 +386,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } else { if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } @@ -451,7 +452,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { @@ -550,7 +551,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->resolver = grpc_resolver_create( exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string, new_args != NULL ? new_args : args->channel_args, - chand->interested_parties); + chand->interested_parties, chand->combiner); if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { @@ -559,13 +560,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_resolver *resolver = arg; + grpc_resolver_shutdown_locked(exec_ctx, resolver); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); +} + /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); @@ -846,8 +857,9 @@ static bool pick_subchannel_locked( if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -1210,8 +1222,9 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 2ae4fe862e9..b1a1faa6c96 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -32,10 +32,13 @@ */ #include "src/core/ext/client_channel/resolver.h" +#include "src/core/lib/iomgr/combiner.h" void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable) { + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner) { resolver->vtable = vtable; + resolver->combiner = GRPC_COMBINER_REF(combiner, "resolver"); gpr_ref_init(&resolver->refs, 1); } @@ -62,20 +65,24 @@ void grpc_resolver_unref(grpc_resolver *resolver, void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { #endif if (gpr_unref(&resolver->refs)) { + grpc_combiner *combiner = resolver->combiner; resolver->vtable->destroy(exec_ctx, resolver); + GRPC_COMBINER_UNREF(exec_ctx, combiner, "resolver"); } } -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { - resolver->vtable->shutdown(exec_ctx, resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->shutdown_locked(exec_ctx, resolver); } -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { - resolver->vtable->channel_saw_error(exec_ctx, resolver); +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error_locked(exec_ctx, resolver); } -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete) { - resolver->vtable->next(exec_ctx, resolver, result, on_complete); +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete); } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 96ece92b9d8..bbba424ca5d 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -44,14 +44,16 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; struct grpc_resolver { const grpc_resolver_vtable *vtable; gpr_refcount refs; + grpc_combiner *combiner; }; struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); + void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -70,21 +72,30 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); #endif void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable); + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner); -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. - Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver); + Can be used as a hint that re-resolution is desirable soon. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Get the next result from the resolver. Expected to set \a *result with new channel args and then schedule \a on_complete for execution. If resolution is fatally broken, set \a *result to NULL and - schedule \a on_complete. */ -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + schedule \a on_complete. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 3792ddca18c..e3cd99ec5a4 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -50,6 +50,7 @@ typedef struct grpc_resolver_args { grpc_uri *uri; const grpc_channel_args *args; grpc_pollset_set *pollset_set; + grpc_combiner *combiner; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5110a7cad9e..f8e8bc9c393 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -133,7 +133,8 @@ static grpc_resolver_factory *resolve_factory(const char *target, grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_combiner *combiner) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = @@ -144,6 +145,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; + resolver_args.combiner = combiner; resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index a4606463ebd..e2c189cf0cd 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); should not be NULL. */ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_combiner *combiner); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index c08b53ea040..96ac521a91b 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -40,6 +40,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/backoff.h" @@ -63,8 +64,6 @@ typedef struct { /** pollset_set to drive the name resolution process */ grpc_pollset_set *interested_parties; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** are we currently resolving? */ bool resolving; /** which version of the result have we published? */ @@ -95,18 +94,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, + dns_next_locked}; -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->have_retry_timer) { grpc_timer_cancel(exec_ctx, &r->retry_timer); } @@ -116,25 +117,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; @@ -144,30 +141,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, } else { dns_maybe_finish_next_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; - gpr_mu_lock(&r->mu); r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_start_resolving_locked(exec_ctx, r); } } - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; grpc_channel_args *result = NULL; - gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { @@ -198,8 +191,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -208,7 +201,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -221,7 +213,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + grpc_closure_create(dns_on_resolved_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)), &r->addresses); } @@ -240,7 +233,6 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } @@ -264,8 +256,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, // Create resolver. dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &dns_resolver_vtable); + grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a1365f64656..e7f66649b5d 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,8 +59,6 @@ typedef struct { grpc_lb_addresses *addresses; /** channel args */ grpc_channel_args *channel_args; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** have we published? */ bool published; /** pending next completion, or NULL */ @@ -73,48 +72,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r); -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r); -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; + sockaddr_destroy, sockaddr_shutdown_locked, + sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +125,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); grpc_lb_addresses_destroy(exec_ctx, r->addresses); grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); @@ -201,8 +194,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, memset(r, 0, sizeof(*r)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); return &r->base; } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 48a1e586e17..0fae3effb63 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -101,6 +101,17 @@ typedef struct { grpc_error *error; } received_status; +static gpr_atm pack_received_status(received_status r) { + return r.is_set ? (1 | (gpr_atm)r.error) : 0; +} + +static received_status unpack_received_status(gpr_atm atm) { + return (atm & 1) == 0 + ? (received_status){.is_set = false, .error = GRPC_ERROR_NONE} + : (received_status){.is_set = true, + .error = (grpc_error *)(atm & ~(gpr_atm)1)}; +} + #define MAX_ERRORS_PER_BATCH 3 typedef struct batch_control { @@ -165,8 +176,8 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; - /* Received call statuses from various sources */ - received_status status[STATUS_SOURCE_COUNT]; + /* Packed received call statuses from various sources */ + gpr_atm status[STATUS_SOURCE_COUNT]; /* Call data useful used for reporting. Only valid after the call has * completed */ @@ -446,7 +457,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF(c->status[i].error); + GRPC_ERROR_UNREF( + unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); @@ -614,13 +626,12 @@ static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, */ static bool get_final_status_from( - grpc_call *call, status_source from_source, bool allow_ok_status, + grpc_call *call, grpc_error *error, bool allow_ok_status, void (*set_value)(grpc_status_code code, void *user_data), void *set_value_user_data, grpc_slice *details) { grpc_status_code code; const char *msg = NULL; - grpc_error_get_status(call->status[from_source].error, call->send_deadline, - &code, &msg, NULL); + grpc_error_get_status(error, call->send_deadline, &code, &msg, NULL); if (code == GRPC_STATUS_OK && !allow_ok_status) { return false; } @@ -638,12 +649,15 @@ static void get_final_status(grpc_call *call, void *user_data), void *set_value_user_data, grpc_slice *details) { int i; + received_status status[STATUS_SOURCE_COUNT]; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i])); + } if (grpc_call_error_trace) { gpr_log(GPR_DEBUG, "get_final_status %s", call->is_client ? "CLI" : "SVR"); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - gpr_log(GPR_DEBUG, " %d: %s", i, - grpc_error_string(call->status[i].error)); + if (status[i].is_set) { + gpr_log(GPR_DEBUG, " %d: %s", i, grpc_error_string(status[i].error)); } } } @@ -653,9 +667,9 @@ static void get_final_status(grpc_call *call, /* search for the best status we can present: ideally the error we use has a clearly defined grpc-status, and we'll prefer that. */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set && - grpc_error_has_clear_grpc_status(call->status[i].error)) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set && + grpc_error_has_clear_grpc_status(status[i].error)) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -663,8 +677,8 @@ static void get_final_status(grpc_call *call, } /* If no clearly defined status exists, search for 'anything' */ for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - if (get_final_status_from(call, (status_source)i, allow_ok_status != 0, + if (status[i].is_set) { + if (get_final_status_from(call, status[i].error, allow_ok_status != 0, set_value, set_value_user_data, details)) { return; } @@ -681,12 +695,13 @@ static void get_final_status(grpc_call *call, static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { - if (call->status[source].is_set) { + if (!gpr_atm_rel_cas(&call->status[source], + pack_received_status((received_status){ + .is_set = false, .error = GRPC_ERROR_NONE}), + pack_received_status((received_status){ + .is_set = true, .error = error}))) { GRPC_ERROR_UNREF(error); - return; } - call->status[source].is_set = true; - call->status[source].error = error; } /******************************************************************************* diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c index cfd37052de4..3e3401165cd 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.c @@ -36,14 +36,17 @@ #include #include +#include "src/core/ext/client_channel/resolver.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "test/core/util/test_config.h" static gpr_mu g_mu; static bool g_fail_resolution = true; +static grpc_combiner *g_combiner; static grpc_error *my_resolve_address(const char *name, const char *addr, grpc_resolved_addresses **addrs) { @@ -71,6 +74,7 @@ static grpc_resolver *create_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; grpc_resolver *resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &args); grpc_resolver_factory_unref(factory); @@ -96,11 +100,41 @@ static bool wait_loop(int deadline_seconds, gpr_event *ev) { return false; } +typedef struct next_args { + grpc_resolver *resolver; + grpc_channel_args **result; + grpc_closure *on_complete; +} next_args; + +static void call_resolver_next_now_lock_taken(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_unused) { + next_args *a = arg; + grpc_resolver_next_locked(exec_ctx, a->resolver, a->result, a->on_complete); + gpr_free(a); +} + +static void call_resolver_next_after_locking(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + next_args *a = gpr_malloc(sizeof(*a)); + a->resolver = resolver; + a->result = result; + a->on_complete = on_complete; + grpc_closure_sched( + exec_ctx, + grpc_closure_create(call_resolver_next_now_lock_taken, a, + grpc_combiner_scheduler(resolver->combiner, false)), + GRPC_ERROR_NONE); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); gpr_mu_init(&g_mu); + g_combiner = grpc_combiner_create(NULL); grpc_blocking_resolve_address = my_resolve_address; grpc_channel_args *result = (grpc_channel_args *)1; @@ -108,7 +142,7 @@ int main(int argc, char **argv) { grpc_resolver *resolver = create_resolver(&exec_ctx, "dns:test"); gpr_event ev1; gpr_event_init(&ev1); - grpc_resolver_next( + call_resolver_next_after_locking( &exec_ctx, resolver, &result, grpc_closure_create(on_done, &ev1, grpc_schedule_on_exec_ctx)); grpc_exec_ctx_flush(&exec_ctx); @@ -117,7 +151,7 @@ int main(int argc, char **argv) { gpr_event ev2; gpr_event_init(&ev2); - grpc_resolver_next( + call_resolver_next_after_locking( &exec_ctx, resolver, &result, grpc_closure_create(on_done, &ev2, grpc_schedule_on_exec_ctx)); grpc_exec_ctx_flush(&exec_ctx); @@ -126,6 +160,7 @@ int main(int argc, char **argv) { grpc_channel_args_destroy(&exec_ctx, result); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test"); + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); diff --git a/test/core/client_channel/resolvers/dns_resolver_test.c b/test/core/client_channel/resolvers/dns_resolver_test.c index 5603a57b5fc..9dd5aed0919 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.c +++ b/test/core/client_channel/resolvers/dns_resolver_test.c @@ -36,8 +36,11 @@ #include #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/lib/iomgr/combiner.h" #include "test/core/util/test_config.h" +static grpc_combiner *g_combiner; + static void test_succeeds(grpc_resolver_factory *factory, const char *string) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_uri *uri = grpc_uri_parse(string, 0); @@ -48,6 +51,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver != NULL); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); @@ -65,6 +69,7 @@ static void test_fails(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver == NULL); grpc_uri_destroy(uri); @@ -76,6 +81,8 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + g_combiner = grpc_combiner_create(NULL); + dns = grpc_resolver_factory_lookup("dns"); test_succeeds(dns, "dns:10.2.1.1"); @@ -84,6 +91,11 @@ int main(int argc, char **argv) { test_fails(dns, "ipv4://8.8.8.8/8.8.8.8:8888"); grpc_resolver_factory_unref(dns); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); + grpc_exec_ctx_finish(&exec_ctx); + } grpc_shutdown(); return 0; diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.c b/test/core/client_channel/resolvers/sockaddr_resolver_test.c index 10df78537c8..68831ab7c79 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.c +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.c @@ -39,9 +39,12 @@ #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "test/core/util/test_config.h" +static grpc_combiner *g_combiner; + typedef struct on_resolution_arg { char *expected_server_name; grpc_channel_args *resolver_result; @@ -62,6 +65,7 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver != NULL); @@ -71,8 +75,8 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { grpc_closure *on_resolution = grpc_closure_create( on_resolution_cb, &on_res_arg, grpc_schedule_on_exec_ctx); - grpc_resolver_next(&exec_ctx, resolver, &on_res_arg.resolver_result, - on_resolution); + grpc_resolver_next_locked(&exec_ctx, resolver, &on_res_arg.resolver_result, + on_resolution); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); grpc_exec_ctx_finish(&exec_ctx); grpc_uri_destroy(uri); @@ -88,6 +92,7 @@ static void test_fails(grpc_resolver_factory *factory, const char *string) { GPR_ASSERT(uri); memset(&args, 0, sizeof(args)); args.uri = uri; + args.combiner = g_combiner; resolver = grpc_resolver_factory_create_resolver(&exec_ctx, factory, &args); GPR_ASSERT(resolver == NULL); grpc_uri_destroy(uri); @@ -99,6 +104,8 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); + g_combiner = grpc_combiner_create(NULL); + ipv4 = grpc_resolver_factory_lookup("ipv4"); ipv6 = grpc_resolver_factory_lookup("ipv6"); @@ -118,6 +125,12 @@ int main(int argc, char **argv) { grpc_resolver_factory_unref(ipv4); grpc_resolver_factory_unref(ipv6); + + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_COMBINER_UNREF(&exec_ctx, g_combiner, "test"); + grpc_exec_ctx_finish(&exec_ctx); + } grpc_shutdown(); return 0; diff --git a/test/core/end2end/fake_resolver.c b/test/core/end2end/fake_resolver.c index 4f05f69f01f..8a37531449e 100644 --- a/test/core/end2end/fake_resolver.c +++ b/test/core/end2end/fake_resolver.c @@ -213,7 +213,7 @@ static grpc_resolver* fake_resolver_create(grpc_exec_ctx* exec_ctx, r->channel_args = grpc_channel_args_copy(args->args); r->addresses = addresses; gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &fake_resolver_vtable); + grpc_resolver_init(&r->base, &fake_resolver_vtable, args->combiner); return &r->base; }