From 6ecd8ad519c360d18b32c8b6a2dbb3f6e9e6b2ef Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 22 Sep 2015 17:01:34 -0700 Subject: [PATCH] Make zookeeper compile with execution contexts --- .../resolvers/zookeeper_resolver.c | 66 ++++++++++--------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 213d5a172fc..f640a0084ad 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -87,24 +87,27 @@ typedef struct { int resolved_num; } zookeeper_resolver; -static void zookeeper_destroy(grpc_resolver *r); +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) - GRPC_MUST_USE_RESULT; +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r); -static void zookeeper_shutdown(grpc_resolver *r); -static void zookeeper_channel_saw_error(grpc_resolver *r, +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); -static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_client_config **target_config, grpc_closure *on_complete); static const grpc_resolver_vtable zookeeper_resolver_vtable = { zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, zookeeper_next}; -static void zookeeper_shutdown(grpc_resolver *resolver) { +static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; grpc_closure *call = NULL; gpr_mu_lock(&r->mu); @@ -116,11 +119,12 @@ static void zookeeper_shutdown(grpc_resolver *resolver) { zookeeper_close(r->zookeeper_handle); gpr_mu_unlock(&r->mu); if (call != NULL) { - call->cb(call->cb_arg, 1); + call->cb(exec_ctx, call->cb_arg, 1); } } -static void zookeeper_channel_saw_error(grpc_resolver *resolver, +static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, struct sockaddr *sa, int len) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; gpr_mu_lock(&r->mu); @@ -130,11 +134,10 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } -static void zookeeper_next(grpc_resolver *resolver, +static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, grpc_client_config **target_config, grpc_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_closure *call; gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -142,10 +145,9 @@ static void zookeeper_next(grpc_resolver *resolver, if (r->resolved_version == 0 && r->resolving == 0) { zookeeper_start_resolving_locked(r); } else { - call = zookeeper_maybe_finish_next_locked(r); + zookeeper_maybe_finish_next_locked(exec_ctx, r); } gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); } /** Zookeeper global watcher for connection management @@ -180,14 +182,13 @@ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, /** Callback function after getting all resolved addresses Creates a subchannel for each address */ -static void zookeeper_on_resolved(void *arg, +static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_closure *call; size_t i; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; @@ -198,13 +199,13 @@ static void zookeeper_on_resolved(void *arg, args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr_len = addresses->addrs[i].len; subchannels[i] = grpc_subchannel_factory_create_subchannel( - r->subchannel_factory, &args); + exec_ctx, r->subchannel_factory, &args); } lb_policy_args.subchannels = subchannels; lb_policy_args.num_subchannels = addresses->naddrs; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); } @@ -212,20 +213,18 @@ static void zookeeper_on_resolved(void *arg, GPR_ASSERT(r->resolving == 1); r->resolving = 0; if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(exec_ctx, r->resolved_config); } r->resolved_config = config; r->resolved_version++; - call = zookeeper_maybe_finish_next_locked(r); + zookeeper_maybe_finish_next_locked(exec_ctx, r); gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); - - GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving"); } /** Callback function for each DNS resolved address */ -static void zookeeper_dns_resolved(void *arg, +static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { size_t i; zookeeper_resolver *r = arg; @@ -251,7 +250,7 @@ static void zookeeper_dns_resolved(void *arg, resolve_done = (r->resolved_num == r->resolved_total); gpr_mu_unlock(&r->mu); if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); + zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs); } } @@ -300,9 +299,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, char *address = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; int resolve_done = 0; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (rc != 0) { gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + grpc_exec_ctx_finish(&exec_ctx); return; } @@ -318,9 +319,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, resolve_done = (r->resolved_num == r->resolved_total); gpr_mu_unlock(&r->mu); if (resolve_done) { - zookeeper_on_resolved(r, r->resolved_addrs); + zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs); } } + + grpc_exec_ctx_finish(&exec_ctx); } static void zookeeper_get_children_completion( @@ -411,28 +414,27 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { zookeeper_resolve_address(r); } -static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { - grpc_closure *call = NULL; +static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, + zookeeper_resolver *r) { if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } - call = r->next_completion; + grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } - return call; } -static void zookeeper_destroy(grpc_resolver *gr) { +static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { zookeeper_resolver *r = (zookeeper_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_config != NULL) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(exec_ctx, r->resolved_config); } - grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory); gpr_free(r->name); gpr_free(r->lb_policy_name); gpr_free(r);