Move metadata batching to stream_op

pull/1312/head
Craig Tiller 10 years ago
parent 87d5b19da6
commit 205aee1c82
  1. 2
      src/core/channel/call_op_string.c
  2. 113
      src/core/channel/channel_stack.c
  3. 41
      src/core/channel/channel_stack.h
  4. 8
      src/core/channel/client_channel.c
  5. 10
      src/core/channel/http_client_filter.c
  6. 4
      src/core/channel/http_server_filter.c
  7. 2
      src/core/security/auth.c
  8. 8
      src/core/surface/call.c
  9. 2
      src/core/surface/call.h
  10. 2
      src/core/surface/server.c
  11. 114
      src/core/transport/stream_op.c
  12. 39
      src/core/transport/stream_op.h

@ -53,7 +53,7 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
GPR_HEXDUMP_PLAINTEXT));
}
static void put_metadata_list(gpr_strvec *b, grpc_call_op_metadata md) {
static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
grpc_linked_mdelem *m;
for (m = md.list.head; m; m = m->next) {
put_metadata(b, m->md);

@ -229,116 +229,3 @@ void grpc_call_element_recv_status(grpc_call_element *cur_elem,
const char *message) {
abort();
}
static void assert_valid_list(grpc_mdelem_list *list) {
grpc_linked_mdelem *l;
GPR_ASSERT((list->head == NULL) == (list->tail == NULL));
if (!list->head) return;
GPR_ASSERT(list->head->prev == NULL);
GPR_ASSERT(list->tail->next == NULL);
GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL));
for (l = list->head; l; l = l->next) {
GPR_ASSERT((l->prev == NULL) == (l == list->head));
GPR_ASSERT((l->next == NULL) == (l == list->tail));
if (l->next) GPR_ASSERT(l->next->prev == l);
if (l->prev) GPR_ASSERT(l->prev->next == l);
}
}
void grpc_call_op_metadata_init(grpc_call_op_metadata *comd) { abort(); }
void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd) { abort(); }
void grpc_call_op_metadata_merge(grpc_call_op_metadata *target,
grpc_call_op_metadata *add) {
abort();
}
void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_call_op_metadata_link_head(comd, storage);
}
static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = NULL;
storage->next = list->head;
if (list->head != NULL) {
list->head->prev = storage;
} else {
list->tail = storage;
}
list->head = storage;
assert_valid_list(list);
}
void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage) {
link_head(&comd->list, storage);
}
void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_call_op_metadata_link_tail(comd, storage);
}
static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = list->tail;
storage->next = NULL;
if (list->tail != NULL) {
list->tail->next = storage;
} else {
list->head = storage;
}
list->tail = storage;
assert_valid_list(list);
}
void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage) {
link_tail(&comd->list, storage);
}
void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),
void *user_data) {
grpc_linked_mdelem *l;
grpc_linked_mdelem *next;
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
for (l = comd->list.head; l; l = next) {
grpc_mdelem *orig = l->md;
grpc_mdelem *filt = filter(user_data, orig);
next = l->next;
if (filt == NULL) {
if (l->prev) {
l->prev->next = l->next;
}
if (l->next) {
l->next->prev = l->prev;
}
if (comd->list.head == l) {
comd->list.head = l->next;
}
if (comd->list.tail == l) {
comd->list.tail = l->prev;
}
assert_valid_list(&comd->list);
link_head(&comd->garbage, l);
} else if (filt != orig) {
grpc_mdelem_unref(orig);
l->md = filt;
}
}
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
}

