|
|
|
@ -47,7 +47,9 @@ DEFINE_int32(stats_port, 50052, |
|
|
|
|
"Port to expose peer distribution stats service."); |
|
|
|
|
|
|
|
|
|
using grpc::Channel; |
|
|
|
|
using grpc::ClientAsyncResponseReader; |
|
|
|
|
using grpc::ClientContext; |
|
|
|
|
using grpc::CompletionQueue; |
|
|
|
|
using grpc::Server; |
|
|
|
|
using grpc::ServerBuilder; |
|
|
|
|
using grpc::ServerContext; |
|
|
|
@ -109,8 +111,8 @@ class XdsStatsWatcher { |
|
|
|
|
int start_id_; |
|
|
|
|
int end_id_; |
|
|
|
|
int rpcs_needed_; |
|
|
|
|
int no_remote_peer_ = 0; |
|
|
|
|
std::map<std::string, int> rpcs_by_peer_; |
|
|
|
|
int no_remote_peer_; |
|
|
|
|
std::mutex m_; |
|
|
|
|
std::condition_variable cv_; |
|
|
|
|
}; |
|
|
|
@ -120,7 +122,7 @@ class TestClient { |
|
|
|
|
TestClient(const std::shared_ptr<Channel>& channel) |
|
|
|
|
: stub_(TestService::NewStub(channel)) {} |
|
|
|
|
|
|
|
|
|
void UnaryCall() { |
|
|
|
|
void AsyncUnaryCall() { |
|
|
|
|
SimpleResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
|
|
|
|
@ -133,29 +135,55 @@ class TestClient { |
|
|
|
|
std::chrono::system_clock::now() + |
|
|
|
|
std::chrono::seconds(FLAGS_rpc_timeout_sec); |
|
|
|
|
context.set_deadline(deadline); |
|
|
|
|
Status status = stub_->UnaryCall( |
|
|
|
|
&context, SimpleRequest::default_instance(), &response); |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> lk(mu); |
|
|
|
|
for (auto watcher : watchers) { |
|
|
|
|
watcher->RpcCompleted(saved_request_id, response.hostname()); |
|
|
|
|
AsyncClientCall* call = new AsyncClientCall; |
|
|
|
|
call->saved_request_id = saved_request_id; |
|
|
|
|
call->response_reader = stub_->PrepareAsyncUnaryCall( |
|
|
|
|
&call->context, SimpleRequest::default_instance(), &cq_); |
|
|
|
|
call->response_reader->StartCall(); |
|
|
|
|
call->response_reader->Finish(&call->response, &call->status, (void*)call); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void AsyncCompleteRpc() { |
|
|
|
|
void* got_tag; |
|
|
|
|
bool ok = false; |
|
|
|
|
while (cq_.Next(&got_tag, &ok)) { |
|
|
|
|
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); |
|
|
|
|
GPR_ASSERT(ok); |
|
|
|
|
{ |
|
|
|
|
std::lock_guard<std::mutex> lk(mu); |
|
|
|
|
for (auto watcher : watchers) { |
|
|
|
|
watcher->RpcCompleted(call->saved_request_id, |
|
|
|
|
call->response.hostname()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (FLAGS_print_response) { |
|
|
|
|
if (status.ok()) { |
|
|
|
|
std::cout << "Greeting: Hello world, this is " << response.hostname() |
|
|
|
|
<< ", from " << context.peer() << std::endl; |
|
|
|
|
} else { |
|
|
|
|
std::cout << "RPC failed: " << status.error_code() << ": " |
|
|
|
|
<< status.error_message() << std::endl; |
|
|
|
|
if (FLAGS_print_response) { |
|
|
|
|
if (call->status.ok()) { |
|
|
|
|
std::cout << "Greeting: Hello world, this is " |
|
|
|
|
<< call->response.hostname() << ", from " |
|
|
|
|
<< call->context.peer() << std::endl; |
|
|
|
|
} else { |
|
|
|
|
std::cout << "RPC failed: " << call->status.error_code() << ": " |
|
|
|
|
<< call->status.error_message() << std::endl; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
delete call; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
struct AsyncClientCall { |
|
|
|
|
SimpleResponse response; |
|
|
|
|
ClientContext context; |
|
|
|
|
Status status; |
|
|
|
|
int saved_request_id; |
|
|
|
|
std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>> response_reader; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
std::unique_ptr<TestService::Stub> stub_; |
|
|
|
|
CompletionQueue cq_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service { |
|
|
|
@ -191,13 +219,16 @@ void RunTestLoop(const std::string& server, |
|
|
|
|
std::chrono::system_clock::now(); |
|
|
|
|
std::chrono::duration<double> elapsed; |
|
|
|
|
|
|
|
|
|
std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client); |
|
|
|
|
|
|
|
|
|
while (true) { |
|
|
|
|
elapsed = std::chrono::system_clock::now() - start; |
|
|
|
|
if (elapsed > duration_per_query) { |
|
|
|
|
start = std::chrono::system_clock::now(); |
|
|
|
|
client.UnaryCall(); |
|
|
|
|
client.AsyncUnaryCall(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
thread.join(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void RunServer(const int port) { |
|
|
|
|