|
|
|
@ -110,13 +110,17 @@ static int skip_compression(channel_data *channeld, call_data *calld) { |
|
|
|
|
return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying
|
|
|
|
|
* the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length, |
|
|
|
|
* flags indicating compression is in effect) and replaces \a send_ops with it. |
|
|
|
|
* */ |
|
|
|
|
static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, |
|
|
|
|
grpc_call_element *elem) { |
|
|
|
|
size_t i; |
|
|
|
|
grpc_stream_op_buffer new_send_ops; |
|
|
|
|
call_data *calld = elem->call_data; |
|
|
|
|
int new_slices_added = 0; /* GPR_FALSE */ |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch metadata; |
|
|
|
|
grpc_stream_op_buffer new_send_ops; |
|
|
|
|
grpc_sopb_init(&new_send_ops); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < send_ops->nops; i++) { |
|
|
|
@ -128,6 +132,9 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, |
|
|
|
|
sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS); |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_SLICE: |
|
|
|
|
/* Once we reach the slices section of the original buffer, simply add
|
|
|
|
|
* all the new (compressed) slices. We obviously want to do this only |
|
|
|
|
* once, hence the "new_slices_added" guard. */ |
|
|
|
|
if (!new_slices_added) { |
|
|
|
|
size_t j; |
|
|
|
|
for (j = 0; j < calld->slices.count; ++j) { |
|
|
|
@ -138,8 +145,9 @@ static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
case GRPC_OP_METADATA: |
|
|
|
|
grpc_sopb_add_metadata(&new_send_ops, sop->data.metadata); |
|
|
|
|
memset(&(sop->data.metadata), 0, sizeof(grpc_metadata_batch)); |
|
|
|
|
/* move the metadata to the new buffer. */ |
|
|
|
|
grpc_metadata_batch_move(&metadata, &sop->data.metadata); |
|
|
|
|
grpc_sopb_add_metadata(&new_send_ops, metadata); |
|
|
|
|
break; |
|
|
|
|
case GRPC_NO_OP: |
|
|
|
|
break; |
|
|
|
@ -156,12 +164,12 @@ static void process_send_ops(grpc_call_element *elem, |
|
|
|
|
size_t i; |
|
|
|
|
int did_compress = 0; |
|
|
|
|
|
|
|
|
|
/* buffer up slices until we've processed all the expected ones (as given by
|
|
|
|
|
* GRPC_OP_BEGIN_MESSAGE) */ |
|
|
|
|
for (i = 0; i < send_ops->nops; ++i) { |
|
|
|
|
grpc_stream_op *sop = &send_ops->ops[i]; |
|
|
|
|
switch (sop->type) { |
|
|
|
|
case GRPC_OP_BEGIN_MESSAGE: |
|
|
|
|
/* buffer up slices until we've processed all the expected ones (as
|
|
|
|
|
* given by GRPC_OP_BEGIN_MESSAGE) */ |
|
|
|
|
calld->remaining_slice_bytes = sop->data.begin_message.length; |
|
|
|
|
if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { |
|
|
|
|
calld->has_compression_algorithm = 1; /* GPR_TRUE */ |
|
|
|
@ -192,12 +200,10 @@ static void process_send_ops(grpc_call_element *elem, |
|
|
|
|
case GRPC_OP_SLICE: |
|
|
|
|
if (skip_compression(channeld, calld)) continue; |
|
|
|
|
GPR_ASSERT(calld->remaining_slice_bytes > 0); |
|
|
|
|
/* We need to copy the input because gpr_slice_buffer_add takes
|
|
|
|
|
* ownership. However, we don't own sop->data.slice, the caller does. */ |
|
|
|
|
/* Increase input ref count, gpr_slice_buffer_add takes ownership. */ |
|
|
|
|
gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice)); |
|
|
|
|
calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); |
|
|
|
|
if (calld->remaining_slice_bytes == 0) { |
|
|
|
|
/* compress */ |
|
|
|
|
did_compress = |
|
|
|
|
compress_send_sb(calld->compression_algorithm, &calld->slices); |
|
|
|
|
} |
|
|
|
|