WIP. Need to merge and add more tests.

pull/2135/head
David Garcia Quintas 10 years ago
parent f020c10482
commit f74a49ed14
  1. 303
      Makefile
  2. 4
      gRPC.podspec
  3. 6
      src/core/channel/channel_args.c
  4. 9
      src/core/channel/channel_args.h
  5. 109
      src/core/channel/compress_filter.c
  6. 2
      src/core/compression/algorithm.c
  7. 60
      src/core/surface/call.c
  8. 47
      src/core/transport/chttp2/frame_data.c
  9. 13
      test/core/end2end/cq_verifier.c
  10. 1
      test/core/end2end/gen_build_json.py
  11. 251
      test/core/end2end/tests/request_with_compressed_payload.c
  12. 139
      tools/run_tests/tests.json
  13. 85
      vsprojects/Grpc.mak

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

@ -131,11 +131,11 @@ grpc_compression_level grpc_channel_args_get_compression_level(
return GRPC_COMPRESS_LEVEL_NONE; return GRPC_COMPRESS_LEVEL_NONE;
} }
void grpc_channel_args_set_compression_level( grpc_channel_args *grpc_channel_args_set_compression_level(
grpc_channel_args **a, grpc_compression_level level) { grpc_channel_args *a, grpc_compression_level level) {
grpc_arg tmp; grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER; tmp.type = GRPC_ARG_INTEGER;
tmp.key = GRPC_COMPRESSION_LEVEL_ARG; tmp.key = GRPC_COMPRESSION_LEVEL_ARG;
tmp.value.integer = level; tmp.value.integer = level;
*a = grpc_channel_args_copy_and_add(*a, &tmp); return grpc_channel_args_copy_and_add(a, &tmp);
} }

@ -56,9 +56,10 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
grpc_compression_level grpc_channel_args_get_compression_level( grpc_compression_level grpc_channel_args_get_compression_level(
const grpc_channel_args *a); const grpc_channel_args *a);
/** Sets the compression level in \a a to \a level. Setting it to /** Returns an channel arg instance with compression enabled. If \a a is
* GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */ * non-NULL, its args are copied. N.B. GRPC_COMPRESS_LEVEL_NONE disables
void grpc_channel_args_set_compression_level( * compression for the channel. */
grpc_channel_args **a, grpc_compression_level level); grpc_channel_args *grpc_channel_args_set_compression_level(
grpc_channel_args *a, grpc_compression_level level);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */

