Progress towards transport using metadata batches

pull/1312/head
Craig Tiller 10 years ago
parent 4e99edb642
commit 9c1043e757
  1. 72
      src/core/channel/connected_channel.c
  2. 31
      src/core/transport/chttp2/stream_encoder.c
  3. 61
      src/core/transport/chttp2_transport.c
  4. 31
      src/core/transport/stream_op.c
  5. 11
      src/core/transport/stream_op.h
  6. 60
      test/core/transport/chttp2/stream_encoder_test.c

@ -63,11 +63,6 @@ typedef struct connected_channel_call_data {
gpr_uint8 got_read_close; gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message; gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate; gpr_uint32 outgoing_buffer_length_estimate;
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
gpr_timespec deadline;
} call_data; } call_data;
/* We perform a small hack to locate transport data alongside the connected /* We perform a small hack to locate transport data alongside the connected
@ -120,26 +115,18 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) { grpc_call_op *op) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_linked_mdelem *m;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op); GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) { switch (op->type) {
case GRPC_SEND_METADATA: case GRPC_SEND_METADATA:
for (m = op->data.metadata.list.head; m; m = m->next) { grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md);
}
if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
grpc_sopb_add_deadline(&calld->outgoing_sopb,
op->data.metadata.deadline);
}
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
op->user_data); op->user_data);
break; break;
case GRPC_SEND_START: case GRPC_SEND_START:
grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset);
grpc_sopb_add_metadata_boundary(&calld->outgoing_sopb);
end_bufferable_op(op, chand, calld, 0); end_bufferable_op(op, chand, calld, 0);
break; break;
case GRPC_SEND_MESSAGE: case GRPC_SEND_MESSAGE:
@ -209,9 +196,6 @@ static void init_call_elem(grpc_call_element *elem,
calld->got_read_close = 0; calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0; calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length; calld->max_message_length = chand->max_message_length;
calld->incoming_metadata = NULL;
calld->incoming_metadata_capacity = 0;
calld->incoming_metadata_count = 0;
gpr_slice_buffer_init(&calld->incoming_message); gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport, r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld), TRANSPORT_STREAM_FROM_CALL_DATA(calld),
@ -328,50 +312,16 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op); grpc_call_next_op(elem, &call_op);
} }
static void metadata_done_cb(void *ptr, grpc_op_error error) { gpr_free(ptr); } static void got_metadata(grpc_call_element *elem, grpc_metadata_batch metadata) {
static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) {
if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) {
calld->incoming_metadata_capacity =
GPR_MAX(8, 2 * calld->incoming_metadata_capacity);
calld->incoming_metadata = gpr_realloc(
calld->incoming_metadata,
sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity);
}
calld->incoming_metadata[calld->incoming_metadata_count++].md = elem;
}
static void flush_metadata(grpc_call_element *elem) {
grpc_call_op op; grpc_call_op op;
call_data *calld = elem->call_data;
size_t i;
for (i = 1; i < calld->incoming_metadata_count; i++) {
calld->incoming_metadata[i].prev = &calld->incoming_metadata[i - 1];
}
for (i = 0; i < calld->incoming_metadata_count - 1; i++) {
calld->incoming_metadata[i].next = &calld->incoming_metadata[i + 1];
}
calld->incoming_metadata[0].prev =
calld->incoming_metadata[calld->incoming_metadata_count - 1].next = NULL;
op.type = GRPC_RECV_METADATA; op.type = GRPC_RECV_METADATA;
op.dir = GRPC_CALL_UP; op.dir = GRPC_CALL_UP;
op.flags = 0; op.flags = 0;
op.data.metadata.list.head = &calld->incoming_metadata[0]; op.data.metadata = metadata;
op.data.metadata.list.tail = op.done_cb = do_nothing;
&calld->incoming_metadata[calld->incoming_metadata_count - 1]; op.user_data = NULL;
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = calld->deadline;
op.done_cb = metadata_done_cb;
op.user_data = calld->incoming_metadata;
grpc_call_next_op(elem, &op); grpc_call_next_op(elem, &op);
calld->incoming_metadata = NULL;
calld->incoming_metadata_count = 0;
calld->incoming_metadata_capacity = 0;
} }
/* Handle incoming stream ops from the transport, translating them into /* Handle incoming stream ops from the transport, translating them into
@ -393,20 +343,12 @@ static void recv_batch(void *user_data, grpc_transport *transport,
stream_op = ops + i; stream_op = ops + i;
switch (stream_op->type) { switch (stream_op->type) {
case GRPC_OP_FLOW_CTL_CB: case GRPC_OP_FLOW_CTL_CB:
gpr_log(GPR_ERROR, stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
"should not receive flow control ops from transport");
abort();
break; break;
case GRPC_NO_OP: case GRPC_NO_OP:
break; break;
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
add_incoming_metadata(calld, stream_op->data.metadata); got_metadata(elem, stream_op->data.metadata);
break;
case GRPC_OP_DEADLINE:
calld->deadline = stream_op->data.deadline;
break;
case GRPC_OP_METADATA_BOUNDARY:
flush_metadata(elem);
break; break;
case GRPC_OP_BEGIN_MESSAGE: case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */ /* can't begin a message when we're still reading a message */

