Added new tests for compression

pull/2603/head
David Garcia Quintas 10 years ago
parent 49a5130318
commit cd37d5852b
  1. 18
      test/cpp/interop/client.cc
  2. 52
      test/cpp/interop/interop_client.cc
  3. 4
      test/cpp/interop/interop_client.h
  4. 18
      test/cpp/interop/server.cc
  5. 3
      test/proto/test.proto

@ -56,8 +56,12 @@ DEFINE_string(test_case, "large_unary",
"Configure different test cases. Valid options are: "
"empty_unary : empty (zero bytes) request and response; "
"large_unary : single request and (large) response; "
"large_compressed_unary : single request and compressed (large) "
"response; "
"client_streaming : request streaming with single response; "
"server_streaming : single request with response streaming; "
"server_compressed_streaming : single request with compressed "
"response streaming; "
"slow_consumer : single request with response; "
" streaming with slow client consumer; "
"half_duplex : half-duplex streaming; "
@ -91,10 +95,14 @@ int main(int argc, char** argv) {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {
client.DoLargeUnary();
} else if (FLAGS_test_case == "large_compressed_unary") {
client.DoLargeCompressedUnary();
} else if (FLAGS_test_case == "client_streaming") {
client.DoRequestStreaming();
} else if (FLAGS_test_case == "server_streaming") {
client.DoResponseStreaming();
} else if (FLAGS_test_case == "server_compressed_streaming") {
client.DoResponseCompressedStreaming();
} else if (FLAGS_test_case == "slow_consumer") {
client.DoResponseStreamingWithSlowConsumer();
} else if (FLAGS_test_case == "half_duplex") {
@ -129,6 +137,7 @@ int main(int argc, char** argv) {
client.DoLargeUnary();
client.DoRequestStreaming();
client.DoResponseStreaming();
client.DoResponseCompressedStreaming();
client.DoHalfDuplex();
client.DoPingPong();
client.DoCancelAfterBegin();
@ -148,10 +157,11 @@ int main(int argc, char** argv) {
gpr_log(
GPR_ERROR,
"Unsupported test case %s. Valid options are all|empty_unary|"
"large_unary|client_streaming|server_streaming|half_duplex|ping_pong|"
"cancel_after_begin|cancel_after_first_response|"
"timeout_on_sleeping_server|service_account_creds|compute_engine_creds|"
"jwt_token_creds|oauth2_auth_token|per_rpc_creds",
"large_unary|large_compressed_unary|client_streaming|server_streaming|"
"server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|"
"cancel_after_first_response|timeout_on_sleeping_server|"
"service_account_creds|compute_engine_creds|jwt_token_creds|"
"oauth2_auth_token|per_rpc_creds",
FLAGS_test_case.c_str());
ret = 1;
}

@ -101,13 +101,29 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
ClientContext context;
InteropClientContextInspector inspector(context);
request->set_response_type(PayloadType::COMPRESSABLE);
request->set_response_size(kLargeResponseSize);
grpc::string payload(kLargeRequestSize, '\0');
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = stub->UnaryCall(&context, *request, response);
AssertOkOrPrintErrorStatus(s);
}
// Shared code to set large payload, make rpc and check response payload.
void InteropClient::PerformLargeCompressedUnary(SimpleRequest* request,
SimpleResponse* response) {
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
ClientContext context;
InteropClientContextInspector inspector(context);
request->set_response_size(kLargeResponseSize);
grpc::string payload(kLargeRequestSize, '\0');
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = stub->CompressedUnaryCall(&context, *request, response);
// Compression related checks.
GPR_ASSERT(request->response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
@ -245,6 +261,14 @@ void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
}
void InteropClient::DoLargeUnary() {
gpr_log(GPR_INFO, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large unary done.");
}
void InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
@ -293,6 +317,32 @@ void InteropClient::DoRequestStreaming() {
}
void InteropClient::DoResponseStreaming() {
gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
ClientContext context;
StreamingOutputCallRequest request;
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
ResponseParameters* response_parameter = request.add_response_parameters();
response_parameter->set_size(response_stream_sizes[i]);
}
StreamingOutputCallResponse response;
std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
stub->StreamingOutputCall(&context, request));
unsigned int i = 0;
while (stream->Read(&response)) {
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done.");
}
void InteropClient::DoResponseCompressedStreaming() {
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};

@ -52,10 +52,12 @@ class InteropClient {
void DoEmpty();
void DoLargeUnary();
void DoLargeCompressedUnary();
void DoPingPong();
void DoHalfDuplex();
void DoRequestStreaming();
void DoResponseStreaming();
void DoResponseCompressedStreaming();
void DoResponseStreamingWithSlowConsumer();
void DoCancelAfterBegin();
void DoCancelAfterFirstResponse();
@ -78,6 +80,8 @@ class InteropClient {
private:
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
void PerformLargeCompressedUnary(SimpleRequest* request,
SimpleResponse* response);
void AssertOkOrPrintErrorStatus(const Status& s);
std::shared_ptr<ChannelInterface> channel_;

@ -140,16 +140,12 @@ class TestServiceImpl : public TestService::Service {
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) {
InteropServerContextInspector inspector(*context);
SetResponseCompression(context, *request);
if (request->has_response_size() && request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
}
const gpr_uint32 client_accept_encodings_bitset =
inspector.GetEncodingsAcceptedByClient();
if (request->has_response_status()) {
return Status(static_cast<grpc::StatusCode>
@ -160,6 +156,13 @@ class TestServiceImpl : public TestService::Service {
return Status::OK;
}
Status CompressedUnaryCall(ServerContext* context,
const SimpleRequest* request,
SimpleResponse* response) {
SetResponseCompression(context, *request);
return UnaryCall(context, request, response);
}
Status StreamingOutputCall(
ServerContext* context, const StreamingOutputCallRequest* request,
ServerWriter<StreamingOutputCallResponse>* writer) {
@ -180,6 +183,13 @@ class TestServiceImpl : public TestService::Service {
}
}
Status CompressedStreamingOutputCall(
ServerContext* context, const StreamingOutputCallRequest* request,
ServerWriter<StreamingOutputCallResponse>* writer) {
SetResponseCompression(context, *request);
return StreamingOutputCall(context, request, writer);
}
Status StreamingInputCall(ServerContext* context,
ServerReader<StreamingInputCallRequest>* reader,
StreamingInputCallResponse* response) {

@ -48,6 +48,9 @@ service TestService {
// TODO(Issue 527): Describe required server behavior.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one compressed response.
rpc CompressedUnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by a sequence of responses (streamed download).
// The server returns the payload with client desired type and sizes.
rpc StreamingOutputCall(StreamingOutputCallRequest)

Loading…
Cancel
Save