Merge pull request #2603 from dgquintas/compression-interop

Updated interop tests with support for compression.
pull/2970/head
Yang Gao 9 years ago
commit e4fec620b5
  1. 36
      Makefile
  2. 15
      build.json
  3. 5
      include/grpc++/client_context.h
  4. 4
      include/grpc++/server_context.h
  5. 9
      include/grpc/compression.h
  6. 42
      src/core/channel/compress_filter.c
  7. 15
      src/core/compression/algorithm.c
  8. 52
      src/core/surface/call.c
  9. 19
      src/core/surface/call.h
  10. 12
      src/core/surface/channel.c
  11. 2
      src/core/surface/channel.h
  12. 77
      test/core/compression/compression_test.c
  13. 9
      test/core/end2end/tests/request_with_compressed_payload.c
  14. 20
      test/cpp/interop/client.cc
  15. 34
      test/cpp/interop/client_helper.cc
  16. 19
      test/cpp/interop/client_helper.h
  17. 136
      test/cpp/interop/interop_client.cc
  18. 2
      test/cpp/interop/interop_client.h
  19. BIN
      test/cpp/interop/rnd.dat
  20. 61
      test/cpp/interop/server.cc
  21. 21
      test/cpp/interop/server_helper.cc
  22. 7
      test/cpp/interop/server_helper.h
  23. 18
      tools/run_tests/sources_and_headers.json
  24. 17
      tools/run_tests/tests.json
  25. 14
      vsprojects/Grpc.mak

File diff suppressed because one or more lines are too long