@ -43,22 +43,31 @@
typedef struct call_data { typedef struct call_data {
gpr_slice_buffer slices; gpr_slice_buffer slices;
int remaining_slice_bytes; int remaining_slice_bytes;
int dont_compress; /**< whether skip compression for this specific call */ int no_compress; /**< whether skip compression for this specific call */
grpc_linked_mdelem compression_algorithm;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
grpc_compression_algorithm compress_algorithm; grpc_compression_algorithm compress_algorithm;
grpc_mdelem *compress_algorithm_md;
grpc_mdelem *no_compression_md;
} channel_data; } channel_data;
static void compress_send_sb(grpc_compression_algorithm algorithm, /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
* actually happen, 0 otherwise (for example if the compressed output size was
* larger than the raw input).
*
* Returns 1 if the data was actually compress and 0 otherwise. */
static int compress_send_sb(grpc_compression_algorithm algorithm,
gpr_slice_buffer *slices) { gpr_slice_buffer *slices) {
int did_compress;
gpr_slice_buffer tmp; gpr_slice_buffer tmp;
gpr_slice_buffer_init(&tmp); gpr_slice_buffer_init(&tmp);
if (!grpc_msg_compress(algorithm, slices, &tmp)) { did_compress = grpc_msg_compress(algorithm, slices, &tmp);
gpr_log(GPR_INFO, "Not compressed!");
}
gpr_slice_buffer_swap(slices, &tmp); gpr_slice_buffer_swap(slices, &tmp);
gpr_slice_buffer_destroy(&tmp); gpr_slice_buffer_destroy(&tmp);
return did_compress;
} }
static void process_send_ops(grpc_call_element *elem, static void process_send_ops(grpc_call_element *elem,
@ -66,50 +75,82 @@ static void process_send_ops(grpc_call_element *elem,
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
size_t i, j; size_t i, j;
int begin_message_index = -1;
int metadata_op_index = -1;
grpc_mdelem *actual_compression_md;
/* buffer up slices until we've processed all the expected ones (as given by /* buffer up slices until we've processed all the expected ones (as given by
* GRPC_OP_BEGIN_MESSAGE) */ * GRPC_OP_BEGIN_MESSAGE) */
for (i = 0; i < send_ops->nops; ++i) { for (i = 0; i < send_ops->nops; ++i) {
grpc_stream_op *sop = &send_ops->ops[i]; grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) { switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE: case GRPC_OP_BEGIN_MESSAGE:
begin_message_index = i;
calld->remaining_slice_bytes = sop->data.begin_message.length; calld->remaining_slice_bytes = sop->data.begin_message.length;
calld->dont_compress = calld->no_compress =
!!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS); !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS);
break; break;
case GRPC_OP_SLICE: case GRPC_OP_SLICE:
if (calld->dont_compress) return; if (calld->no_compress) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0); GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* add to calld->slices */ /* add to calld->slices */
gpr_slice_buffer_add(&calld->slices, sop->data.slice); gpr_slice_buffer_add(&calld->slices, sop->data.slice);
calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice);
if (calld->remaining_slice_bytes == 0) { if (calld->remaining_slice_bytes == 0) {
/* compress */ /* compress */
compress_send_sb(channeld->compress_algorithm, &calld->slices); if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) {
calld->no_compress = 1; /* GPR_TRUE */
}
} }
break; break;
case GRPC_NO_OP:
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
/* Save the index of the first metadata op, to be processed after we
* know whether compression actually happened */
if (metadata_op_index < 0) metadata_op_index = i;
break;
case GRPC_NO_OP:
; /* fallthrough, ignore */ ; /* fallthrough, ignore */
} }
} }
/* at this point, calld->slices contains the *compressed* slices from GPR_ASSERT(metadata_op_index >= 0);
* send_ops->ops[*]->data.slice. We now replace these input slices with the GPR_ASSERT(begin_message_index >= 0);
* compressed ones. */
for (i = 0, j = 0; i < send_ops->nops; ++i) { /* update both the metadata and the begin_message's flags */
grpc_stream_op *sop = &send_ops->ops[i]; if (calld->no_compress) {
GPR_ASSERT(j < calld->slices.count); /* either because the user requested the exception or because compressing
switch (sop->type) { * would have resulted in a larger output */
case GRPC_OP_SLICE: channeld->compress_algorithm = GRPC_COMPRESS_NONE;
gpr_slice_unref(sop->data.slice); actual_compression_md = channeld->no_compression_md;
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); /* reset the flag compression bit */
break; send_ops->ops[begin_message_index].data.begin_message.flags &=
case GRPC_OP_BEGIN_MESSAGE: ~GRPC_WRITE_INTERNAL_COMPRESS;
case GRPC_NO_OP: } else { /* DID compress */
case GRPC_OP_METADATA: actual_compression_md = channeld->compress_algorithm_md;
; /* fallthrough, ignore */ /* at this point, calld->slices contains the *compressed* slices from
* send_ops->ops[*]->data.slice. We now replace these input slices with the
* compressed ones. */
for (i = 0, j = 0; i < send_ops->nops; ++i) {
grpc_stream_op *sop = &send_ops->ops[i];
GPR_ASSERT(j < calld->slices.count);
switch (sop->type) {
case GRPC_OP_SLICE:
gpr_slice_unref(sop->data.slice);
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
break;
case GRPC_OP_BEGIN_MESSAGE:
sop->data.begin_message.length = calld->slices.length;
sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
case GRPC_NO_OP:
case GRPC_OP_METADATA:
; /* fallthrough, ignore */
}
} }
} }
grpc_metadata_batch_add_head(
&(send_ops->ops[metadata_op_index].data.metadata),
&calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md));
} }
/* Called either: /* Called either:
@ -167,10 +208,20 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) { int is_first, int is_last) {
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
channeld->compress_algorithm = grpc_compression_algorithm_for_level( const grpc_compression_level clevel =
grpc_channel_args_get_compression_level(args)); grpc_channel_args_get_compression_level(args);
const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE;
/*We shouldn't be in this filter if compression is disabled. */ /*We shouldn't be in this filter if compression is disabled. */
GPR_ASSERT(channeld->compress_algorithm != GRPC_COMPRESS_NONE); GPR_ASSERT(clevel != GRPC_COMPRESS_LEVEL_NONE);
channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer(
mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel));
channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel);
channeld->no_compression_md = grpc_mdelem_from_string_and_buffer(
mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg,
sizeof(none_alg));
/* The first and the last filters tend to be implemented differently to /* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down handle the case that there's no 'next' filter to call on the up or down
@ -181,7 +232,9 @@ static void init_channel_elem(grpc_channel_element *elem,
/* Destructor for channel data */ /* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) { static void destroy_channel_elem(grpc_channel_element *elem) {
/* empty for now */ channel_data *channeld = elem->channel_data;
grpc_mdelem_unref(channeld->compress_algorithm_md);
grpc_mdelem_unref(channeld->no_compression_md);
} }
const grpc_channel_filter grpc_compress_filter = { const grpc_channel_filter grpc_compress_filter = {

@ -54,7 +54,7 @@ const char *grpc_compression_algorithm_name(
grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_algorithm grpc_compression_algorithm_for_level(
grpc_compression_level level) { grpc_compression_level level) {
switch (level) { switch (level) {
case GRPC_COMPRESS_NONE: case GRPC_COMPRESS_LEVEL_NONE:
return GRPC_COMPRESS_NONE; return GRPC_COMPRESS_NONE;
case GRPC_COMPRESS_LEVEL_LOW: case GRPC_COMPRESS_LEVEL_LOW:
case GRPC_COMPRESS_LEVEL_MED: case GRPC_COMPRESS_LEVEL_MED:

@ -30,24 +30,25 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* *
*/ */
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <grpc/compression.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/census/grpc_context.h" #include "src/core/census/grpc_context.h"
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h" #include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h" #include "src/core/iomgr/alarm.h"
#include "src/core/profiling/timers.h" #include "src/core/profiling/timers.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h" #include "src/core/surface/byte_buffer_queue.h"
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h" #include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h" #include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
@ -394,7 +395,7 @@ static void set_status_code(grpc_call *call, status_source source,
} }
} }
static void set_decode_compression_level(grpc_call *call, static void set_compression_level(grpc_call *call,
grpc_compression_level clevel) { grpc_compression_level clevel) {
call->compression_level = clevel; call->compression_level = clevel;
} }
@ -646,8 +647,19 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) { static void finish_message(grpc_call *call) {
/* TODO(ctiller): this could be a lot faster if coded directly */ /* TODO(ctiller): this could be a lot faster if coded directly */
grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( grpc_byte_buffer *byte_buffer;
call->incoming_message.slices, call->incoming_message.count); /* some aliases for readability */
gpr_slice *slices = call->incoming_message.slices;
const size_t nslices = call->incoming_message.count;
const grpc_compression_algorithm compression_algorithm =
grpc_compression_algorithm_for_level(call->compression_level);
if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) {
byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices,
compression_algorithm);
} else {
byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message); gpr_slice_buffer_reset_and_unref(&call->incoming_message);
grpc_bbq_push(&call->incoming_queue, byte_buffer); grpc_bbq_push(&call->incoming_queue, byte_buffer);
@ -667,6 +679,18 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
gpr_free(message); gpr_free(message);
return 0; return 0;
} }
/* sanity check: if message flags indicate a compressed message, the
* compression level should already be present in the call, as parsed off its
* corresponding metadata. */
if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) {
char *message = NULL;
gpr_asprintf(
&message, "Invalid compression algorithm (%s) for compressed message.",
grpc_compression_algorithm_name(
grpc_compression_algorithm_for_level(call->compression_level)));
cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1);
}
/* stash away parameters, and prepare for incoming slices */ /* stash away parameters, and prepare for incoming slices */
if (msg.length > grpc_channel_get_max_message_length(call->channel)) { if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
char *message = NULL; char *message = NULL;
@ -1135,11 +1159,8 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) {
if (user_data) { if (user_data) {
clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else { } else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), GPR_ASSERT(sizeof(clevel) == GPR_SLICE_LENGTH(md->value->slice));
GPR_SLICE_LENGTH(md->value->slice), memcpy(&clevel, GPR_SLICE_START_PTR(md->value->slice), sizeof(clevel));
&clevel)) {
clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
}
grpc_mdelem_set_user_data(md, destroy_compression, grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
} }
@ -1161,8 +1182,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { } else if (key ==
set_decode_compression_level(call, decode_compression(md)); grpc_channel_get_compresssion_level_string(call->channel)) {
set_compression_level(call, decode_compression(md));
} else { } else {
dest = &call->buffered_metadata[is_trailing]; dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {

@ -35,12 +35,13 @@
#include <string.h> #include <string.h>
#include "src/core/support/string.h"
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/support/string.h"
#include "src/core/transport/stream_op.h"
#include "src/core/transport/transport.h" #include "src/core/transport/transport.h"
#include "src/core/compression/message_compress.h"
grpc_chttp2_parse_error grpc_chttp2_data_parser_init( grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) { grpc_chttp2_data_parser *parser) {
@ -69,35 +70,6 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(
return GRPC_CHTTP2_PARSE_OK; return GRPC_CHTTP2_PARSE_OK;
} }
/** Performs any extra work needed after a frame has been assembled */
grpc_chttp2_parse_error parse_postprocessing(grpc_chttp2_data_parser *p) {
if (p->is_frame_compressed) { /* Decompress */
/* Reorganize the slices within p->incoming_sopb into a gpr_slice_buffer to
* be fed to the decompression function */
gpr_slice_buffer sb_in, sb_out;
grpc_stream_op_buffer *sopb = &p->incoming_sopb;
size_t i;
gpr_slice_buffer_init(&sb_in);
gpr_slice_buffer_init(&sb_out);
for (i = 0; i < sopb->nops; ++i) {
if (sopb->ops->type == GRPC_OP_SLICE) {
gpr_slice_buffer_add(&sb_in, sopb->ops->data.slice);
}
}
grpc_msg_decompress(GRPC_COMPRESS_GZIP /* XXX */, &sb_in, &sb_out);
/* copy uncompressed output back to p->incoming_sopb */
grpc_sopb_reset(sopb);
grpc_sopb_add_begin_message(sopb, sb_out.length, 0);
for (i = 0; i < sb_out.count; ++i) {
grpc_sopb_add_slice(sopb, sb_out.slices[i]);
}
gpr_slice_buffer_destroy(&sb_in);
gpr_slice_buffer_destroy(&sb_out);
}
return GRPC_CHTTP2_PARSE_OK;
}
grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
int is_last) { int is_last) {
@ -105,6 +77,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
gpr_uint8 *cur = beg; gpr_uint8 *cur = beg;
grpc_chttp2_data_parser *p = parser; grpc_chttp2_data_parser *p = parser;
gpr_uint32 message_flags = 0;
if (is_last && p->is_last_frame) { if (is_last && p->is_last_frame) {
state->end_of_stream = 1; state->end_of_stream = 1;
@ -160,17 +133,21 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
p->state = GRPC_CHTTP2_DATA_FRAME; p->state = GRPC_CHTTP2_DATA_FRAME;
++cur; ++cur;
state->need_flush_reads = 1; state->need_flush_reads = 1;
grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size, 0); if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
grpc_sopb_add_begin_message(&p->incoming_sopb, p->frame_size,
message_flags);
/* fallthrough */ /* fallthrough */
case GRPC_CHTTP2_DATA_FRAME: case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) { if (cur == end) {
return parse_postprocessing(p); return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) == p->frame_size) { } else if ((gpr_uint32)(end - cur) == p->frame_size) {
state->need_flush_reads = 1; state->need_flush_reads = 1;
grpc_sopb_add_slice(&p->incoming_sopb, grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg)); gpr_slice_sub(slice, cur - beg, end - beg));
p->state = GRPC_CHTTP2_DATA_FH_0; p->state = GRPC_CHTTP2_DATA_FH_0;
return parse_postprocessing(p); return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) { } else if ((gpr_uint32)(end - cur) > p->frame_size) {
state->need_flush_reads = 1; state->need_flush_reads = 1;
grpc_sopb_add_slice( grpc_sopb_add_slice(
@ -183,7 +160,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
grpc_sopb_add_slice(&p->incoming_sopb, grpc_sopb_add_slice(&p->incoming_sopb,
gpr_slice_sub(slice, cur - beg, end - beg)); gpr_slice_sub(slice, cur - beg, end - beg));
p->frame_size -= (end - cur); p->frame_size -= (end - cur);
return parse_postprocessing(p); return GRPC_CHTTP2_PARSE_OK;
} }
} }

