pull/2882/head
Hongwei Wang 9 years ago
parent 00de698355
commit cd62075eb6
  1. 115
      src/core/client_config/resolvers/zookeeper_resolver.c
  2. 2
      test/build/zookeeper.c

@ -82,7 +82,7 @@ typedef struct {
/** zookeeper handle */
zhandle_t *zookeeper_handle;
/** zookeeper resolved addresses */
grpc_resolved_addresses * resolved_addrs;
grpc_resolved_addresses *resolved_addrs;
/** total number of addresses to be resolved */
int resolved_total;
/** resolved number of addresses */
@ -96,13 +96,14 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r);
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
struct sockaddr *failing_address,
int failing_address_len);
static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable zookeeper_resolver_vtable = {
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, zookeeper_next};
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
zookeeper_next};
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
@ -116,8 +117,8 @@ static void zookeeper_shutdown(grpc_resolver *resolver) {
gpr_mu_unlock(&r->mu);
}
static void zookeeper_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
int len) {
static void zookeeper_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *sa, int len) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (!r->resolving) {
@ -127,8 +128,8 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, struct sockaddr
}
static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
@ -142,7 +143,8 @@ static void zookeeper_next(grpc_resolver *resolver,
gpr_mu_unlock(&r->mu);
}
static void zookeeper_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
static void zookeeper_on_resolved(void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
@ -180,17 +182,21 @@ static void zookeeper_on_resolved(void *arg, grpc_resolved_addresses *addresses)
}
/* Callback function for each DNS resolved address */
static void zookeeper_dns_resolved(void *arg, grpc_resolved_addresses *addresses) {
static void zookeeper_dns_resolved(void *arg,
grpc_resolved_addresses *addresses) {
size_t i;
zookeeper_resolver *r = arg;
gpr_mu_lock(&r->mu);
r->resolved_num++;
r->resolved_addrs->addrs = gpr_realloc(r->resolved_addrs->addrs,
sizeof(grpc_resolved_address) * (r->resolved_addrs->naddrs + addresses->naddrs));
r->resolved_addrs->addrs =
gpr_realloc(r->resolved_addrs->addrs,
sizeof(grpc_resolved_address) *
(r->resolved_addrs->naddrs + addresses->naddrs));
for (i = 0; i < addresses->naddrs; i++) {
memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
addresses->addrs[i].addr, addresses->addrs[i].len);
r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = addresses->addrs[i].len;
memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr,
addresses->addrs[i].addr, addresses->addrs[i].len);
r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len =
addresses->addrs[i].len;
}
r->resolved_addrs->naddrs += addresses->naddrs;
@ -211,7 +217,7 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) {
char *port;
char *address;
grpc_json *json;
grpc_json *cur;
grpc_json *cur;
address = NULL;
json = grpc_json_parse_string_with_len(buffer, buffer_len);
@ -221,13 +227,10 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) {
for (cur = json->child; cur != NULL; cur = cur->next) {
if (!strcmp(cur->key, "host")) {
host = (char *)cur->value;
if (port != NULL)
break;
}
else if (!strcmp(cur->key, "port")) {
if (port != NULL) break;
} else if (!strcmp(cur->key, "port")) {
port = (char *)cur->value;
if (host != NULL)
break;
if (host != NULL) break;
}
}
if (host != NULL && port != NULL) {
@ -238,13 +241,15 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) {
strcat(address, port);
}
grpc_json_destroy(json);
}
}
return address;
}
static void zookeeper_get_children_node_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *arg) {
static void zookeeper_get_children_node_completion(int rc, const char *value,
int value_len,
const struct Stat *stat,
const void *arg) {
char *address = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
@ -270,8 +275,8 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, in
}
}
static void zookeeper_get_children_completion(int rc, const struct String_vector *children,
const void *arg) {
static void zookeeper_get_children_completion(
int rc, const struct String_vector *children, const void *arg) {
int status;
char path[GRPC_MAX_ZOOKEEPER_BUFFER_SIZE];
int i;
@ -297,15 +302,16 @@ static void zookeeper_get_children_completion(int rc, const struct String_vector
strcat(path, r->name);
strcat(path, "/");
strcat(path, children->data[i]);
status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_node_completion, r);
if (status)
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_node_completion, r);
if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
}
}
static void zookeeper_get_node_completion(int rc, const char *value, int value_len,
const struct Stat *stat, const void *arg) {
static void zookeeper_get_node_completion(int rc, const char *value,
int value_len,
const struct Stat *stat,
const void *arg) {
int status;
char *address = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
@ -319,7 +325,8 @@ static void zookeeper_get_node_completion(int rc, const char *value, int value_l
return;
}
/* If zookeeper node of path r->name does not have address (i.e. service node),
/* If zookeeper node of path r->name does not have address (i.e. service
node),
get its children */
address = zookeeper_parse_address((char *)value, value_len);
if (address != NULL) {
@ -333,18 +340,17 @@ static void zookeeper_get_node_completion(int rc, const char *value, int value_l
return;
}
status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_completion, r);
status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_children_completion, r);
if (status)
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
}
static void zookeeper_resolve_address(zookeeper_resolver *r) {
int status;
status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_node_completion, r);
if (status)
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
zookeeper_get_node_completion, r);
if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
}
static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
@ -380,8 +386,8 @@ static void zookeeper_destroy(grpc_resolver *gr) {
}
/* Zookeeper watcher function - handle updates to any watched nodes */
static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
const char* path, void* watcher_ctx) {}
static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
const char *path, void *watcher_ctx) {}
static grpc_resolver *zookeeper_create(
grpc_uri *uri,
@ -401,18 +407,18 @@ static grpc_resolver *zookeeper_create(
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
if (r->name[strlen(r->name)-1] == '/') {
r->name[strlen(r->name)-1] = 0;
}
if (r->name[strlen(r->name) - 1] == '/') {
r->name[strlen(r->name) - 1] = 0;
}
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
/* Initialize zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher,
GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher,
GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");
return NULL;
}
@ -421,7 +427,8 @@ static grpc_resolver *zookeeper_create(
}
static void zookeeper_plugin_init() {
grpc_register_resolver_type("zookeeper", grpc_zookeeper_resolver_factory_create());
grpc_register_resolver_type("zookeeper",
grpc_zookeeper_resolver_factory_create());
}
void grpc_zookeeper_register() {
@ -440,12 +447,14 @@ static grpc_resolver *zookeeper_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
return zookeeper_create(uri, grpc_create_pick_first_lb_policy,
subchannel_factory);
subchannel_factory);
}
static const grpc_resolver_factory_vtable zookeeper_factory_vtable = {
zookeeper_factory_ref, zookeeper_factory_unref, zookeeper_factory_create_resolver};
static grpc_resolver_factory zookeeper_resolver_factory = {&zookeeper_factory_vtable};
zookeeper_factory_ref, zookeeper_factory_unref,
zookeeper_factory_create_resolver};
static grpc_resolver_factory zookeeper_resolver_factory = {
&zookeeper_factory_vtable};
grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() {
return &zookeeper_resolver_factory;

@ -31,7 +31,7 @@
*
*/
/* This is just a compilation test, to see if we have Zookeeper C client
/* This is just a compilation test, to see if we have Zookeeper C client
library installed. */
#include <stdlib.h>

Loading…
Cancel
Save