@ -733,6 +733,7 @@
"test/cpp/interop/client_helper.h"
],
"src": [
"test/proto/messages.proto",
"test/cpp/interop/client_helper.cc"
],
"deps": [
@ -992,6 +993,20 @@
"gpr"
]
},
{
"name": "compression_test",
"build": "test",
"language": "c",
"src": [
"test/core/compression/compression_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "dualstack_socket_test",
"build": "test",

@ -121,6 +121,10 @@ class PropagationOptions {
gpr_uint32 propagate_;
};
namespace testing {
class InteropClientContextInspector;
} // namespace testing
class ClientContext {
public:
ClientContext();
@ -190,6 +194,7 @@ class ClientContext {
ClientContext(const ClientContext&);
ClientContext& operator=(const ClientContext&);
friend class ::grpc::testing::InteropClientContextInspector;
friend class CallOpClientRecvStatus;
friend class CallOpRecvInitialMetadata;
friend class Channel;

@ -81,7 +81,7 @@ class CompletionQueue;
class Server;
namespace testing {
class InteropContextInspector;
class InteropServerContextInspector;
} // namespace testing
// Interface of server side rpc context.
@ -136,7 +136,7 @@ class ServerContext {
}
private:
friend class ::grpc::testing::InteropContextInspector;
friend class ::grpc::testing::InteropServerContextInspector;
friend class ::grpc::Server;
template <class W, class R>
friend class ::grpc::ServerAsyncReader;

@ -34,6 +34,8 @@
#ifndef GRPC_COMPRESSION_H
#define GRPC_COMPRESSION_H
#include <stdlib.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -58,9 +60,10 @@ typedef enum {
GRPC_COMPRESS_LEVEL_COUNT
} grpc_compression_level;
/** Parses \a name as a grpc_compression_algorithm instance, updating \a
* algorithm. Returns 1 upon success, 0 otherwise. */
int grpc_compression_algorithm_parse(const char *name,
/** Parses the first \a name_length bytes of \a name as a
* grpc_compression_algorithm instance, updating \a algorithm. Returns 1 upon
* success, 0 otherwise. */
int grpc_compression_algorithm_parse(const char *name, size_t name_length,
grpc_compression_algorithm *algorithm);
/** Updates \a name with the encoding name corresponding to a valid \a

@ -35,16 +35,19 @@
#include <string.h>
#include <grpc/compression.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include "src/core/channel/compress_filter.h"
#include "src/core/channel/channel_args.h"
#include "src/core/compression/message_compress.h"
#include "src/core/support/string.h"
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;
int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */
int written_initial_metadata; /**< Already processed initial md? */
/** Compression algorithm we'll try to use. It may be given by incoming
@ -59,8 +62,12 @@ typedef struct channel_data {
grpc_mdstr *mdstr_request_compression_algorithm_key;
/** Metadata key for the outgoing (used) compression algorithm */
grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
/** Metadata key for the accepted encodings */
grpc_mdstr *mdstr_compression_capabilities_key;
/** Precomputed metadata elements for all available compression algorithms */
grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
/** Precomputed metadata elements for the accepted encodings */
grpc_mdelem *mdelem_accept_encoding;
/** The default, channel-level, compression algorithm */
grpc_compression_algorithm default_compression_algorithm;
} channel_data;
@ -93,7 +100,7 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
if (md->key == channeld->mdstr_request_compression_algorithm_key) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str,
if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&calld->compression_algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.",
md_c_str);
@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
/* hint compression algorithm */
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
/* convey supported compression algorithms */
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
calld->written_initial_metadata = 1; /* GPR_TRUE */
}
break;
@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1];
char *accept_encoding_str;
size_t accept_encoding_str_len;
channeld->default_compression_algorithm =
grpc_channel_args_get_compression_algorithm(args);
@ -289,6 +306,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
channeld->mdstr_outgoing_compression_algorithm_key =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
channeld->mdstr_compression_capabilities_key =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
char *algorithm_name;
GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
@ -297,8 +317,26 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
mdctx,
GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
grpc_mdstr_from_string(mdctx, algorithm_name, 0));
if (algo_idx > 0) {
supported_algorithms_names[algo_idx-1] = algorithm_name;
}
}
/* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
* arrays, as to avoid the heap allocs */
accept_encoding_str =
gpr_strjoin_sep(supported_algorithms_names,
GPR_ARRAY_SIZE(supported_algorithms_names),
", ",
&accept_encoding_str_len);
channeld->mdelem_accept_encoding =
grpc_mdelem_from_metadata_strings(
mdctx,
GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
grpc_mdstr_from_string(mdctx, accept_encoding_str, 0));
gpr_free(accept_encoding_str);
GPR_ASSERT(!is_last);
}
@ -309,10 +347,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
}
GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
}
const grpc_channel_filter grpc_compress_filter = {

@ -35,13 +35,20 @@
#include <string.h>
#include <grpc/compression.h>
int grpc_compression_algorithm_parse(const char* name,
int grpc_compression_algorithm_parse(const char* name, size_t name_length,
grpc_compression_algorithm *algorithm) {
if (strcmp(name, "identity") == 0) {
/* we use strncmp not only because it's safer (even though in this case it
* doesn't matter, given that we are comparing against string literals, but
* because this way we needn't have "name" nil-terminated (useful for slice
* data, for example) */
if (name_length == 0) {
return 0;
}
if (strncmp(name, "identity", name_length) == 0) {
*algorithm = GRPC_COMPRESS_NONE;
} else if (strcmp(name, "gzip") == 0) {
} else if (strncmp(name, "gzip", name_length) == 0) {
*algorithm = GRPC_COMPRESS_GZIP;
} else if (strcmp(name, "deflate") == 0) {
} else if (strncmp(name, "deflate", name_length) == 0) {
*algorithm = GRPC_COMPRESS_DEFLATE;
} else {
return 0;

@ -39,6 +39,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
@ -242,6 +243,9 @@ struct grpc_call {
/* Compression algorithm for the call */
grpc_compression_algorithm compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
gpr_uint32 encodings_accepted_by_peer;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@ -532,6 +536,46 @@ grpc_compression_algorithm grpc_call_get_compression_algorithm(
return call->compression_algorithm;
}
static void set_encodings_accepted_by_peer(grpc_call *call,
const gpr_slice accept_encoding_slice) {
size_t i;
grpc_compression_algorithm algorithm;
gpr_slice_buffer accept_encoding_parts;
gpr_slice_buffer_init(&accept_encoding_parts);
gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts);
/* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
* zeroes the whole grpc_call */
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
for (i = 0; i < accept_encoding_parts.count; i++) {
const gpr_slice *accept_encoding_entry_slice =
&accept_encoding_parts.slices[i];
if (grpc_compression_algorithm_parse(
(const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
} else {
char *accept_encoding_entry_str =
gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR,
"Invalid entry in accept encoding metadata: '%s'. Ignoring.",
accept_encoding_entry_str);
gpr_free(accept_encoding_entry_str);
}
}
}
gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) {
return call->encodings_accepted_by_peer;
}
gpr_uint32 grpc_call_get_message_flags(const grpc_call *call) {
return call->incoming_message_flags;
}
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@ -1408,10 +1452,11 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
if (user_data) {
algorithm =
((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) {
if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
&algorithm)) {
gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
assert(0);
}
@ -1440,6 +1485,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md));
} else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
call->channel)) {
set_encodings_accepted_by_peer(call, md->value->slice);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {

@ -38,6 +38,10 @@
#include "src/core/channel/context.h"
#include <grpc/grpc.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Primitive operation types - grpc_op's get rewritten into these */
typedef enum {
GRPC_IOREQ_RECV_INITIAL_METADATA,
@ -162,4 +166,19 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem);
gpr_uint8 grpc_call_is_client(grpc_call *call);
grpc_compression_algorithm grpc_call_get_compression_algorithm(
const grpc_call *call);
gpr_uint32 grpc_call_get_message_flags(const grpc_call *call);
/** Returns a bitset for the encodings (compression algorithms) supported by \a
* call's peer.
*
* To be indexed by grpc_compression_algorithm enum values. */
gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call);
#ifdef __cplusplus
}
#endif
#endif /* GRPC_INTERNAL_CORE_SURFACE_CALL_H */

@ -66,6 +66,7 @@ struct grpc_channel {
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_encodings_accepted_by_peer_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@ -104,7 +105,10 @@ grpc_channel *grpc_channel_create_from_filters(
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0);
channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-encoding", 0);
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message", 0);
channel->grpc_encodings_accepted_by_peer_string =
grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0);
channel->grpc_message_string =
grpc_mdstr_from_string(mdctx, "grpc-message", 0);
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(i, buf);
@ -230,6 +234,7 @@ static void destroy_channel(void *p, int ok) {
}
GRPC_MDSTR_UNREF(channel->grpc_status_string);
GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string);
GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string);
GRPC_MDSTR_UNREF(channel->grpc_message_string);
GRPC_MDSTR_UNREF(channel->path_string);
GRPC_MDSTR_UNREF(channel->authority_string);
@ -290,6 +295,11 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string(
return channel->grpc_compression_algorithm_string;
}
grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_channel *channel) {
return channel->grpc_encodings_accepted_by_peer_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return GRPC_MDELEM_REF(channel->grpc_status_elem[i]);

@ -56,6 +56,8 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compression_algorithm_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);

