pull/2882/head
Hongwei Wang 10 years ago
parent cd62075eb6
commit 35d5a0fd64
  1. 4
      include/grpc/grpc_zookeeper.h
  2. 122
      src/core/client_config/resolvers/zookeeper_resolver.c
  3. 4
      src/core/client_config/resolvers/zookeeper_resolver.h
  4. 12
      src/core/surface/init.c

@ -38,11 +38,11 @@
extern "C" {
#endif
/* Register zookeeper name resolver in grpc */
/** Register zookeeper name resolver in grpc */
void grpc_zookeeper_register();
#ifdef __cplusplus
}
#endif
#endif /* GRPC_GRPC_ZOOKEEPER_H */
#endif /* GRPC_GRPC_ZOOKEEPER_H */

@ -47,9 +47,10 @@
#include "src/core/support/string.h"
#include "src/core/json/json.h"
#define GRPC_MAX_ZOOKEEPER_BUFFER_SIZE 1024
#define GRPC_ZOOKEEPER_TIMEOUT 15000
#define GRPC_ZOOKEEPER_WATCH 0
/** Zookeeper session expiration time in milliseconds */
#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
/** Set zookeeper watch */
#define GRPC_ZOOKEEPER_WATCH 1
typedef struct {
/** base class: must be first */
@ -121,7 +122,7 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *sa, int len) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (!r->resolving) {
if (r->resolving == 0) {
zookeeper_start_resolving_locked(r);
}
gpr_mu_unlock(&r->mu);
@ -132,10 +133,10 @@ static void zookeeper_next(grpc_resolver *resolver,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
r->target_config = target_config;
if (r->resolved_version == 0 && !r->resolving) {
if (r->resolved_version == 0 && r->resolving == 0) {
zookeeper_start_resolving_locked(r);
} else {
zookeeper_maybe_finish_next_locked(r);
@ -151,7 +152,7 @@ static void zookeeper_on_resolved(void *arg,
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
size_t i;
if (addresses) {
if (addresses != NULL) {
config = grpc_client_config_create();
subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
for (i = 0; i < addresses->naddrs; i++) {
@ -168,9 +169,9 @@ static void zookeeper_on_resolved(void *arg,
gpr_free(subchannels);
}
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
GPR_ASSERT(r->resolving == 1);
r->resolving = 0;
if (r->resolved_config) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(r->resolved_config);
}
r->resolved_config = config;
@ -181,11 +182,13 @@ static void zookeeper_on_resolved(void *arg,
GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
}
/* Callback function for each DNS resolved address */
/** Callback function for each DNS resolved address */
static void zookeeper_dns_resolved(void *arg,
grpc_resolved_addresses *addresses) {
size_t i;
zookeeper_resolver *r = arg;
int resolve_all = 0;
gpr_mu_lock(&r->mu);
r->resolved_num++;
r->resolved_addrs->addrs =
@ -202,19 +205,18 @@ static void zookeeper_dns_resolved(void *arg,
r->resolved_addrs->naddrs += addresses->naddrs;
grpc_resolved_addresses_destroy(addresses);
/* Wait for all addresses to be resolved */
if (r->resolved_num == r->resolved_total) {
gpr_mu_unlock(&r->mu);
/** Wait for all addresses to be resolved */
resolve_all = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_all) {
zookeeper_on_resolved(r, r->resolved_addrs);
} else {
gpr_mu_unlock(&r->mu);
}
}
/* Parse json format address of a zookeeper node */
/** Parse json format address of a zookeeper node */
static char *zookeeper_parse_address(char *buffer, int buffer_len) {
char *host;
char *port;
const char *host;
const char *port;
char *address;
grpc_json *json;
grpc_json *cur;
@ -226,11 +228,15 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) {
port = NULL;
for (cur = json->child; cur != NULL; cur = cur->next) {
if (!strcmp(cur->key, "host")) {
host = (char *)cur->value;
if (port != NULL) break;
host = cur->value;
if (port != NULL) {
break;
}
} else if (!strcmp(cur->key, "port")) {
port = (char *)cur->value;
if (host != NULL) break;
port = cur->value;
if (host != NULL) {
break;
}
}
}
if (host != NULL && port != NULL) {
@ -251,38 +257,43 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
const struct Stat *stat,
const void *arg) {
char *address = NULL;
char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
int resolve_all = 0;
if (rc) {
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
return;
}
address = zookeeper_parse_address((char *)value, value_len);
buffer = gpr_malloc(value_len);
memcpy(buffer, value, value_len);
address = zookeeper_parse_address(buffer, value_len);
gpr_free(buffer);
if (address != NULL) {
/* Further resolve address by DNS */
/** Further resolve address by DNS */
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
} else {
gpr_mu_lock(&r->mu);
r->resolved_total--;
if (r->resolved_num == r->resolved_total) {
gpr_mu_unlock(&r->mu);
resolve_all = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_all) {
zookeeper_on_resolved(r, r->resolved_addrs);
} else {
gpr_mu_unlock(&r->mu);
}
}
}
static void zookeeper_get_children_completion(
int rc, const struct String_vector *children, const void *arg) {
char *path;
int path_length;
int status;
char path[GRPC_MAX_ZOOKEEPER_BUFFER_SIZE];
int i;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
if (rc) {
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
return;
}
@ -298,13 +309,18 @@ static void zookeeper_get_children_completion(
r->resolved_total = children->count;
for (i = 0; i < children->count; i++) {
memset(path, 0, GRPC_MAX_ZOOKEEPER_BUFFER_SIZE);
path_length = strlen(r->name) + strlen(children->data[i]) + 2;
path = gpr_malloc(path_length);
memset(path, 0, path_length);
strcat(path, r->name);
strcat(path, "/");
strcat(path, children->data[i]);
status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_node_completion, r);
if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
gpr_free(path);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
}
}
}
@ -314,27 +330,30 @@ static void zookeeper_get_node_completion(int rc, const char *value,
const void *arg) {
int status;
char *address = NULL;
char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
r->resolved_addrs = NULL;
r->resolved_total = 0;
r->resolved_num = 0;
if (rc) {
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
return;
}
/* If zookeeper node of path r->name does not have address (i.e. service
node),
get its children */
address = zookeeper_parse_address((char *)value, value_len);
/** If zookeeper node of path r->name does not have address
(i.e. service node), get its children */
buffer = gpr_malloc(value_len);
memcpy(buffer, value, value_len);
address = zookeeper_parse_address(buffer, value_len);
gpr_free(buffer);
if (address != NULL) {
r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
r->resolved_addrs->addrs = NULL;
r->resolved_addrs->naddrs = 0;
r->resolved_total = 1;
/* Further resolve address by DNS */
/** Further resolve address by DNS */
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
return;
@ -342,20 +361,23 @@ static void zookeeper_get_node_completion(int rc, const char *value,
status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_completion, r);
if (status)
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
}
}
static void zookeeper_resolve_address(zookeeper_resolver *r) {
int status;
status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_node_completion, r);
if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
}
}
static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
GPR_ASSERT(!r->resolving);
GPR_ASSERT(r->resolving == 0);
r->resolving = 1;
zookeeper_resolve_address(r);
@ -365,7 +387,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config) {
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
grpc_iomgr_add_callback(r->next_completion);
@ -377,7 +399,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
static void zookeeper_destroy(grpc_resolver *gr) {
zookeeper_resolver *r = (zookeeper_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_config) {
if (r->resolved_config != NULL) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
@ -385,9 +407,11 @@ static void zookeeper_destroy(grpc_resolver *gr) {
gpr_free(r);
}
/* Zookeeper watcher function - handle updates to any watched nodes */
/** Zookeeper watcher function - handle updates to any watched nodes */
static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
const char *path, void *watcher_ctx) {}
const char *path, void *watcher_ctx) {
}
static grpc_resolver *zookeeper_create(
grpc_uri *uri,
@ -414,10 +438,10 @@ static grpc_resolver *zookeeper_create(
r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
/* Initialize zookeeper client */
/** Initialize zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher,
GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
return NULL;
@ -458,4 +482,4 @@ static grpc_resolver_factory zookeeper_resolver_factory = {
grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() {
return &zookeeper_resolver_factory;
}
}

@ -36,7 +36,7 @@
#include "src/core/client_config/resolver_factory.h"
/** Create a zookeeper resolver for \a name */
/** Create a zookeeper resolver factory */
grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */

@ -110,8 +110,10 @@ void grpc_init(void) {
gpr_log(GPR_ERROR, "Could not initialize census.");
}
grpc_timers_global_init();
for (plugin = g_plugins_head; plugin; plugin = plugin->next) {
if (plugin->init) plugin->init();
for (plugin = g_plugins_head; plugin != NULL; plugin = plugin->next) {
if (plugin->init) {
plugin->init();
}
}
}
gpr_mu_unlock(&g_init_mu);
@ -128,8 +130,10 @@ void grpc_shutdown(void) {
grpc_timers_global_destroy();
grpc_tracer_shutdown();
grpc_resolver_registry_shutdown();
for (plugin = g_plugins_head; plugin; plugin = next) {
if (plugin->deinit) plugin->deinit();
for (plugin = g_plugins_head; plugin != NULL; plugin = next) {
if (plugin->deinit) {
plugin->deinit();
}
next = plugin->next;
gpr_free(plugin);
}

Loading…
Cancel
Save