@ -480,9 +480,7 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count,
curop++; curop++;
break; break;
case GRPC_OP_FLOW_CTL_CB: case GRPC_OP_FLOW_CTL_CB:
case GRPC_OP_DEADLINE:
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
case GRPC_OP_METADATA_BOUNDARY:
/* these just get copied as they don't impact the number of flow /* these just get copied as they don't impact the number of flow
controlled bytes */ controlled bytes */
grpc_sopb_append(outops, op, 1); grpc_sopb_append(outops, op, 1);
@ -543,6 +541,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
gpr_uint32 curop = 0; gpr_uint32 curop = 0;
gpr_uint32 unref_op; gpr_uint32 unref_op;
grpc_mdctx *mdctx = compressor->mdctx; grpc_mdctx *mdctx = compressor->mdctx;
grpc_linked_mdelem *l;
int need_unref = 0; int need_unref = 0;
GPR_ASSERT(stream_id != 0); GPR_ASSERT(stream_id != 0);
@ -566,19 +565,19 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
curop++; curop++;
break; break;
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
/* Encode a metadata element; store the returned value, representing /* Encode a metadata batch; store the returned values, representing
a metadata element that needs to be unreffed back into the metadata a metadata element that needs to be unreffed back into the metadata
slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got slot. THIS MAY NOT BE THE SAME ELEMENT (if a decoder table slot got
updated). After this loop, we'll do a batch unref of elements. */ updated). After this loop, we'll do a batch unref of elements. */
op->data.metadata = hpack_enc(compressor, op->data.metadata, &st); need_unref |= op->data.metadata.garbage.head != NULL;
need_unref |= op->data.metadata != NULL; grpc_metadata_batch_assert_ok(&op->data.metadata);
curop++; for (l = op->data.metadata.list.head; l; l = l->next) {
break; l->md = hpack_enc(compressor, l->md, &st);
case GRPC_OP_DEADLINE: need_unref |= l->md != NULL;
deadline_enc(compressor, op->data.deadline, &st); }
curop++; if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
break; deadline_enc(compressor, op->data.metadata.deadline, &st);
case GRPC_OP_METADATA_BOUNDARY: }
ensure_frame_type(&st, HEADER, 0); ensure_frame_type(&st, HEADER, 0);
finish_frame(&st, 1, 0); finish_frame(&st, 1, 0);
st.last_was_header = 0; /* force a new header frame */ st.last_was_header = 0; /* force a new header frame */
@ -614,8 +613,12 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
for (unref_op = 0; unref_op < curop; unref_op++) { for (unref_op = 0; unref_op < curop; unref_op++) {
op = &ops[unref_op]; op = &ops[unref_op];
if (op->type != GRPC_OP_METADATA) continue; if (op->type != GRPC_OP_METADATA) continue;
if (!op->data.metadata) continue; for (l = op->data.metadata.list.head; l; l = l->next) {
grpc_mdctx_locked_mdelem_unref(mdctx, op->data.metadata); if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
for (l = op->data.metadata.garbage.head; l; l = l->next) {
grpc_mdctx_locked_mdelem_unref(mdctx, l->md);
}
} }
grpc_mdctx_unlock(mdctx); grpc_mdctx_unlock(mdctx);
} }

