Enable streaming test case for gRPC client in GCE.

Change on 2015/01/08 by chenw <chenw@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83549959
pull/3/merge
chenw 10 years ago committed by Nicolas Noble
parent 4dee1577fb
commit 6edb547c99
  1. 163
      test/cpp/interop/client.cc

@ -128,6 +128,152 @@ void DoLargeUnary(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Large unary done.");
}
void DoRequestStreaming(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Sending request steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
grpc::ClientContext context;
StreamingInputCallRequest request;
StreamingInputCallResponse response;
std::unique_ptr<grpc::ClientWriter<StreamingInputCallRequest>> stream(
stub->StreamingInputCall(&context, &response));
int aggregated_payload_size = 0;
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
grpc::testing::Payload* payload = request.mutable_payload();
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
GPR_ASSERT(stream->Write(request));
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
grpc::Status s = stream->Wait();
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Request streaming done.");
}
void DoResponseStreaming(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
grpc::ClientContext context;
StreamingOutputCallRequest request;
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
ResponseParameters* response_parameter = request.add_response_parameters();
response_parameter->set_size(response_stream_sizes[i]);
}
StreamingOutputCallResponse response;
std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
stub->StreamingOutputCall(&context, &request));
unsigned int i = 0;
while (stream->Read(&response)) {
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
grpc::Status s = stream->Wait();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
}
void DoResponseStreamingWithSlowConsumer(
std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
grpc::ClientContext context;
StreamingOutputCallRequest request;
for (unsigned int i = 0; i < kNumResponseMessages; ++i) {
ResponseParameters* response_parameter = request.add_response_parameters();
response_parameter->set_size(kResponseMessageSize);
}
StreamingOutputCallResponse response;
std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
stub->StreamingOutputCall(&context, &request));
unsigned int i = 0;
while (stream->Read(&response)) {
GPR_ASSERT(response.payload().body() ==
grpc::string(kResponseMessageSize, '\0'));
gpr_log(GPR_INFO, "received message %d", i);
std::this_thread::sleep_for(
std::chrono::milliseconds(kReceiveDelayMilliSeconds));
++i;
}
GPR_ASSERT(kNumResponseMessages == i);
grpc::Status s = stream->Wait();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
}
void DoHalfDuplex(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<StreamingOutputCallRequest,
StreamingOutputCallResponse>>
stream(stub->HalfDuplexCall(&context));
StreamingOutputCallRequest request;
ResponseParameters* response_parameter = request.add_response_parameters();
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
GPR_ASSERT(stream->Write(request));
}
stream->WritesDone();
unsigned int i = 0;
StreamingOutputCallResponse response;
while (stream->Read(&response)) {
GPR_ASSERT(response.payload().has_body());
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
grpc::Status s = stream->Wait();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
}
void DoPingPong(std::shared_ptr<ChannelInterface> channel) {
gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel));
grpc::ClientContext context;
std::unique_ptr<grpc::ClientReaderWriter<StreamingOutputCallRequest,
StreamingOutputCallResponse>>
stream(stub->FullDuplexCall(&context));
StreamingOutputCallRequest request;
request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
ResponseParameters* response_parameter = request.add_response_parameters();
grpc::testing::Payload* payload = request.mutable_payload();
StreamingOutputCallResponse response;
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
GPR_ASSERT(stream->Write(request));
GPR_ASSERT(stream->Read(&response));
GPR_ASSERT(response.payload().has_body());
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
}
stream->WritesDone();
GPR_ASSERT(!stream->Read(&response));
grpc::Status s = stream->Wait();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Ping pong streaming done.");
}
int main(int argc, char** argv) {
grpc_init();
@ -148,6 +294,23 @@ int main(int argc, char** argv) {
DoEmpty(channel);
} else if (FLAGS_test_case == "large_unary") {
DoLargeUnary(channel);
} else if (FLAGS_test_case == "client_streaming") {
DoRequestStreaming(channel);
} else if (FLAGS_test_case == "server_streaming") {
DoResponseStreaming(channel);
} else if (FLAGS_test_case == "slow_consumer") {
DoResponseStreamingWithSlowConsumer(channel);
} else if (FLAGS_test_case == "half_duplex") {
DoHalfDuplex(channel);
} else if (FLAGS_test_case == "ping_pong") {
DoPingPong(channel);
} else if (FLAGS_test_case == "all") {
DoEmpty(channel);
DoLargeUnary(channel);
DoRequestStreaming(channel);
DoResponseStreaming(channel);
DoHalfDuplex(channel);
DoPingPong(channel);
} else {
gpr_log(
GPR_ERROR,

Loading…
Cancel
Save