diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index ee0f62cf20b..36bc580a96c 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -128,6 +128,152 @@ void DoLargeUnary(std::shared_ptr channel) { gpr_log(GPR_INFO, "Large unary done."); } +void DoRequestStreaming(std::shared_ptr channel) { + gpr_log(GPR_INFO, "Sending request steaming rpc ..."); + std::unique_ptr stub(TestService::NewStub(channel)); + + grpc::ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + std::unique_ptr> stream( + stub->StreamingInputCall(&context, &response)); + + int aggregated_payload_size = 0; + for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + grpc::testing::Payload* payload = request.mutable_payload(); + payload->set_body(grpc::string(request_stream_sizes[i], '\0')); + GPR_ASSERT(stream->Write(request)); + aggregated_payload_size += request_stream_sizes[i]; + } + stream->WritesDone(); + grpc::Status s = stream->Wait(); + + GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); + GPR_ASSERT(s.IsOk()); + gpr_log(GPR_INFO, "Request streaming done."); +} + +void DoResponseStreaming(std::shared_ptr channel) { + gpr_log(GPR_INFO, "Receiving response steaming rpc ..."); + std::unique_ptr stub(TestService::NewStub(channel)); + + grpc::ClientContext context; + StreamingOutputCallRequest request; + for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { + ResponseParameters* response_parameter = request.add_response_parameters(); + response_parameter->set_size(response_stream_sizes[i]); + } + StreamingOutputCallResponse response; + std::unique_ptr> stream( + stub->StreamingOutputCall(&context, &request)); + + unsigned int i = 0; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + ++i; + } + GPR_ASSERT(response_stream_sizes.size() == i); + grpc::Status s = stream->Wait(); + + GPR_ASSERT(s.IsOk()); + gpr_log(GPR_INFO, "Response streaming done."); +} + +void DoResponseStreamingWithSlowConsumer( + std::shared_ptr channel) { + gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ..."); + std::unique_ptr stub(TestService::NewStub(channel)); + + grpc::ClientContext context; + StreamingOutputCallRequest request; + + for (unsigned int i = 0; i < kNumResponseMessages; ++i) { + ResponseParameters* response_parameter = request.add_response_parameters(); + response_parameter->set_size(kResponseMessageSize); + } + StreamingOutputCallResponse response; + std::unique_ptr> stream( + stub->StreamingOutputCall(&context, &request)); + + unsigned int i = 0; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().body() == + grpc::string(kResponseMessageSize, '\0')); + gpr_log(GPR_INFO, "received message %d", i); + std::this_thread::sleep_for( + std::chrono::milliseconds(kReceiveDelayMilliSeconds)); + ++i; + } + GPR_ASSERT(kNumResponseMessages == i); + grpc::Status s = stream->Wait(); + + GPR_ASSERT(s.IsOk()); + gpr_log(GPR_INFO, "Response streaming done."); +} + +void DoHalfDuplex(std::shared_ptr channel) { + gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ..."); + std::unique_ptr stub(TestService::NewStub(channel)); + + grpc::ClientContext context; + std::unique_ptr> + stream(stub->HalfDuplexCall(&context)); + + StreamingOutputCallRequest request; + ResponseParameters* response_parameter = request.add_response_parameters(); + for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) { + response_parameter->set_size(response_stream_sizes[i]); + GPR_ASSERT(stream->Write(request)); + } + stream->WritesDone(); + + unsigned int i = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + GPR_ASSERT(response.payload().has_body()); + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + ++i; + } + GPR_ASSERT(response_stream_sizes.size() == i); + grpc::Status s = stream->Wait(); + GPR_ASSERT(s.IsOk()); + gpr_log(GPR_INFO, "Half-duplex streaming rpc done."); +} + +void DoPingPong(std::shared_ptr channel) { + gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ..."); + std::unique_ptr stub(TestService::NewStub(channel)); + + grpc::ClientContext context; + std::unique_ptr> + stream(stub->FullDuplexCall(&context)); + + StreamingOutputCallRequest request; + request.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + ResponseParameters* response_parameter = request.add_response_parameters(); + grpc::testing::Payload* payload = request.mutable_payload(); + StreamingOutputCallResponse response; + for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + response_parameter->set_size(response_stream_sizes[i]); + payload->set_body(grpc::string(request_stream_sizes[i], '\0')); + GPR_ASSERT(stream->Write(request)); + GPR_ASSERT(stream->Read(&response)); + GPR_ASSERT(response.payload().has_body()); + GPR_ASSERT(response.payload().body() == + grpc::string(response_stream_sizes[i], '\0')); + } + + stream->WritesDone(); + GPR_ASSERT(!stream->Read(&response)); + grpc::Status s = stream->Wait(); + GPR_ASSERT(s.IsOk()); + gpr_log(GPR_INFO, "Ping pong streaming done."); +} int main(int argc, char** argv) { grpc_init(); @@ -148,6 +294,23 @@ int main(int argc, char** argv) { DoEmpty(channel); } else if (FLAGS_test_case == "large_unary") { DoLargeUnary(channel); + } else if (FLAGS_test_case == "client_streaming") { + DoRequestStreaming(channel); + } else if (FLAGS_test_case == "server_streaming") { + DoResponseStreaming(channel); + } else if (FLAGS_test_case == "slow_consumer") { + DoResponseStreamingWithSlowConsumer(channel); + } else if (FLAGS_test_case == "half_duplex") { + DoHalfDuplex(channel); + } else if (FLAGS_test_case == "ping_pong") { + DoPingPong(channel); + } else if (FLAGS_test_case == "all") { + DoEmpty(channel); + DoLargeUnary(channel); + DoRequestStreaming(channel); + DoResponseStreaming(channel); + DoHalfDuplex(channel); + DoPingPong(channel); } else { gpr_log( GPR_ERROR,