@ -292,6 +292,12 @@ struct stream {
stream_link links[STREAM_LIST_COUNT]; stream_link links[STREAM_LIST_COUNT];
gpr_uint8 included[STREAM_LIST_COUNT]; gpr_uint8 included[STREAM_LIST_COUNT];
/* incoming metadata */
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
gpr_timespec incoming_deadline;
/* sops from application */ /* sops from application */
grpc_stream_op_buffer outgoing_sopb; grpc_stream_op_buffer outgoing_sopb;
/* sops that have passed flow control to be written */ /* sops that have passed flow control to be written */
@ -593,6 +599,10 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs,
s->cancelled = 0; s->cancelled = 0;
s->allow_window_updates = 0; s->allow_window_updates = 0;
s->published_close = 0; s->published_close = 0;
s->incoming_metadata_count = 0;
s->incoming_metadata_capacity = 0;
s->incoming_metadata = NULL;
s->incoming_deadline = gpr_inf_future;
memset(&s->links, 0, sizeof(s->links)); memset(&s->links, 0, sizeof(s->links));
memset(&s->included, 0, sizeof(s->included)); memset(&s->included, 0, sizeof(s->included));
grpc_sopb_init(&s->outgoing_sopb); grpc_sopb_init(&s->outgoing_sopb);
@ -1057,6 +1067,14 @@ static void finalize_cancellations(transport *t) {
} }
} }
static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
if (s->incoming_metadata_capacity == s->incoming_metadata_count) {
s->incoming_metadata_capacity = GPR_MAX(8, 2 * s->incoming_metadata_capacity);
s->incoming_metadata = gpr_realloc(s->incoming_metadata, sizeof(*s->incoming_metadata) * s->incoming_metadata_capacity);
}
s->incoming_metadata[s->incoming_metadata_count++].md = elem;
}
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status, grpc_status_code local_status,
grpc_chttp2_error_code error_code, grpc_chttp2_error_code error_code,
@ -1076,9 +1094,7 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
stream_list_join(t, s, CANCELLED); stream_list_join(t, s, CANCELLED);
gpr_ltoa(local_status, buffer); gpr_ltoa(local_status, buffer);
grpc_sopb_add_metadata( add_incoming_metadata(t, s, grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
&s->parser.incoming_sopb,
grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
stream_list_join(t, s, PENDING_CALLBACKS); stream_list_join(t, s, PENDING_CALLBACKS);
} }
@ -1254,11 +1270,10 @@ static void on_header(void *tp, grpc_mdelem *md) {
} }
grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
} }
grpc_sopb_add_deadline(&s->parser.incoming_sopb, s->incoming_deadline = gpr_time_add(gpr_now(), *cached_timeout);
gpr_time_add(gpr_now(), *cached_timeout));
grpc_mdelem_unref(md); grpc_mdelem_unref(md);
} else { } else {
grpc_sopb_add_metadata(&s->parser.incoming_sopb, md); add_incoming_metadata(t, s, md);
} }
} }
@ -1434,6 +1449,37 @@ static int is_window_update_legal(gpr_uint32 window_update, gpr_uint32 window) {
return window_update < MAX_WINDOW - window; return window_update < MAX_WINDOW - window;
} }
static void free_md(void *p, grpc_op_error result) {
gpr_free(p);
}
static void add_metadata_batch(transport *t) {
grpc_metadata_batch b;
stream *s = t->incoming_stream;
size_t i;
b.list.head = &s->incoming_metadata[0];
b.list.tail = &s->incoming_metadata[s->incoming_metadata_count - 1];
b.garbage.head = b.garbage.tail = NULL;
b.deadline = s->incoming_deadline;
for (i = 1; i < s->incoming_metadata_count; i++) {
s->incoming_metadata[i].prev = &s->incoming_metadata[i - 1];
s->incoming_metadata[i - 1].next = &s->incoming_metadata[i];
}
s->incoming_metadata[0].prev = NULL;
s->incoming_metadata[s->incoming_metadata_count - 1].next = NULL;
grpc_sopb_add_metadata(&s->parser.incoming_sopb, b);
grpc_sopb_add_flow_ctl_cb(&s->parser.incoming_sopb, free_md, s->incoming_metadata);
/* reset */
s->incoming_deadline = gpr_inf_future;
s->incoming_metadata = NULL;
s->incoming_metadata_count = 0;
s->incoming_metadata_capacity = 0;
}
static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
grpc_chttp2_parse_state st; grpc_chttp2_parse_state st;
size_t i; size_t i;
@ -1448,8 +1494,7 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
} }
if (st.metadata_boundary) { if (st.metadata_boundary) {
grpc_sopb_add_metadata_boundary( add_metadata_batch(t);
&t->incoming_stream->parser.incoming_sopb);
stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS); stream_list_join(t, t->incoming_stream, PENDING_CALLBACKS);
} }
if (st.ack_settings) { if (st.ack_settings) {

@ -79,14 +79,12 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) {
gpr_slice_unref(ops[i].data.slice); gpr_slice_unref(ops[i].data.slice);
break; break;
case GRPC_OP_METADATA: case GRPC_OP_METADATA:
grpc_mdelem_unref(ops[i].data.metadata); /* grpc_mdelem_unref(ops[i].data.metadata); */
break; break;
case GRPC_OP_FLOW_CTL_CB: case GRPC_OP_FLOW_CTL_CB:
ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR); ops[i].data.flow_ctl_cb.cb(ops[i].data.flow_ctl_cb.arg, GRPC_OP_ERROR);
break; break;
case GRPC_NO_OP: case GRPC_NO_OP:
case GRPC_OP_DEADLINE:
case GRPC_OP_METADATA_BOUNDARY:
case GRPC_OP_BEGIN_MESSAGE: case GRPC_OP_BEGIN_MESSAGE:
break; break;
} }
@ -126,22 +124,11 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
op->data.begin_message.flags = flags; op->data.begin_message.flags = flags;
} }
void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb) { void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_METADATA_BOUNDARY;
}
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *md) {
grpc_stream_op *op = add(sopb); grpc_stream_op *op = add(sopb);
grpc_metadata_batch_assert_ok(&b);
op->type = GRPC_OP_METADATA; op->type = GRPC_OP_METADATA;
op->data.metadata = md; op->data.metadata = b;
}
void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb,
gpr_timespec deadline) {
grpc_stream_op *op = add(sopb);
op->type = GRPC_OP_DEADLINE;
op->data.deadline = deadline;
} }
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) {
@ -183,6 +170,7 @@ static void assert_valid_list(grpc_mdelem_list *list) {
GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL)); GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL));
for (l = list->head; l; l = l->next) { for (l = list->head; l; l = l->next) {
GPR_ASSERT(l->md);
GPR_ASSERT((l->prev == NULL) == (l == list->head)); GPR_ASSERT((l->prev == NULL) == (l == list->head));
GPR_ASSERT((l->next == NULL) == (l == list->tail)); GPR_ASSERT((l->next == NULL) == (l == list->tail));
if (l->next) GPR_ASSERT(l->next->prev == l); if (l->next) GPR_ASSERT(l->next->prev == l);
@ -190,6 +178,11 @@ static void assert_valid_list(grpc_mdelem_list *list) {
} }
} }
void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) {
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
}
void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); } void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); }
void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); } void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); }
@ -202,12 +195,14 @@ void grpc_metadata_batch_merge(grpc_metadata_batch *target,
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage, grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) { grpc_mdelem *elem_to_add) {
GPR_ASSERT(elem_to_add);
storage->md = elem_to_add; storage->md = elem_to_add;
grpc_metadata_batch_link_head(comd, storage); grpc_metadata_batch_link_head(comd, storage);
} }
static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list); assert_valid_list(list);
GPR_ASSERT(storage->md);
storage->prev = NULL; storage->prev = NULL;
storage->next = list->head; storage->next = list->head;
if (list->head != NULL) { if (list->head != NULL) {
@ -227,12 +222,14 @@ void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd, void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage, grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) { grpc_mdelem *elem_to_add) {
GPR_ASSERT(elem_to_add);
storage->md = elem_to_add; storage->md = elem_to_add;
grpc_metadata_batch_link_tail(comd, storage); grpc_metadata_batch_link_tail(comd, storage);
} }
static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list); assert_valid_list(list);
GPR_ASSERT(storage->md);
storage->prev = list->tail; storage->prev = list->tail;
storage->next = NULL; storage->next = NULL;
if (list->tail != NULL) { if (list->tail != NULL) {

@ -50,8 +50,6 @@ typedef enum grpc_stream_op_code {
Must be ignored by receivers */ Must be ignored by receivers */
GRPC_NO_OP, GRPC_NO_OP,
GRPC_OP_METADATA, GRPC_OP_METADATA,
GRPC_OP_DEADLINE,
GRPC_OP_METADATA_BOUNDARY,
/* Begin a message/metadata element/status - as defined by /* Begin a message/metadata element/status - as defined by
grpc_message_type. */ grpc_message_type. */
GRPC_OP_BEGIN_MESSAGE, GRPC_OP_BEGIN_MESSAGE,
@ -115,6 +113,8 @@ void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
grpc_mdelem *elem), grpc_mdelem *elem),
void *user_data); void *user_data);
void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd);
/* Represents a single operation performed on a stream/transport */ /* Represents a single operation performed on a stream/transport */
typedef struct grpc_stream_op { typedef struct grpc_stream_op {
/* the operation to be applied */ /* the operation to be applied */
@ -123,8 +123,7 @@ typedef struct grpc_stream_op {
associated op-code */ associated op-code */
union { union {
grpc_begin_message begin_message; grpc_begin_message begin_message;
grpc_mdelem *metadata; grpc_metadata_batch metadata;
gpr_timespec deadline;
gpr_slice slice; gpr_slice slice;
grpc_flow_ctl_cb flow_ctl_cb; grpc_flow_ctl_cb flow_ctl_cb;
} data; } data;
@ -157,9 +156,7 @@ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
/* Append a GRPC_OP_BEGIN to a buffer */ /* Append a GRPC_OP_BEGIN to a buffer */
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
gpr_uint32 flags); gpr_uint32 flags);
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_mdelem *metadata); void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
void grpc_sopb_add_deadline(grpc_stream_op_buffer *sopb, gpr_timespec deadline);
void grpc_sopb_add_metadata_boundary(grpc_stream_op_buffer *sopb);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ /* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ /* Append a GRPC_OP_FLOW_CTL_CB to a buffer */