@ -0,0 +1,77 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdlib.h>
#include <string.h>
#include <grpc/compression.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "test/core/util/test_config.h"
static void test_compression_algorithm_parse(void) {
size_t i;
const char* valid_names[] = {"identity", "gzip", "deflate"};
const grpc_compression_algorithm valid_algorithms[] = {
GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_DEFLATE};
const char* invalid_names[] = {"gzip2", "foo", "", "2gzip"};
gpr_log(GPR_DEBUG, "test_compression_algorithm_parse");
for (i = 0; i < GPR_ARRAY_SIZE(valid_names); i++) {
const char* valid_name = valid_names[i];
grpc_compression_algorithm algorithm;
int success;
success = grpc_compression_algorithm_parse(valid_name, strlen(valid_name),
&algorithm);
GPR_ASSERT(success != 0);
GPR_ASSERT(algorithm == valid_algorithms[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(invalid_names); i++) {
const char* invalid_name = invalid_names[i];
grpc_compression_algorithm algorithm;
int success;
success = grpc_compression_algorithm_parse(
invalid_name, strlen(invalid_name), &algorithm);
GPR_ASSERT(success == 0);
/* the value of "algorithm" is undefined upon failure */
}
}
int main(int argc, char **argv) {
test_compression_algorithm_parse();
return 0;
}

@ -46,6 +46,7 @@
#include "test/core/end2end/cq_verifier.h"
#include "src/core/channel/channel_args.h"
#include "src/core/channel/compress_filter.h"
#include "src/core/surface/call.h"
enum { TIMEOUT = 200000 };
@ -195,6 +196,14 @@ static void request_with_payload_template(
cq_expect_completion(cqv, tag(101), 1);
cq_verify(cqv);
GPR_ASSERT(GPR_BITCOUNT(grpc_call_get_encodings_accepted_by_peer(s)) == 3);
GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
GRPC_COMPRESS_NONE) != 0);
GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
GRPC_COMPRESS_DEFLATE) != 0);
GPR_ASSERT(GPR_BITGET(grpc_call_get_encodings_accepted_by_peer(s),
GRPC_COMPRESS_GZIP) != 0);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;

