Switching to batch oriented metadata passing

pull/1312/head
Craig Tiller 10 years ago
parent b12dc6b5bc
commit 6902ad2e9c
  1. 4
      BUILD
  2. 47
      Makefile
  3. 16
      build.json
  4. 9
      include/grpc/grpc.h
  5. 29
      src/core/channel/call_op_string.c
  6. 13
      src/core/channel/census_filter.c
  7. 135
      src/core/channel/channel_stack.c
  8. 43
      src/core/channel/channel_stack.h
  9. 51
      src/core/channel/client_channel.c
  10. 92
      src/core/channel/connected_channel.c
  11. 64
      src/core/channel/http_client_filter.c
  12. 240
      src/core/channel/http_server_filter.c
  13. 149
      src/core/channel/metadata_buffer.c
  14. 70
      src/core/channel/metadata_buffer.h
  15. 52
      src/core/security/auth.c
  16. 183
      src/core/surface/call.c
  17. 17
      src/core/surface/call.h
  18. 52
      src/core/surface/channel.c
  19. 12
      src/core/surface/client.c
  20. 21
      src/core/surface/lame_client.c
  21. 39
      src/core/surface/server.c
  22. 6
      templates/Makefile.template
  23. 201
      test/core/channel/metadata_buffer_test.c
  24. 19
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  25. 20
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload_legacy.c
  26. 6
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  27. 8
      test/core/end2end/tests/request_response_with_metadata_and_payload_legacy.c
  28. 12
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload_legacy.c
  29. 2
      test/core/surface/lame_client_test.c
  30. 1
      tools/run_tests/run_tests.py
  31. 9
      tools/run_tests/tests.json
  32. 12
      vsprojects/vs2010/Grpc.mak
  33. 3
      vsprojects/vs2010/grpc.vcxproj
  34. 6
      vsprojects/vs2010/grpc.vcxproj.filters
  35. 3
      vsprojects/vs2010/grpc_unsecure.vcxproj
  36. 6
      vsprojects/vs2010/grpc_unsecure.vcxproj.filters
  37. 10
      vsprojects/vs2013/Grpc.mak
  38. 3
      vsprojects/vs2013/grpc.vcxproj
  39. 6
      vsprojects/vs2013/grpc.vcxproj.filters
  40. 3
      vsprojects/vs2013/grpc_unsecure.vcxproj
  41. 6
      vsprojects/vs2013/grpc_unsecure.vcxproj.filters

@ -147,7 +147,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/metadata_buffer.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
@ -258,7 +257,6 @@ cc_library(
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/metadata_buffer.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
"src/core/compression/message_compress.c",
@ -378,7 +376,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/metadata_buffer.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
@ -469,7 +466,6 @@ cc_library(
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/metadata_buffer.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
"src/core/compression/message_compress.c",

File diff suppressed because one or more lines are too long

@ -98,7 +98,6 @@
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/metadata_buffer.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
@ -190,7 +189,6 @@
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/metadata_buffer.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
"src/core/compression/message_compress.c",
@ -1574,20 +1572,6 @@
"gpr"
]
},
{
"name": "metadata_buffer_test",
"build": "test",
"language": "c",
"src": [
"test/core/channel/metadata_buffer_test.c"
],
"deps": [
"grpc_test_util",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "multi_init_test",
"build": "test",

@ -186,6 +186,13 @@ typedef struct grpc_metadata {
const char *key;
const char *value;
size_t value_length;
/* The following fields are reserved for grpc internal use.
There is no need to initialize them, and they will be set to garbage during
calls to grpc. */
struct {
void *some_random_pointers[3];
} internal_data;
} grpc_metadata;
typedef enum grpc_completion_type {
@ -295,7 +302,7 @@ typedef struct grpc_op {
union {
struct {
size_t count;
const grpc_metadata *metadata;
grpc_metadata *metadata;
} send_initial_metadata;
grpc_byte_buffer *send_message;
struct {

@ -51,6 +51,18 @@ static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT));
}
static void put_metadata_list(gpr_strvec *b, grpc_call_op_metadata md) {
grpc_linked_mdelem *m;
for (m = md.list.head; m; m = m->next) {
put_metadata(b, m->md);
}
if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
char *tmp;
gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, md.deadline.tv_nsec);
gpr_strvec_add(b, tmp);
}
}
char *grpc_call_op_string(grpc_call_op *op) {
char *tmp;
char *out;
@ -69,12 +81,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
switch (op->type) {
case GRPC_SEND_METADATA:
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
put_metadata(&b, op->data.metadata);
break;
case GRPC_SEND_DEADLINE:
gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
op->data.deadline.tv_nsec);
gpr_strvec_add(&b, tmp);
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_SEND_START:
gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset);
@ -94,15 +101,7 @@ char *grpc_call_op_string(grpc_call_op *op) {
break;
case GRPC_RECV_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
put_metadata(&b, op->data.metadata);
break;
case GRPC_RECV_DEADLINE:
gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
op->data.deadline.tv_nsec);
gpr_strvec_add(&b, tmp);
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA"));
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));

@ -62,11 +62,14 @@ static void init_rpc_stats(census_rpc_stats* stats) {
static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
channel_data* chand) {
if (op->data.metadata->key == chand->path_str) {
gpr_log(GPR_DEBUG,
(const char*)GPR_SLICE_START_PTR(op->data.metadata->value->slice));
census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
op->data.metadata->value->slice));
grpc_linked_mdelem *m;
for (m = op->data.metadata.list.head; m; m = m->next) {
if (m->md->key == chand->path_str) {
gpr_log(GPR_DEBUG,
(const char*)GPR_SLICE_START_PTR(m->md->value->slice));
census_add_method_tag(calld->op_id, (const char*)GPR_SLICE_START_PTR(
m->md->value->slice));
}
}
}

@ -205,30 +205,6 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
static void do_nothing(void *user_data, grpc_op_error error) {}
void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
grpc_mdelem *mdelem) {
grpc_call_op metadata_op;
metadata_op.type = GRPC_RECV_METADATA;
metadata_op.dir = GRPC_CALL_UP;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
grpc_mdelem *mdelem) {
grpc_call_op metadata_op;
metadata_op.type = GRPC_SEND_METADATA;
metadata_op.dir = GRPC_CALL_DOWN;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
grpc_call_op cancel_op;
cancel_op.type = GRPC_CANCEL_OP;
@ -248,3 +224,114 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
finish_op.flags = 0;
grpc_call_next_op(cur_elem, &finish_op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message) {
abort();
}
static void assert_valid_list(grpc_mdelem_list *list) {
grpc_linked_mdelem *l;
GPR_ASSERT((list->head == NULL) == (list->tail == NULL));
if (!list->head) return;
GPR_ASSERT(list->head->prev == NULL);
GPR_ASSERT(list->tail->next == NULL);
GPR_ASSERT((list->head == list->tail) == (list->head->next == NULL));
for (l = list->head; l; l = l->next) {
GPR_ASSERT((l->prev == NULL) == (l == list->head));
GPR_ASSERT((l->next == NULL) == (l == list->tail));
if (l->next) GPR_ASSERT(l->next->prev == l);
if (l->prev) GPR_ASSERT(l->prev->next == l);
}
}
void grpc_call_op_metadata_init(grpc_call_op_metadata *comd) {
abort();
}
void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd) {
abort();
}
void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, grpc_call_op_metadata *add) {
abort();
}
void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_call_op_metadata_link_head(comd, storage);
}
static void link_head(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = NULL;
storage->next = list->head;
if (list->head != NULL) {
list->head->prev = storage;
} else {
list->tail = storage;
}
list->head = storage;
assert_valid_list(list);
}
void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage) {
link_head(&comd->list, storage);
}
void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add) {
storage->md = elem_to_add;
grpc_call_op_metadata_link_tail(comd, storage);
}
static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
assert_valid_list(list);
storage->prev = list->tail;
storage->next = NULL;
if (list->tail != NULL) {
list->tail->next = storage;
} else {
list->head = storage;
}
list->tail = storage;
assert_valid_list(list);
}
void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage) {
link_tail(&comd->list, storage);
}
void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem), void *user_data) {
grpc_linked_mdelem *l;
grpc_linked_mdelem *next;
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
for (l = comd->list.head; l; l = next) {
grpc_mdelem *orig = l->md;
grpc_mdelem *filt = filter(user_data, orig);
next = l->next;
if (filt == NULL) {
if (l->prev) {
l->prev->next = l->next;
}
if (l->next) {
l->next->prev = l->prev;
}
if (comd->list.head == l) {
comd->list.head = l->next;
}
if (comd->list.tail == l) {
comd->list.tail = l->prev;
}
assert_valid_list(&comd->list);
link_head(&comd->garbage, l);
} else if (filt != orig) {
grpc_mdelem_unref(orig);
l->md = filt;
}
}
assert_valid_list(&comd->list);
assert_valid_list(&comd->garbage);
}

