diff --git a/include/grpc/compression.h b/include/grpc/compression.h index 630fa1656ae..207898f6054 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -34,6 +34,9 @@ #ifndef GRPC_COMPRESSION_H #define GRPC_COMPRESSION_H +/** To be used in channel arguments */ +#define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level" + /* The various compression algorithms supported by GRPC */ typedef enum { GRPC_COMPRESS_NONE = 0, @@ -43,7 +46,17 @@ typedef enum { GRPC_COMPRESS_ALGORITHMS_COUNT } grpc_compression_algorithm; +typedef enum { + GRPC_COMPRESS_LEVEL_NONE = 0, + GRPC_COMPRESS_LEVEL_LOW, + GRPC_COMPRESS_LEVEL_MED, + GRPC_COMPRESS_LEVEL_HIGH +} grpc_compression_level; + const char *grpc_compression_algorithm_name( grpc_compression_algorithm algorithm); +grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level); + #endif /* GRPC_COMPRESSION_H */ diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 36ead843d26..06b034dee75 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -31,6 +31,7 @@ * */ +#include #include const char *grpc_compression_algorithm_name( @@ -47,3 +48,20 @@ const char *grpc_compression_algorithm_name( } return "error"; } + +/* TODO(dgq): Add the ability to specify parameters to the individual + * compression algorithms */ +grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level) { + switch (level) { + case GRPC_COMPRESS_NONE: + return GRPC_COMPRESS_NONE; + case GRPC_COMPRESS_LEVEL_LOW: + case GRPC_COMPRESS_LEVEL_MED: + case GRPC_COMPRESS_LEVEL_HIGH: + return GRPC_COMPRESS_DEFLATE; + default: + /* we shouldn't be making it here */ + abort(); + } +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5cdd7cd0f6c..afed030532c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -208,6 +208,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; + /** Compression level for the call */ + grpc_compression_level compression_level; + /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -391,6 +394,11 @@ static void set_status_code(grpc_call *call, status_source source, } } +static void set_decode_compression_level(grpc_call *call, + grpc_compression_level clevel) { + call->compression_level = clevel; +} + static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { @@ -1116,6 +1124,28 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { return status; } +/* just as for status above, we need to offset: metadata userdata can't hold a + * zero (null), which in this case is used to signal no compression */ +#define COMPRESS_OFFSET 1 +static void destroy_compression(void *ignored) {} + +static gpr_uint32 decode_compression(grpc_mdelem *md) { + grpc_compression_level clevel; + void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + if (user_data) { + clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + } else { + if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), + GPR_SLICE_LENGTH(md->value->slice), + &clevel)) { + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + } + grpc_mdelem_set_user_data(md, destroy_compression, + (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + } + return clevel; +} + static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_linked_mdelem *l; grpc_metadata_array *dest; @@ -1131,6 +1161,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 9175ad0572a..1fe3bf357bb 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -64,6 +64,7 @@ struct grpc_channel { grpc_mdctx *metadata_context; /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; + grpc_mdstr *grpc_compression_level_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -98,6 +99,8 @@ grpc_channel *grpc_channel_create_from_filters( gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); + channel->grpc_compression_level_string = + grpc_mdstr_from_string(mdctx, "grpc-compression-level"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -199,6 +202,7 @@ static void destroy_channel(void *p, int ok) { grpc_mdelem_unref(channel->grpc_status_elem[i]); } grpc_mdstr_unref(channel->grpc_status_string); + grpc_mdstr_unref(channel->grpc_compression_level_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); @@ -257,6 +261,11 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } +grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { + return channel->grpc_compression_level_string; +} + + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return grpc_mdelem_ref(channel->grpc_status_elem[i]); diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 6d1ed879006..f838129148f 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,6 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);