@ -40,6 +40,7 @@
#include "src/core/surface/event_string.h" #include "src/core/surface/event_string.h"
#include "src/core/support/string.h" #include "src/core/support/string.h"
#include <grpc/byte_buffer.h> #include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
@ -144,7 +145,17 @@ static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) {
} }
int byte_buffer_eq_string(grpc_byte_buffer *bb, const char *str) { int byte_buffer_eq_string(grpc_byte_buffer *bb, const char *str) {
return byte_buffer_eq_slice(bb, gpr_slice_from_copied_string(str)); grpc_byte_buffer_reader reader;
grpc_byte_buffer* rbb;
int res;
grpc_byte_buffer_reader_init(&reader, bb);
rbb = grpc_raw_byte_buffer_from_reader(&reader);
res = byte_buffer_eq_slice(rbb, gpr_slice_from_copied_string(str));
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(rbb);
return res;
} }
static void verify_matches(expectation *e, grpc_event *ev) { static void verify_matches(expectation *e, grpc_event *ev) {

@ -84,6 +84,7 @@ END2END_TESTS = {
'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True), 'request_response_with_payload_and_call_creds': TestOptions(flaky=False, secure=True),
'request_with_large_metadata': default_test_options, 'request_with_large_metadata': default_test_options,
'request_with_payload': default_test_options, 'request_with_payload': default_test_options,
'request_with_compressed_payload': default_test_options,
'request_with_flags': default_test_options, 'request_with_flags': default_test_options,
'server_finishes_request': default_test_options, 'server_finishes_request': default_test_options,
'simple_delayed_request': default_test_options, 'simple_delayed_request': default_test_options,

@ -0,0 +1,251 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "test/core/end2end/end2end_tests.h"
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "test/core/end2end/cq_verifier.h"
#include "src/core/channel/channel_args.h"
enum { TIMEOUT = 200000 };
static void *tag(gpr_intptr t) { return (void *)t; }
static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
const char *test_name,
grpc_channel_args *client_args,
grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
f = config.create_fixture(client_args, server_args);
config.init_client(&f, client_args);
config.init_server(&f, server_args);
return f;
}
static gpr_timespec n_seconds_time(int n) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
}
static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
ev = grpc_completion_queue_next(cq, five_seconds_time());
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))
.type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->server);
f->server = NULL;
}
static void shutdown_client(grpc_end2end_test_fixture *f) {
if (!f->client) return;
grpc_channel_destroy(f->client);
f->client = NULL;
}
static void end_test(grpc_end2end_test_fixture *f) {
shutdown_server(f);
shutdown_client(f);
grpc_completion_queue_shutdown(f->server_cq);
drain_cq(f->server_cq);
grpc_completion_queue_destroy(f->server_cq);
grpc_completion_queue_shutdown(f->client_cq);
drain_cq(f->client_cq);
grpc_completion_queue_destroy(f->client_cq);
}
/* Client sends a request with payload, server reads then returns status. */
static void test_invoke_request_with_compressed_payload(
grpc_end2end_test_config config) {
grpc_call *c;
grpc_call *s;
gpr_slice request_payload_slice;
grpc_byte_buffer *request_payload;
gpr_timespec deadline = five_seconds_time();
grpc_channel_args *client_args;
grpc_channel_args *server_args;
grpc_end2end_test_fixture f;
cq_verifier *v_client;
cq_verifier *v_server;
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_byte_buffer *request_payload_recv = NULL;
grpc_call_details call_details;
grpc_status_code status;
char *details = NULL;
size_t details_capacity = 0;
int was_cancelled = 2;
char str[1024]; memset(&str[0], 1023, 'x'); str[1023] = '\0';
request_payload_slice = gpr_slice_from_copied_string(str);
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
client_args =
grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH);
server_args =
grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH);
f = begin_test(config, "test_invoke_request_with_compressed_payload",
client_args, server_args);
v_client = cq_verifier_create(f.client_cq);
v_server = cq_verifier_create(f.server_cq);
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",
"foo.test.google.fr", deadline);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message = request_payload;
op->flags = 0;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->data.recv_status_on_client.status_details_capacity = &details_capacity;
op->flags = 0;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1)));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.server_cq,
f.server_cq, tag(101)));
cq_expect_completion(v_server, tag(101), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message = &request_payload_recv;
op->flags = 0;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102)));
cq_expect_completion(v_server, tag(102), 1);
cq_verify(v_server);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = "xyz";
op->flags = 0;
op++;
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103)));
cq_expect_completion(v_server, tag(103), 1);
cq_verify(v_server);
cq_expect_completion(v_client, tag(1), 1);
cq_verify(v_client);
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == strcmp(details, "xyz"));
GPR_ASSERT(0 == strcmp(call_details.method, "/foo"));
GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr"));
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, str));
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_destroy(c);
grpc_call_destroy(s);
cq_verifier_destroy(v_client);
cq_verifier_destroy(v_server);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
grpc_channel_args_destroy(client_args);
grpc_channel_args_destroy(server_args);
end_test(&f);
config.tear_down_data(&f);
}
void grpc_end2end_tests(grpc_end2end_test_config config) {
test_invoke_request_with_compressed_payload(config);
}