@ -62,8 +62,6 @@ typedef struct grpc_call_element grpc_call_element;
typedef enum {
/* send metadata to the channels peer */
GRPC_SEND_METADATA,
/* send a deadline */
GRPC_SEND_DEADLINE,
/* start a connection (corresponds to start_invoke/accept) */
GRPC_SEND_START,
/* send a message to the channels peer */
@ -76,10 +74,6 @@ typedef enum {
GRPC_REQUEST_DATA,
/* metadata was received from the channels peer */
GRPC_RECV_METADATA,
/* receive a deadline */
GRPC_RECV_DEADLINE,
/* the end of the first batch of metadata was received */
GRPC_RECV_END_OF_INITIAL_METADATA,
/* a message was received from the channels peer */
GRPC_RECV_MESSAGE,
/* half-close was received from the channels peer */
@ -95,6 +89,35 @@ typedef enum {
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
struct grpc_linked_mdelem *prev;
} grpc_linked_mdelem;
typedef struct grpc_mdelem_list {
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
typedef struct grpc_call_op_metadata {
grpc_mdelem_list list;
grpc_mdelem_list garbage;
gpr_timespec deadline;
} grpc_call_op_metadata;
void grpc_call_op_metadata_init(grpc_call_op_metadata *comd);
void grpc_call_op_metadata_destroy(grpc_call_op_metadata *comd);
void grpc_call_op_metadata_merge(grpc_call_op_metadata *target, grpc_call_op_metadata *add);
void grpc_call_op_metadata_link_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage);
void grpc_call_op_metadata_link_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage);
void grpc_call_op_metadata_add_head(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add);
void grpc_call_op_metadata_add_tail(grpc_call_op_metadata *comd, grpc_linked_mdelem *storage, grpc_mdelem *elem_to_add);
void grpc_call_op_metadata_filter(grpc_call_op_metadata *comd, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem), void *user_data);
/* A single filterable operation to be performed on a call */
typedef struct {
/* The type of operation we're performing */
@ -113,8 +136,7 @@ typedef struct {
grpc_pollset *pollset;
} start;
grpc_byte_buffer *message;
grpc_mdelem *metadata;
gpr_timespec deadline;
grpc_call_op_metadata metadata;
} data;
/* Must be called when processing of this call-op is complete.
@ -291,12 +313,9 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_call_op *op);
void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
grpc_mdelem *elem);
void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
grpc_mdelem *elem);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
void grpc_call_element_send_finish(grpc_call_element *cur_elem);
void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message);
extern int grpc_trace_channel;

@ -38,7 +38,6 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/child_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
@ -70,9 +69,6 @@ typedef struct {
int transport_setup_initiated;
grpc_channel_args *args;
/* metadata cache */
grpc_mdelem *cancel_status;
} channel_data;
typedef enum {
@ -87,7 +83,8 @@ struct call_data {
grpc_call_element *elem;
call_state state;
grpc_metadata_buffer pending_metadata;
grpc_call_op_metadata pending_metadata;
gpr_uint32 pending_metadata_flags;
gpr_timespec deadline;
union {
struct {
@ -124,22 +121,18 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
grpc_call_op mop;
GPR_ASSERT(calld->state == CALL_ACTIVE);
/* sending buffered metadata down the stack before the start call */
grpc_metadata_buffer_flush(&calld->pending_metadata, child_elem);
if (gpr_time_cmp(calld->deadline, gpr_inf_future) != 0) {
grpc_call_op dop;
dop.type = GRPC_SEND_DEADLINE;
dop.dir = GRPC_CALL_DOWN;
dop.flags = 0;
dop.data.deadline = calld->deadline;
dop.done_cb = do_nothing;
dop.user_data = NULL;
child_elem->filter->call_op(child_elem, elem, &dop);
}
mop.type = GRPC_SEND_METADATA;
mop.dir = GRPC_CALL_DOWN;
mop.flags = calld->pending_metadata_flags;
mop.data.metadata = calld->pending_metadata;
mop.done_cb = do_nothing;
mop.user_data = NULL;
child_elem->filter->call_op(child_elem, elem, &mop);
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
@ -212,15 +205,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
static void send_up_cancelled_ops(grpc_call_element *elem) {
grpc_call_op finish_op;
channel_data *chand = elem->channel_data;
/* send up a synthesized status */
finish_op.type = GRPC_RECV_METADATA;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.data.metadata = grpc_mdelem_ref(chand->cancel_status);
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
/* send up a finish */
finish_op.type = GRPC_RECV_FINISH;
finish_op.dir = GRPC_CALL_UP;
@ -271,10 +257,7 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_metadata_buffer_queue(&calld->pending_metadata, op);
break;
case GRPC_SEND_DEADLINE:
calld->deadline = op->data.deadline;
grpc_call_op_metadata_merge(&calld->pending_metadata, &op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_SEND_START:
@ -400,7 +383,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->s.waiting.on_complete = error_bad_on_complete;
calld->s.waiting.on_complete_user_data = NULL;
grpc_metadata_buffer_init(&calld->pending_metadata);
grpc_call_op_metadata_init(&calld->pending_metadata);
}
/* Destructor for call_data */
@ -408,7 +391,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
/* if the metadata buffer is not flushed, destroy it here. */
grpc_metadata_buffer_destroy(&calld->pending_metadata, GRPC_OP_OK);
grpc_call_op_metadata_destroy(&calld->pending_metadata);
/* if the call got activated, we need to destroy the child stack also, and
remove it from the in-flight requests tracked by the child_entry we
picked */
@ -423,7 +406,6 @@ static void init_channel_elem(grpc_channel_element *elem,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
channel_data *chand = elem->channel_data;
char temp[GPR_LTOA_MIN_BUFSIZE];
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
@ -437,10 +419,6 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->transport_setup = NULL;
chand->transport_setup_initiated = 0;
chand->args = grpc_channel_args_copy(args);
gpr_ltoa(GRPC_STATUS_CANCELLED, temp);
chand->cancel_status =
grpc_mdelem_from_strings(metadata_context, "grpc-status", temp);
}
/* Destructor for channel_data */
@ -455,7 +433,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
grpc_channel_args_destroy(chand->args);
grpc_mdelem_unref(chand->cancel_status);
gpr_mu_destroy(&chand->mu);
GPR_ASSERT(chand->waiting_child_count == 0);

@ -60,10 +60,14 @@ typedef struct connected_channel_call_data {
gpr_uint32 max_message_length;
gpr_uint32 incoming_message_length;
gpr_uint8 reading_message;
gpr_uint8 got_metadata_boundary;
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
grpc_linked_mdelem *incoming_metadata;
size_t incoming_metadata_count;
size_t incoming_metadata_capacity;
gpr_timespec deadline;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@ -116,18 +120,19 @@ static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
call_data *calld = elem->call_data;
grpc_linked_mdelem *m;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
op->user_data);
break;
case GRPC_SEND_DEADLINE:
grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.deadline);
for (m = op->data.metadata.list.head; m; m = m->next) {
grpc_sopb_add_metadata(&calld->outgoing_sopb, m->md);
}
if (gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future) != 0) {
grpc_sopb_add_deadline(&calld->outgoing_sopb, op->data.metadata.deadline);
}
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb,
op->user_data);
break;
@ -200,10 +205,12 @@ static void init_call_elem(grpc_call_element *elem,
grpc_sopb_init(&calld->outgoing_sopb);
calld->reading_message = 0;
calld->got_metadata_boundary = 0;
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
calld->incoming_metadata = NULL;
calld->incoming_metadata_capacity = 0;
calld->incoming_metadata_count = 0;
gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
@ -320,6 +327,49 @@ static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_next_op(elem, &call_op);
}
static void metadata_done_cb(void *ptr, grpc_op_error error) {
gpr_free(ptr);
}
static void add_incoming_metadata(call_data *calld, grpc_mdelem *elem) {
if (calld->incoming_metadata_count == calld->incoming_metadata_capacity) {
calld->incoming_metadata_capacity = GPR_MAX(8, 2 * calld->incoming_metadata_capacity);
calld->incoming_metadata = gpr_realloc(calld->incoming_metadata, sizeof(*calld->incoming_metadata) * calld->incoming_metadata_capacity);
}
calld->incoming_metadata[calld->incoming_metadata_count++].md = elem;
}
static void flush_metadata(grpc_call_element *elem) {
grpc_call_op op;
call_data *calld = elem->call_data;
size_t i;
for (i = 1; i < calld->incoming_metadata_count; i++) {
calld->incoming_metadata[i].prev = &calld->incoming_metadata[i-1];
}
for (i = 0; i < calld->incoming_metadata_count - 1; i++) {
calld->incoming_metadata[i].next = &calld->incoming_metadata[i+1];
}
calld->incoming_metadata[0].prev = calld->incoming_metadata[calld->incoming_metadata_count-1].next = NULL;
op.type = GRPC_RECV_METADATA;
op.dir = GRPC_CALL_UP;
op.flags = 0;
op.data.metadata.list.head = &calld->incoming_metadata[0];
op.data.metadata.list.tail = &calld->incoming_metadata[calld->incoming_metadata_count - 1];
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = calld->deadline;
op.done_cb = metadata_done_cb;
op.user_data = calld->incoming_metadata;
grpc_call_next_op(elem, &op);
calld->incoming_metadata = NULL;
calld->incoming_metadata_count = 0;
calld->incoming_metadata_capacity = 0;
}
/* Handle incoming stream ops from the transport, translating them into
call_ops to pass up the call stack */
static void recv_batch(void *user_data, grpc_transport *transport,
@ -346,33 +396,13 @@ static void recv_batch(void *user_data, grpc_transport *transport,
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
call_op.type = GRPC_RECV_METADATA;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
call_op.data.metadata = stream_op->data.metadata;
call_op.done_cb = do_nothing;
call_op.user_data = NULL;
grpc_call_next_op(elem, &call_op);
add_incoming_metadata(calld, stream_op->data.metadata);
break;
case GRPC_OP_DEADLINE:
call_op.type = GRPC_RECV_DEADLINE;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
call_op.data.deadline = stream_op->data.deadline;
call_op.done_cb = do_nothing;
call_op.user_data = NULL;
grpc_call_next_op(elem, &call_op);
calld->deadline = stream_op->data.deadline;
break;
case GRPC_OP_METADATA_BOUNDARY:
if (!calld->got_metadata_boundary) {
calld->got_metadata_boundary = 1;
call_op.type = GRPC_RECV_END_OF_INITIAL_METADATA;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
call_op.done_cb = do_nothing;
call_op.user_data = NULL;
grpc_call_next_op(elem, &call_op);
}
flush_metadata(elem);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */

@ -35,7 +35,10 @@
#include <grpc/support/log.h>
typedef struct call_data {
int sent_headers;
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
} call_data;
typedef struct channel_data {
@ -49,6 +52,18 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
if (md == channeld->status) {
return NULL;
} else if (md->key == channeld->status->key) {
grpc_call_element_send_cancel(elem);
return NULL;
}
return md;
}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
@ -61,42 +76,19 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
ignore_unused(calld);
switch (op->type) {
case GRPC_SEND_METADATA:
if (!calld->sent_headers) {
/* Send : prefixed headers, which have to be before any application
* layer headers. */
calld->sent_headers = 1;
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
}
grpc_call_next_op(elem, op);
break;
case GRPC_SEND_START:
if (!calld->sent_headers) {
/* Send : prefixed headers, if we haven't already */
calld->sent_headers = 1;
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->method));
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->scheme));
}
/* Send non : prefixed headers */
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->te_trailers));
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(channeld->content_type));
/* Send : prefixed headers, which have to be before any application
* layer headers. */
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->method, grpc_mdelem_ref(channeld->method));
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->scheme, grpc_mdelem_ref(channeld->scheme));
grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->te_trailers, grpc_mdelem_ref(channeld->te_trailers));
grpc_call_op_metadata_add_tail(&op->data.metadata, &calld->content_type, grpc_mdelem_ref(channeld->content_type));
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
if (op->data.metadata == channeld->status) {
grpc_mdelem_unref(op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (op->data.metadata->key == channeld->status->key) {
grpc_mdelem_unref(op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
grpc_call_element_send_cancel(elem);
} else {
grpc_call_next_op(elem, op);
}
grpc_call_op_metadata_filter(&op->data.metadata, client_filter, elem);
grpc_call_next_op(elem, op);
break;
default:
/* pass control up or down the stack depending on op->dir */
@ -125,14 +117,6 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
ignore_unused(channeld);
/* initialize members */
calld->sent_headers = 0;
}
/* Destructor for call_data */

@ -38,8 +38,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
typedef enum { NOT_RECEIVED, POST, GET } known_method_type;
typedef struct {
grpc_mdelem *path;
grpc_mdelem *content_type;
@ -47,16 +45,17 @@ typedef struct {
} gettable;
typedef struct call_data {
known_method_type seen_method;
gpr_uint8 got_initial_metadata;
gpr_uint8 seen_path;
gpr_uint8 seen_post;
gpr_uint8 sent_status;
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_mdelem *path;
grpc_linked_mdelem status;
} call_data;
typedef struct channel_data {
grpc_mdelem *te_trailers;
grpc_mdelem *method_get;
grpc_mdelem *method_post;
grpc_mdelem *http_scheme;
grpc_mdelem *https_scheme;
@ -78,38 +77,75 @@ typedef struct channel_data {
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
/* Handle 'GET': not technically grpc, so probably a web browser hitting
us */
static void payload_done(void *elem, grpc_op_error error) {
if (error == GRPC_OP_OK) {
grpc_call_element_send_finish(elem);
}
}
static void handle_get(grpc_call_element *elem) {
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
call_data *calld = elem->call_data;
grpc_call_op op;
size_t i;
for (i = 0; i < channeld->gettable_count; i++) {
if (channeld->gettables[i].path == calld->path) {
grpc_call_element_send_metadata(elem,
grpc_mdelem_ref(channeld->status_ok));
grpc_call_element_send_metadata(
elem, grpc_mdelem_ref(channeld->gettables[i].content_type));
op.type = GRPC_SEND_PREFORMATTED_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.message = channeld->gettables[i].content;
op.done_cb = payload_done;
op.user_data = elem;
grpc_call_next_op(elem, &op);
/* Check if it is one of the headers we care about. */
if (md == channeld->te_trailers ||
md == channeld->method_post ||
md == channeld->http_scheme ||
md == channeld->https_scheme ||
md == channeld->grpc_scheme ||
md == channeld->content_type) {
/* swallow it */
if (md == channeld->method_post) {
calld->seen_post = 1;
} else if (md->key == channeld->http_scheme->key) {
calld->seen_scheme = 1;
} else if (md == channeld->te_trailers) {
calld->seen_te_trailers = 1;
}
/* TODO(klempner): Track that we've seen all the headers we should
require */
return NULL;
} else if (md->key == channeld->content_type->key) {
if (strncmp(grpc_mdstr_as_c_string(md->value),
"application/grpc+", 17) == 0) {
/* Although the C implementation doesn't (currently) generate them,
any
custom +-suffix is explicitly valid. */
/* TODO(klempner): We should consider preallocating common values such
as +proto or +json, or at least stashing them if we see them. */
/* TODO(klempner): Should we be surfacing this to application code? */
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
gpr_log(GPR_INFO, "Unexpected content-type %s",
channeld->content_type->key);
}
return NULL;
} else if (md->key == channeld->te_trailers->key ||
md->key == channeld->method_post->key ||
md->key == channeld->http_scheme->key ||
md->key == channeld->content_type->key) {
gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value));
/* swallow it and error everything out. */
/* TODO(klempner): We ought to generate more descriptive error messages
on the wire here. */
grpc_call_element_send_cancel(elem);
return NULL;
} else if (md->key == channeld->path_key) {
if (calld->seen_path) {
gpr_log(GPR_ERROR, "Received :path twice");
return NULL;
}
calld->seen_path = 1;
return md;
} else if (md->key == channeld->host_key) {
/* translate host to :authority since :authority may be
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
return authority;
} else {
return md;
}
grpc_call_element_send_metadata(elem,
grpc_mdelem_ref(channeld->status_not_found));
grpc_call_element_send_finish(elem);
}
/* Called either:
@ -126,116 +162,37 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
switch (op->type) {
case GRPC_RECV_METADATA:
/* Check if it is one of the headers we care about. */
if (op->data.metadata == channeld->te_trailers ||
op->data.metadata == channeld->method_get ||
op->data.metadata == channeld->method_post ||
op->data.metadata == channeld->http_scheme ||
op->data.metadata == channeld->https_scheme ||
op->data.metadata == channeld->grpc_scheme ||
op->data.metadata == channeld->content_type) {
/* swallow it */
if (op->data.metadata == channeld->method_get) {
calld->seen_method = GET;
} else if (op->data.metadata == channeld->method_post) {
calld->seen_method = POST;
} else if (op->data.metadata->key == channeld->http_scheme->key) {
calld->seen_scheme = 1;
} else if (op->data.metadata == channeld->te_trailers) {
calld->seen_te_trailers = 1;
}
/* TODO(klempner): Track that we've seen all the headers we should
require */
grpc_mdelem_unref(op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (op->data.metadata->key == channeld->content_type->key) {
if (strncmp(grpc_mdstr_as_c_string(op->data.metadata->value),
"application/grpc+", 17) == 0) {
/* Although the C implementation doesn't (currently) generate them,
any
custom +-suffix is explicitly valid. */
/* TODO(klempner): We should consider preallocating common values such
as +proto or +json, or at least stashing them if we see them. */
/* TODO(klempner): Should we be surfacing this to application code? */
grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem);
if (!calld->got_initial_metadata) {
calld->got_initial_metadata = 1;
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_post && calld->seen_scheme &&
calld->seen_te_trailers && calld->seen_path) {
grpc_call_next_op(elem, op);
} else {
/* TODO(klempner): We're currently allowing this, but we shouldn't
see it without a proxy so log for now. */
gpr_log(GPR_INFO, "Unexpected content-type %s",
channeld->content_type->key);
}
grpc_mdelem_unref(op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (op->data.metadata->key == channeld->te_trailers->key ||
op->data.metadata->key == channeld->method_post->key ||
op->data.metadata->key == channeld->http_scheme->key ||
op->data.metadata->key == channeld->content_type->key) {
gpr_log(GPR_ERROR, "Invalid %s: header: '%s'",
grpc_mdstr_as_c_string(op->data.metadata->key),
grpc_mdstr_as_c_string(op->data.metadata->value));
/* swallow it and error everything out. */
/* TODO(klempner): We ought to generate more descriptive error messages
on the wire here. */
grpc_mdelem_unref(op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
grpc_call_element_send_cancel(elem);
} else if (op->data.metadata->key == channeld->path_key) {
if (calld->path != NULL) {
gpr_log(GPR_ERROR, "Received :path twice");
grpc_mdelem_unref(calld->path);
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
op->done_cb(op->user_data, GRPC_OP_OK);
grpc_call_element_send_cancel(elem);
}
calld->path = op->data.metadata;
op->done_cb(op->user_data, GRPC_OP_OK);
} else if (op->data.metadata->key == channeld->host_key) {
/* translate host to :authority since :authority may be
omitted */
grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
channeld->mdctx, grpc_mdstr_ref(channeld->authority_key),
grpc_mdstr_ref(op->data.metadata->value));
grpc_mdelem_unref(op->data.metadata);
op->data.metadata = authority;
/* pass the event up */
grpc_call_next_op(elem, op);
} else {
/* pass the event up */
grpc_call_next_op(elem, op);
}
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_method == POST && calld->seen_scheme &&
calld->seen_te_trailers && calld->path) {
grpc_call_element_recv_metadata(elem, calld->path);
calld->path = NULL;
grpc_call_next_op(elem, op);
} else if (calld->seen_method == GET) {
handle_get(elem);
} else {
if (calld->seen_method == NOT_RECEIVED) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
op->done_cb(op->user_data, GRPC_OP_OK);
grpc_call_element_send_cancel(elem);
}
break;
case GRPC_SEND_START:
case GRPC_SEND_METADATA:
/* If we haven't sent status 200 yet, we need to so so because it needs to
come before any non : prefixed metadata. */
if (!calld->sent_status) {
calld->sent_status = 1;
/* status is reffed by grpc_call_element_send_metadata */
grpc_call_element_send_metadata(elem,
grpc_mdelem_ref(channeld->status_ok));
}
grpc_call_op_metadata_add_head(&op->data.metadata, &calld->status, grpc_mdelem_ref(channeld->status_ok));
grpc_call_next_op(elem, op);
break;
default:
@ -272,24 +229,11 @@ static void init_call_elem(grpc_call_element *elem,
ignore_unused(channeld);
/* initialize members */
calld->path = NULL;
calld->sent_status = 0;
calld->seen_scheme = 0;
calld->seen_method = NOT_RECEIVED;
calld->seen_te_trailers = 0;
memset(calld, 0, sizeof(*calld));
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
ignore_unused(channeld);
if (calld->path) {
grpc_mdelem_unref(calld->path);
}
}
/* Constructor for channel_data */
@ -314,7 +258,6 @@ static void init_channel_elem(grpc_channel_element *elem,
channeld->status_not_found =
grpc_mdelem_from_strings(mdctx, ":status", "404");
channeld->method_post = grpc_mdelem_from_strings(mdctx, ":method", "POST");
channeld->method_get = grpc_mdelem_from_strings(mdctx, ":method", "GET");
channeld->http_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "http");
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
@ -369,7 +312,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_mdelem_unref(channeld->status_ok);
grpc_mdelem_unref(channeld->status_not_found);
grpc_mdelem_unref(channeld->method_post);
grpc_mdelem_unref(channeld->method_get);
grpc_mdelem_unref(channeld->http_scheme);
grpc_mdelem_unref(channeld->https_scheme);
grpc_mdelem_unref(channeld->grpc_scheme);

@ -1,149 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/metadata_buffer.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include <string.h>
#define INITIAL_ELEM_CAP 8
/* One queued call; we track offsets to string data in a shared buffer to
reduce allocations. See grpc_metadata_buffer_impl for the memory use
strategy */
typedef struct {
grpc_mdelem *md;
void (*cb)(void *user_data, grpc_op_error error);
void *user_data;
gpr_uint32 flags;
} qelem;
/* Memory layout:
grpc_metadata_buffer_impl
followed by an array of qelem */
struct grpc_metadata_buffer_impl {
/* number of elements in q */
size_t elems;
/* capacity of q */
size_t elem_cap;
};
#define ELEMS(buffer) ((qelem *)((buffer) + 1))
void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer) {
/* start buffer as NULL, indicating no elements */
*buffer = NULL;
}
void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
grpc_op_error error) {
size_t i;
qelem *qe;
if (*buffer) {
for (i = 0; i < (*buffer)->elems; i++) {
qe = &ELEMS(*buffer)[i];
grpc_mdelem_unref(qe->md);
qe->cb(qe->user_data, error);
}
gpr_free(*buffer);
}
}
void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer,
grpc_call_op *op) {
grpc_metadata_buffer_impl *impl = *buffer;
qelem *qe;
size_t bytes;
GPR_ASSERT(op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA);
if (!impl) {
/* this is the first element: allocate enough space to hold the
header object and the initial element capacity of qelems */
bytes =
sizeof(grpc_metadata_buffer_impl) + INITIAL_ELEM_CAP * sizeof(qelem);
impl = gpr_malloc(bytes);
/* initialize the header object */
impl->elems = 0;
impl->elem_cap = INITIAL_ELEM_CAP;
} else if (impl->elems == impl->elem_cap) {
/* more qelems than what we can deal with: grow by doubling size */
impl->elem_cap *= 2;
bytes = sizeof(grpc_metadata_buffer_impl) + impl->elem_cap * sizeof(qelem);
impl = gpr_realloc(impl, bytes);
}
/* append an element to the queue */
qe = &ELEMS(impl)[impl->elems];
impl->elems++;
qe->md = op->data.metadata;
qe->cb = op->done_cb;
qe->user_data = op->user_data;
qe->flags = op->flags;
/* header object may have changed location: store it back */
*buffer = impl;
}
void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
grpc_call_element *elem) {
grpc_metadata_buffer_impl *impl = *buffer;
grpc_call_op op;
qelem *qe;
size_t i;
if (!impl) {
/* nothing to send */
return;
}
/* construct call_op's, and push them down the stack */
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
for (i = 0; i < impl->elems; i++) {
qe = &ELEMS(impl)[i];
op.done_cb = qe->cb;
op.user_data = qe->user_data;
op.flags = qe->flags;
op.data.metadata = qe->md;
grpc_call_next_op(elem, &op);
}
/* free data structures and reset to NULL: we can only flush once */
gpr_free(impl);
*buffer = NULL;
}

@ -1,70 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
#define GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H
#include "src/core/channel/channel_stack.h"
/* Utility code to buffer GRPC_SEND_METADATA calls and pass them down the stack
all at once at some otherwise-determined time. Useful for implementing
filters that want to queue metadata until a START event chooses some
underlying filter stack to send an rpc on. */
/* Clients should declare a member of grpc_metadata_buffer. This may at some
point become a typedef for a struct, but for now a pointer suffices */
typedef struct grpc_metadata_buffer_impl grpc_metadata_buffer_impl;
typedef grpc_metadata_buffer_impl *grpc_metadata_buffer;
/* Initializes the metadata buffer. Allocates no memory. */
void grpc_metadata_buffer_init(grpc_metadata_buffer *buffer);
/* Destroy the metadata buffer. */
void grpc_metadata_buffer_destroy(grpc_metadata_buffer *buffer,
grpc_op_error error);
/* Append a call to the end of a metadata buffer: may allocate memory */
void grpc_metadata_buffer_queue(grpc_metadata_buffer *buffer, grpc_call_op *op);
/* Flush all queued operations from the metadata buffer to the element below
self */
void grpc_metadata_buffer_flush(grpc_metadata_buffer *buffer,
grpc_call_element *self);
/* Count the number of queued elements in the buffer. */
size_t grpc_metadata_buffer_count(const grpc_metadata_buffer *buffer);
/* Extract elements as a grpc_metadata*, for presentation to applications.
The returned buffer must be freed with
grpc_metadata_buffer_cleanup_elements.
Clears the metadata buffer (this is a one-shot operation) */
grpc_metadata *grpc_metadata_buffer_extract_elements(
grpc_metadata_buffer *buffer);
void grpc_metadata_buffer_cleanup_elements(void *elements, grpc_op_error error);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_METADATA_BUFFER_H */

@ -44,12 +44,15 @@
#include "src/core/security/credentials.h"
#include "src/core/surface/call.h"
#define MAX_CREDENTIAL_METADATA_COUNT 4
/* We can have a per-call credentials. */
typedef struct {
grpc_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
grpc_call_op op;
grpc_linked_mdelem md_links[MAX_CREDENTIAL_METADATA_COUNT];
} call_data;
/* We can have a per-channel credentials. */
@ -62,30 +65,8 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
static void do_nothing(void *ignored, grpc_op_error error) {}
static void bubbleup_error(grpc_call_element *elem, const char *error_msg) {
grpc_call_op finish_op;
channel_data *channeld = elem->channel_data;
char status[GPR_LTOA_MIN_BUFSIZE];
gpr_log(GPR_ERROR, "%s", error_msg);
finish_op.type = GRPC_RECV_METADATA;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.data.metadata = grpc_mdelem_from_metadata_strings(
channeld->md_ctx, grpc_mdstr_ref(channeld->error_msg_key),
grpc_mdstr_from_string(channeld->md_ctx, error_msg));
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
gpr_ltoa(GRPC_STATUS_UNAUTHENTICATED, status);
finish_op.data.metadata = grpc_mdelem_from_metadata_strings(
channeld->md_ctx, grpc_mdstr_ref(channeld->status_key),
grpc_mdstr_from_string(channeld->md_ctx, status));
grpc_call_next_op(elem, &finish_op);
grpc_call_element_recv_status(elem, GRPC_STATUS_UNAUTHENTICATED, error_msg);
grpc_call_element_send_cancel(elem);
}
@ -93,11 +74,14 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
size_t num_md,
grpc_credentials_status status) {
grpc_call_element *elem = (grpc_call_element *)user_data;
call_data *calld = elem->call_data;
grpc_call_op op = calld->op;
size_t i;
GPR_ASSERT(num_md <= MAX_CREDENTIAL_METADATA_COUNT);
for (i = 0; i < num_md; i++) {
grpc_call_element_send_metadata(elem, grpc_mdelem_ref(md_elems[i]));
grpc_call_op_metadata_add_tail(&op.data.metadata, &calld->md_links[i], grpc_mdelem_ref(md_elems[i]));
}
grpc_call_next_op(elem, &((call_data *)elem->call_data)->op);
grpc_call_next_op(elem, &op);
}
static char *build_service_url(const char *url_scheme, call_data *calld) {
@ -174,16 +158,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
grpc_linked_mdelem *l;
switch (op->type) {
case GRPC_SEND_METADATA:
/* Pointer comparison is OK for md_elems created from the same context. */
if (op->data.metadata->key == channeld->authority_string) {
if (calld->host != NULL) grpc_mdstr_unref(calld->host);
calld->host = grpc_mdstr_ref(op->data.metadata->value);
} else if (op->data.metadata->key == channeld->path_string) {
if (calld->method != NULL) grpc_mdstr_unref(calld->method);
calld->method = grpc_mdstr_ref(op->data.metadata->value);
for (l = op->data.metadata.list.head; l; l = l->next) {
grpc_mdelem *md = l->md;
/* Pointer comparison is OK for md_elems created from the same context. */
if (md->key == channeld->authority_string) {
if (calld->host != NULL) grpc_mdstr_unref(calld->host);
calld->host = grpc_mdstr_ref(md->value);
} else if (md->key == channeld->path_string) {
if (calld->method != NULL) grpc_mdstr_unref(calld->method);
calld->method = grpc_mdstr_ref(md->value);
}
}
grpc_call_next_op(elem, op);
break;

@ -33,7 +33,6 @@
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/metadata_buffer.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/support/string.h"
#include "src/core/surface/byte_buffer_queue.h"
@ -41,6 +40,7 @@
#include "src/core/surface/completion_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
@ -68,8 +68,10 @@ typedef struct {
} completed_request;
/* See request_set in grpc_call below for a description */
#define REQSET_EMPTY 255
#define REQSET_DONE 254
#define REQSET_EMPTY 'X'
#define REQSET_DONE 'Y'
#define MAX_SEND_INITIAL_METADATA_COUNT 3
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
@ -204,6 +206,12 @@ struct grpc_call {
/* Call refcount - to keep the call alive during asynchronous operations */
gpr_refcount internal_refcount;
grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
grpc_linked_mdelem status_link;
grpc_linked_mdelem details_link;
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@ -226,9 +234,11 @@ struct grpc_call {
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data) {
const void *server_transport_data, grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count, gpr_timespec send_deadline) {
size_t i;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_call *call =
@ -245,6 +255,12 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
call->send_initial_metadata[i].md = add_initial_metadata[i];
}
call->send_initial_metadata_count = add_initial_metadata_count;
call->send_deadline = send_deadline;
grpc_channel_internal_ref(channel);
call->metadata_context = grpc_channel_get_metadata_context(channel);
/* one ref is dropped in response to destroy, the other in
@ -252,6 +268,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
gpr_ref_init(&call->internal_refcount, 2);
grpc_call_stack_init(channel_stack, server_transport_data,
CALL_STACK_FROM_CALL(call));
if (gpr_time_cmp(send_deadline, gpr_inf_future) != 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
}
@ -587,15 +606,28 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
grpc_call_op op;
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = GRPC_WRITE_BUFFER_HINT;
op.data.metadata = elem;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, grpc_metadata *metadata) {
size_t i;
grpc_mdelem_list out;
if (count == 0) {
out.head = out.tail = NULL;
return out;
}
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_metadata *next_md = (i == count-1) ? NULL : &metadata[i+1];
grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i-1];
grpc_linked_mdelem *l = (grpc_linked_mdelem*)&md->internal_data;
assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(
call->metadata_context, md->key,
(const gpr_uint8 *)md->value, md->value_length);
l->next = next_md ? (grpc_linked_mdelem*)&next_md->internal_data : NULL;
l->prev = prev_md ? (grpc_linked_mdelem*)&prev_md->internal_data : NULL;
}
out.head = (grpc_linked_mdelem*)&(metadata[0].internal_data);
out.tail = (grpc_linked_mdelem*)&(metadata[count-1].internal_data);
return out;
}
static void enact_send_action(grpc_call *call, send_action sa) {
@ -614,13 +646,18 @@ static void enact_send_action(grpc_call *call, send_action sa) {
/* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata(call,
grpc_mdelem_from_string_and_buffer(
call->metadata_context, md->key,
(const gpr_uint8 *)md->value, md->value_length));
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata);
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
grpc_call_op_metadata_link_head(&op.data.metadata, &call->send_initial_metadata[i]);
}
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
op.type = GRPC_SEND_START;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
@ -645,32 +682,32 @@ static void enact_send_action(grpc_call *call, send_action sa) {
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
for (i = 0; i < data.send_metadata.count; i++) {
const grpc_metadata *md = &data.send_metadata.metadata[i];
send_metadata(call,
grpc_mdelem_from_string_and_buffer(
call->metadata_context, md->key,
(const gpr_uint8 *)md->value, md->value_length));
}
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.data.metadata.list = chain_metadata_from_app(call, data.send_metadata.count, data.send_metadata.metadata);
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
/* send status */
/* TODO(ctiller): cache common status values */
data = call->request_data[GRPC_IOREQ_SEND_STATUS];
gpr_ltoa(data.send_status.code, status_str);
send_metadata(
call,
grpc_mdelem_from_metadata_strings(
grpc_call_op_metadata_add_tail(&op.data.metadata, &call->status_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context, status_str)));
if (data.send_status.details) {
send_metadata(
call,
grpc_call_op_metadata_add_tail(&op.data.metadata, &call->details_link,
grpc_mdelem_from_metadata_strings(
call->metadata_context,
grpc_mdstr_ref(grpc_channel_get_message_string(call->channel)),
grpc_mdstr_from_string(call->metadata_context,
data.send_status.details)));
}
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
/* fallthrough: see choose_send_action for details */
case SEND_FINISH:
op.type = GRPC_SEND_FINISH;
@ -875,9 +912,7 @@ static void call_alarm(void *arg, int success) {
grpc_call_internal_unref(call, 1);
}
void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
}
@ -886,11 +921,15 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
}
static void set_read_state(grpc_call *call, read_state state) {
lock(call);
static void set_read_state_locked(grpc_call *call, read_state state) {
GPR_ASSERT(call->read_state < state);
call->read_state = state;
finish_read_ops(call);
}
static void set_read_state(grpc_call *call, read_state state) {
lock(call);
set_read_state_locked(call, state);
unlock(call);
}
@ -936,52 +975,60 @@ void grpc_call_recv_message(grpc_call_element *elem,
unlock(call);
}
void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
int grpc_call_recv_metadata(grpc_call_element *elem, grpc_call_op_metadata *md) {
grpc_call *call = CALL_FROM_TOP_ELEM(elem);
grpc_mdstr *key = md->key;
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
grpc_metadata *mdusr;
int is_trailing;
lock(call);
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
dest = &call->buffered_metadata[call->read_state >=
READ_STATE_GOT_INITIAL_METADATA];
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
dest->metadata =
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
mdusr->key = grpc_mdstr_as_c_string(md->key);
mdusr->value = grpc_mdstr_as_c_string(md->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = GPR_MAX(
call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
call->owned_metadata =
gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l; l = l->next) {
grpc_mdelem *md = l->md;
grpc_mdstr *key = md->key;
if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
grpc_mdelem_unref(md);
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
grpc_mdelem_unref(md);
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
dest->metadata =
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
}
mdusr = &dest->metadata[dest->count++];
mdusr->key = grpc_mdstr_as_c_string(md->key);
mdusr->value = grpc_mdstr_as_c_string(md->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = GPR_MAX(
call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
call->owned_metadata =
gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
}
call->owned_metadata[call->owned_metadata_count++] = md;
}
call->owned_metadata[call->owned_metadata_count++] = md;
}
if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) {
set_deadline_alarm(call, md->deadline);
}
if (!is_trailing) {
set_read_state_locked(call, READ_STATE_GOT_INITIAL_METADATA);
}
unlock(call);
return !is_trailing;
}
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
grpc_call *call = grpc_call_from_top_element(surface_element);
set_read_state(call, READ_STATE_GOT_INITIAL_METADATA);
}
/*
* BATCH API IMPLEMENTATION
*/

@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_SURFACE_CALL_H
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
/* Primitive operation types - grpc_op's get rewritten into these */
@ -67,7 +66,7 @@ typedef union {
} recv_status_details;
struct {
size_t count;
const grpc_metadata *metadata;
grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
@ -86,7 +85,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data);
const void *server_transport_data, grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count, gpr_timespec send_deadline);
void grpc_call_set_completion_queue(grpc_call *call, grpc_completion_queue *cq);
grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
@ -96,8 +96,9 @@ void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
void grpc_call_recv_metadata(grpc_call_element *surface_element,
grpc_mdelem *md);
/* receive metadata - returns 1 if this was initial metadata */
int grpc_call_recv_metadata(grpc_call_element *surface_element,
grpc_call_op_metadata *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
@ -108,12 +109,6 @@ grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
/* Called when it's known that the initial batch of metadata is complete */
void grpc_call_initial_metadata_complete(grpc_call_element *surface_element);
void grpc_call_set_deadline(grpc_call_element *surface_element,
gpr_timespec deadline);
grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */

@ -62,7 +62,7 @@ struct grpc_channel {
registered_call *registered_calls;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \
(((grpc_channel *)(channel_stack)) - 1)
#define CHANNEL_FROM_TOP_ELEM(top_elem) \
@ -91,44 +91,25 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_call *call;
grpc_call_op op;
grpc_mdelem *send_metadata[2];
if (!channel->is_client) {
gpr_log(GPR_ERROR, "Cannot create a call on the server.");
return NULL;
}
GPR_ASSERT(channel->is_client);
call = grpc_call_create(channel, cq, NULL);
send_metadata[0] = path_mdelem;
send_metadata[1] = authority_mdelem;
/* Add :path and :authority headers. */
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.metadata = path_mdelem;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
op.data.metadata = authority_mdelem;
grpc_call_execute_op(call, &op);
if (0 != gpr_time_cmp(deadline, gpr_inf_future)) {
op.type = GRPC_SEND_DEADLINE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.deadline = deadline;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
}
return grpc_call_create(channel, cq, NULL, send_metadata,
GPR_ARRAY_SIZE(send_metadata), deadline);
}
return call;
grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec absolute_deadline) {
return grpc_channel_create_call(channel, NULL, method, host,
absolute_deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@ -146,13 +127,6 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
deadline);
}
grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec absolute_deadline) {
return grpc_channel_create_call(channel, NULL, method, host,
absolute_deadline);
}
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
const char *host) {
registered_call *rc = gpr_malloc(sizeof(registered_call));

@ -52,15 +52,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_DEADLINE:
grpc_call_set_deadline(elem, op->data.deadline);
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
grpc_call_recv_metadata(elem, op->data.metadata);
break;
case GRPC_RECV_DEADLINE:
gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
grpc_call_recv_metadata(elem, &op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
@ -72,9 +65,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
case GRPC_RECV_FINISH:
grpc_call_stream_closed(elem);
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
grpc_call_initial_metadata_complete(elem);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);

@ -47,23 +47,20 @@ typedef struct {
} call_data;
typedef struct {
grpc_mdelem *status;
grpc_mdelem *message;
void *unused;
} channel_data;
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_START:
grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->status));
grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN, "Rpc sent on a lame channel.");
grpc_call_stream_closed(elem);
break;
case GRPC_SEND_METADATA:
grpc_mdelem_unref(op->data.metadata);
abort();
break;
default:
break;
@ -94,23 +91,11 @@ static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *channeld = elem->channel_data;
char status[12];
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
channeld->message = grpc_mdelem_from_strings(mdctx, "grpc-message",
"Rpc sent on a lame channel.");
gpr_ltoa(GRPC_STATUS_UNKNOWN, status);
channeld->status = grpc_mdelem_from_strings(mdctx, "grpc-status", status);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *channeld = elem->channel_data;
grpc_mdelem_unref(channeld->message);
grpc_mdelem_unref(channeld->status);
}
static const grpc_channel_filter lame_filter = {

@ -411,29 +411,32 @@ static void read_closed(grpc_call_element *elem) {
gpr_mu_unlock(&chand->server->mu);
}
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
if (md->key == chand->path_key) {
calld->path = grpc_mdstr_ref(md->value);
return NULL;
} else if (md->key == chand->authority_key) {
calld->host = grpc_mdstr_ref(md->value);
return NULL;
}
return md;
}
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
md = op->data.metadata;
if (md->key == chand->path_key) {
calld->path = grpc_mdstr_ref(md->value);
grpc_mdelem_unref(md);
} else if (md->key == chand->authority_key) {
calld->host = grpc_mdstr_ref(md->value);
grpc_mdelem_unref(md);
} else {
grpc_call_recv_metadata(elem, md);
grpc_call_op_metadata_filter(&op->data.metadata, server_filter, elem);
if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
calld->deadline = op->data.metadata.deadline;
start_new_rpc(elem);
}
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
start_new_rpc(elem);
grpc_call_initial_metadata_complete(elem);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
@ -444,10 +447,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
case GRPC_RECV_FINISH:
stream_closed(elem);
break;
case GRPC_RECV_DEADLINE:
grpc_call_set_deadline(elem, op->data.deadline);
((call_data *)elem->call_data)->deadline = op->data.deadline;
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
grpc_call_next_op(elem, op);
@ -464,7 +463,7 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_ACCEPT_CALL:
/* create a call */
grpc_call_create(chand->channel, NULL,
op->data.accept_call.transport_server_data);
op->data.accept_call.transport_server_data, NULL, 0, gpr_inf_future);
break;
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the

