Redesign of compression algorithm propagation based on metadata

pull/2135/head
David Garcia Quintas 10 years ago
parent 5927aec9b7
commit d16af0ea52
  1. 182
      src/core/channel/compress_filter.c
  2. 27
      src/core/surface/call.c
  3. 12
      src/core/surface/channel.c
  4. 3
      src/core/surface/channel.h
  5. 6
      test/core/end2end/tests/request_with_compressed_payload.c

@ -31,6 +31,7 @@
* *
*/ */
#include <assert.h>
#include <string.h> #include <string.h>
#include "src/core/channel/compress_filter.h" #include "src/core/channel/compress_filter.h"
@ -42,16 +43,16 @@
typedef struct call_data { typedef struct call_data {
gpr_slice_buffer slices; gpr_slice_buffer slices;
grpc_linked_mdelem compression_algorithm_storage;
int remaining_slice_bytes; int remaining_slice_bytes;
int no_compress; /**< whether to skip compression for this specific call */ grpc_compression_algorithm compression_algorithm;
gpr_uint8 has_compression_algorithm;
grpc_linked_mdelem compression_algorithm;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
grpc_compression_algorithm compress_algorithm; grpc_mdstr *mdstr_compression_algorithm_key;
grpc_mdelem *compress_algorithm_md; grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
grpc_mdelem *no_compression_md; grpc_compression_algorithm default_compression_algorithm;
} channel_data; } channel_data;
/** Compress \a slices in place using \a algorithm. Returns 1 if compression did /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
@ -70,14 +71,41 @@ static int compress_send_sb(grpc_compression_algorithm algorithm,
return did_compress; return did_compress;
} }
/** For each \a md element from the incoming metadata, filter out the entry for
* "grpc-compression-algorithm", using its value to populate the call data's
* compression_algorithm field. */
static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
if (md->key == channeld->mdstr_compression_algorithm_key) {
assert(GPR_SLICE_LENGTH(md->value->slice) ==
sizeof(grpc_compression_algorithm));
memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice),
sizeof(grpc_compression_algorithm));
calld->has_compression_algorithm = 1;
return NULL;
}
return md;
}
static int skip_compression(channel_data *channeld, call_data *calld) {
if (calld->has_compression_algorithm &&
(calld->compression_algorithm == GRPC_COMPRESS_NONE)) {
return 1;
}
/* no per-call compression override */
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
}
static void process_send_ops(grpc_call_element *elem, static void process_send_ops(grpc_call_element *elem,
grpc_stream_op_buffer *send_ops) { grpc_stream_op_buffer *send_ops) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
size_t i, j; size_t i, j;
int begin_message_index = -1; int did_compress = 0;
int metadata_op_index = -1;
grpc_mdelem *actual_compression_md;
/* buffer up slices until we've processed all the expected ones (as given by /* buffer up slices until we've processed all the expected ones (as given by
* GRPC_OP_BEGIN_MESSAGE) */ * GRPC_OP_BEGIN_MESSAGE) */
@ -85,73 +113,83 @@ static void process_send_ops(grpc_call_element *elem,
grpc_stream_op *sop = &send_ops->ops[i]; grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) { switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE: case GRPC_OP_BEGIN_MESSAGE:
begin_message_index = i;
calld->remaining_slice_bytes = sop->data.begin_message.length; calld->remaining_slice_bytes = sop->data.begin_message.length;
calld->no_compress = /* TODO(dgq): we may want to get rid of the flags mechanism to have
!!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS); * exceptions to compression: we can rely solely on metadata to set NONE
* as the compression algorithm */
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
calld->has_compression_algorithm = 1; /* GPR_TRUE */
calld->compression_algorithm = GRPC_COMPRESS_NONE;
}
break;
case GRPC_OP_METADATA:
/* Parse incoming request for compression. If any, it'll be available at
* calld->compression_algorithm */
grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter,
elem);
if (!calld->has_compression_algorithm) {
/* If no algorithm was found in the metadata and we aren't
* exceptionally skipping compression, fall back to the channel
* default */
calld->compression_algorithm =
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
break; break;
case GRPC_OP_SLICE: case GRPC_OP_SLICE:
if (calld->no_compress) continue; if (skip_compression(channeld, calld)) continue;
GPR_ASSERT(calld->remaining_slice_bytes > 0); GPR_ASSERT(calld->remaining_slice_bytes > 0);
/* add to calld->slices */ /* add to calld->slices */
gpr_slice_buffer_add(&calld->slices, sop->data.slice); gpr_slice_buffer_add(&calld->slices, sop->data.slice);
calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice);
if (calld->remaining_slice_bytes == 0) { if (calld->remaining_slice_bytes == 0) {
/* compress */ /* compress */
if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) { did_compress =
calld->no_compress = 1; /* GPR_TRUE */ compress_send_sb(calld->compression_algorithm, &calld->slices);
}
} }
break; break;
case GRPC_OP_METADATA:
/* Save the index of the first metadata op, to be processed after we
* know whether compression actually happened */
if (metadata_op_index < 0) metadata_op_index = i;
break;
case GRPC_NO_OP: case GRPC_NO_OP:
; /* fallthrough, ignore */ ; /* fallthrough */
} }
} }
if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */ /* We need to:
return; * - (OP_SLICE) If compression happened, replace the input slices with the
} * compressed ones.
* - (BEGIN_MESSAGE) Update the message info (size, flags).
/* update both the metadata and the begin_message's flags */ * - (OP_METADATA) Convey the compression configuration */
if (calld->no_compress) { for (i = 0, j = 0; i < send_ops->nops; ++i) {
/* either because the user requested the exception or because compressing grpc_stream_op *sop = &send_ops->ops[i];
* would have resulted in a larger output */ switch (sop->type) {
channeld->compress_algorithm = GRPC_COMPRESS_NONE; case GRPC_OP_BEGIN_MESSAGE:
actual_compression_md = channeld->no_compression_md; if (did_compress) {
/* reset the flag compression bit */
send_ops->ops[begin_message_index].data.begin_message.flags &=
~GRPC_WRITE_INTERNAL_COMPRESS;
} else { /* DID compress */
actual_compression_md = channeld->compress_algorithm_md;
/* at this point, calld->slices contains the *compressed* slices from
* send_ops->ops[*]->data.slice. We now replace these input slices with the
* compressed ones. */
for (i = 0, j = 0; i < send_ops->nops; ++i) {
grpc_stream_op *sop = &send_ops->ops[i];
GPR_ASSERT(j < calld->slices.count);
switch (sop->type) {
case GRPC_OP_SLICE:
gpr_slice_unref(sop->data.slice);
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
break;
case GRPC_OP_BEGIN_MESSAGE:
sop->data.begin_message.length = calld->slices.length; sop->data.begin_message.length = calld->slices.length;
sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
case GRPC_NO_OP: } else {
case GRPC_OP_METADATA: /* either because the user requested the exception or because compressing
; /* fallthrough, ignore */ * would have resulted in a larger output */
} calld->compression_algorithm = GRPC_COMPRESS_NONE;
/* reset the flag compression bit */
sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS;
}
break;
case GRPC_OP_METADATA:
grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
break;
case GRPC_OP_SLICE:
if (did_compress) {
GPR_ASSERT(j < calld->slices.count);
gpr_slice_unref(sop->data.slice);
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]);
}
break;
case GRPC_NO_OP:
; /* fallthrough */
} }
} }
grpc_metadata_batch_add_head(
&(send_ops->ops[metadata_op_index].data.metadata),
&calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md));
} }
/* Called either: /* Called either:
@ -189,6 +227,7 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */ /* initialize members */
gpr_slice_buffer_init(&calld->slices); gpr_slice_buffer_init(&calld->slices);
calld->has_compression_algorithm = 0;
if (initial_op) { if (initial_op) {
if (initial_op->send_ops && initial_op->send_ops->nops > 0) { if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
@ -209,17 +248,23 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx, const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) { int is_first, int is_last) {
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
grpc_compression_algorithm algo_idx;
const grpc_compression_level clevel = const grpc_compression_level clevel =
grpc_channel_args_get_compression_level(args); grpc_channel_args_get_compression_level(args);
const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE;
channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer( channeld->default_compression_algorithm =
mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel)); grpc_compression_algorithm_for_level(clevel);
channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel);
channeld->no_compression_md = grpc_mdelem_from_string_and_buffer( channeld->mdstr_compression_algorithm_key =
mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg, grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
sizeof(none_alg));
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
channeld->mdelem_compression_algorithms[algo_idx] =
grpc_mdelem_from_metadata_strings(
mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key),
grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx,
sizeof(algo_idx)));
}
/* The first and the last filters tend to be implemented differently to /* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down handle the case that there's no 'next' filter to call on the up or down
@ -231,8 +276,13 @@ static void init_channel_elem(grpc_channel_element *elem,
/* Destructor for channel data */ /* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) { static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
grpc_mdelem_unref(channeld->compress_algorithm_md); grpc_compression_algorithm algo_idx;
grpc_mdelem_unref(channeld->no_compression_md);
grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key);
for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT;
++algo_idx) {
grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]);
}
} }
const grpc_channel_filter grpc_compress_filter = { const grpc_channel_filter grpc_compress_filter = {

@ -209,8 +209,8 @@ struct grpc_call {
/* Received call statuses from various sources */ /* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT]; received_status status[STATUS_SOURCE_COUNT];
/** Compression level for the call */ /** Compression algorithm for the call */
grpc_compression_level compression_level; grpc_compression_algorithm compression_algorithm;
/* Contexts for various subsystems (security, tracing, ...). */ /* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT]; grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@ -395,9 +395,9 @@ static void set_status_code(grpc_call *call, status_source source,
} }
} }
static void set_compression_level(grpc_call *call, static void set_compression_algorithm(grpc_call *call,
grpc_compression_level clevel) { grpc_compression_algorithm algo) {
call->compression_level = clevel; call->compression_algorithm = algo;
} }
static void set_status_details(grpc_call *call, status_source source, static void set_status_details(grpc_call *call, status_source source,
@ -651,12 +651,10 @@ static void finish_message(grpc_call *call) {
/* some aliases for readability */ /* some aliases for readability */
gpr_slice *slices = call->incoming_message.slices; gpr_slice *slices = call->incoming_message.slices;
const size_t nslices = call->incoming_message.count; const size_t nslices = call->incoming_message.count;
const grpc_compression_algorithm compression_algorithm =
grpc_compression_algorithm_for_level(call->compression_level);
if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) { if (call->compression_algorithm > GRPC_COMPRESS_NONE) {
byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices, byte_buffer = grpc_raw_compressed_byte_buffer_create(
compression_algorithm); slices, nslices, call->compression_algorithm);
} else { } else {
byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
} }
@ -683,12 +681,11 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) {
* compression level should already be present in the call, as parsed off its * compression level should already be present in the call, as parsed off its
* corresponding metadata. */ * corresponding metadata. */
if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) { (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
char *message = NULL; char *message = NULL;
gpr_asprintf( gpr_asprintf(
&message, "Invalid compression algorithm (%s) for compressed message.", &message, "Invalid compression algorithm (%s) for compressed message.",
grpc_compression_algorithm_name( grpc_compression_algorithm_name(call->compression_algorithm));
grpc_compression_algorithm_for_level(call->compression_level)));
cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1); cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1);
} }
/* stash away parameters, and prepare for incoming slices */ /* stash away parameters, and prepare for incoming slices */
@ -1183,8 +1180,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key == } else if (key ==
grpc_channel_get_compresssion_level_string(call->channel)) { grpc_channel_get_compresssion_algorithm_string(call->channel)) {
set_compression_level(call, decode_compression(md)); set_compression_algorithm(call, decode_compression(md));
} else { } else {
dest = &call->buffered_metadata[is_trailing]; dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {

@ -64,7 +64,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context; grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_level_string; grpc_mdstr *grpc_compression_algorithm_string;
grpc_mdstr *grpc_message_string; grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string; grpc_mdstr *path_string;
grpc_mdstr *authority_string; grpc_mdstr *authority_string;
@ -99,8 +99,8 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1 + is_client); gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx; channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string = channel->grpc_compression_algorithm_string =
grpc_mdstr_from_string(mdctx, "grpc-compression-level"); grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE]; char buf[GPR_LTOA_MIN_BUFSIZE];
@ -202,7 +202,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdelem_unref(channel->grpc_status_elem[i]); grpc_mdelem_unref(channel->grpc_status_elem[i]);
} }
grpc_mdstr_unref(channel->grpc_status_string); grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_compression_level_string); grpc_mdstr_unref(channel->grpc_compression_algorithm_string);
grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string); grpc_mdstr_unref(channel->authority_string);
@ -261,8 +261,8 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string; return channel->grpc_status_string;
} }
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) {
return channel->grpc_compression_level_string; return channel->grpc_compression_algorithm_string;
} }
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {

@ -53,7 +53,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code); int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(
grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);

@ -251,10 +251,10 @@ static void request_with_payload_template(
config.tear_down_data(&f); config.tear_down_data(&f);
} }
static void test_invoke_request_with_excepcionally_uncompressed_payload( static void test_invoke_request_with_exceptionally_uncompressed_payload(
grpc_end2end_test_config config) { grpc_end2end_test_config config) {
request_with_payload_template( request_with_payload_template(
config, "test_invoke_request_with_excepcionally_uncompressed_payload", config, "test_invoke_request_with_exceptionally_uncompressed_payload",
GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE); GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE);
} }
@ -276,7 +276,7 @@ static void test_invoke_request_with_uncompressed_payload(
void grpc_end2end_tests(grpc_end2end_test_config config) { void grpc_end2end_tests(grpc_end2end_test_config config) {
test_invoke_request_with_excepcionally_uncompressed_payload(config); test_invoke_request_with_exceptionally_uncompressed_payload(config);
test_invoke_request_with_compressed_payload(config); test_invoke_request_with_compressed_payload(config);
test_invoke_request_with_uncompressed_payload(config); test_invoke_request_with_uncompressed_payload(config);
} }

Loading…
Cancel
Save