Propagation of compression level to the call.

Through the metadata's user data mechanism.
pull/2073/head
David Garcia Quintas 10 years ago
parent e29feb2adb
commit db94b276f5
  1. 32
      src/core/surface/call.c
  2. 9
      src/core/surface/channel.c
  3. 1
      src/core/surface/channel.h

@ -207,6 +207,9 @@ struct grpc_call {
/* Received call statuses from various sources */ /* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT]; received_status status[STATUS_SOURCE_COUNT];
/** Compression level for the call */
grpc_compression_level compression_level;
/* Contexts for various subsystems (security, tracing, ...). */ /* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT]; grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@ -389,6 +392,11 @@ static void set_status_code(grpc_call *call, status_source source,
} }
} }
static void set_decode_compression_level(grpc_call *call,
grpc_compression_level clevel) {
call->compression_level = clevel;
}
static void set_status_details(grpc_call *call, status_source source, static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) { grpc_mdstr *status) {
if (call->status[source].details != NULL) { if (call->status[source].details != NULL) {
@ -1110,6 +1118,28 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status; return status;
} }
/* just as for status above, we need to offset: metadata userdata can't hold a
* zero (null), which in this case is used to signal no compression */
#define COMPRESS_OFFSET 1
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
grpc_compression_level clevel;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
&clevel)) {
clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
}
grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
}
return clevel;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l; grpc_linked_mdelem *l;
grpc_metadata_array *dest; grpc_metadata_array *dest;
@ -1125,6 +1155,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
set_decode_compression_level(call, decode_compression(md));
} else { } else {
dest = &call->buffered_metadata[is_trailing]; dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {

@ -64,6 +64,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context; grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_level_string;
grpc_mdstr *grpc_message_string; grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string; grpc_mdstr *path_string;
grpc_mdstr *authority_string; grpc_mdstr *authority_string;
@ -98,6 +99,8 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1 + is_client); gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx; channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string =
grpc_mdstr_from_string(mdctx, "grpc-compression-level");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE]; char buf[GPR_LTOA_MIN_BUFSIZE];
@ -199,6 +202,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdelem_unref(channel->grpc_status_elem[i]); grpc_mdelem_unref(channel->grpc_status_elem[i]);
} }
grpc_mdstr_unref(channel->grpc_status_string); grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_compression_level_string);
grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string); grpc_mdstr_unref(channel->authority_string);
@ -257,6 +261,11 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string; return channel->grpc_status_string;
} }
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
return channel->grpc_compression_level_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]); return grpc_mdelem_ref(channel->grpc_status_elem[i]);

@ -53,6 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code); int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);

Loading…
Cancel
Save