@ -150,9 +150,9 @@ CC_asan = clang
CXX_asan = clang++
LD_asan = clang
LDXX_asan = clang++
CPPFLAGS_asan = -O1 -fsanitize=address -fno-omit-frame-pointer
CPPFLAGS_asan = -O0 -fsanitize=address -fno-omit-frame-pointer
LDFLAGS_asan = -fsanitize=address
DEFINES_asan = NDEBUG GRPC_TEST_SLOWDOWN_BUILD_FACTOR=5
DEFINES_asan = GRPC_TEST_SLOWDOWN_BUILD_FACTOR=5
VALID_CONFIG_msan = 1
REQUIRE_CUSTOM_LIBRARIES_msan = 1
@ -160,7 +160,7 @@ CC_msan = clang
CXX_msan = clang++-libc++
LD_msan = clang
LDXX_msan = clang++-libc++
CPPFLAGS_msan = -O1 -fsanitize=memory -fsanitize-memory-track-origins -fno-omit-frame-pointer -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1
CPPFLAGS_msan = -O0 -fsanitize=memory -fsanitize-memory-track-origins -fno-omit-frame-pointer -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1
OPENSSL_CFLAGS_msan = -DPURIFY
LDFLAGS_msan = -fsanitize=memory -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1
DEFINES_msan = NDEBUG GRPC_TEST_SLOWDOWN_BUILD_FACTOR=20

