diff --git a/Makefile b/Makefile index 830dcaeff33..9be3e5784cc 100644 --- a/Makefile +++ b/Makefile @@ -4376,7 +4376,7 @@ LIBINTEROP_SERVER_MAIN_SRC = \ $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \ $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc \ - test/cpp/interop/server_main.cc \ + test/cpp/interop/interop_server.cc \ PUBLIC_HEADERS_CXX += \ @@ -4422,7 +4422,7 @@ ifneq ($(NO_DEPS),true) -include $(LIBINTEROP_SERVER_MAIN_OBJS:.o=.dep) endif endif -$(OBJDIR)/$(CONFIG)/test/cpp/interop/server_main.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc +$(OBJDIR)/$(CONFIG)/test/cpp/interop/interop_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc LIBQPS_SRC = \ @@ -14939,8 +14939,8 @@ test/cpp/end2end/test_service_impl.cc: $(OPENSSL_DEP) test/cpp/interop/client.cc: $(OPENSSL_DEP) test/cpp/interop/client_helper.cc: $(OPENSSL_DEP) test/cpp/interop/interop_client.cc: $(OPENSSL_DEP) +test/cpp/interop/interop_server.cc: $(OPENSSL_DEP) test/cpp/interop/server_helper.cc: $(OPENSSL_DEP) -test/cpp/interop/server_main.cc: $(OPENSSL_DEP) test/cpp/qps/client_async.cc: $(OPENSSL_DEP) test/cpp/qps/client_sync.cc: $(OPENSSL_DEP) test/cpp/qps/driver.cc: $(OPENSSL_DEP) diff --git a/build.yaml b/build.yaml index d41f4195947..1f06e20cf9b 100644 --- a/build.yaml +++ b/build.yaml @@ -1117,7 +1117,7 @@ libs: - src/proto/grpc/testing/empty.proto - src/proto/grpc/testing/messages.proto - src/proto/grpc/testing/test.proto - - test/cpp/interop/server_main.cc + - test/cpp/interop/interop_server.cc deps: - interop_server_helper - grpc++_test_util diff --git a/doc/interop-test-descriptions.md b/doc/interop-test-descriptions.md index 7fd21c7022e..a4f9abecfae 100644 --- a/doc/interop-test-descriptions.md +++ b/doc/interop-test-descriptions.md @@ -68,14 +68,12 @@ control (even if compression is enabled on the channel). Server features: * [UnaryCall][] -* [Compressable Payload][] Procedure: 1. Client calls UnaryCall with: ``` { - response_type: COMPRESSABLE response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -85,56 +83,106 @@ Procedure: Client asserts: * call was successful -* response payload type is COMPRESSABLE * response payload body is 314159 bytes in size * clients are free to assert that the response payload body contents are zero and comparing the entire response message against a golden response -### large_compressed_unary - -This test verifies compressed unary calls succeed in sending messages. It -sends one unary request for every payload type, with and without requesting a -compressed response from the server. - -In all scenarios, whether compression was actually performed is determined by -the compression bit in the response's message flags. +### client_compressed_unary +This test verifies the client can compress unary messages by sending two unary +calls, for compressed and uncompressed payloads. It also sends an initial +probing request to verify whether the server supports the [CompressedRequest][] +feature by checking if the probing call fails with an `INVALID_ARGUMENT` status. Server features: * [UnaryCall][] -* [Compressable Payload][] -* [Uncompressable Payload][] +* [CompressedRequest][] Procedure: - 1. Client calls UnaryCall with: + 1. Client calls UnaryCall with the feature probe, an *uncompressed* message: + ``` + { + expect_compressed:{ + value: true + } + response_size: 314159 + payload:{ + body: 271828 bytes of zeros + } + } + ``` + + 1. Client calls UnaryCall with the *compressed* message: + + ``` + { + expect_compressed:{ + value: true + } + response_size: 314159 + payload:{ + body: 271828 bytes of zeros + } + } + ``` + + 1. Client calls UnaryCall with the *uncompressed* message: ``` { - request_compressed_response: bool - response_type: COMPRESSABLE + expect_compressed:{ + value: false + } response_size: 314159 payload:{ body: 271828 bytes of zeros } } ``` + Client asserts: - * call was successful - * response payload type is COMPRESSABLE - * if `request_compressed_response` is false, the response MUST NOT have the - compressed message flag set. - * if `request_compressed_response` is true, the response MUST have the - compressed message flag set. - * response payload body is 314159 bytes in size - * clients are free to assert that the response payload body contents are - zero and comparing the entire response message against a golden response + * First call failed with `INVALID_ARGUMENT` status. + * Subsequent calls were successful. + * Response payload body is 314159 bytes in size. + * Clients are free to assert that the response payload body contents are + zeros and comparing the entire response message against a golden response. - 2. Client calls UnaryCall with: +### server_compressed_unary + +This test verifies the server can compress unary messages. It sends two unary +requests, expecting the server's response to be compressed or not according to +the `response_compressed` boolean. + +Whether compression was actually performed is determined by the compression bit +in the response's message flags. *Note that some languages may not have access +to the message flags*. + + +Server features: +* [UnaryCall][] +* [CompressedResponse][] + +Procedure: + 1. Client calls UnaryCall with `SimpleRequest`: + + ``` + { + response_compressed:{ + value: true + } + response_size: 314159 + payload:{ + body: 271828 bytes of zeros + } + } + ``` + ``` { - request_compressed_response: bool - response_type: UNCOMPRESSABLE + response_compressed:{ + value: false + } response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -143,11 +191,13 @@ Procedure: ``` Client asserts: * call was successful - * response payload type is UNCOMPRESSABLE - * the response MAY have the compressed message flag set. Some - implementations will choose to compress the payload even when the output - size if larger than the input. - * response payload body is 314159 bytes in size + * when `response_compressed` is true, the response MUST have the + compressed message flag set. + * when `response_compressed` is false, the response MUST NOT have + the compressed message flag set. + * response payload body is 314159 bytes in size in both cases. + * clients are free to assert that the response payload body contents are + zero and comparing the entire response message against a golden response ### client_streaming @@ -156,7 +206,6 @@ This test verifies that client-only streaming succeeds. Server features: * [StreamingInputCall][] -* [Compressable Payload][] Procedure: 1. Client calls StreamingInputCall @@ -206,25 +255,81 @@ Client asserts: * call was successful * response aggregated_payload_size is 74922 + +### client_compressed_streaming + +This test verifies the client can compress requests on per-message basis by +performing a two-request streaming call. It also sends an initial probing +request to verify whether the server supports the [CompressedRequest][] feature +by checking if the probing call fails with an `INVALID_ARGUMENT` status. + +Procedure: + 1. Client calls `StreamingInputCall` and sends the following feature-probing + *uncompressed* `StreamingInputCallRequest` message + + ``` + { + expect_compressed:{ + value: true + } + payload:{ + body: 27182 bytes of zeros + } + } + ``` + If the call fails with `INVALID_ARGUMENT`, the test fails. Otherwise, we + continue. + + 1. Client calls `StreamingInputCall` again, sending the *compressed* message + + ``` + { + expect_compressed:{ + value: true + } + payload:{ + body: 27182 bytes of zeros + } + } + ``` + + 1. And finally, the *uncompressed* message + ``` + { + expect_compressed:{ + value: false + } + payload:{ + body: 45904 bytes of zeros + } + } + ``` + + 1. Client half-closes + +Client asserts: +* First call fails with `INVALID_ARGUMENT`. +* Next calls succeeds. +* Response aggregated payload size is 73086. + + ### server_streaming This test verifies that server-only streaming succeeds. Server features: * [StreamingOutputCall][] -* [Compressable Payload][] Procedure: - 1. Client calls StreamingOutputCall with: + 1. Client calls StreamingOutputCall with `StreamingOutputCallRequest`: ``` { - response_type:COMPRESSABLE response_parameters:{ size: 31415 } response_parameters:{ - size: 59 + size: 9 } response_parameters:{ size: 2653 @@ -238,103 +343,64 @@ Procedure: Client asserts: * call was successful * exactly four responses -* response payloads are COMPRESSABLE * response payload bodies are sized (in order): 31415, 9, 2653, 58979 * clients are free to assert that the response payload body contents are zero and comparing the entire response messages against golden responses ### server_compressed_streaming -This test verifies that server-only compressed streaming succeeds. +This test verifies that the server can compress streaming messages and disable +compression on individual messages. Server features: * [StreamingOutputCall][] -* [Compressable Payload][] -* [Uncompressable Payload][] +* [CompressedResponse][] Procedure: - 1. Client calls StreamingOutputCall with: + 1. Client calls StreamingOutputCall with `StreamingOutputCallRequest`: ``` { - request_compressed_response: bool - response_type:COMPRESSABLE response_parameters:{ + compressed: { + value: true + } size: 31415 } response_parameters:{ - size: 59 - } - response_parameters:{ - size: 2653 - } - response_parameters:{ - size: 58979 + compressed: { + value: false + } + size: 92653 } } ``` Client asserts: * call was successful - * exactly four responses - * response payloads are COMPRESSABLE - * if `request_compressed_response` is false, the response's messages MUST + * exactly two responses + * when `response_compressed` is false, the response's messages MUST NOT have the compressed message flag set. - * if `request_compressed_response` is true, the response's messages MUST + * when `response_compressed` is true, the response's messages MUST have the compressed message flag set. - * response payload bodies are sized (in order): 31415, 59, 2653, 58979 + * response payload bodies are sized (in order): 31415, 92653 * clients are free to assert that the response payload body contents are zero and comparing the entire response messages against golden responses - 2. Client calls StreamingOutputCall with: - - ``` - { - request_compressed_response: bool - response_type:UNCOMPRESSABLE - response_parameters:{ - size: 31415 - } - response_parameters:{ - size: 59 - } - response_parameters:{ - size: 2653 - } - response_parameters:{ - size: 58979 - } - } - ``` - - Client asserts: - * call was successful - * exactly four responses - * response payloads are UNCOMPRESSABLE - * the response MAY have the compressed message flag set. Some - implementations will choose to compress the payload even when the output - size if larger than the input. - * response payload bodies are sized (in order): 31415, 59, 2653, 58979 - * clients are free to assert that the body of the responses are identical to - the golden uncompressable data at `test/cpp/interop/rnd.dat`. - - ### ping_pong This test verifies that full duplex bidi is supported. Server features: * [FullDuplexCall][] -* [Compressable Payload][] Procedure: 1. Client calls FullDuplexCall with: ``` { - response_type: COMPRESSABLE response_parameters:{ size: 31415 } @@ -348,9 +414,8 @@ Procedure: ``` { - response_type: COMPRESSABLE response_parameters:{ - size: 59 + size: 9 } payload:{ body: 8 bytes of zeros @@ -362,7 +427,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_parameters:{ size: 2653 } @@ -376,7 +440,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_parameters:{ size: 58979 } @@ -391,7 +454,6 @@ Procedure: Client asserts: * call was successful * exactly four responses -* response payloads are COMPRESSABLE * response payload bodies are sized (in order): 31415, 9, 2653, 58979 * clients are free to assert that the response payload body contents are zero and comparing the entire response messages against golden responses @@ -421,12 +483,12 @@ with desired oauth scope. The test uses `--default_service_account` with GCE service account email and `--oauth_scope` with the OAuth scope to use. For testing against -grpc-test.sandbox.googleapis.com, "https://www.googleapis.com/auth/xapi.zoo" should +grpc-test.sandbox.googleapis.com, "https://www.googleapis.com/auth/xapi.zoo" +should be passed in as `--oauth_scope`. Server features: * [UnaryCall][] -* [Compressable Payload][] * [Echo Authenticated Username][] * [Echo OAuth Scope][] @@ -436,7 +498,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -448,7 +509,8 @@ Procedure: Client asserts: * call was successful -* received SimpleResponse.username equals the value of `--default_service_account` flag +* received SimpleResponse.username equals the value of + `--default_service_account` flag * received SimpleResponse.oauth_scope is in `--oauth_scope` * response payload body is 314159 bytes in size * clients are free to assert that the response payload body contents are zero @@ -469,7 +531,6 @@ variable GOOGLE_APPLICATION_CREDENTIALS. Server features: * [UnaryCall][] -* [Compressable Payload][] * [Echo Authenticated Username][] * [Echo OAuth Scope][] @@ -479,7 +540,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -492,7 +552,8 @@ Client asserts: * call was successful * received SimpleResponse.username is not empty and is in the json key file used by the auth library. The client can optionally check the username matches the -email address in the key file or equals the value of `--default_service_account` flag. +email address in the key file or equals the value of `--default_service_account` +flag. * response payload body is 314159 bytes in size * clients are free to assert that the response payload body contents are zero and comparing the entire response message against a golden response @@ -518,18 +579,18 @@ variable GOOGLE_APPLICATION_CREDENTIALS, *OR* if GCE credentials is used to fetch the token, `--default_service_account` can be used to pass in GCE service account email. - uses the flag `--oauth_scope` for the oauth scope. For testing against -grpc-test.sandbox.googleapis.com, "https://www.googleapis.com/auth/xapi.zoo" should -be passed as the `--oauth_scope`. +grpc-test.sandbox.googleapis.com, "https://www.googleapis.com/auth/xapi.zoo" +should be passed as the `--oauth_scope`. Server features: * [UnaryCall][] -* [Compressable Payload][] * [Echo Authenticated Username][] * [Echo OAuth Scope][] Procedure: 1. Client uses the auth library to obtain an authorization token - 2. Client configures the channel to use AccessTokenCredentials with the access token obtained in step 1 + 2. Client configures the channel to use AccessTokenCredentials with the access + token obtained in step 1 3. Client calls UnaryCall with the following message ``` @@ -550,22 +611,21 @@ json key file or GCE default service account email. Similar to the other auth tests, this test is only for cloud-to-prod path. -This test verifies unary calls succeed in sending messages using a JWT or a service account -credentials set on the RPC. +This test verifies unary calls succeed in sending messages using a JWT or a +service account credentials set on the RPC. The test - uses the flag `--service_account_key_file` with the path to a json key file downloaded from https://console.developers.google.com. Alternately, if using a usable auth implementation, it may specify the file location in the environment variable GOOGLE_APPLICATION_CREDENTIALS -- optionally uses the flag `--oauth_scope` for the oauth scope if implementator +- optionally uses the flag `--oauth_scope` for the oauth scope if implementator wishes to use service account credential instead of JWT credential. For testing -against grpc-test.sandbox.googleapis.com, oauth scope +against grpc-test.sandbox.googleapis.com, oauth scope "https://www.googleapis.com/auth/xapi.zoo" should be used. Server features: * [UnaryCall][] -* [Compressable Payload][] * [Echo Authenticated Username][] * [Echo OAuth Scope][] @@ -596,7 +656,6 @@ by the server. Server features: * [UnaryCall][] * [FullDuplexCall][] -* [Compressable Payload][] * [Echo Metadata][] Procedure: @@ -611,7 +670,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -630,7 +688,6 @@ Procedure: ``` { - response_type: COMPRESSABLE response_size: 314159 payload:{ body: 271828 bytes of zeros @@ -736,14 +793,12 @@ from the server. Server features: * [FullDuplexCall][] -* [Compressable Payload][] Procedure: 1. Client starts FullDuplexCall with ``` { - response_type: COMPRESSABLE response_parameters:{ size: 31415 } @@ -887,6 +942,21 @@ payload body of size `SimpleRequest.response_size` bytes and type as appropriate for the `SimpleRequest.response_type`. If the server does not support the `response_type`, then it should fail the RPC with `INVALID_ARGUMENT`. +### CompressedResponse +[CompressedResponse]: #compressedresponse + +When the client sets `response_compressed` to true, the server's response is +sent back compressed. Note that `response_compressed` is present on both +`SimpleRequest` (unary) and `StreamingOutputCallRequest` (streaming). + +### CompressedRequest +[CompressedRequest]: #compressedrequest + +When the client sets `expect_compressed` to true, the server expects the client +request to be compressed. If it's not, it fails the RPC with `INVALID_ARGUMENT`. +Note that `response_compressed` is present on both `SimpleRequest` (unary) and +`StreamingOutputCallRequest` (streaming). + ### StreamingInputCall [StreamingInputCall]: #streaminginputcall @@ -913,20 +983,6 @@ payload body of size ResponseParameters.size bytes, as specified by its respective ResponseParameters. After receiving half close and sending all responses, it closes with OK. -### Compressable Payload -[Compressable Payload]: #compressable-payload - -When the client requests COMPRESSABLE payload, the response includes a payload -of the size requested containing all zeros and the payload type is -COMPRESSABLE. - -### Uncompressable Payload -[Uncompressable Payload]: #uncompressable-payload - -When the client requests UNCOMPRESSABLE payload, the response includes a payload -of the size requested containing uncompressable data and the payload type is -UNCOMPRESSABLE. - ### Echo Status [Echo Status]: #echo-status When the client sends a response_status in the request payload, the server closes diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 72807623054..05f52a1083d 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -45,9 +45,6 @@ var testProto = grpc.load({ var ECHO_INITIAL_KEY = 'x-grpc-test-echo-initial'; var ECHO_TRAILING_KEY = 'x-grpc-test-echo-trailing-bin'; -var incompressible_data = fs.readFileSync( - __dirname + '/../../../test/cpp/interop/rnd.dat'); - /** * Create a buffer filled with size zeroes * @param {number} size The length of the buffer @@ -88,15 +85,7 @@ function getEchoTrailer(call) { } function getPayload(payload_type, size) { - if (payload_type === 'RANDOM') { - payload_type = ['COMPRESSABLE', - 'UNCOMPRESSABLE'][Math.random() < 0.5 ? 0 : 1]; - } - var body; - switch (payload_type) { - case 'COMPRESSABLE': body = zeroBuffer(size); break; - case 'UNCOMPRESSABLE': incompressible_data.slice(size); break; - } + var body = zeroBuffer(size); return {type: payload_type, body: body}; } diff --git a/src/proto/grpc/binary_log/v1alpha/log.proto b/src/proto/grpc/binary_log/v1alpha/log.proto index 83166cd4104..46656109bc9 100644 --- a/src/proto/grpc/binary_log/v1alpha/log.proto +++ b/src/proto/grpc/binary_log/v1alpha/log.proto @@ -29,20 +29,20 @@ syntax = "proto3"; -import "google/protobuf/timestamp.proto" +import "google/protobuf/timestamp.proto"; package grpc.binary_log.v1alpha; enum Direction { - SERVER_SEND; - SERVER_RECV; - CLIENT_SEND; - CLIENT_RECV; + SERVER_SEND = 0; + SERVER_RECV = 1; + CLIENT_SEND = 2; + CLIENT_RECV = 3; } message KeyValuePair { - string key; - string value; + string key = 1; + string value = 2; } // Any sort of metadata that may be sent in either direction during a call diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index e1090156ab4..64998c2f231 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -34,17 +34,24 @@ syntax = "proto3"; package grpc.testing; +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} + +// DEPRECATED, don't use. To be removed shortly. // The type of payload that should be returned. enum PayloadType { // Compressable text format. COMPRESSABLE = 0; - - // Uncompressable binary format. - UNCOMPRESSABLE = 1; } // A block of data, to simply increase gRPC message size. message Payload { + // DEPRECATED, don't use. To be removed shortly. // The type of data in body. PayloadType type = 1; // Primary contents of payload. @@ -60,12 +67,12 @@ message EchoStatus { // Unary request. message SimpleRequest { + // DEPRECATED, don't use. To be removed shortly. // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. PayloadType response_type = 1; // Desired payload size in the response from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. int32 response_size = 2; // Optional input payload sent along with the request. @@ -77,11 +84,17 @@ message SimpleRequest { // Whether SimpleResponse should include OAuth scope. bool fill_oauth_scope = 5; - // Whether to request the server to compress the response. - bool request_compressed_response = 6; + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; // Whether server should return a given status EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; } // Unary response, as configured by the request. @@ -100,6 +113,12 @@ message StreamingInputCallRequest { // Optional input payload sent along with the request. Payload payload = 1; + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + // Not expecting any payload from the response. } @@ -112,16 +131,22 @@ message StreamingInputCallResponse { // Configuration for a particular response. message ResponseParameters { // Desired payload sizes in responses from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. int32 size = 1; // Desired interval between consecutive responses in the response stream in // microseconds. int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; } // Server-streaming request. message StreamingOutputCallRequest { + // DEPRECATED, don't use. To be removed shortly. // Desired payload type in the response from the server. // If response_type is RANDOM, the payload from each response in the stream // might be of different types. This is to simulate a mixed type of payload @@ -134,9 +159,6 @@ message StreamingOutputCallRequest { // Optional input payload sent along with the request. Payload payload = 3; - // Whether to request the server to compress the response. - bool request_compressed_response = 6; - // Whether server should return a given status EchoStatus response_status = 7; } diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 3e974eba0a3..f498ed41901 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -182,6 +182,8 @@ class BuildProtoModules(setuptools.Command): '--plugin=protoc-gen-python-grpc={}'.format( self.grpc_python_plugin_command), '-I {}'.format(GRPC_STEM), + '-I .', + '-I {}/third_party/protobuf/src'.format(GRPC_STEM), '--python_out={}'.format(PROTO_GEN_STEM), '--python-grpc_out={}'.format(PROTO_GEN_STEM), ] + [path] diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index addaf174f2b..e8ae6ee5723 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -40,7 +40,9 @@ #include #include #include +#include +#include "src/core/lib/support/string.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" #include "test/cpp/util/test_config.h" @@ -52,30 +54,31 @@ DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); DEFINE_string(test_case, "large_unary", - "Configure different test cases. Valid options are: " - "empty_unary : empty (zero bytes) request and response; " - "large_unary : single request and (large) response; " - "large_compressed_unary : single request and compressed (large) " - "response; " - "client_streaming : request streaming with single response; " - "server_streaming : single request with response streaming; " + "Configure different test cases. Valid options are:\n\n" + "all : all test cases;\n" + "cancel_after_begin : cancel stream after starting it;\n" + "cancel_after_first_response: cancel on first response;\n" + "client_compressed_streaming : compressed request streaming with " + "client_compressed_unary : single compressed request;\n" + "client_streaming : request streaming with single response;\n" + "compute_engine_creds: large_unary with compute engine auth;\n" + "custom_metadata: server will echo custom metadata;\n" + "empty_stream : bi-di stream with no request/response;\n" + "empty_unary : empty (zero bytes) request and response;\n" + "half_duplex : half-duplex streaming;\n" + "jwt_token_creds: large_unary with JWT token auth;\n" + "large_unary : single request and (large) response;\n" + "oauth2_auth_token: raw oauth2 access token auth;\n" + "per_rpc_creds: raw oauth2 access token on a single rpc;\n" + "ping_pong : full-duplex streaming;\n" + "response streaming;\n" "server_compressed_streaming : single request with compressed " - "response streaming; " - "slow_consumer : single request with response; " - " streaming with slow client consumer; " - "half_duplex : half-duplex streaming; " - "ping_pong : full-duplex streaming; " - "cancel_after_begin : cancel stream after starting it; " - "cancel_after_first_response: cancel on first response; " - "timeout_on_sleeping_server: deadline exceeds on stream; " - "empty_stream : bi-di stream with no request/response; " - "compute_engine_creds: large_unary with compute engine auth; " - "jwt_token_creds: large_unary with JWT token auth; " - "oauth2_auth_token: raw oauth2 access token auth; " - "per_rpc_creds: raw oauth2 access token on a single rpc; " - "status_code_and_message: verify status code & message; " - "custom_metadata: server will echo custom metadata;" - "all : all of above."); + "server_compressed_unary : single compressed response;\n" + "server_streaming : single request with response streaming;\n" + "slow_consumer : single request with response streaming with " + "slow client consumer;\n" + "status_code_and_message: verify status code & message;\n" + "timeout_on_sleeping_server: deadline exceeds on stream;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -104,14 +107,18 @@ int main(int argc, char** argv) { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { client.DoLargeUnary(); - } else if (FLAGS_test_case == "large_compressed_unary") { - client.DoLargeCompressedUnary(); + } else if (FLAGS_test_case == "server_compressed_unary") { + client.DoServerCompressedUnary(); + } else if (FLAGS_test_case == "client_compressed_unary") { + client.DoClientCompressedUnary(); } else if (FLAGS_test_case == "client_streaming") { client.DoRequestStreaming(); } else if (FLAGS_test_case == "server_streaming") { client.DoResponseStreaming(); } else if (FLAGS_test_case == "server_compressed_streaming") { - client.DoResponseCompressedStreaming(); + client.DoServerCompressedStreaming(); + } else if (FLAGS_test_case == "client_compressed_streaming") { + client.DoClientCompressedStreaming(); } else if (FLAGS_test_case == "slow_consumer") { client.DoResponseStreamingWithSlowConsumer(); } else if (FLAGS_test_case == "half_duplex") { @@ -144,9 +151,12 @@ int main(int argc, char** argv) { } else if (FLAGS_test_case == "all") { client.DoEmpty(); client.DoLargeUnary(); + client.DoClientCompressedUnary(); + client.DoServerCompressedUnary(); client.DoRequestStreaming(); client.DoResponseStreaming(); - client.DoResponseCompressedStreaming(); + client.DoClientCompressedStreaming(); + client.DoServerCompressedStreaming(); client.DoHalfDuplex(); client.DoPingPong(); client.DoCancelAfterBegin(); @@ -165,15 +175,35 @@ int main(int argc, char** argv) { } // compute_engine_creds only runs in GCE. } else { - gpr_log( - GPR_ERROR, - "Unsupported test case %s. Valid options are all|empty_unary|" - "large_unary|large_compressed_unary|client_streaming|server_streaming|" - "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|" - "cancel_after_first_response|timeout_on_sleeping_server|empty_stream|" - "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds|" - "status_code_and_message|custom_metadata", - FLAGS_test_case.c_str()); + const char* testcases[] = {"all", + "cancel_after_begin", + "cancel_after_first_response", + "client_compressed_streaming", + "client_compressed_unary", + "client_streaming", + "compute_engine_creds", + "custom_metadata", + "empty_stream", + "empty_unary", + "half_duplex", + "jwt_token_creds", + "large_unary", + "oauth2_auth_token", + "oauth2_auth_token", + "per_rpc_creds", + "per_rpc_creds", + "ping_pong", + "server_compressed_streaming", + "server_compressed_unary", + "server_streaming", + "status_code_and_message", + "timeout_on_sleeping_server"}; + char* joined_testcases = + gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); + + gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s", + FLAGS_test_case.c_str(), joined_testcases); + gpr_free(joined_testcases); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 608f902d6fe..89f841dbe96 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,7 +57,7 @@ namespace testing { namespace { // The same value is defined by the Java client. const std::vector request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector response_stream_sizes = {31415, 59, 2653, 58979}; +const std::vector response_stream_sizes = {31415, 9, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -67,28 +67,23 @@ const int kLargeResponseSize = 314159; void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} -void CompressionChecks(const InteropClientContextInspector& inspector, - const SimpleRequest* request, - const SimpleResponse* response) { +void UnaryCompressionChecks(const InteropClientContextInspector& inspector, + const SimpleRequest* request, + const SimpleResponse* response) { const grpc_compression_algorithm received_compression = inspector.GetCallCompressionAlgorithm(); - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { + if (request->response_compressed().value()) { + if (received_compression == GRPC_COMPRESS_NONE) { // Requested some compression, got NONE. This is an error. gpr_log(GPR_ERROR, "Failure: Requested compression but got uncompressed response " "from server."); abort(); } - } - if (!request->request_compressed_response()) { - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request->response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should always - // be compressed. GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // Didn't request compression -> make sure the response is uncompressed + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } } } // namespace @@ -190,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, CheckerFn custom_checks_fn) { ClientContext context; InteropClientContextInspector inspector(context); - // If the request doesn't already specify the response type, default to - // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + if (request->has_expect_compressed()) { + if (request->expect_compressed().value()) { + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + } else { + context.set_compression_algorithm(GRPC_COMPRESS_NONE); + } + } Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); if (!AssertStatusOk(s)) { @@ -204,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - GPR_ASSERT(response->payload().type() == request->response_type()); - switch (response->payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); - break; - case PayloadType::UNCOMPRESSABLE: { - // We don't really check anything: We can't assert that the payload is - // uncompressed because it's the server's prerogative to decide on that, - // and different implementations decide differently (ie, Java always - // compresses when requested to do so, whereas C core throws away the - // compressed payload if the output is larger than the input). - // In addition, we don't compare the actual random bytes received because - // asserting that data is sent/received properly isn't the purpose of this - // test. Moreover, different implementations are also free to use - // different sets of random bytes. - } break; - default: - GPR_ASSERT(false); - } - + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); return true; } @@ -237,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -311,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -327,7 +306,6 @@ bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; } @@ -335,32 +313,73 @@ bool InteropClient::DoLargeUnary() { return true; } -bool InteropClient::DoLargeCompressedUnary() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", - log_suffix); - SimpleRequest request; - SimpleResponse response; - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - if (!PerformLargeUnary(&request, &response, CompressionChecks)) { - gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); - gpr_free(log_suffix); - return false; - } - - gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); +bool InteropClient::DoClientCompressedUnary() { + // Probing for compression-checks support. + ClientContext probe_context; + SimpleRequest probe_req; + SimpleResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + + probe_req.set_response_size(kLargeResponseSize); + probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); + const Status s = + serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); + + const std::vector compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_expect_compressed()->set_value(compressions[i]); + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + + gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + } + + return true; +} + +bool InteropClient::DoServerCompressedUnary() { + const std::vector compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", + log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_response_compressed()->set_value(compressions[i]); + + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); gpr_free(log_suffix); + return false; } + + gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); + gpr_free(log_suffix); } return true; @@ -387,7 +406,7 @@ bool InteropClient::DoRequestStreaming() { serviceStub_.Get()->StreamingInputCall(&context, &response)); int aggregated_payload_size = 0; - for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + for (size_t i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); if (!stream->Write(request)) { @@ -396,7 +415,7 @@ bool InteropClient::DoRequestStreaming() { } aggregated_payload_size += request_stream_sizes[i]; } - stream->WritesDone(); + GPR_ASSERT(stream->WritesDone()); Status s = stream->Finish(); if (!AssertStatusOk(s)) { @@ -446,92 +465,129 @@ bool InteropClient::DoResponseStreaming() { return true; } -bool InteropClient::DoResponseCompressedStreaming() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - ClientContext context; - InteropClientContextInspector inspector(context); - StreamingOutputCallRequest request; - - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); - - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - for (size_t k = 0; k < response_stream_sizes.size(); ++k) { - ResponseParameters* response_parameter = - request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[k]); - } - StreamingOutputCallResponse response; - - std::unique_ptr> stream( - serviceStub_.Get()->StreamingOutputCall(&context, request)); - - size_t k = 0; - while (stream->Read(&response)) { - // Payload related checks. - GPR_ASSERT(response.payload().type() == request.response_type()); - switch (response.payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[k], '\0')); - break; - case PayloadType::UNCOMPRESSABLE: - break; - default: - GPR_ASSERT(false); - } - - // Compression related checks. - if (request.request_compressed_response()) { - GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > - GRPC_COMPRESS_NONE); - if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); - } - } else { - // requested *no* compression. - GPR_ASSERT( - !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } - - ++k; - } - - gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); - gpr_free(log_suffix); +bool InteropClient::DoClientCompressedStreaming() { + // Probing for compression-checks support. + ClientContext probe_context; + StreamingInputCallRequest probe_req; + StreamingInputCallResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); + + std::unique_ptr> probe_stream( + serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); + + if (!probe_stream->Write(probe_req)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + Status s = probe_stream->Finish(); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, + "Compressed streaming request probe succeeded. Proceeding."); + + ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::unique_ptr> stream( + serviceStub_.Get()->StreamingInputCall(&context, &response)); + + request.mutable_payload()->set_body(grpc::string(27182, '\0')); + request.mutable_expect_compressed()->set_value(true); + gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + + WriteOptions wopts; + wopts.set_no_compression(); + request.mutable_payload()->set_body(grpc::string(45904, '\0')); + request.mutable_expect_compressed()->set_value(false); + gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); + if (!stream->Write(request, wopts)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + GPR_ASSERT(stream->WritesDone()); + + s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } + + return true; +} - if (k < response_stream_sizes.size()) { - // stream->Read() failed before reading all the expected messages. This - // is most likely due to a connection failure. - gpr_log(GPR_ERROR, - "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR - ") is " - "less than the expected messages (i.e " - "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR - ", j=%" PRIuPTR ")", - k, response_stream_sizes.size(), i, j); - return TransientFailureOrAbort(); - } - - Status s = stream->Finish(); - if (!AssertStatusOk(s)) { - return false; - } +bool InteropClient::DoServerCompressedStreaming() { + const std::vector compressions = {true, false}; + const std::vector sizes = {31415, 92653}; + + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + GPR_ASSERT(compressions.size() == sizes.size()); + for (size_t i = 0; i < sizes.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", + compressions[i] ? "true" : "false", sizes[i]); + + gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); + gpr_free(log_suffix); + + ResponseParameters* const response_parameter = + request.add_response_parameters(); + response_parameter->mutable_compressed()->set_value(compressions[i]); + response_parameter->set_size(sizes[i]); + } + std::unique_ptr> stream( + serviceStub_.Get()->StreamingOutputCall(&context, request)); + + size_t k = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + // Payload size checks. + GPR_ASSERT(response.payload().body() == + grpc::string(request.response_parameters(k).size(), '\0')); + + // Compression checks. + GPR_ASSERT(request.response_parameters(k).has_compressed()); + if (request.response_parameters(k).compressed().value()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // requested *no* compression. + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } + ++k; + } + + if (k < sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR + ") is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%" PRIuPTR ")).", + __func__, k, response_stream_sizes.size()); + return TransientFailureOrAbort(); } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } return true; } @@ -632,7 +688,6 @@ bool InteropClient::DoPingPong() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; @@ -699,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); @@ -839,7 +893,6 @@ bool InteropClient::DoCustomMetadata() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index c8d6810c12e..eb886fcb7e2 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -64,12 +64,14 @@ class InteropClient { bool DoEmpty(); bool DoLargeUnary(); - bool DoLargeCompressedUnary(); + bool DoServerCompressedUnary(); + bool DoClientCompressedUnary(); bool DoPingPong(); bool DoHalfDuplex(); bool DoRequestStreaming(); bool DoResponseStreaming(); - bool DoResponseCompressedStreaming(); + bool DoServerCompressedStreaming(); + bool DoClientCompressedStreaming(); bool DoResponseStreamingWithSlowConsumer(); bool DoCancelAfterBegin(); bool DoCancelAfterFirstResponse(); diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/interop_server.cc similarity index 72% rename from test/cpp/interop/server_main.cc rename to test/cpp/interop/interop_server.cc index 062857f3d6b..ebef0002a3c 100644 --- a/test/cpp/interop/server_main.cc +++ b/test/cpp/interop/interop_server.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,7 @@ #include #include +#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -64,10 +65,10 @@ using grpc::ServerCredentials; using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; +using grpc::WriteOptions; using grpc::SslServerCredentialsOptions; using grpc::testing::InteropServerContextInspector; using grpc::testing::Payload; -using grpc::testing::PayloadType; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StreamingInputCallRequest; @@ -78,7 +79,6 @@ using grpc::testing::TestService; using grpc::Status; static bool got_sigint = false; -static const char* kRandomFile = "test/cpp/interop/rnd.dat"; const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; @@ -110,34 +110,41 @@ void MaybeEchoMetadata(ServerContext* context) { } } -bool SetPayload(PayloadType response_type, int size, Payload* payload) { - payload->set_type(response_type); - switch (response_type) { - case PayloadType::COMPRESSABLE: { - std::unique_ptr body(new char[size]()); - payload->set_body(body.get(), size); - } break; - case PayloadType::UNCOMPRESSABLE: { - std::unique_ptr body(new char[size]()); - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - rnd_file.read(body.get(), size); - GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available - payload->set_body(body.get(), size); - } break; - default: - GPR_ASSERT(false); - } +bool SetPayload(int size, Payload* payload) { + std::unique_ptr body(new char[size]()); + payload->set_body(body.get(), size); return true; } -template -void SetResponseCompression(ServerContext* context, - const RequestType& request) { - if (request.request_compressed_response()) { - // Any level would do, let's go for HIGH because we are overachievers. - context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); +bool CheckExpectedCompression(const ServerContext& context, + const bool compression_expected) { + const InteropServerContextInspector inspector(context); + const grpc_compression_algorithm received_compression = + inspector.GetCallCompressionAlgorithm(); + + if (compression_expected) { + if (received_compression == GRPC_COMPRESS_NONE) { + // Expected some compression, got NONE. This is an error. + gpr_log(GPR_ERROR, + "Expected compression but got uncompressed request from client."); + return false; + } + if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { + gpr_log(GPR_ERROR, + "Failure: Requested compression in a compressable request, but " + "compression bit in message flags not set."); + return false; + } + } else { + // Didn't expect compression -> make sure the request is uncompressed + if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { + gpr_log(GPR_ERROR, + "Failure: Didn't requested compression, but compression bit in " + "message flags set."); + return false; + } } + return true; } class TestServiceImpl : public TestService::Service { @@ -151,11 +158,26 @@ class TestServiceImpl : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { MaybeEchoMetadata(context); - SetResponseCompression(context, *request); + if (request->has_response_compressed()) { + const bool compression_requested = request->response_compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (compression_requested) { + // Any level would do, let's go for HIGH because we are overachievers. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + } else { + context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE); + } + } + if (!CheckExpectedCompression(*context, + request->expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + if (!SetPayload(request->response_size(), response->mutable_payload())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); } } @@ -171,15 +193,26 @@ class TestServiceImpl : public TestService::Service { Status StreamingOutputCall( ServerContext* context, const StreamingOutputCallRequest* request, ServerWriter* writer) { - SetResponseCompression(context, *request); StreamingOutputCallResponse response; bool write_success = true; for (int i = 0; write_success && i < request->response_parameters_size(); i++) { - if (!SetPayload(request->response_type(), - request->response_parameters(i).size(), + if (!SetPayload(request->response_parameters(i).size(), response.mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); + } + WriteOptions wopts; + if (request->response_parameters(i).has_compressed()) { + // Compress by default. Disabled on a per-message basis. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + const bool compression_requested = + request->response_parameters(i).compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (!compression_requested) { + wopts.set_no_compression(); + } // else, compression is already enabled via the context. } int time_us; if ((time_us = request->response_parameters(i).interval_us()) > 0) { @@ -189,7 +222,7 @@ class TestServiceImpl : public TestService::Service { gpr_time_from_micros(time_us, GPR_TIMESPAN)); gpr_sleep_until(sleep_time); } - write_success = writer->Write(response); + write_success = writer->Write(response, wopts); } if (write_success) { return Status::OK; @@ -204,6 +237,11 @@ class TestServiceImpl : public TestService::Service { StreamingInputCallRequest request; int aggregated_payload_size = 0; while (reader->Read(&request)) { + if (!CheckExpectedCompression(*context, + request.expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request.has_payload()) { aggregated_payload_size += request.payload().body().size(); } @@ -221,7 +259,6 @@ class TestServiceImpl : public TestService::Service { StreamingOutputCallResponse response; bool write_success = true; while (write_success && stream->Read(&request)) { - SetResponseCompression(context, request); if (request.response_parameters_size() != 0) { response.mutable_payload()->set_type(request.payload().type()); response.mutable_payload()->set_body( diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat deleted file mode 100644 index 8c7f38f9e0e..00000000000 Binary files a/test/cpp/interop/rnd.dat and /dev/null differ diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc index c6d891ad71b..8b0b511bcb8 100644 --- a/test/cpp/interop/server_helper.cc +++ b/test/cpp/interop/server_helper.cc @@ -72,6 +72,10 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const { return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_); } +uint32_t InteropServerContextInspector::GetMessageFlags() const { + return grpc_call_test_only_get_message_flags(context_.call_); +} + std::shared_ptr InteropServerContextInspector::GetAuthContext() const { return context_.auth_context(); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 12865e40324..a1da14a4c8d 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -54,6 +54,7 @@ class InteropServerContextInspector { bool IsCancelled() const; grpc_compression_algorithm GetCallCompressionAlgorithm() const; uint32_t GetEncodingsAcceptedByClient() const; + uint32_t GetMessageFlags() const; private: const ::grpc::ServerContext& context_; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index aa95682e741..1d5fc80cf26 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -138,8 +138,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoLargeUnary(); break; } - case LARGE_COMPRESSED_UNARY: { - is_success = interop_client_->DoLargeCompressedUnary(); + case CLIENT_COMPRESSED_UNARY: { + is_success = interop_client_->DoClientCompressedUnary(); + break; + } + case CLIENT_COMPRESSED_STREAMING: { + is_success = interop_client_->DoClientCompressedStreaming(); break; } case CLIENT_STREAMING: { @@ -150,8 +154,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoResponseStreaming(); break; } + case SERVER_COMPRESSED_UNARY: { + is_success = interop_client_->DoServerCompressedUnary(); + break; + } case SERVER_COMPRESSED_STREAMING: { - is_success = interop_client_->DoResponseCompressedStreaming(); + is_success = interop_client_->DoServerCompressedStreaming(); break; } case SLOW_CONSUMER: { diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index aa93b58b4a7..cf6a7134739 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -51,29 +51,33 @@ using std::vector; enum TestCaseType { UNKNOWN_TEST = -1, - EMPTY_UNARY = 0, - LARGE_UNARY = 1, - LARGE_COMPRESSED_UNARY = 2, - CLIENT_STREAMING = 3, - SERVER_STREAMING = 4, - SERVER_COMPRESSED_STREAMING = 5, - SLOW_CONSUMER = 6, - HALF_DUPLEX = 7, - PING_PONG = 8, - CANCEL_AFTER_BEGIN = 9, - CANCEL_AFTER_FIRST_RESPONSE = 10, - TIMEOUT_ON_SLEEPING_SERVER = 11, - EMPTY_STREAM = 12, - STATUS_CODE_AND_MESSAGE = 13, - CUSTOM_METADATA = 14 + EMPTY_UNARY, + LARGE_UNARY, + CLIENT_COMPRESSED_UNARY, + CLIENT_COMPRESSED_STREAMING, + CLIENT_STREAMING, + SERVER_STREAMING, + SERVER_COMPRESSED_UNARY, + SERVER_COMPRESSED_STREAMING, + SLOW_CONSUMER, + HALF_DUPLEX, + PING_PONG, + CANCEL_AFTER_BEGIN, + CANCEL_AFTER_FIRST_RESPONSE, + TIMEOUT_ON_SLEEPING_SERVER, + EMPTY_STREAM, + STATUS_CODE_AND_MESSAGE, + CUSTOM_METADATA }; const vector> kTestCaseList = { {EMPTY_UNARY, "empty_unary"}, {LARGE_UNARY, "large_unary"}, - {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, + {CLIENT_COMPRESSED_UNARY, "client_compressed_unary"}, + {CLIENT_COMPRESSED_STREAMING, "client_compressed_streaming"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, + {SERVER_COMPRESSED_UNARY, "server_compressed_unary"}, {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, {SLOW_CONSUMER, "slow_consumer"}, {HALF_DUPLEX, "half_duplex"}, diff --git a/tools/run_tests/run_interop_tests.py b/tools/run_tests/run_interop_tests.py index 0787637d758..e3af721ee53 100755 --- a/tools/run_tests/run_interop_tests.py +++ b/tools/run_tests/run_interop_tests.py @@ -54,7 +54,9 @@ os.chdir(ROOT) _DEFAULT_SERVER_PORT=8080 -_SKIP_COMPRESSION = ['large_compressed_unary', +_SKIP_COMPRESSION = ['client_compressed_unary', + 'client_compressed_streaming', + 'server_compressed_unary', 'server_compressed_streaming'] _SKIP_ADVANCED = ['custom_metadata', 'status_code_and_message', @@ -345,7 +347,8 @@ _TEST_CASES = ['large_unary', 'empty_unary', 'ping_pong', 'cancel_after_begin', 'cancel_after_first_response', 'timeout_on_sleeping_server', 'custom_metadata', 'status_code_and_message', 'unimplemented_method', - 'large_compressed_unary', 'server_compressed_streaming'] + 'client_compressed_unary', 'server_compressed_unary', + 'client_compressed_streaming', 'server_compressed_streaming'] _AUTH_TEST_CASES = ['compute_engine_creds', 'jwt_token_creds', 'oauth2_auth_token', 'per_rpc_creds'] diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 750d98b0be0..00018834af9 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -4624,7 +4624,7 @@ "language": "c++", "name": "interop_server_main", "src": [ - "test/cpp/interop/server_main.cc" + "test/cpp/interop/interop_server.cc" ], "third_party": false, "type": "lib" diff --git a/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj b/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj index 075750afc6d..18971d6a341 100644 --- a/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj +++ b/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj @@ -171,7 +171,7 @@ - + diff --git a/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj.filters b/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj.filters index 51a6b9e73c3..4ee8135c045 100644 --- a/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj.filters +++ b/vsprojects/vcxproj/interop_server_main/interop_server_main.vcxproj.filters @@ -10,7 +10,7 @@ src\proto\grpc\testing - + test\cpp\interop