@ -919,6 +919,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fake_security_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -1189,6 +1198,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -1436,6 +1454,14 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_uds_posix_request_with_compressed_payload_test",
"platforms": [
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -1676,6 +1702,14 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_with_poll_request_with_compressed_payload_test",
"platforms": [
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -1939,6 +1973,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_simple_ssl_fullstack_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -2186,6 +2229,14 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_simple_ssl_fullstack_with_poll_request_with_compressed_payload_test",
"platforms": [
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -2449,6 +2500,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_simple_ssl_with_oauth2_fullstack_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -2719,6 +2779,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -2989,6 +3058,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_one_byte_at_a_time_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -3259,6 +3337,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_with_grpc_trace_request_with_compressed_payload_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -3520,6 +3607,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_request_with_compressed_payload_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -3759,6 +3855,14 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_uds_posix_request_with_compressed_payload_unsecure_test",
"platforms": [
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -3991,6 +4095,14 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_fullstack_with_poll_request_with_compressed_payload_unsecure_test",
"platforms": [
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -4245,6 +4357,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_request_with_compressed_payload_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -4506,6 +4627,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_one_byte_at_a_time_request_with_compressed_payload_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",
@ -4767,6 +4897,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c",
"name": "chttp2_socket_pair_with_grpc_trace_request_with_compressed_payload_unsecure_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save