@ -130,47 +130,65 @@ static void test_small_data_framing(void) {
verify_sopb(10, 0, 5, "000005 0000 deadbeef 00000000ff"); verify_sopb(10, 0, 5, "000005 0000 deadbeef 00000000ff");
} }
static void add_sopb_header(const char *key, const char *value) { static void free_md(void *p, grpc_op_error err) {
grpc_sopb_add_metadata(&g_sopb, gpr_free(p);
grpc_mdelem_from_strings(g_mdctx, key, value)); }
static void add_sopb_headers(int n, ...) {
int i;
grpc_metadata_batch b;
va_list l;
grpc_linked_mdelem *e = gpr_malloc(sizeof(*e) * n);
va_start(l, n);
for (i = 0; i < n; i++) {
char *key = va_arg(l, char *);
char *value = va_arg(l, char *);
if (i) {
e[i-1].next = &e[i];
e[i].prev = &e[i-1];
}
e[i].md = grpc_mdelem_from_strings(g_mdctx, key, value);
}
e[0].prev = NULL;
e[n-1].next = NULL;
va_end(l);
grpc_sopb_add_metadata(&g_sopb, b);
grpc_sopb_add_flow_ctl_cb(&g_sopb, free_md, e);
} }
static void test_basic_headers(void) { static void test_basic_headers(void) {
int i; int i;
add_sopb_header("a", "a"); add_sopb_headers(1, "a", "a");
verify_sopb(0, 0, 0, "000005 0104 deadbeef 40 0161 0161"); verify_sopb(0, 0, 0, "000005 0104 deadbeef 40 0161 0161");
add_sopb_header("a", "a"); add_sopb_headers(1, "a", "a");
verify_sopb(0, 0, 0, "000001 0104 deadbeef be"); verify_sopb(0, 0, 0, "000001 0104 deadbeef be");
add_sopb_header("a", "a"); add_sopb_headers(1, "a", "a");
verify_sopb(0, 0, 0, "000001 0104 deadbeef be"); verify_sopb(0, 0, 0, "000001 0104 deadbeef be");
add_sopb_header("a", "a"); add_sopb_headers(2, "a", "a", "b", "c");
add_sopb_header("b", "c");
verify_sopb(0, 0, 0, "000006 0104 deadbeef be 40 0162 0163"); verify_sopb(0, 0, 0, "000006 0104 deadbeef be 40 0162 0163");
add_sopb_header("a", "a"); add_sopb_headers(2, "a", "a", "b", "c");
add_sopb_header("b", "c");
verify_sopb(0, 0, 0, "000002 0104 deadbeef bf be"); verify_sopb(0, 0, 0, "000002 0104 deadbeef bf be");
add_sopb_header("a", "d"); add_sopb_headers(1, "a", "d");
verify_sopb(0, 0, 0, "000004 0104 deadbeef 7f 00 0164"); verify_sopb(0, 0, 0, "000004 0104 deadbeef 7f 00 0164");
/* flush out what's there to make a few values look very popular */ /* flush out what's there to make a few values look very popular */
for (i = 0; i < 350; i++) { for (i = 0; i < 350; i++) {
add_sopb_header("a", "a"); add_sopb_headers(3, "a", "a", "b", "c", "a", "d");
add_sopb_header("b", "c");
add_sopb_header("a", "d");
verify_sopb(0, 0, 0, "000003 0104 deadbeef c0 bf be"); verify_sopb(0, 0, 0, "000003 0104 deadbeef c0 bf be");
} }
add_sopb_header("a", "a"); add_sopb_headers(2, "a", "a", "k", "v");
add_sopb_header("k", "v");
verify_sopb(0, 0, 0, "000006 0104 deadbeef c0 00 016b 0176"); verify_sopb(0, 0, 0, "000006 0104 deadbeef c0 00 016b 0176");
add_sopb_header("a", "v"); add_sopb_headers(1, "a", "v");
/* this could be 000004 0104 deadbeef 0f 30 0176 also */ /* this could be 000004 0104 deadbeef 0f 30 0176 also */
verify_sopb(0, 0, 0, "000004 0104 deadbeef 0f 2f 0176"); verify_sopb(0, 0, 0, "000004 0104 deadbeef 0f 2f 0176");
} }
@ -190,7 +208,7 @@ static void test_decode_table_overflow(void) {
for (i = 0; i < 114; i++) { for (i = 0; i < 114; i++) {
if (i > 0) { if (i > 0) {
add_sopb_header("aa", "ba"); add_sopb_headers(1, "aa", "ba");
} }
encode_int_to_str(i, key); encode_int_to_str(i, key);
@ -209,14 +227,14 @@ static void test_decode_table_overflow(void) {
key[0], key[1], value[0], value[1]); key[0], key[1], value[0], value[1]);
} }
add_sopb_header(key, value); add_sopb_headers(1, key, value);
verify_sopb(0, 0, 0, expect); verify_sopb(0, 0, 0, expect);
gpr_free(expect); gpr_free(expect);
} }
/* if the above passes, then we must have just knocked this pair out of the /* if the above passes, then we must have just knocked this pair out of the
decoder stack, and so we'll be forced to re-encode it */ decoder stack, and so we'll be forced to re-encode it */
add_sopb_header("aa", "ba"); add_sopb_headers(1, "aa", "ba");
verify_sopb(0, 0, 0, "000007 0104 deadbeef 40 026161 026261"); verify_sopb(0, 0, 0, "000007 0104 deadbeef 40 026161 026261");
} }
@ -260,7 +278,7 @@ static void test_decode_random_headers_inner(int max_len) {
randstr(st.key, max_len); randstr(st.key, max_len);
randstr(st.value, max_len); randstr(st.value, max_len);
add_sopb_header(st.key, st.value); add_sopb_headers(1, st.key, st.value);
gpr_slice_buffer_init(&output); gpr_slice_buffer_init(&output);
GPR_ASSERT(0 == GPR_ASSERT(0 ==
grpc_chttp2_preencode(g_sopb.ops, &g_sopb.nops, 0, &encops)); grpc_chttp2_preencode(g_sopb.ops, &g_sopb.nops, 0, &encops));

Loading…
Cancel
Save