diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index f7fed7cae96..17abba06be9 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -63,11 +63,6 @@ typedef struct connected_channel_call_data { gpr_uint8 got_read_close; gpr_slice_buffer incoming_message; gpr_uint32 outgoing_buffer_length_estimate; - - grpc_linked_mdelem *incoming_metadata; - size_t incoming_metadata_count; - size_t incoming_metadata_capacity; - gpr_timespec deadline; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -120,26 +115,18 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand, static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_call_op *op) { call_data *calld = elem->call_data; - grpc_linked_mdelem *m; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { case GRPC_SEND_METADATA: - for (m = op->data.metadata.list.head; m; m = m->next) { - grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md); - } - if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { - grpc_sopb_add_deadline(&calld->outgoing_sopb, - op->data.metadata.deadline); - } + grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); break; case GRPC_SEND_START: grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); - grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb); end_bufferable_op(op, chand, calld, 0); break; case GRPC_SEND_MESSAGE: @@ -209,9 +196,6 @@ static void init_call_elem(grpc_call_element *elem, calld->got_read_close = 0; calld->outgoing_buffer_length_estimate = 0; calld->max_message_length = chand->max_message_length; - calld->incoming_metadata = NULL; - calld->incoming_metadata_capacity = 0; - calld->incoming_metadata_count = 0; gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), @@ -328,50 +312,16 @@ static void finish_message(channel_data *chand, call_data *calld) { grpc_call_next_op(elem, &call_op); } -static void metadata_done_cb(void *ptr, grpc_op_error error) { gpr_free(ptr); } - -static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) { - if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) { - calld->incoming_metadata_capacity = - GPR_MAX(8, 2 * calld->incoming_metadata_capacity); - calld->incoming_metadata = gpr_realloc( - calld->incoming_metadata, - sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity); - } - calld->incoming_metadata[calld->incoming_metadata_count++].md = elem; -} - -static void flush_metadata(grpc_call_element *elem) { +static void got_metadata(grpc_call_element *elem, grpc_metadata_batch metadata) { grpc_call_op op; - call_data *calld = elem->call_data; - size_t i; - - for (i = 1; i < calld->incoming_metadata_count; i++) { - calld->incoming_metadata[i].prev = &calld->incoming_metadata[i - 1]; - } - for (i = 0; i < calld->incoming_metadata_count - 1; i++) { - calld->incoming_metadata[i].next = &calld->incoming_metadata[i + 1]; - } - - calld->incoming_metadata[0].prev = - calld->incoming_metadata[calld->incoming_metadata_count - 1].next = NULL; - op.type = GRPC_RECV_METADATA; op.dir = GRPC_CALL_UP; op.flags = 0; - op.data.metadata.list.head = &calld->incoming_metadata[0]; - op.data.metadata.list.tail = - &calld->incoming_metadata[calld->incoming_metadata_count - 1]; - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = calld->deadline; - op.done_cb = metadata_done_cb; - op.user_data = calld->incoming_metadata; + op.data.metadata = metadata; + op.done_cb = do_nothing; + op.user_data = NULL; grpc_call_next_op(elem, &op); - - calld->incoming_metadata = NULL; - calld->incoming_metadata_count = 0; - calld->incoming_metadata_capacity = 0; } /* Handle incoming stream ops from the transport, translating them into @@ -393,20 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport, stream_op = ops + i; switch (stream_op->type) { case GRPC_OP_FLOW_CTL_CB: - gpr_log(GPR_ERROR, - "should not receive flow control ops from transport"); - abort(); + stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); break; case GRPC_NO_OP: break; case GRPC_OP_METADATA: - add_incoming_metadata(calld, stream_op->data.metadata); - break; - case GRPC_OP_DEADLINE: - calld->deadline = stream_op->data.deadline; - break; - case GRPC_OP_METADATA_BOUNDARY: - flush_metadata(elem); + got_metadata(elem, stream_op->data.metadata); break; case GRPC_OP_BEGIN_MESSAGE: /* can't begin a message when we're still reading a message */ diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 708bb06c7f3..8c66171b078 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -480,9 +480,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, curop++; break; case GRPC_OP_FLOW_CTL_CB: - case GRPC_OP_DEADLINE: case GRPC_OP_METADATA: - case GRPC_OP_METADATA_BOUNDARY: /* these just get copied as they don't impact the number of flow controlled bytes */ grpc_sopb_append(outops, op, 1); @@ -543,6 +541,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, gpr_uint32 curop = 0; gpr_uint32 unref_op; grpc_mdctx *mdctx = compressor->mdctx; + grpc_linked_mdelem *l; int need_unref = 0; GPR_ASSERT(stream_id != 0); @@ -566,19 +565,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, curop++; break; case GRPC_OP_METADATA: - /* Encode a metadata element; store the returned value, representing + /* Encode a metadata batch; store the returned values, representing a metadata element that needs to be unreffed back into the metadata slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got updated). After this loop, we'll do a batch unref of elements. */ - op->data.metadata = hpack_enc(compressor, op->data.metadata, &st); - need_unref |= op->data.metadata != NULL; - curop++; - break; - case GRPC_OP_DEADLINE: - deadline_enc(compressor, op->data.deadline, &st); - curop++; - break; - case GRPC_OP_METADATA_BOUNDARY: + need_unref |= op->data.metadata.garbage.head != NULL; + grpc_metadata_batch_assert_ok(&op->data.metadata); + for (l = op->data.metadata.list.head; l; l = l->next) { + l->md = hpack_enc(compressor, l->md, &st); + need_unref |= l->md != NULL; + } + if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) { + deadline_enc(compressor, op->data.metadata.deadline, &st); + } ensure_frame_type(&st, HEADER, 0); finish_frame(&st, 1, 0); st.last_was_header = 0; /* force a new header frame */ @@ -614,8 +613,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof, for (unref_op = 0; unref_op < curop; unref_op++) { op = &ops[unref_op]; if (op->type != GRPC_OP_METADATA) continue; - if (!op->data.metadata) continue; - grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata); + for (l = op->data.metadata.list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = op->data.metadata.garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } } grpc_mdctx_unlock(mdctx); } diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 110a4b544f3..63fbf67080b 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -292,6 +292,12 @@ struct stream { stream_link links[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT]; + /* incoming metadata */ + grpc_linked_mdelem *incoming_metadata; + size_t incoming_metadata_count; + size_t incoming_metadata_capacity; + gpr_timespec incoming_deadline; + /* sops from application */ grpc_stream_op_buffer outgoing_sopb; /* sops that have passed flow control to be written */ @@ -593,6 +599,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, s->cancelled = 0; s->allow_window_updates = 0; s->published_close = 0; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; + s->incoming_metadata = NULL; + s->incoming_deadline = gpr_inf_future; memset(&s->links, 0, sizeof(s->links)); memset(&s->included, 0, sizeof(s->included)); grpc_sopb_init(&s->outgoing_sopb); @@ -1057,6 +1067,14 @@ static void finalize_cancellations(transport *t) { } } +static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) { + if (s->incoming_metadata_capacity == s->incoming_metadata_count) { + s->incoming_metadata_capacity = GPR_MAX(8, 2 * s->incoming_metadata_capacity); + s->incoming_metadata = gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) * s->incoming_metadata_capacity); + } + s->incoming_metadata[s->incoming_metadata_count++].md = elem; +} + static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, grpc_status_code local_status, grpc_chttp2_error_code error_code, @@ -1076,9 +1094,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, stream_list_join(t, s, CANCELLED); gpr_ltoa(local_status, buffer); - grpc_sopb_add_metadata( - &s->parser.incoming_sopb, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + add_incoming_metadata(t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); stream_list_join(t, s, PENDING_CALLBACKS); } @@ -1254,11 +1270,10 @@ static void on_header(void *tp, grpc_mdelem *md) { } grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } - grpc_sopb_add_deadline(&s->parser.incoming_sopb, - gpr_time_add(gpr_now(), *cached_timeout)); + s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout); grpc_mdelem_unref(md); } else { - grpc_sopb_add_metadata(&s->parser.incoming_sopb, md); + add_incoming_metadata(t, s, md); } } @@ -1434,6 +1449,37 @@ static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) { return window_update < MAX_WINDOW - window; } +static void free_md(void *p, grpc_op_error result) { + gpr_free(p); +} + +static void add_metadata_batch(transport *t) { + grpc_metadata_batch b; + stream *s = t->incoming_stream; + size_t i; + + b.list.head = &s->incoming_metadata[0]; + b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1]; + b.garbage.head = b.garbage.tail = NULL; + b.deadline = s->incoming_deadline; + + for (i = 1; i < s->incoming_metadata_count; i++) { + s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1]; + s->incoming_metadata[i - 1].next = &s->incoming_metadata[i]; + } + s->incoming_metadata[0].prev = NULL; + s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL; + + grpc_sopb_add_metadata(&s->parser.incoming_sopb, b); + grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, s->incoming_metadata); + + /* reset */ + s->incoming_deadline = gpr_inf_future; + s->incoming_metadata = NULL; + s->incoming_metadata_count = 0; + s->incoming_metadata_capacity = 0; +} + static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { grpc_chttp2_parse_state st; size_t i; @@ -1448,8 +1494,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.metadata_boundary) { - grpc_sopb_add_metadata_boundary( - &t->incoming_stream->parser.incoming_sopb); + add_metadata_batch(t); stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); } if (st.ack_settings) { diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 38b42359316..1a8b4174ff1 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -79,14 +79,12 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { gpr_slice_unref(ops[i].data.slice); break; case GRPC_OP_METADATA: - grpc_mdelem_unref(ops[i].data.metadata); + /* grpc_mdelem_unref(ops[i].data.metadata); */ break; case GRPC_OP_FLOW_CTL_CB: ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); break; case GRPC_NO_OP: - case GRPC_OP_DEADLINE: - case GRPC_OP_METADATA_BOUNDARY: case GRPC_OP_BEGIN_MESSAGE: break; } @@ -126,22 +124,11 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, op->data.begin_message.flags = flags; } -void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_METADATA_BOUNDARY; -} - -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *md) { +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) { grpc_stream_op *op = add(sopb); + grpc_metadata_batch_assert_ok(&b); op->type = GRPC_OP_METADATA; - op->data.metadata = md; -} - -void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, - gpr_timespec deadline) { - grpc_stream_op *op = add(sopb); - op->type = GRPC_OP_DEADLINE; - op->data.deadline = deadline; + op->data.metadata = b; } void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { @@ -183,6 +170,7 @@ static void assert_valid_list(grpc_mdelem_list *list) { GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); for (l = list->head; l; l = l->next) { + GPR_ASSERT(l->md); GPR_ASSERT((l->prev == NULL) == (l == list->head)); GPR_ASSERT((l->next == NULL) == (l == list->tail)); if (l->next) GPR_ASSERT(l->next->prev == l); @@ -190,6 +178,11 @@ static void assert_valid_list(grpc_mdelem_list *list) { } } +void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) { + assert_valid_list(&comd->list); + assert_valid_list(&comd->garbage); +} + void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); } void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); } @@ -202,12 +195,14 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target, void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); storage->md = elem_to_add; grpc_metadata_batch_link_head(comd, storage); } static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { assert_valid_list(list); + GPR_ASSERT(storage->md); storage->prev = NULL; storage->next = list->head; if (list->head != NULL) { @@ -227,12 +222,14 @@ void grpc_metadata_batch_link_head(grpc_metadata_batch *comd, void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) { + GPR_ASSERT(elem_to_add); storage->md = elem_to_add; grpc_metadata_batch_link_tail(comd, storage); } static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { assert_valid_list(list); + GPR_ASSERT(storage->md); storage->prev = list->tail; storage->next = NULL; if (list->tail != NULL) { diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 81d08b60fbd..f6ce140bd80 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -50,8 +50,6 @@ typedef enum grpc_stream_op_code { Must be ignored by receivers */ GRPC_NO_OP, GRPC_OP_METADATA, - GRPC_OP_DEADLINE, - GRPC_OP_METADATA_BOUNDARY, /* Begin a message/metadata element/status - as defined by grpc_message_type. */ GRPC_OP_BEGIN_MESSAGE, @@ -115,6 +113,8 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *comd, grpc_mdelem *elem), void *user_data); +void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); + /* Represents a single operation performed on a stream/transport */ typedef struct grpc_stream_op { /* the operation to be applied */ @@ -123,8 +123,7 @@ typedef struct grpc_stream_op { associated op-code */ union { grpc_begin_message begin_message; - grpc_mdelem *metadata; - gpr_timespec deadline; + grpc_metadata_batch metadata; gpr_slice slice; grpc_flow_ctl_cb flow_ctl_cb; } data; @@ -157,9 +156,7 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); /* Append a GRPC_OP_BEGIN to a buffer */ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, gpr_uint32 flags); -void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *metadata); -void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, gpr_timespec deadline); -void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb); +void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); /* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ diff --git a/test/core/transport/chttp2/stream_encoder_test.c b/test/core/transport/chttp2/stream_encoder_test.c index 5c7801079fe..22a4a0a7d69 100644 --- a/test/core/transport/chttp2/stream_encoder_test.c +++ b/test/core/transport/chttp2/stream_encoder_test.c @@ -130,47 +130,65 @@ static void test_small_data_framing(void) { verify_sopb(10, 0, 5, "000005 0000 deadbeef 00000000ff"); } -static void add_sopb_header(const char *key, const char *value) { - grpc_sopb_add_metadata(&g_sopb, - grpc_mdelem_from_strings(g_mdctx, key, value)); +static void free_md(void *p, grpc_op_error err) { + gpr_free(p); +} + +static void add_sopb_headers(int n, ...) { + int i; + grpc_metadata_batch b; + va_list l; + grpc_linked_mdelem *e = gpr_malloc(sizeof(*e) * n); + + va_start(l, n); + for (i = 0; i < n; i++) { + char *key = va_arg(l, char *); + char *value = va_arg(l, char *); + if (i) { + e[i-1].next = &e[i]; + e[i].prev = &e[i-1]; + } + e[i].md = grpc_mdelem_from_strings(g_mdctx, key, value); + } + e[0].prev = NULL; + e[n-1].next = NULL; + va_end(l); + + grpc_sopb_add_metadata(&g_sopb, b); + grpc_sopb_add_flow_ctl_cb(&g_sopb, free_md, e); } static void test_basic_headers(void) { int i; - add_sopb_header("a", "a"); + add_sopb_headers(1, "a", "a"); verify_sopb(0, 0, 0, "000005 0104 deadbeef 40 0161 0161"); - add_sopb_header("a", "a"); + add_sopb_headers(1, "a", "a"); verify_sopb(0, 0, 0, "000001 0104 deadbeef be"); - add_sopb_header("a", "a"); + add_sopb_headers(1, "a", "a"); verify_sopb(0, 0, 0, "000001 0104 deadbeef be"); - add_sopb_header("a", "a"); - add_sopb_header("b", "c"); + add_sopb_headers(2, "a", "a", "b", "c"); verify_sopb(0, 0, 0, "000006 0104 deadbeef be 40 0162 0163"); - add_sopb_header("a", "a"); - add_sopb_header("b", "c"); + add_sopb_headers(2, "a", "a", "b", "c"); verify_sopb(0, 0, 0, "000002 0104 deadbeef bf be"); - add_sopb_header("a", "d"); + add_sopb_headers(1, "a", "d"); verify_sopb(0, 0, 0, "000004 0104 deadbeef 7f 00 0164"); /* flush out what's there to make a few values look very popular */ for (i = 0; i < 350; i++) { - add_sopb_header("a", "a"); - add_sopb_header("b", "c"); - add_sopb_header("a", "d"); + add_sopb_headers(3, "a", "a", "b", "c", "a", "d"); verify_sopb(0, 0, 0, "000003 0104 deadbeef c0 bf be"); } - add_sopb_header("a", "a"); - add_sopb_header("k", "v"); + add_sopb_headers(2, "a", "a", "k", "v"); verify_sopb(0, 0, 0, "000006 0104 deadbeef c0 00 016b 0176"); - add_sopb_header("a", "v"); + add_sopb_headers(1, "a", "v"); /* this could be 000004 0104 deadbeef 0f 30 0176 also */ verify_sopb(0, 0, 0, "000004 0104 deadbeef 0f 2f 0176"); } @@ -190,7 +208,7 @@ static void test_decode_table_overflow(void) { for (i = 0; i < 114; i++) { if (i > 0) { - add_sopb_header("aa", "ba"); + add_sopb_headers(1, "aa", "ba"); } encode_int_to_str(i, key); @@ -209,14 +227,14 @@ static void test_decode_table_overflow(void) { key[0], key[1], value[0], value[1]); } - add_sopb_header(key, value); + add_sopb_headers(1, key, value); verify_sopb(0, 0, 0, expect); gpr_free(expect); } /* if the above passes, then we must have just knocked this pair out of the decoder stack, and so we'll be forced to re-encode it */ - add_sopb_header("aa", "ba"); + add_sopb_headers(1, "aa", "ba"); verify_sopb(0, 0, 0, "000007 0104 deadbeef 40 026161 026261"); } @@ -260,7 +278,7 @@ static void test_decode_random_headers_inner(int max_len) { randstr(st.key, max_len); randstr(st.value, max_len); - add_sopb_header(st.key, st.value); + add_sopb_headers(1, st.key, st.value); gpr_slice_buffer_init(&output); GPR_ASSERT(0 == grpc_chttp2_preencode(g_sopb.ops, &g_sopb.nops, 0, &encops));