Update zookeeper test to have two servers

pull/2549/head
Hongwei Wang 10 years ago
parent fbc86857e5
commit b0453fba84
  1. 13
      src/core/client_config/resolvers/zookeeper_resolver.c
  2. 113
      test/cpp/end2end/zookeeper_test.cc

@ -213,7 +213,7 @@ static void zookeeper_dns_resolved(void *arg,
grpc_resolved_addresses *addresses) {
size_t i;
zookeeper_resolver *r = arg;
int resolve_all = 0;
int resolve_done = 0;
gpr_mu_lock(&r->mu);
r->resolved_num++;
@ -232,9 +232,9 @@ static void zookeeper_dns_resolved(void *arg,
grpc_resolved_addresses_destroy(addresses);
/** Wait for all addresses to be resolved */
resolve_all = (r->resolved_num == r->resolved_total);
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_all) {
if (resolve_done) {
zookeeper_on_resolved(r, r->resolved_addrs);
}
}
@ -281,7 +281,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
char *address = NULL;
char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
int resolve_all = 0;
int resolve_done = 0;
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
@ -297,11 +297,12 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
} else {
gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name);
gpr_mu_lock(&r->mu);
r->resolved_total--;
resolve_all = (r->resolved_num == r->resolved_total);
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_all) {
if (resolve_done) {
zookeeper_on_resolved(r, r->resolved_addrs);
}
}

@ -71,11 +71,40 @@ class ZookeeperTest : public ::testing::Test {
ZookeeperTest() {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
server_address_ = "localhost:" + std::to_string(port);
SetUpZookeeper();
// Setup two servers
int port1 = grpc_pick_unused_port_or_die();
int port2 = grpc_pick_unused_port_or_die();
server1_ = SetUpServer(port1);
server2_ = SetUpServer(port2);
// Register service /test in zookeeper
RegisterService("/test", "test");
// Register service instance /test/1 in zookeeper
string value = "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port1) + "\"}";
RegisterService("/test/1", value);
// Register service instance /test/2 in zookeeper
value = "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port2) + "\"}";
RegisterService("/test/2", value);
}
std::unique_ptr<Server> SetUpServer(int port) {
string server_address = "localhost:" + std::to_string(port);
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.RegisterService(&service_);
std::unique_ptr<Server> server = builder.BuildAndStart();
return server;
}
// Setup zookeeper
// Require zookeeper server running in grpc-jenkins-master
// Require zookeeper server running beforehand
void SetUpZookeeper() {
// Find zookeeper server address in environment
// Default is localhost:2181
zookeeper_address_ = "localhost:2181";
char* addr = gpr_getenv("GRPC_ZOOKEEPER_SERVER_TEST");
if (addr != NULL) {
@ -84,72 +113,37 @@ class ZookeeperTest : public ::testing::Test {
gpr_free(addr);
}
gpr_log(GPR_DEBUG, zookeeper_address_.c_str());
ZookeeperSetUp(port);
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_, InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
}
void ZookeeperSetUp(int port) {
// Connect to zookeeper server
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zookeeper_handle_ = zookeeper_init(zookeeper_address_.c_str(), NULL, 15000, 0, 0, 0);
GPR_ASSERT(zookeeper_handle_ != NULL);
// Register service /test in zookeeper
char service_path[] = "/test";
char service_value[] = "test";
int status = zoo_exists(zookeeper_handle_, service_path, 0, NULL);
if (status != 0) {
status = zoo_create(zookeeper_handle_, service_path, service_value,
strlen(service_value), &ZOO_OPEN_ACL_UNSAFE, 0,
service_path, sizeof(service_path));
GPR_ASSERT(status == 0);
}
// Register service instance /test/1 in zookeeper
char instance_path[] = "/test/1";
string instance_value =
"{\"host\":\"localhost\",\"port\":\"" + std::to_string(port) + "\"}";
status = zoo_exists(zookeeper_handle_, instance_path, 0, NULL);
if (status == ZNONODE) {
status =
zoo_create(zookeeper_handle_, instance_path, instance_value.c_str(),
instance_value.size(), &ZOO_OPEN_ACL_UNSAFE, 0,
instance_path, sizeof(instance_path));
GPR_ASSERT(status == 0);
} else {
status = zoo_set(zookeeper_handle_, instance_path, instance_value.c_str(),
instance_value.size(), -1);
GPR_ASSERT(status == 0);
}
GPR_ASSERT(status == 0);
// Register zookeeper name resolver in grpc
grpc_zookeeper_register();
}
void ZookeeperStateChange() {
char instance_path[] = "/test/2";
string instance_value = "2222";
void RegisterService(string name, string value) {
char *path = (char *)gpr_malloc(name.size());
int status = zoo_exists(zookeeper_handle_, instance_path, 0, NULL);
int status = zoo_exists(zookeeper_handle_, name.c_str(), 0, NULL);
if (status == ZNONODE) {
status =
zoo_create(zookeeper_handle_, instance_path, instance_value.c_str(),
instance_value.size(), &ZOO_OPEN_ACL_UNSAFE, 0,
instance_path, sizeof(instance_path));
GPR_ASSERT(status == 0);
status = zoo_create(zookeeper_handle_, name.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, 0, path, name.size());
} else {
status = zoo_delete(zookeeper_handle_, instance_path, -1);
GPR_ASSERT(status == 0);
status = zoo_set(zookeeper_handle_, name.c_str(), value.c_str(), value.size(), -1);
}
gpr_free(path);
GPR_ASSERT(status == 0);
}
void DeleteService(string name) {
int status = zoo_delete(zookeeper_handle_, name.c_str(), -1);
GPR_ASSERT(status == 0);
}
void TearDown() GRPC_OVERRIDE {
server_->Shutdown();
server1_->Shutdown();
server2_->Shutdown();
zookeeper_close(zookeeper_handle_);
}
@ -161,14 +155,15 @@ class ZookeeperTest : public ::testing::Test {
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
string server_address_;
std::unique_ptr<Server> server1_;
std::unique_ptr<Server> server2_;
ZookeeperTestServiceImpl service_;
zhandle_t* zookeeper_handle_;
string zookeeper_address_;
};
// Test zookeeper state change between two RPCs
// TODO: Handle leaked objects
TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) {
ResetStub();
@ -183,15 +178,15 @@ TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) {
EXPECT_TRUE(s1.ok());
// Zookeeper state change
ZookeeperStateChange();
sleep(1);
DeleteService("/test/2");
sleep(1);
// Second RPC
EchoRequest request2;
EchoResponse response2;
ClientContext context2;
context2.set_authority("test");
request2.set_message("Hello");
request2.set_message("World");
Status s2 = stub_->Echo(&context2, request2, &response2);
EXPECT_EQ(response2.message(), request2.message());
EXPECT_TRUE(s2.ok());

Loading…
Cancel
Save