@ -89,45 +89,6 @@ typedef enum {
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
struct grpc_linked_mdelem *prev;
} grpc_linked_mdelem;
typedef struct grpc_mdelem_list {
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
typedef struct grpc_call_op_metadata {
grpc_mdelem_list list;
grpc_mdelem_list garbage;
gpr_timespec deadline;
} grpc_call_op_metadata;
void grpc_call_op_metadata_init(grpc_call_op_metadata *comd);
void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd);
void grpc_call_op_metadata_merge(grpc_call_op_metadata *target,
grpc_call_op_metadata *add);
void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage);
void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage);
void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add);
void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add);
void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),
void *user_data);
/* A single filterable operation to be performed on a call */
typedef struct {
/* The type of operation we're performing */
@ -146,7 +107,7 @@ typedef struct {
grpc_pollset *pollset;
} start;
grpc_byte_buffer *message;
grpc_call_op_metadata metadata;
grpc_metadata_batch metadata;
} data;
/* Must be called when processing of this call-op is complete.

@ -83,7 +83,7 @@ struct call_data {
grpc_call_element *elem;
call_state state;
grpc_call_op_metadata pending_metadata;
grpc_metadata_batch pending_metadata;
gpr_uint32 pending_metadata_flags;
gpr_timespec deadline;
union {
@ -257,7 +257,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_call_op_metadata_merge(&calld->pending_metadata, &op->data.metadata);
grpc_metadata_batch_merge(&calld->pending_metadata, &op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_SEND_START:
@ -383,7 +383,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->s.waiting.on_complete = error_bad_on_complete;
calld->s.waiting.on_complete_user_data = NULL;
grpc_call_op_metadata_init(&calld->pending_metadata);
grpc_metadata_batch_init(&calld->pending_metadata);
}
/* Destructor for call_data */
@ -391,7 +391,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
/* if the metadata buffer is not flushed, destroy it here. */
grpc_call_op_metadata_destroy(&calld->pending_metadata);
grpc_metadata_batch_destroy(&calld->pending_metadata);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */

@ -80,18 +80,18 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_SEND_METADATA:
/* Send : prefixed headers, which have to be before any application
* layer headers. */
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->method,
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->scheme,
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
grpc_mdelem_ref(channeld->scheme));
grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->te_trailers,
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers,
grpc_mdelem_ref(channeld->te_trailers));
grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->content_type,
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
grpc_call_op_metadata_filter(&op->data.metadata, client_filter, elem);
grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
grpc_call_next_op(elem, op);
break;
default:

@ -158,7 +158,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_RECV_METADATA:
grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem);
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
if (!calld->got_initial_metadata) {
calld->got_initial_metadata = 1;
/* Have we seen the required http2 transport headers?
@ -188,7 +188,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_SEND_METADATA:
/* If we haven't sent status 200 yet, we need to so so because it needs to
come before any non : prefixed metadata. */
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->status,
grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
grpc_mdelem_ref(channeld->status_ok));
grpc_call_next_op(elem, op);
break;