@ -1,201 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/metadata_buffer.h"
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
#include <string.h>
#include <stdio.h>
/* construct a buffer with some prefix followed by an integer converted to
a string */
static gpr_slice construct_buffer(size_t prefix_length, size_t index) {
gpr_slice buffer = gpr_slice_malloc(prefix_length + GPR_LTOA_MIN_BUFSIZE);
memset(GPR_SLICE_START_PTR(buffer), 'a', prefix_length);
GPR_SLICE_SET_LENGTH(
buffer,
prefix_length +
gpr_ltoa(index, (char *)GPR_SLICE_START_PTR(buffer) + prefix_length));
return buffer;
}
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
/* we need a fake channel & call stack, which is defined here */
/* a fake channel needs to track some information about the test */
typedef struct {
size_t key_prefix_len;
size_t value_prefix_len;
} channel_data;
static void fail_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
abort();
}
/* verify that the metadata passed on during flush is the same as we expect */
static void expect_call_op(grpc_call_element *elem,
grpc_call_element *from_elem, grpc_call_op *op) {
size_t *n = elem->call_data;
channel_data *cd = elem->channel_data;
gpr_slice key = construct_buffer(cd->key_prefix_len, *n);
gpr_slice value = construct_buffer(cd->value_prefix_len, *n);
GPR_ASSERT(op->type == GRPC_SEND_METADATA);
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
GPR_ASSERT(op->flags == *n);
GPR_ASSERT(op->done_cb == do_nothing);
GPR_ASSERT(op->user_data == (void *)(gpr_uintptr) * n);
GPR_ASSERT(0 == gpr_slice_cmp(op->data.metadata->key->slice, key));
GPR_ASSERT(0 == gpr_slice_cmp(op->data.metadata->value->slice, value));
++*n;
gpr_slice_unref(key);
gpr_slice_unref(value);
grpc_mdelem_unref(op->data.metadata);
}
static void fail_channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem,
grpc_channel_op *op) {
abort();
}
static void init_call_elem(grpc_call_element *elem,
const void *transport_server_data) {
*(size_t *)elem->call_data = 0;
}
static void destroy_call_elem(grpc_call_element *elem) {}
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {}
static void destroy_channel_elem(grpc_channel_element *elem) {}
static const grpc_channel_filter top_filter = {
fail_call_op, fail_channel_op, sizeof(size_t),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "top_filter"};
static const grpc_channel_filter bottom_filter = {
expect_call_op, fail_channel_op, sizeof(size_t),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "bottom_filter"};
static const grpc_channel_filter *filters[2] = {&top_filter, &bottom_filter};
/* run a test with differently sized keys, and values, some number of times. */
static void test_case(size_t key_prefix_len, size_t value_prefix_len,
size_t num_calls) {
size_t i;
size_t got_calls;
grpc_metadata_buffer buffer;
grpc_channel_stack *stk;
grpc_call_stack *call;
grpc_mdctx *mdctx;
gpr_log(GPR_INFO, "Test %d calls, {key,value}_prefix_len = {%d, %d}",
(int)num_calls, (int)key_prefix_len, (int)value_prefix_len);
mdctx = grpc_mdctx_create();
grpc_metadata_buffer_init(&buffer);
/* queue metadata elements */
for (i = 0; i < num_calls; i++) {
grpc_call_op op;
gpr_slice key = construct_buffer(key_prefix_len, i);
gpr_slice value = construct_buffer(value_prefix_len, i);
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = i;
op.data.metadata = grpc_mdelem_from_slices(mdctx, key, value);
op.done_cb = do_nothing;
op.user_data = (void *)(gpr_uintptr) i;
grpc_metadata_buffer_queue(&buffer, &op);
}
/* construct a test channel, call stack */
stk = gpr_malloc(grpc_channel_stack_size(filters, 2));
grpc_channel_stack_init(filters, 2, NULL, mdctx, stk);
for (i = 0; i < 2; i++) {
channel_data *cd =
(channel_data *)grpc_channel_stack_element(stk, i)->channel_data;
cd->key_prefix_len = key_prefix_len;
cd->value_prefix_len = value_prefix_len;
}
call = gpr_malloc(stk->call_stack_size);
grpc_call_stack_init(stk, NULL, call);
/* flush out metadata, verifying each element (see expect_call_op) */
grpc_metadata_buffer_flush(&buffer, grpc_call_stack_element(call, 0));
/* verify expect_call_op was called an appropriate number of times */
got_calls = *(size_t *)grpc_call_stack_element(call, 1)->call_data;
GPR_ASSERT(num_calls == got_calls);
/* clean up the things */
grpc_call_stack_destroy(call);
gpr_free(call);
grpc_channel_stack_destroy(stk);
gpr_free(stk);
grpc_metadata_buffer_destroy(&buffer, GRPC_OP_OK);
grpc_mdctx_unref(mdctx);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_case(0, 0, 0);
test_case(0, 0, 1);
test_case(0, 0, 2);
test_case(0, 0, 10000);
test_case(10, 10, 1);
test_case(10, 10, 2);
test_case(10, 10, 10000);
test_case(100, 100, 1);
test_case(100, 100, 2);
test_case(100, 100, 10000);
return 0;
}