@ -56,8 +56,12 @@ DEFINE_string(test_case, "large_unary",
"Configure different test cases. Valid options are: "
"empty_unary : empty (zero bytes) request and response; "
"large_unary : single request and (large) response; "
"large_compressed_unary : single request and compressed (large) "
"response; "
"client_streaming : request streaming with single response; "
"server_streaming : single request with response streaming; "
"server_compressed_streaming : single request with compressed "
"response streaming; "
"slow_consumer : single request with response; "
" streaming with slow client consumer; "
"half_duplex : half-duplex streaming; "
@ -70,7 +74,7 @@ DEFINE_string(test_case, "large_unary",
"jwt_token_creds: large_unary with JWT token auth; "
"oauth2_auth_token: raw oauth2 access token auth; "
"per_rpc_creds: raw oauth2 access token on a single rpc; "
"status_code_and_message: verify status code & message; "
"status_code_and_message: verify status code & message; "
"all : all of above.");
DEFINE_string(default_service_account, "",
"Email of GCE default service account");
@ -91,10 +95,14 @@ int main(int argc, char** argv) {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {
client.DoLargeUnary();
} else if (FLAGS_test_case == "large_compressed_unary") {
client.DoLargeCompressedUnary();
} else if (FLAGS_test_case == "client_streaming") {
client.DoRequestStreaming();
} else if (FLAGS_test_case == "server_streaming") {
client.DoResponseStreaming();
} else if (FLAGS_test_case == "server_compressed_streaming") {
client.DoResponseCompressedStreaming();
} else if (FLAGS_test_case == "slow_consumer") {
client.DoResponseStreamingWithSlowConsumer();
} else if (FLAGS_test_case == "half_duplex") {
@ -129,6 +137,7 @@ int main(int argc, char** argv) {
client.DoLargeUnary();
client.DoRequestStreaming();
client.DoResponseStreaming();
client.DoResponseCompressedStreaming();
client.DoHalfDuplex();
client.DoPingPong();
client.DoCancelAfterBegin();
@ -148,10 +157,11 @@ int main(int argc, char** argv) {
gpr_log(
GPR_ERROR,
"Unsupported test case %s. Valid options are all|empty_unary|"
"large_unary|client_streaming|server_streaming|half_duplex|ping_pong|"
"cancel_after_begin|cancel_after_first_response|"
"timeout_on_sleeping_server|service_account_creds|compute_engine_creds|"
"jwt_token_creds|oauth2_auth_token|per_rpc_creds",
"large_unary|large_compressed_unary|client_streaming|server_streaming|"
"server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|"
"cancel_after_first_response|timeout_on_sleeping_server|"
"service_account_creds|compute_engine_creds|jwt_token_creds|"
"oauth2_auth_token|per_rpc_creds",
FLAGS_test_case.c_str());
ret = 1;
}

@ -48,10 +48,13 @@
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
#include <grpc++/stream.h>
#include "src/cpp/client/secure_credentials.h"
#include "test/core/security/oauth2_utils.h"
#include "test/cpp/util/create_test_channel.h"
#include "src/core/surface/call.h"
#include "src/cpp/client/secure_credentials.h"
DECLARE_bool(enable_ssl);
DECLARE_bool(use_prod_roots);
DECLARE_int32(server_port);
@ -62,6 +65,8 @@ DECLARE_string(default_service_account);
DECLARE_string(service_account_key_file);
DECLARE_string(oauth_scope);
using grpc::testing::CompressionType;
namespace grpc {
namespace testing {
@ -138,5 +143,32 @@ std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
}
}
CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
grpc_compression_algorithm algorithm) {
switch (algorithm) {
case GRPC_COMPRESS_NONE:
return CompressionType::NONE;
case GRPC_COMPRESS_GZIP:
return CompressionType::GZIP;
case GRPC_COMPRESS_DEFLATE:
return CompressionType::DEFLATE;
default:
GPR_ASSERT(false);
}
}
InteropClientContextInspector::InteropClientContextInspector(
const ::grpc::ClientContext& context)
: context_(context) {}
grpc_compression_algorithm
InteropClientContextInspector::GetCallCompressionAlgorithm() const {
return grpc_call_get_compression_algorithm(context_.call_);
}
gpr_uint32 InteropClientContextInspector::GetMessageFlags() const {
return grpc_call_get_message_flags(context_.call_);
}
} // namespace testing
} // namespace grpc

@ -39,6 +39,8 @@
#include <grpc++/config.h>
#include <grpc++/channel_interface.h>
#include "test/proto/messages.grpc.pb.h"
namespace grpc {
namespace testing {
@ -49,6 +51,23 @@ grpc::string GetOauth2AccessToken();
std::shared_ptr<ChannelInterface> CreateChannelForTestCase(
const grpc::string& test_case);
grpc::testing::CompressionType
GetInteropCompressionTypeFromCompressionAlgorithm(
grpc_compression_algorithm algorithm);
class InteropClientContextInspector {
public:
InteropClientContextInspector(const ::grpc::ClientContext& context);
// Inspector methods, able to peek inside ClientContext, follow.
grpc_compression_algorithm GetCallCompressionAlgorithm() const;
gpr_uint32 GetMessageFlags() const;
private:
const ::grpc::ClientContext& context_;
};
} // namespace testing
} // namespace grpc

@ -33,25 +33,31 @@
#include "test/cpp/interop/interop_client.h"
#include <fstream>
#include <memory>
#include <unistd.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/credentials.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include "test/cpp/interop/client_helper.h"
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
#include "src/core/transport/stream_op.h"
namespace grpc {
namespace testing {
static const char* kRandomFile = "test/cpp/interop/rnd.dat";
namespace {
// The same value is defined by the Java client.
const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
@ -95,17 +101,49 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
ClientContext context;
request->set_response_type(PayloadType::COMPRESSABLE);
InteropClientContextInspector inspector(context);
// If the request doesn't already specify the response type, default to
// COMPRESSABLE.
request->set_response_size(kLargeResponseSize);
grpc::string payload(kLargeRequestSize, '\0');
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = stub->UnaryCall(&context, *request, response);
// Compression related checks.
GPR_ASSERT(request->response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
inspector.GetCallCompressionAlgorithm()));
if (request->response_compression() == NONE) {
GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
} else if (request->response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should always
// be compressed.
GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
}
AssertOkOrPrintErrorStatus(s);
GPR_ASSERT(response->payload().type() == PayloadType::COMPRESSABLE);
GPR_ASSERT(response->payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
// Payload related checks.
if (request->response_type() != PayloadType::RANDOM) {
GPR_ASSERT(response->payload().type() == request->response_type());
}
switch (response->payload().type()) {
case PayloadType::COMPRESSABLE:
GPR_ASSERT(response->payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
break;
case PayloadType::UNCOMPRESSABLE: {
std::ifstream rnd_file(kRandomFile);
GPR_ASSERT(rnd_file.good());
for (int i = 0; i < kLargeResponseSize; i++) {
GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
}
}
break;
default:
GPR_ASSERT(false);
}
}
void InteropClient::DoComputeEngineCreds(
@ -117,6 +155,7 @@ void InteropClient::DoComputeEngineCreds(
SimpleResponse response;
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
@ -136,6 +175,7 @@ void InteropClient::DoServiceAccountCreds(const grpc::string& username,
SimpleResponse response;
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(!response.oauth_scope().empty());
@ -199,6 +239,7 @@ void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
@ -209,10 +250,33 @@ void InteropClient::DoLargeUnary() {
gpr_log(GPR_INFO, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large unary done.");
}
void InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
for (const auto compression_type : compression_types) {
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_type).c_str(),
PayloadType_Name(payload_type).c_str());
gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
SimpleRequest request;
SimpleResponse response;
request.set_response_type(payload_type);
request.set_response_compression(compression_type);
PerformLargeUnary(&request, &response);
gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
}
}
}
void InteropClient::DoRequestStreaming() {
gpr_log(GPR_INFO, "Sending request steaming rpc ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
@ -261,11 +325,73 @@ void InteropClient::DoResponseStreaming() {
}
GPR_ASSERT(response_stream_sizes.size() == i);
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done.");
}
void InteropClient::DoResponseCompressedStreaming() {
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (const auto payload_type : payload_types) {
for (const auto compression_type : compression_types) {
ClientContext context;
InteropClientContextInspector inspector(context);
StreamingOutputCallRequest request;
char* log_suffix;
gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
CompressionType_Name(compression_type).c_str(),
PayloadType_Name(payload_type).c_str());
gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
request.set_response_type(payload_type);
request.set_response_compression(compression_type);
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
ResponseParameters* response_parameter =
request.add_response_parameters();
response_parameter->set_size(response_stream_sizes[i]);
}
StreamingOutputCallResponse response;
std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
stub->StreamingOutputCall(&context, request));
unsigned int i = 0;
while (stream->Read(&response)) {
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
// Compression related checks.
GPR_ASSERT(request.response_compression() ==
GetInteropCompressionTypeFromCompressionAlgorithm(
inspector.GetCallCompressionAlgorithm()));
if (request.response_compression() == NONE) {
GPR_ASSERT(
!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
} else if (request.response_type() == PayloadType::COMPRESSABLE) {
// requested compression and compressable response => results should
// always be compressed.
GPR_ASSERT(inspector.GetMessageFlags() &
GRPC_WRITE_INTERNAL_COMPRESS);
}
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
gpr_free(log_suffix);
}
}
}
void InteropClient::DoResponseStreamingWithSlowConsumer() {
gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));

@ -52,10 +52,12 @@ class InteropClient {
void DoEmpty();
void DoLargeUnary();
void DoLargeCompressedUnary();
void DoPingPong();
void DoHalfDuplex();
void DoRequestStreaming();
void DoResponseStreaming();
void DoResponseCompressedStreaming();
void DoResponseStreamingWithSlowConsumer();
void DoCancelAfterBegin();
void DoCancelAfterFirstResponse();

Binary file not shown.

@ -31,6 +31,7 @@
*
*/
#include <fstream>
#include <memory>
#include <sstream>
#include <thread>
@ -41,6 +42,8 @@
#include <gflags/gflags.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include <grpc++/config.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@ -48,6 +51,7 @@
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include "test/proto/test.grpc.pb.h"
#include "test/proto/empty.grpc.pb.h"
#include "test/proto/messages.grpc.pb.h"
@ -65,6 +69,7 @@ using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using grpc::SslServerCredentialsOptions;
using grpc::testing::InteropServerContextInspector;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::SimpleRequest;
@ -77,19 +82,54 @@ using grpc::testing::TestService;
using grpc::Status;
static bool got_sigint = false;
static const char* kRandomFile = "test/cpp/interop/rnd.dat";
bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
return false;
PayloadType response_type;
if (type == PayloadType::RANDOM) {
response_type =
rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
} else {
response_type = type;
}
payload->set_type(response_type);
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
switch (response_type) {
case PayloadType::COMPRESSABLE: {
std::unique_ptr<char[]> body(new char[size]());
payload->set_body(body.get(), size);
} break;
case PayloadType::UNCOMPRESSABLE: {
std::unique_ptr<char[]> body(new char[size]());
std::ifstream rnd_file(kRandomFile);
GPR_ASSERT(rnd_file.good());
rnd_file.read(body.get(), size);
GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available
payload->set_body(body.get(), size);
} break;
default:
GPR_ASSERT(false);
}
return true;
}
template <typename RequestType>
void SetResponseCompression(ServerContext* context,
const RequestType& request) {
switch (request.response_compression()) {
case grpc::testing::NONE:
context->set_compression_algorithm(GRPC_COMPRESS_NONE);
break;
case grpc::testing::GZIP:
context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
break;
case grpc::testing::DEFLATE:
context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE);
break;
default:
abort();
}
}
class TestServiceImpl : public TestService::Service {
public:
Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
@ -99,6 +139,7 @@ class TestServiceImpl : public TestService::Service {
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) {
SetResponseCompression(context, *request);
if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@ -107,9 +148,9 @@ class TestServiceImpl : public TestService::Service {
}
if (request->has_response_status()) {
return Status(static_cast<grpc::StatusCode>
(request->response_status().code()),
request->response_status().message());
return Status(
static_cast<grpc::StatusCode>(request->response_status().code()),
request->response_status().message());
}
return Status::OK;
@ -118,6 +159,7 @@ class TestServiceImpl : public TestService::Service {
Status StreamingOutputCall(
ServerContext* context, const StreamingOutputCallRequest* request,
ServerWriter<StreamingOutputCallResponse>* writer) {
SetResponseCompression(context, *request);
StreamingOutputCallResponse response;
bool write_success = true;
response.mutable_payload()->set_type(request->response_type());
@ -156,6 +198,7 @@ class TestServiceImpl : public TestService::Service {
StreamingOutputCallResponse response;
bool write_success = true;
while (write_success && stream->Read(&request)) {
SetResponseCompression(context, request);
if (request.response_parameters_size() != 0) {
response.mutable_payload()->set_type(request.payload().type());
response.mutable_payload()->set_body(

@ -36,10 +36,12 @@
#include <memory>
#include <gflags/gflags.h>
#include "test/core/end2end/data/ssl_test_data.h"
#include <grpc++/config.h>
#include <grpc++/server_credentials.h>
#include "src/core/surface/call.h"
#include "test/core/end2end/data/ssl_test_data.h"
DECLARE_bool(enable_ssl);
namespace grpc {
@ -58,16 +60,25 @@ std::shared_ptr<ServerCredentials> CreateInteropServerCredentials() {
}
}
InteropContextInspector::InteropContextInspector(
InteropServerContextInspector::InteropServerContextInspector(
const ::grpc::ServerContext& context)
: context_(context) {}
std::shared_ptr<const AuthContext> InteropContextInspector::GetAuthContext()
const {
grpc_compression_algorithm
InteropServerContextInspector::GetCallCompressionAlgorithm() const {
return grpc_call_get_compression_algorithm(context_.call_);
}
gpr_uint32 InteropServerContextInspector::GetEncodingsAcceptedByClient() const {
return grpc_call_get_encodings_accepted_by_peer(context_.call_);
}
std::shared_ptr<const AuthContext>
InteropServerContextInspector::GetAuthContext() const {
return context_.auth_context();
}
bool InteropContextInspector::IsCancelled() const {
bool InteropServerContextInspector::IsCancelled() const {
return context_.IsCancelled();
}

@ -36,6 +36,7 @@
#include <memory>
#include <grpc/compression.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
@ -44,13 +45,15 @@ namespace testing {
std::shared_ptr<ServerCredentials> CreateInteropServerCredentials();
class InteropContextInspector {
class InteropServerContextInspector {
public:
InteropContextInspector(const ::grpc::ServerContext& context);
InteropServerContextInspector(const ::grpc::ServerContext& context);
// Inspector methods, able to peek inside ServerContext, follow.
std::shared_ptr<const AuthContext> GetAuthContext() const;
bool IsCancelled() const;
grpc_compression_algorithm GetCallCompressionAlgorithm() const;
gpr_uint32 GetEncodingsAcceptedByClient() const;
private:
const ::grpc::ServerContext& context_;

@ -113,6 +113,20 @@
"test/core/transport/chttp2/stream_map_test.c"
]
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"language": "c",
"name": "compression_test",
"src": [
"test/core/compression/compression_test.c"
]
},
{
"deps": [
"gpr",
@ -13432,7 +13446,9 @@
"grpc_test_util"
],
"headers": [
"test/cpp/interop/client_helper.h"
"test/cpp/interop/client_helper.h",
"test/proto/messages.grpc.pb.h",
"test/proto/messages.pb.h"
],
"language": "c++",
"name": "interop_client_helper",

@ -137,6 +137,23 @@
"windows"
]
},
{
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"flaky": false,
"language": "c",
"name": "compression_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
]
},
{
"ci_platforms": [
"linux",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save