|
|
|
@ -31,8 +31,9 @@ |
|
|
|
|
* |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
#include <stdarg.h> |
|
|
|
|
#include <string.h> |
|
|
|
|
#include <cstdarg> |
|
|
|
|
#include <cstring> |
|
|
|
|
#include <string> |
|
|
|
|
|
|
|
|
|
extern "C" { |
|
|
|
|
#include <grpc/grpc.h> |
|
|
|
@ -55,8 +56,13 @@ extern "C" { |
|
|
|
|
#include "test/core/util/test_config.h" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#include "src/proto/grpc/lb/v1/load_balancer.pb.h" |
|
|
|
|
|
|
|
|
|
#define NUM_BACKENDS 4 |
|
|
|
|
|
|
|
|
|
namespace grpc { |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
typedef struct client_fixture { |
|
|
|
|
grpc_channel *client; |
|
|
|
|
char *server_uri; |
|
|
|
@ -86,8 +92,9 @@ static gpr_timespec n_seconds_time(int n) { |
|
|
|
|
|
|
|
|
|
static void *tag(intptr_t t) { return (void *)t; } |
|
|
|
|
|
|
|
|
|
static gpr_slice build_response_payload_slice(const char *host, int *ports, |
|
|
|
|
size_t nports) { |
|
|
|
|
static gpr_slice build_response_payload_slice( |
|
|
|
|
const char *host, int *ports, size_t nports, |
|
|
|
|
int64_t expiration_interval_secs, int32_t expiration_interval_nanos) { |
|
|
|
|
/*
|
|
|
|
|
server_list { |
|
|
|
|
servers { |
|
|
|
@ -97,45 +104,30 @@ static gpr_slice build_response_payload_slice(const char *host, int *ports, |
|
|
|
|
} |
|
|
|
|
... |
|
|
|
|
} */ |
|
|
|
|
char **hostports_vec = |
|
|
|
|
static_cast<char **>(gpr_malloc(sizeof(char *) * nports)); |
|
|
|
|
for (size_t i = 0; i < nports; i++) { |
|
|
|
|
gpr_join_host_port(&hostports_vec[i], "127.0.0.1", ports[i]); |
|
|
|
|
} |
|
|
|
|
char *hostports_str = |
|
|
|
|
gpr_strjoin_sep((const char **)hostports_vec, nports, " ", NULL); |
|
|
|
|
gpr_log(GPR_INFO, "generating response for %s", hostports_str); |
|
|
|
|
|
|
|
|
|
char *output_fname; |
|
|
|
|
FILE *tmpfd = gpr_tmpfile("grpclb_test", &output_fname); |
|
|
|
|
fclose(tmpfd); |
|
|
|
|
char *cmdline; |
|
|
|
|
gpr_asprintf(&cmdline, |
|
|
|
|
"./tools/codegen/core/gen_grpclb_test_response.py --lb_proto " |
|
|
|
|
"src/proto/grpc/lb/v1/load_balancer.proto %s " |
|
|
|
|
"--output %s --quiet", |
|
|
|
|
hostports_str, output_fname); |
|
|
|
|
GPR_ASSERT(system(cmdline) == 0); |
|
|
|
|
FILE *f = fopen(output_fname, "rb"); |
|
|
|
|
fseek(f, 0, SEEK_END); |
|
|
|
|
const size_t fsize = (size_t)ftell(f); |
|
|
|
|
rewind(f); |
|
|
|
|
|
|
|
|
|
char *serialized_response = static_cast<char *>(gpr_malloc(fsize)); |
|
|
|
|
GPR_ASSERT(fread(serialized_response, fsize, 1, f) == 1); |
|
|
|
|
fclose(f); |
|
|
|
|
gpr_free(output_fname); |
|
|
|
|
gpr_free(cmdline); |
|
|
|
|
grpc::lb::v1::LoadBalanceResponse response; |
|
|
|
|
auto *serverlist = response.mutable_server_list(); |
|
|
|
|
|
|
|
|
|
if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) { |
|
|
|
|
auto *expiration_interval = serverlist->mutable_expiration_interval(); |
|
|
|
|
if (expiration_interval_secs > 0) { |
|
|
|
|
expiration_interval->set_seconds(expiration_interval_secs); |
|
|
|
|
} |
|
|
|
|
if (expiration_interval_nanos > 0) { |
|
|
|
|
expiration_interval->set_nanos(expiration_interval_nanos); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for (size_t i = 0; i < nports; i++) { |
|
|
|
|
gpr_free(hostports_vec[i]); |
|
|
|
|
auto *server = serverlist->add_servers(); |
|
|
|
|
server->set_ip_address(host); |
|
|
|
|
server->set_port(ports[i]); |
|
|
|
|
server->set_load_balance_token("token" + std::to_string(ports[i])); |
|
|
|
|
} |
|
|
|
|
gpr_free(hostports_vec); |
|
|
|
|
gpr_free(hostports_str); |
|
|
|
|
|
|
|
|
|
gpr_log(GPR_INFO, "generating response: %s", |
|
|
|
|
response.ShortDebugString().c_str()); |
|
|
|
|
|
|
|
|
|
const gpr_slice response_slice = |
|
|
|
|
gpr_slice_from_copied_buffer(serialized_response, fsize); |
|
|
|
|
gpr_free(serialized_response); |
|
|
|
|
gpr_slice_from_copied_string(response.SerializeAsString().c_str()); |
|
|
|
|
return response_slice; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -211,12 +203,13 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, |
|
|
|
|
if (i == 0) { |
|
|
|
|
// First half of the ports.
|
|
|
|
|
response_payload_slice = |
|
|
|
|
build_response_payload_slice("127.0.0.1", ports, nports / 2); |
|
|
|
|
build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1); |
|
|
|
|
} else { |
|
|
|
|
// Second half of the ports.
|
|
|
|
|
sleep_ms(update_delay_ms); |
|
|
|
|
response_payload_slice = build_response_payload_slice( |
|
|
|
|
"127.0.0.1", ports + (nports / 2), (nports + 1) / 2 /* ceil */); |
|
|
|
|
response_payload_slice = |
|
|
|
|
build_response_payload_slice("127.0.0.1", ports + (nports / 2), |
|
|
|
|
(nports + 1) / 2 /* ceil */, -1, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); |
|
|
|
@ -606,11 +599,16 @@ static void teardown_test_fixture(test_fixture *tf) { |
|
|
|
|
teardown_server(&tf->lb_server); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The LB server will send two updates: batch 1 and batch 2. Each batch contains
|
|
|
|
|
// two addresses, both of a valid and running backend server. Batch 1 is readily
|
|
|
|
|
// available and provided as soon as the client establishes the streaming call.
|
|
|
|
|
// Batch 2 is sent after a delay of \a lb_server_update_delay_ms milliseconds.
|
|
|
|
|
// The LB server will send two updates: batch 1 and batch 2. Each batch
|
|
|
|
|
// contains
|
|
|
|
|
// two addresses, both of a valid and running backend server. Batch 1 is
|
|
|
|
|
// readily
|
|
|
|
|
// available and provided as soon as the client establishes the streaming
|
|
|
|
|
// call.
|
|
|
|
|
// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
|
|
|
|
|
// milliseconds.
|
|
|
|
|
static test_fixture test_update(int lb_server_update_delay_ms) { |
|
|
|
|
gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms); |
|
|
|
|
test_fixture tf; |
|
|
|
|
memset(&tf, 0, sizeof(tf)); |
|
|
|
|
setup_test_fixture(&tf, lb_server_update_delay_ms); |
|
|
|
@ -625,14 +623,18 @@ static test_fixture test_update(int lb_server_update_delay_ms) { |
|
|
|
|
&tf.client); // "consumes" 2nd backend server of 2nd serverlist
|
|
|
|
|
|
|
|
|
|
teardown_test_fixture(&tf); |
|
|
|
|
gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms); |
|
|
|
|
return tf; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
} // namespace grpc
|
|
|
|
|
|
|
|
|
|
int main(int argc, char **argv) { |
|
|
|
|
grpc_test_init(argc, argv); |
|
|
|
|
grpc_init(); |
|
|
|
|
|
|
|
|
|
test_fixture tf_result; |
|
|
|
|
grpc::test_fixture tf_result; |
|
|
|
|
// Clients take a bit over one second to complete a call (the last part of the
|
|
|
|
|
// call sleeps for 1 second while verifying the client's completion queue is
|
|
|
|
|
// empty). Therefore:
|
|
|
|
@ -641,7 +643,7 @@ int main(int argc, char **argv) { |
|
|
|
|
// before the first client request is done, skipping the second server from
|
|
|
|
|
// batch 1 altogether: the 2nd client request will go to the 1st server of
|
|
|
|
|
// batch 2 (ie, the third one out of the four total servers).
|
|
|
|
|
tf_result = test_update(800); |
|
|
|
|
tf_result = grpc::test_update(800); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2); |
|
|
|
@ -650,17 +652,17 @@ int main(int argc, char **argv) { |
|
|
|
|
// If the LB server waits 1500ms, the update arrives after having picked the
|
|
|
|
|
// 2nd server from batch 1 but before the next pick for the first server of
|
|
|
|
|
// batch 2. All server are used.
|
|
|
|
|
tf_result = test_update(1500); |
|
|
|
|
tf_result = grpc::test_update(1500); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); |
|
|
|
|
|
|
|
|
|
// If the LB server waits >= 2000ms, the update arrives after the first two
|
|
|
|
|
// If the LB server waits > 2000ms, the update arrives after the first two
|
|
|
|
|
// request are done and the third pick is performed, which returns, in RR
|
|
|
|
|
// fashion, the 1st server of the 1st update. Therefore, the second server of
|
|
|
|
|
// batch 1 is hit twice, whereas the first server of batch 2 is never hit.
|
|
|
|
|
tf_result = test_update(2000); |
|
|
|
|
tf_result = grpc::test_update(2100); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 2); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); |
|
|
|
|
GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); |
|
|
|
|