@ -115,14 +115,23 @@ static void test_request_response_with_metadata_and_payload(
grpc_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {
{"key1-bin", "\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13},
{"key2-bin", "\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
14}};
{"key1-bin",
"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc",
13,
{{NULL, NULL, NULL}}},
{"key2-bin",
"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
14,
{{NULL, NULL, NULL}}}};
grpc_metadata meta_s[2] = {
{"key3-bin",
"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15},
"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee",
15,
{{NULL, NULL, NULL}}},
{"key4-bin",
"\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff", 16}};
"\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff",
16,
{{NULL, NULL, NULL}}}};
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -116,17 +116,25 @@ static void test_request_response_with_metadata_and_payload(
gpr_timespec deadline = five_seconds_time();
/* staggered lengths to ensure we hit various branches in base64 encode/decode
*/
grpc_metadata meta1 = {
"key1-bin", "\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13};
grpc_metadata meta1 = {"key1-bin",
"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc",
13,
{{NULL, NULL, NULL}}};
grpc_metadata meta2 = {
"key2-bin", "\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
14};
"key2-bin",
"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
14,
{{NULL, NULL, NULL}}};
grpc_metadata meta3 = {
"key3-bin",
"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15};
"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee",
15,
{{NULL, NULL, NULL}}};
grpc_metadata meta4 = {
"key4-bin",
"\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff", 16};
"\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff",
16,
{{NULL, NULL, NULL}}};
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -114,8 +114,10 @@ static void test_request_response_with_metadata_and_payload(
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {{"key1", "val1", 4}, {"key2", "val2", 4}};
grpc_metadata meta_s[2] = {{"key3", "val3", 4}, {"key4", "val4", 4}};
grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}},
{"key2", "val2", 4, {{NULL, NULL, NULL}}}};
grpc_metadata meta_s[2] = {{"key3", "val3", 4, {{NULL, NULL, NULL}}},
{"key4", "val4", 4, {{NULL, NULL, NULL}}}};
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -114,10 +114,10 @@ static void test_request_response_with_metadata_and_payload(
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta1 = {"key1", "val1", 4};
grpc_metadata meta2 = {"key2", "val2", 4};
grpc_metadata meta3 = {"key3", "val3", 4};
grpc_metadata meta4 = {"key4", "val4", 4};
grpc_metadata meta1 = {"key1", "val1", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta2 = {"key2", "val2", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta3 = {"key3", "val3", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta4 = {"key4", "val4", 4, {{NULL, NULL, NULL}}};
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -114,12 +114,12 @@ static void test_request_response_with_metadata_and_payload(
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta1 = {"key1", "val1", 4};
grpc_metadata meta2 = {"key2", "val2", 4};
grpc_metadata meta3 = {"key3", "val3", 4};
grpc_metadata meta4 = {"key4", "val4", 4};
grpc_metadata meta5 = {"key5", "val5", 4};
grpc_metadata meta6 = {"key6", "val6", 4};
grpc_metadata meta1 = {"key1", "val1", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta2 = {"key2", "val2", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta3 = {"key3", "val3", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta4 = {"key4", "val4", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta5 = {"key5", "val5", 4, {{NULL, NULL, NULL}}};
grpc_metadata meta6 = {"key6", "val6", 4, {{NULL, NULL, NULL}}};
grpc_end2end_test_fixture f = begin_test(config, __FUNCTION__, NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -42,7 +42,7 @@ static void *tag(gpr_intptr x) { return (void *)x; }
int main(int argc, char **argv) {
grpc_channel *chan;
grpc_call *call;
grpc_metadata md = {"a", "b", 1};
grpc_metadata md = {"a", "b", 1, {{NULL, NULL, NULL}}};
grpc_completion_queue *cq;
cq_verifier *cqv;

@ -444,6 +444,7 @@ if forever:
previous_success = success
success = _build_and_run(check_cancelled=have_files_changed,
newline_on_success=False,
travis=args.travis,
cache=test_cache) == 0
if not previous_success and success:
jobset.message('SUCCESS',

@ -468,15 +468,6 @@
"posix"
]
},
{
"flaky": false,
"language": "c",
"name": "metadata_buffer_test",
"platforms": [
"windows",
"posix"
]
},
{
"flaky": false,
"language": "c",

@ -53,10 +53,10 @@ grpc_test_util:
$(OUT_DIR):
mkdir $(OUT_DIR)
buildtests: alarm_heap_test.exe alarm_list_test.exe alarm_test.exe alpn_test.exe bin_encoder_test.exe census_hash_table_test.exe census_statistics_multiple_writers_circular_buffer_test.exe census_statistics_multiple_writers_test.exe census_statistics_performance_test.exe census_statistics_quick_test.exe census_statistics_small_log_test.exe census_stats_store_test.exe census_stub_test.exe census_trace_store_test.exe census_window_stats_test.exe chttp2_status_conversion_test.exe chttp2_stream_encoder_test.exe chttp2_stream_map_test.exe chttp2_transport_end2end_test.exe dualstack_socket_test.exe echo_test.exe fd_posix_test.exe fling_stream_test.exe fling_test.exe gpr_cancellable_test.exe gpr_cmdline_test.exe gpr_env_test.exe gpr_file_test.exe gpr_histogram_test.exe gpr_host_port_test.exe gpr_log_test.exe gpr_slice_buffer_test.exe gpr_slice_test.exe gpr_string_test.exe gpr_sync_test.exe gpr_thd_test.exe gpr_time_test.exe gpr_tls_test.exe gpr_useful_test.exe grpc_base64_test.exe grpc_byte_buffer_reader_test.exe grpc_channel_stack_test.exe grpc_completion_queue_test.exe grpc_credentials_test.exe grpc_json_token_test.exe grpc_stream_op_test.exe hpack_parser_test.exe hpack_table_test.exe httpcli_format_request_test.exe httpcli_parser_test.exe httpcli_test.exe json_rewrite_test.exe json_test.exe lame_client_test.exe message_compress_test.exe metadata_buffer_test.exe multi_init_test.exe murmur_hash_test.exe no_server_test.exe poll_kick_posix_test.exe resolve_address_test.exe secure_endpoint_test.exe sockaddr_utils_test.exe tcp_client_posix_test.exe tcp_posix_test.exe tcp_server_posix_test.exe time_averaged_stats_test.exe time_test.exe timeout_encoding_test.exe timers_test.exe transport_metadata_test.exe transport_security_test.exe
buildtests: alarm_heap_test.exe alarm_list_test.exe alarm_test.exe alpn_test.exe bin_encoder_test.exe census_hash_table_test.exe census_statistics_multiple_writers_circular_buffer_test.exe census_statistics_multiple_writers_test.exe census_statistics_performance_test.exe census_statistics_quick_test.exe census_statistics_small_log_test.exe census_stats_store_test.exe census_stub_test.exe census_trace_store_test.exe census_window_stats_test.exe chttp2_status_conversion_test.exe chttp2_stream_encoder_test.exe chttp2_stream_map_test.exe chttp2_transport_end2end_test.exe dualstack_socket_test.exe echo_test.exe fd_posix_test.exe fling_stream_test.exe fling_test.exe gpr_cancellable_test.exe gpr_cmdline_test.exe gpr_env_test.exe gpr_file_test.exe gpr_histogram_test.exe gpr_host_port_test.exe gpr_log_test.exe gpr_slice_buffer_test.exe gpr_slice_test.exe gpr_string_test.exe gpr_sync_test.exe gpr_thd_test.exe gpr_time_test.exe gpr_tls_test.exe gpr_useful_test.exe grpc_base64_test.exe grpc_byte_buffer_reader_test.exe grpc_channel_stack_test.exe grpc_completion_queue_test.exe grpc_credentials_test.exe grpc_json_token_test.exe grpc_stream_op_test.exe hpack_parser_test.exe hpack_table_test.exe httpcli_format_request_test.exe httpcli_parser_test.exe httpcli_test.exe json_rewrite_test.exe json_test.exe lame_client_test.exe message_compress_test.exe multi_init_test.exe murmur_hash_test.exe no_server_test.exe poll_kick_posix_test.exe resolve_address_test.exe secure_endpoint_test.exe sockaddr_utils_test.exe tcp_client_posix_test.exe tcp_posix_test.exe tcp_server_posix_test.exe time_averaged_stats_test.exe time_test.exe timeout_encoding_test.exe timers_test.exe transport_metadata_test.exe transport_security_test.exe
echo All tests built.
test: alarm_heap_test alarm_list_test alarm_test alpn_test bin_encoder_test census_hash_table_test census_statistics_multiple_writers_circular_buffer_test census_statistics_multiple_writers_test census_statistics_performance_test census_statistics_quick_test census_statistics_small_log_test census_stats_store_test census_stub_test census_trace_store_test census_window_stats_test chttp2_status_conversion_test chttp2_stream_encoder_test chttp2_stream_map_test chttp2_transport_end2end_test dualstack_socket_test echo_test fd_posix_test fling_stream_test fling_test gpr_cancellable_test gpr_cmdline_test gpr_env_test gpr_file_test gpr_histogram_test gpr_host_port_test gpr_log_test gpr_slice_buffer_test gpr_slice_test gpr_string_test gpr_sync_test gpr_thd_test gpr_time_test gpr_tls_test gpr_useful_test grpc_base64_test grpc_byte_buffer_reader_test grpc_channel_stack_test grpc_completion_queue_test grpc_credentials_test grpc_json_token_test grpc_stream_op_test hpack_parser_test hpack_table_test httpcli_format_request_test httpcli_parser_test httpcli_test json_rewrite_test json_test lame_client_test message_compress_test metadata_buffer_test multi_init_test murmur_hash_test no_server_test poll_kick_posix_test resolve_address_test secure_endpoint_test sockaddr_utils_test tcp_client_posix_test tcp_posix_test tcp_server_posix_test time_averaged_stats_test time_test timeout_encoding_test timers_test transport_metadata_test transport_security_test
test: alarm_heap_test alarm_list_test alarm_test alpn_test bin_encoder_test census_hash_table_test census_statistics_multiple_writers_circular_buffer_test census_statistics_multiple_writers_test census_statistics_performance_test census_statistics_quick_test census_statistics_small_log_test census_stats_store_test census_stub_test census_trace_store_test census_window_stats_test chttp2_status_conversion_test chttp2_stream_encoder_test chttp2_stream_map_test chttp2_transport_end2end_test dualstack_socket_test echo_test fd_posix_test fling_stream_test fling_test gpr_cancellable_test gpr_cmdline_test gpr_env_test gpr_file_test gpr_histogram_test gpr_host_port_test gpr_log_test gpr_slice_buffer_test gpr_slice_test gpr_string_test gpr_sync_test gpr_thd_test gpr_time_test gpr_tls_test gpr_useful_test grpc_base64_test grpc_byte_buffer_reader_test grpc_channel_stack_test grpc_completion_queue_test grpc_credentials_test grpc_json_token_test grpc_stream_op_test hpack_parser_test hpack_table_test httpcli_format_request_test httpcli_parser_test httpcli_test json_rewrite_test json_test lame_client_test message_compress_test multi_init_test murmur_hash_test no_server_test poll_kick_posix_test resolve_address_test secure_endpoint_test sockaddr_utils_test tcp_client_posix_test tcp_posix_test tcp_server_posix_test time_averaged_stats_test time_test timeout_encoding_test timers_test transport_metadata_test transport_security_test
echo All tests ran.
test_gpr: gpr_cancellable_test gpr_cmdline_test gpr_env_test gpr_file_test gpr_histogram_test gpr_host_port_test gpr_log_test gpr_slice_buffer_test gpr_slice_test gpr_string_test gpr_sync_test gpr_thd_test gpr_time_test gpr_tls_test gpr_useful_test
@ -590,14 +590,6 @@ message_compress_test: message_compress_test.exe
echo Running message_compress_test
$(OUT_DIR)\message_compress_test.exe
metadata_buffer_test.exe: grpc_test_util
echo Building metadata_buffer_test
$(CC) $(CFLAGS) /Fo:$(OUT_DIR)\ ..\..\test\core\channel\metadata_buffer_test.c
$(LINK) $(LFLAGS) /OUT:"$(OUT_DIR)\metadata_buffer_test.exe" Debug\grpc_test_util.lib Debug\grpc.lib Debug\gpr_test_util.lib Debug\gpr.lib $(LIBS) $(OUT_DIR)\metadata_buffer_test.obj
metadata_buffer_test: metadata_buffer_test.exe
echo Running metadata_buffer_test
$(OUT_DIR)\metadata_buffer_test.exe
multi_init_test.exe: grpc_test_util
echo Building multi_init_test
$(CC) $(CFLAGS) /Fo:$(OUT_DIR)\ ..\..\test\core\surface\multi_init_test.c

@ -109,7 +109,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
@ -254,8 +253,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\compression\algorithm.c">

@ -97,9 +97,6 @@
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -455,9 +452,6 @@
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -93,7 +93,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
@ -198,8 +197,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\compression\algorithm.c">

@ -37,9 +37,6 @@
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -347,9 +344,6 @@
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -55,7 +55,7 @@ $(OUT_DIR):
buildtests: buildtests_c buildtests_cxx
buildtests_c: alarm_heap_test.exe alarm_list_test.exe alarm_test.exe alpn_test.exe bin_encoder_test.exe census_hash_table_test.exe census_statistics_multiple_writers_circular_buffer_test.exe census_statistics_multiple_writers_test.exe census_statistics_performance_test.exe census_statistics_quick_test.exe census_statistics_small_log_test.exe census_stub_test.exe census_window_stats_test.exe chttp2_status_conversion_test.exe chttp2_stream_encoder_test.exe chttp2_stream_map_test.exe chttp2_transport_end2end_test.exe echo_client.exe echo_server.exe echo_test.exe fd_posix_test.exe fling_client.exe fling_server.exe fling_stream_test.exe fling_test.exe gpr_cancellable_test.exe gpr_cmdline_test.exe gpr_env_test.exe gpr_file_test.exe gpr_histogram_test.exe gpr_host_port_test.exe gpr_log_test.exe gpr_slice_buffer_test.exe gpr_slice_test.exe gpr_string_test.exe gpr_sync_test.exe gpr_thd_test.exe gpr_time_test.exe gpr_tls_test.exe gpr_useful_test.exe grpc_base64_test.exe grpc_byte_buffer_reader_test.exe grpc_channel_stack_test.exe grpc_completion_queue_test.exe grpc_credentials_test.exe grpc_json_token_test.exe grpc_stream_op_test.exe hpack_parser_test.exe hpack_table_test.exe httpcli_format_request_test.exe httpcli_parser_test.exe httpcli_test.exe json_rewrite.exe json_rewrite_test.exe json_test.exe lame_client_test.exe message_compress_test.exe metadata_buffer_test.exe multi_init_test.exe murmur_hash_test.exe no_server_test.exe poll_kick_posix_test.exe resolve_address_test.exe secure_endpoint_test.exe sockaddr_utils_test.exe tcp_client_posix_test.exe tcp_posix_test.exe tcp_server_posix_test.exe time_averaged_stats_test.exe time_test.exe timeout_encoding_test.exe timers_test.exe transport_metadata_test.exe transport_security_test.exe
buildtests_c: alarm_heap_test.exe alarm_list_test.exe alarm_test.exe alpn_test.exe bin_encoder_test.exe census_hash_table_test.exe census_statistics_multiple_writers_circular_buffer_test.exe census_statistics_multiple_writers_test.exe census_statistics_performance_test.exe census_statistics_quick_test.exe census_statistics_small_log_test.exe census_stub_test.exe census_window_stats_test.exe chttp2_status_conversion_test.exe chttp2_stream_encoder_test.exe chttp2_stream_map_test.exe chttp2_transport_end2end_test.exe echo_client.exe echo_server.exe echo_test.exe fd_posix_test.exe fling_client.exe fling_server.exe fling_stream_test.exe fling_test.exe gpr_cancellable_test.exe gpr_cmdline_test.exe gpr_env_test.exe gpr_file_test.exe gpr_histogram_test.exe gpr_host_port_test.exe gpr_log_test.exe gpr_slice_buffer_test.exe gpr_slice_test.exe gpr_string_test.exe gpr_sync_test.exe gpr_thd_test.exe gpr_time_test.exe gpr_tls_test.exe gpr_useful_test.exe grpc_base64_test.exe grpc_byte_buffer_reader_test.exe grpc_channel_stack_test.exe grpc_completion_queue_test.exe grpc_credentials_test.exe grpc_json_token_test.exe grpc_stream_op_test.exe hpack_parser_test.exe hpack_table_test.exe httpcli_format_request_test.exe httpcli_parser_test.exe httpcli_test.exe json_rewrite.exe json_rewrite_test.exe json_test.exe lame_client_test.exe message_compress_test.exe multi_init_test.exe murmur_hash_test.exe no_server_test.exe poll_kick_posix_test.exe resolve_address_test.exe secure_endpoint_test.exe sockaddr_utils_test.exe tcp_client_posix_test.exe tcp_posix_test.exe tcp_server_posix_test.exe time_averaged_stats_test.exe time_test.exe timeout_encoding_test.exe timers_test.exe transport_metadata_test.exe transport_security_test.exe
echo All tests built.
buildtests_cxx:
@ -581,14 +581,6 @@ message_compress_test: message_compress_test.exe
echo Running message_compress_test
$(OUT_DIR)\message_compress_test.exe
metadata_buffer_test.exe: grpc_test_util $(OUT_DIR)
echo Building metadata_buffer_test
$(CC) $(CFLAGS) /Fo:$(OUT_DIR)\ ..\..\test\core\channel\metadata_buffer_test.c
$(LINK) $(LFLAGS) /OUT:"$(OUT_DIR)\metadata_buffer_test.exe" Debug\grpc_test_util.lib Debug\grpc.lib Debug\gpr_test_util.lib Debug\gpr.lib $(LIBS) $(OUT_DIR)\metadata_buffer_test.obj
metadata_buffer_test: metadata_buffer_test.exe
echo Running metadata_buffer_test
$(OUT_DIR)\metadata_buffer_test.exe
multi_init_test.exe: grpc_test_util $(OUT_DIR)
echo Building multi_init_test
$(CC) $(CFLAGS) /Fo:$(OUT_DIR)\ ..\..\test\core\surface\multi_init_test.c

@ -111,7 +111,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
@ -256,8 +255,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\compression\algorithm.c">

@ -97,9 +97,6 @@
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -455,9 +452,6 @@
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -95,7 +95,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
@ -200,8 +199,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\compression\algorithm.c">

@ -37,9 +37,6 @@
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\metadata_buffer.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -347,9 +344,6 @@
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\metadata_buffer.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

Loading…
Cancel
Save