Additionally use arena for incoming metadata allocation

reviewable/pr10135/r1
Craig Tiller 8 years ago
parent d426caca81
commit 7d2c398276
  1. 26
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 65
      src/core/ext/transport/chttp2/transport/incoming_metadata.c
  3. 18
      src/core/ext/transport/chttp2/transport/incoming_metadata.h
  4. 18
      src/core/ext/transport/chttp2/transport/parsing.c
  5. 2
      src/core/lib/channel/connected_channel.c
  6. 4
      src/core/lib/transport/transport.c
  7. 3
      src/core/lib/transport/transport.h
  8. 2
      src/core/lib/transport/transport_impl.h

@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) {
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
const void *server_data, gpr_arena *arena) {
GPR_TIMER_BEGIN("init_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena);
grpc_chttp2_data_parser_init(&s->data_parser);
grpc_slice_buffer_init(&s->flow_controlled_buffer);
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@ -1630,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s->recv_trailing_metadata_finished != NULL) {
char status_string[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(status, status_string);
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
exec_ctx, &s->metadata_buffer[1],
grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS,
grpc_slice_from_copied_string(status_string)));
GRPC_LOG_IF_ERROR("add_status",
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
exec_ctx, &s->metadata_buffer[1],
grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_GRPC_STATUS,
grpc_slice_from_copied_string(status_string))));
if (msg != NULL) {
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
exec_ctx, &s->metadata_buffer[1],
grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg)));
GRPC_LOG_IF_ERROR(
"add_status_message",
grpc_chttp2_incoming_metadata_buffer_replace_or_add(
exec_ctx, &s->metadata_buffer[1],
grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg))));
}
s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);

@ -41,69 +41,48 @@
#include <grpc/support/log.h>
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer) {
buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) {
grpc_metadata_batch_init(&buffer->batch);
buffer->arena = arena;
buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
}
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) {
size_t i;
if (!buffer->published) {
for (i = 0; i < buffer->count; i++) {
GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
}
}
gpr_free(buffer->elems);
grpc_metadata_batch_destroy(exec_ctx, &buffer->batch);
}
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) {
GPR_ASSERT(!buffer->published);
if (buffer->capacity == buffer->count) {
buffer->capacity = GPR_MAX(8, 2 * buffer->capacity);
buffer->elems =
gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
}
buffer->elems[buffer->count++].md = elem;
grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem);
return grpc_metadata_batch_add_tail(
exec_ctx, &buffer->batch,
gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem);
}
void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) {
for (size_t i = 0; i < buffer->count; i++) {
if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) {
GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
buffer->elems[i].md = elem;
return;
for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL;
l = l->next) {
if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) {
GRPC_MDELEM_UNREF(exec_ctx, l->md);
l->md = elem;
return GRPC_ERROR_NONE;
}
}
grpc_chttp2_incoming_metadata_buffer_add(buffer, elem);
return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem);
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) {
GPR_ASSERT(!buffer->published);
buffer->deadline = deadline;
buffer->batch.deadline = deadline;
}
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch) {
GPR_ASSERT(!buffer->published);
buffer->published = 1;
if (buffer->count > 0) {
size_t i;
for (i = 0; i < buffer->count; i++) {
/* TODO(ctiller): do something better here */
if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish",
grpc_metadata_batch_link_tail(
exec_ctx, batch, &buffer->elems[i]))) {
GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md);
}
}
} else {
batch->list.head = batch->list.tail = NULL;
}
batch->deadline = buffer->deadline;
*batch = buffer->batch;
grpc_metadata_batch_init(&buffer->batch);
}

@ -37,28 +37,26 @@
#include "src/core/lib/transport/transport.h"
typedef struct {
grpc_linked_mdelem *elems;
size_t count;
size_t capacity;
gpr_timespec deadline;
int published;
gpr_arena *arena;
grpc_metadata_batch batch;
size_t size; // total size of metadata
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
void grpc_chttp2_incoming_metadata_buffer_init(
grpc_chttp2_incoming_metadata_buffer *buffer);
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena);
void grpc_chttp2_incoming_metadata_buffer_destroy(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer);
void grpc_chttp2_incoming_metadata_buffer_publish(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_metadata_batch *batch);
void grpc_chttp2_incoming_metadata_buffer_add(
grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem);
void grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem);
grpc_mdelem elem) GRPC_MUST_USE_RESULT;
grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer,
grpc_mdelem elem) GRPC_MUST_USE_RESULT;
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline);

@ -548,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md);
grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
exec_ctx, &s->metadata_buffer[0], md);
if (error != GRPC_ERROR_NONE) {
grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
}
}
}
@ -598,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp,
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
} else {
grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md);
grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add(
exec_ctx, &s->metadata_buffer[1], md);
if (error != GRPC_ERROR_NONE) {
grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
s->seen_error = true;
GRPC_MDELEM_UNREF(exec_ctx, md);
}
}
GPR_TIMER_END("on_trailing_header", 0);

@ -88,7 +88,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
&args->call_stack->refcount, args->server_transport_data, args->arena);
return r == 0 ? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE("transport stream initialization failed");
}

@ -162,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
const void *server_data) {
const void *server_data, gpr_arena *arena) {
return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
server_data);
server_data, arena);
}
void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,

@ -41,6 +41,7 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/byte_stream.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -229,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport);
int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_stream *stream,
grpc_stream_refcount *refcount,
const void *server_data);
const void *server_data, gpr_arena *arena);
void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
grpc_stream *stream, grpc_polling_entity *pollent);

@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_init_stream */
int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_stream_refcount *refcount,
const void *server_data);
const void *server_data, gpr_arena *arena);
/* implementation of grpc_transport_set_pollset */
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self,

Loading…
Cancel
Save