Fixes to surface/call.c to consider the channel's default compression level.

pull/6481/head
David Garcia Quintas 9 years ago
parent b0dd253a60
commit 749367fd00
  1. 40
      src/core/lib/surface/call.c

@ -178,8 +178,8 @@ struct grpc_call {
/* Call stats: only valid after trailing metadata received */ /* Call stats: only valid after trailing metadata received */
grpc_transport_stream_stats stats; grpc_transport_stream_stats stats;
/* Compression algorithm for the call */ /* Compression algorithm for *incoming* data */
grpc_compression_algorithm compression_algorithm; grpc_compression_algorithm incoming_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */ /* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer; uint32_t encodings_accepted_by_peer;
@ -406,16 +406,16 @@ static void set_status_code(grpc_call *call, status_source source,
/* TODO(ctiller): what to do about the flush that was previously here */ /* TODO(ctiller): what to do about the flush that was previously here */
} }
static void set_compression_algorithm(grpc_call *call, static void set_incoming_compression_algorithm(
grpc_compression_algorithm algo) { grpc_call *call, grpc_compression_algorithm algo) {
call->compression_algorithm = algo; call->incoming_compression_algorithm = algo;
} }
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) { grpc_call *call) {
grpc_compression_algorithm algorithm; grpc_compression_algorithm algorithm;
gpr_mu_lock(&call->mu); gpr_mu_lock(&call->mu);
algorithm = call->compression_algorithm; algorithm = call->incoming_compression_algorithm;
gpr_mu_unlock(&call->mu); gpr_mu_unlock(&call->mu);
return algorithm; return algorithm;
} }
@ -968,9 +968,9 @@ static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
if (elem == NULL) { if (elem == NULL) {
return NULL; return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
GPR_TIMER_BEGIN("compression_algorithm", 0); GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_compression_algorithm(call, decode_compression(elem)); set_incoming_compression_algorithm(call, decode_compression(elem));
GPR_TIMER_END("compression_algorithm", 0); GPR_TIMER_END("incoming_compression_algorithm", 0);
return NULL; return NULL;
} else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
@ -1133,9 +1133,9 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
} else { } else {
call->test_only_last_message_flags = call->receiving_stream->flags; call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->compression_algorithm > GRPC_COMPRESS_NONE)) { (call->incoming_compression_algorithm > GRPC_COMPRESS_NONE)) {
*call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
NULL, 0, call->compression_algorithm); NULL, 0, call->incoming_compression_algorithm);
} else { } else {
*call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
} }
@ -1323,17 +1323,25 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_metadata compression_md; grpc_metadata compression_md;
memset(&compression_md, 0, sizeof(grpc_metadata)); memset(&compression_md, 0, sizeof(grpc_metadata));
size_t additional_metadata_count = 0; size_t additional_metadata_count = 0;
if (op->data.send_initial_metadata.maybe_compression_level.is_set && grpc_compression_level effective_compression_level;
op->data.send_initial_metadata.maybe_compression_level bool level_set = false;
.compression_level > GRPC_COMPRESS_LEVEL_NONE) { if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
if (call->is_client) { if (call->is_client) {
error = GRPC_CALL_ERROR_NOT_ON_CLIENT; error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
goto done_with_error; goto done_with_error;
} }
effective_compression_level =
op->data.send_initial_metadata.maybe_compression_level
.compression_level;
level_set = true;
} else {
level_set = grpc_channel_default_compression_level(
call->channel, &effective_compression_level);
}
if (level_set) {
const grpc_compression_algorithm calgo = const grpc_compression_algorithm calgo =
compression_algorithm_for_level_locked( compression_algorithm_for_level_locked(
call, op->data.send_initial_metadata.maybe_compression_level call, effective_compression_level);
.compression_level);
char *calgo_name; char *calgo_name;
grpc_compression_algorithm_name(calgo, &calgo_name); grpc_compression_algorithm_name(calgo, &calgo_name);
compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY; compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY;

Loading…
Cancel
Save