From d16af0ea52a932ced7562e21d5ec8f57eafa51c5 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 22 Jun 2015 22:39:21 -0700 Subject: [PATCH] Redesign of compression algorithm propagation based on metadata --- src/core/channel/compress_filter.c | 182 +++++++++++------- src/core/surface/call.c | 27 ++- src/core/surface/channel.c | 12 +- src/core/surface/channel.h | 3 +- .../tests/request_with_compressed_payload.c | 6 +- 5 files changed, 139 insertions(+), 91 deletions(-) diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 918cb2dd791..ad42bbb61c8 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -31,6 +31,7 @@ * */ +#include #include #include "src/core/channel/compress_filter.h" @@ -42,16 +43,16 @@ typedef struct call_data { gpr_slice_buffer slices; + grpc_linked_mdelem compression_algorithm_storage; int remaining_slice_bytes; - int no_compress; /**< whether to skip compression for this specific call */ - - grpc_linked_mdelem compression_algorithm; + grpc_compression_algorithm compression_algorithm; + gpr_uint8 has_compression_algorithm; } call_data; typedef struct channel_data { - grpc_compression_algorithm compress_algorithm; - grpc_mdelem *compress_algorithm_md; - grpc_mdelem *no_compression_md; + grpc_mdstr *mdstr_compression_algorithm_key; + grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; + grpc_compression_algorithm default_compression_algorithm; } channel_data; /** Compress \a slices in place using \a algorithm. Returns 1 if compression did @@ -70,14 +71,41 @@ static int compress_send_sb(grpc_compression_algorithm algorithm, return did_compress; } +/** For each \a md element from the incoming metadata, filter out the entry for + * "grpc-compression-algorithm", using its value to populate the call data's + * compression_algorithm field. */ +static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + if (md->key == channeld->mdstr_compression_algorithm_key) { + assert(GPR_SLICE_LENGTH(md->value->slice) == + sizeof(grpc_compression_algorithm)); + memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice), + sizeof(grpc_compression_algorithm)); + calld->has_compression_algorithm = 1; + return NULL; + } + + return md; +} + +static int skip_compression(channel_data *channeld, call_data *calld) { + if (calld->has_compression_algorithm && + (calld->compression_algorithm == GRPC_COMPRESS_NONE)) { + return 1; + } + /* no per-call compression override */ + return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; +} + static void process_send_ops(grpc_call_element *elem, grpc_stream_op_buffer *send_ops) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i, j; - int begin_message_index = -1; - int metadata_op_index = -1; - grpc_mdelem *actual_compression_md; + int did_compress = 0; /* buffer up slices until we've processed all the expected ones (as given by * GRPC_OP_BEGIN_MESSAGE) */ @@ -85,73 +113,83 @@ static void process_send_ops(grpc_call_element *elem, grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { case GRPC_OP_BEGIN_MESSAGE: - begin_message_index = i; calld->remaining_slice_bytes = sop->data.begin_message.length; - calld->no_compress = - !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS); + /* TODO(dgq): we may want to get rid of the flags mechanism to have + * exceptions to compression: we can rely solely on metadata to set NONE + * as the compression algorithm */ + if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + break; + case GRPC_OP_METADATA: + /* Parse incoming request for compression. If any, it'll be available at + * calld->compression_algorithm */ + grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter, + elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = + channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } break; case GRPC_OP_SLICE: - if (calld->no_compress) continue; + if (skip_compression(channeld, calld)) continue; GPR_ASSERT(calld->remaining_slice_bytes > 0); /* add to calld->slices */ gpr_slice_buffer_add(&calld->slices, sop->data.slice); calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); if (calld->remaining_slice_bytes == 0) { /* compress */ - if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) { - calld->no_compress = 1; /* GPR_TRUE */ - } + did_compress = + compress_send_sb(calld->compression_algorithm, &calld->slices); } break; - case GRPC_OP_METADATA: - /* Save the index of the first metadata op, to be processed after we - * know whether compression actually happened */ - if (metadata_op_index < 0) metadata_op_index = i; - break; case GRPC_NO_OP: - ; /* fallthrough, ignore */ + ; /* fallthrough */ } } - if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */ - return; - } - - /* update both the metadata and the begin_message's flags */ - if (calld->no_compress) { - /* either because the user requested the exception or because compressing - * would have resulted in a larger output */ - channeld->compress_algorithm = GRPC_COMPRESS_NONE; - actual_compression_md = channeld->no_compression_md; - /* reset the flag compression bit */ - send_ops->ops[begin_message_index].data.begin_message.flags &= - ~GRPC_WRITE_INTERNAL_COMPRESS; - } else { /* DID compress */ - actual_compression_md = channeld->compress_algorithm_md; - /* at this point, calld->slices contains the *compressed* slices from - * send_ops->ops[*]->data.slice. We now replace these input slices with the - * compressed ones. */ - for (i = 0, j = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - GPR_ASSERT(j < calld->slices.count); - switch (sop->type) { - case GRPC_OP_SLICE: - gpr_slice_unref(sop->data.slice); - sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); - break; - case GRPC_OP_BEGIN_MESSAGE: + /* We need to: + * - (OP_SLICE) If compression happened, replace the input slices with the + * compressed ones. + * - (BEGIN_MESSAGE) Update the message info (size, flags). + * - (OP_METADATA) Convey the compression configuration */ + for (i = 0, j = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + if (did_compress) { sop->data.begin_message.length = calld->slices.length; sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; - case GRPC_NO_OP: - case GRPC_OP_METADATA: - ; /* fallthrough, ignore */ - } + } else { + /* either because the user requested the exception or because compressing + * would have resulted in a larger output */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + /* reset the flag compression bit */ + sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; + } + break; + case GRPC_OP_METADATA: + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->compression_algorithm_storage, + grpc_mdelem_ref(channeld->mdelem_compression_algorithms + [calld->compression_algorithm])); + break; + case GRPC_OP_SLICE: + if (did_compress) { + GPR_ASSERT(j < calld->slices.count); + gpr_slice_unref(sop->data.slice); + sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + } + break; + case GRPC_NO_OP: + ; /* fallthrough */ } } - - grpc_metadata_batch_add_head( - &(send_ops->ops[metadata_op_index].data.metadata), - &calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md)); } /* Called either: @@ -189,6 +227,7 @@ static void init_call_elem(grpc_call_element *elem, /* initialize members */ gpr_slice_buffer_init(&calld->slices); + calld->has_compression_algorithm = 0; if (initial_op) { if (initial_op->send_ops && initial_op->send_ops->nops > 0) { @@ -209,17 +248,23 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *channeld = elem->channel_data; + grpc_compression_algorithm algo_idx; const grpc_compression_level clevel = grpc_channel_args_get_compression_level(args); - const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE; - channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer( - mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel)); - channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel); + channeld->default_compression_algorithm = + grpc_compression_algorithm_for_level(clevel); - channeld->no_compression_md = grpc_mdelem_from_string_and_buffer( - mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg, - sizeof(none_alg)); + channeld->mdstr_compression_algorithm_key = + grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); + + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + channeld->mdelem_compression_algorithms[algo_idx] = + grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key), + grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx, + sizeof(algo_idx))); + } /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down @@ -231,8 +276,13 @@ static void init_channel_elem(grpc_channel_element *elem, /* Destructor for channel data */ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; - grpc_mdelem_unref(channeld->compress_algorithm_md); - grpc_mdelem_unref(channeld->no_compression_md); + grpc_compression_algorithm algo_idx; + + grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; + ++algo_idx) { + grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); + } } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 41257419c0f..2b2d92cac74 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -209,8 +209,8 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /** Compression level for the call */ - grpc_compression_level compression_level; + /** Compression algorithm for the call */ + grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -395,9 +395,9 @@ static void set_status_code(grpc_call *call, status_source source, } } -static void set_compression_level(grpc_call *call, - grpc_compression_level clevel) { - call->compression_level = clevel; +static void set_compression_algorithm(grpc_call *call, + grpc_compression_algorithm algo) { + call->compression_algorithm = algo; } static void set_status_details(grpc_call *call, status_source source, @@ -651,12 +651,10 @@ static void finish_message(grpc_call *call) { /* some aliases for readability */ gpr_slice *slices = call->incoming_message.slices; const size_t nslices = call->incoming_message.count; - const grpc_compression_algorithm compression_algorithm = - grpc_compression_algorithm_for_level(call->compression_level); - if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) { - byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices, - compression_algorithm); + if (call->compression_algorithm > GRPC_COMPRESS_NONE) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); } else { byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); } @@ -683,12 +681,11 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { * compression level should already be present in the call, as parsed off its * corresponding metadata. */ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) { + (call->compression_algorithm == GRPC_COMPRESS_NONE)) { char *message = NULL; gpr_asprintf( &message, "Invalid compression algorithm (%s) for compressed message.", - grpc_compression_algorithm_name( - grpc_compression_algorithm_for_level(call->compression_level))); + grpc_compression_algorithm_name(call->compression_algorithm)); cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1); } /* stash away parameters, and prepare for incoming slices */ @@ -1183,8 +1180,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); } else if (key == - grpc_channel_get_compresssion_level_string(call->channel)) { - set_compression_level(call, decode_compression(md)); + grpc_channel_get_compresssion_algorithm_string(call->channel)) { + set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 6353a83b4f1..e841a5d493e 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -64,7 +64,7 @@ struct grpc_channel { grpc_mdctx *metadata_context; /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; - grpc_mdstr *grpc_compression_level_string; + grpc_mdstr *grpc_compression_algorithm_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -99,8 +99,8 @@ grpc_channel *grpc_channel_create_from_filters( gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); - channel->grpc_compression_level_string = - grpc_mdstr_from_string(mdctx, "grpc-compression-level"); + channel->grpc_compression_algorithm_string = + grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -202,7 +202,7 @@ static void destroy_channel(void *p, int ok) { grpc_mdelem_unref(channel->grpc_status_elem[i]); } grpc_mdstr_unref(channel->grpc_status_string); - grpc_mdstr_unref(channel->grpc_compression_level_string); + grpc_mdstr_unref(channel->grpc_compression_algorithm_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); @@ -261,8 +261,8 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { - return channel->grpc_compression_level_string; +grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) { + return channel->grpc_compression_algorithm_string; } grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index f838129148f..f4df06a0c3a 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_compresssion_algorithm_string( + grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index fe41780702d..8cc4cb7fdab 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -251,10 +251,10 @@ static void request_with_payload_template( config.tear_down_data(&f); } -static void test_invoke_request_with_excepcionally_uncompressed_payload( +static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( - config, "test_invoke_request_with_excepcionally_uncompressed_payload", + config, "test_invoke_request_with_exceptionally_uncompressed_payload", GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE); } @@ -276,7 +276,7 @@ static void test_invoke_request_with_uncompressed_payload( void grpc_end2end_tests(grpc_end2end_test_config config) { - test_invoke_request_with_excepcionally_uncompressed_payload(config); + test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); test_invoke_request_with_uncompressed_payload(config); }