@ -79,7 +79,7 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t i;
GPR_ASSERT(num_md <= MAX_CREDENTIAL_METADATA_COUNT);
for (i = 0; i < num_md; i++) {
grpc_call_op_metadata_add_tail(&op.data.metadata, &calld->md_links[i],
grpc_metadata_batch_add_tail(&op.data.metadata, &calld->md_links[i],
grpc_mdelem_ref(md_elems[i]));
}
grpc_call_next_op(elem, &op);

@ -657,7 +657,7 @@ static void enact_send_action(grpc_call *call, send_action sa) {
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
grpc_call_op_metadata_link_head(&op.data.metadata,
grpc_metadata_batch_link_head(&op.data.metadata,
&call->send_initial_metadata[i]);
}
op.done_cb = do_nothing;
@ -698,14 +698,14 @@ static void enact_send_action(grpc_call *call, send_action sa) {
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
grpc_call_op_metadata_add_tail(
grpc_metadata_batch_add_tail(
&op.data.metadata, &call->status_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
grpc_call_op_metadata_add_tail(
grpc_metadata_batch_add_tail(
&op.data.metadata, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
@ -984,7 +984,7 @@ void grpc_call_recv_message(grpc_call_element *elem,
}
int grpc_call_recv_metadata(grpc_call_element *elem,
grpc_call_op_metadata *md) {
grpc_metadata_batch *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_linked_mdelem *l;
grpc_metadata_array *dest;

@ -100,7 +100,7 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
the completion queue/surface layer */
/* receive metadata - returns 1 if this was initial metadata */
int grpc_call_recv_metadata(grpc_call_element *surface_element,
grpc_call_op_metadata *md);
grpc_metadata_batch *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);

@ -431,7 +431,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem);
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
calld->deadline = op->data.metadata.deadline;
start_new_rpc(elem);

@ -171,3 +171,117 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops);
sopb->nops = new_nops;
}
static void assert_valid_list(grpc_mdelem_list *list) {
grpc_linked_mdelem *l;
GPR_ASSERT((list->head == NULL) == (list->tail == NULL));
if (!list->head) return;
GPR_ASSERT(list->head->prev == NULL);
GPR_ASSERT(list->tail->next == NULL);
GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL));
for (l = list->head; l; l = l->next) {
GPR_ASSERT((l->prev == NULL) == (l == list->head));
GPR_ASSERT((l->next == NULL) == (l == list->tail));
if (l->next) GPR_ASSERT(l->next->prev == l);
if (l->prev) GPR_ASSERT(l->prev->next == l);
}
}
void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); }
void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); }
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
grpc_metadata_batch *add) {
abort();
}
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_metadata_batch_link_head(comd, storage);
}
static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = NULL;
storage->next = list->head;
if (list->head != NULL) {
list->head->prev = storage;
} else {
list->tail = storage;
}
list->head = storage;
assert_valid_list(list);
}
void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage) {
link_head(&comd->list, storage);
}
void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_metadata_batch_link_tail(comd, storage);
}
static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = list->tail;
storage->next = NULL;
if (list->tail != NULL) {
list->tail->next = storage;
} else {
list->head = storage;
}
list->tail = storage;
assert_valid_list(list);
}
void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage) {
link_tail(&comd->list, storage);
}
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),
void *user_data) {
grpc_linked_mdelem *l;
grpc_linked_mdelem *next;
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
for (l = comd->list.head; l; l = next) {
grpc_mdelem *orig = l->md;
grpc_mdelem *filt = filter(user_data, orig);
next = l->next;
if (filt == NULL) {
if (l->prev) {
l->prev->next = l->next;
}
if (l->next) {
l->next->prev = l->prev;
}
if (comd->list.head == l) {
comd->list.head = l->next;
}
if (comd->list.tail == l) {
comd->list.tail = l->prev;
}
assert_valid_list(&comd->list);
link_head(&comd->garbage, l);
} else if (filt != orig) {
grpc_mdelem_unref(orig);
l->md = filt;
}
}
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
}

@ -76,6 +76,45 @@ typedef struct grpc_flow_ctl_cb {
void *arg;
} grpc_flow_ctl_cb;
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
struct grpc_linked_mdelem *prev;
} grpc_linked_mdelem;
typedef struct grpc_mdelem_list {
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
typedef struct grpc_metadata_batch {
grpc_mdelem_list list;
grpc_mdelem_list garbage;
gpr_timespec deadline;
} grpc_metadata_batch;
void grpc_metadata_batch_init(grpc_metadata_batch *comd);
void grpc_metadata_batch_destroy(grpc_metadata_batch *comd);
void grpc_metadata_batch_merge(grpc_metadata_batch *target,
grpc_metadata_batch *add);
void grpc_metadata_batch_link_head(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage);
void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage);
void grpc_metadata_batch_add_head(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add);
void grpc_metadata_batch_add_tail(grpc_metadata_batch *comd,
grpc_linked_mdelem *storage,
grpc_mdelem *elem_to_add);
void grpc_metadata_batch_filter(grpc_metadata_batch *comd,
grpc_mdelem *(*filter)(void *user_data,
grpc_mdelem *elem),
void *user_data);
/* Represents a single operation performed on a stream/transport */
typedef struct grpc_stream_op {
/* the operation to be applied */

Loading…
Cancel
Save