More bugfixes to compress_filter.

Introduced grpc_metadata_batch_move and added grpc_compress_filter to grpc_server_create
pull/2135/head
David Garcia Quintas 10 years ago
parent a21e2c8f91
commit 20a3538ddc
  1. 36
      src/core/channel/compress_filter.c
  2. 5
      src/core/surface/server_create.c
  3. 6
      src/core/transport/stream_op.c
  4. 5
      src/core/transport/stream_op.h

@ -112,43 +112,51 @@ static int skip_compression(channel_data *channeld, call_data *calld) {
static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops, static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
grpc_call_element *elem) { grpc_call_element *elem) {
size_t i, j; size_t i;
grpc_stream_op_buffer new_send_ops;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data; channel_data *channeld = elem->channel_data;
int new_slices_added = 0; /* GPR_FALSE */
grpc_metadata_batch metadata;
grpc_sopb_init(&new_send_ops);
/* The following loop is akin to a selective reset + update */ /* The following loop is akin to a selective reset + update */
for (i = 0, j = 0; i < send_ops->nops; ++i) { for (i = 0; i < send_ops->nops; i++) {
grpc_stream_op *sop = &send_ops->ops[i]; grpc_stream_op *sop = &send_ops->ops[i];
switch (sop->type) { switch (sop->type) {
case GRPC_OP_BEGIN_MESSAGE: case GRPC_OP_BEGIN_MESSAGE:
sop->data.begin_message.length = calld->slices.length; grpc_sopb_add_begin_message(
sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; &new_send_ops, calld->slices.length,
sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
break; break;
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
grpc_metadata_batch_move(&metadata, &sop->data.metadata);
if (!calld->seen_initial_metadata) { if (!calld->seen_initial_metadata) {
grpc_metadata_batch_add_head( grpc_metadata_batch_add_head(
&(sop->data.metadata), &calld->compression_algorithm_storage, &metadata, &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms grpc_mdelem_ref(channeld->mdelem_compression_algorithms
[calld->compression_algorithm])); [calld->compression_algorithm]));
calld->seen_initial_metadata = 1; /* GPR_TRUE */ calld->seen_initial_metadata = 1; /* GPR_TRUE */
} }
grpc_sopb_add_metadata(&new_send_ops, metadata);
break; break;
case GRPC_OP_SLICE: case GRPC_OP_SLICE:
gpr_slice_unref(sop->data.slice); if (!new_slices_added) {
/* replace only up to the number of available compressed slices */ size_t j;
if (j < calld->slices.count) { for (j = 0; j < calld->slices.count; ++j) {
sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); grpc_sopb_add_slice(&new_send_ops,
gpr_slice_ref(calld->slices.slices[j]));
}
new_slices_added = 1; /* GPR_TRUE */
} }
break; break;
case GRPC_NO_OP: case GRPC_NO_OP:
break; break;
} }
} }
grpc_sopb_swap(send_ops, &new_send_ops);
/* in case compressed slices remain to be added to the output */ grpc_sopb_destroy(&new_send_ops);
while (j < calld->slices.count) {
grpc_sopb_add_slice(send_ops, gpr_slice_ref(calld->slices.slices[j++]));
}
} }
/* even if the filter isn't producing compressed output, it may need to update /* even if the filter isn't producing compressed output, it may need to update

@ -34,7 +34,10 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/surface/completion_queue.h" #include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h" #include "src/core/surface/server.h"
#include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args) { grpc_server *grpc_server_create(const grpc_channel_args *args) {
return grpc_server_create_from_filters(NULL, 0, args); const grpc_channel_filter *filters[] = {&grpc_compress_filter};
return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
args);
} }

@ -286,6 +286,12 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target,
} }
} }
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
grpc_metadata_batch *src) {
*dst = *src;
memset(src, 0, sizeof(grpc_metadata_batch));
}
void grpc_metadata_batch_filter(grpc_metadata_batch *batch, void grpc_metadata_batch_filter(grpc_metadata_batch *batch,
grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem), grpc_mdelem *elem),

@ -102,6 +102,11 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch);
void grpc_metadata_batch_merge(grpc_metadata_batch *target, void grpc_metadata_batch_merge(grpc_metadata_batch *target,
grpc_metadata_batch *add); grpc_metadata_batch *add);
/** Moves the metadata information from \a src to \a dst. Upon return, \a src is
* zeroed. */
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
grpc_metadata_batch *src);
/** Add \a storage to the beginning of \a batch. storage->md is /** Add \a storage to the beginning of \a batch. storage->md is
assumed to be valid. assumed to be valid.
\a storage is owned by the caller and must survive for the \a storage is owned by the caller and must survive for the

Loading…
Cancel
Save