diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 3e30d822f4a..12e9f03b41e 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -86,10 +86,6 @@ char *grpc_call_op_string(grpc_call_op *op) { gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); put_metadata_list(&b, op->data.metadata); break; - case GRPC_SEND_START: - gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset); - gpr_strvec_add(&b, tmp); - break; case GRPC_SEND_MESSAGE: gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); break; @@ -115,12 +111,19 @@ char *grpc_call_op_string(grpc_call_op *op) { case GRPC_RECV_FINISH: gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); break; + case GRPC_RECV_SYNTHETIC_STATUS: + gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", op->data.synthetic_status.status, op->data.synthetic_status.message); + gpr_strvec_add(&b, tmp); + break; case GRPC_CANCEL_OP: gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); break; } gpr_asprintf(&tmp, " flags=0x%08x", op->flags); gpr_strvec_add(&b, tmp); + if (op->bind_pollset) { + gpr_strvec_add(&b, gpr_strdup("bind_pollset")); + } out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index cd3496b42bb..3a3a3a75b70 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -183,6 +183,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) { grpc_call_element *next_elem = elem + op->dir; + if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) { + grpc_metadata_batch_assert_ok(&op->data.metadata); + } next_elem->filter->call_op(next_elem, elem, op); } @@ -211,6 +214,7 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { cancel_op.done_cb = do_nothing; cancel_op.user_data = NULL; cancel_op.flags = 0; + cancel_op.bind_pollset = NULL; grpc_call_next_op(cur_elem, &cancel_op); } @@ -221,11 +225,19 @@ void grpc_call_element_send_finish(grpc_call_element *cur_elem) { finish_op.done_cb = do_nothing; finish_op.user_data = NULL; finish_op.flags = 0; + finish_op.bind_pollset = NULL; grpc_call_next_op(cur_elem, &finish_op); } void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message) { - abort(); + grpc_call_op op; + op.type = GRPC_RECV_SYNTHETIC_STATUS; + op.dir = GRPC_CALL_UP; + op.done_cb = do_nothing; + op.user_data = NULL; + op.data.synthetic_status.status = status; + op.data.synthetic_status.message = message; + grpc_call_next_op(cur_elem, &op); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index cc1eab7cf22..addc92b2727 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -62,8 +62,6 @@ typedef struct grpc_call_element grpc_call_element; typedef enum { /* send metadata to the channels peer */ GRPC_SEND_METADATA, - /* start a connection (corresponds to start_invoke/accept) */ - GRPC_SEND_START, /* send a message to the channels peer */ GRPC_SEND_MESSAGE, /* send a pre-formatted message to the channels peer */ @@ -80,6 +78,8 @@ typedef enum { GRPC_RECV_HALF_CLOSE, /* full close was received from the channels peer */ GRPC_RECV_FINISH, + /* a status has been sythesized locally */ + GRPC_RECV_SYNTHETIC_STATUS, /* the call has been abnormally terminated */ GRPC_CANCEL_OP } grpc_call_op_type; @@ -103,13 +103,16 @@ typedef struct { /* Argument data, matching up with grpc_call_op_type names */ union { - struct { - grpc_pollset *pollset; - } start; grpc_byte_buffer *message; grpc_metadata_batch metadata; + struct { + grpc_status_code status; + const char *message; + } synthetic_status; } data; + grpc_pollset *bind_pollset; + /* Must be called when processing of this call-op is complete. Signature chosen to match transport flow control callbacks */ void (*done_cb)(void *user_data, grpc_op_error error); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 018228b4e62..fb984f2878a 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -82,21 +82,16 @@ struct call_data { /* owning element */ grpc_call_element *elem; + gpr_uint8 got_first_send; + call_state state; - grpc_metadata_batch pending_metadata; - gpr_uint32 pending_metadata_flags; gpr_timespec deadline; union { struct { /* our child call stack */ grpc_child_call *child_call; } active; - struct { - void (*on_complete)(void *user_data, grpc_op_error error); - void *on_complete_user_data; - gpr_uint32 start_flags; - grpc_pollset *pollset; - } waiting; + grpc_call_op waiting_op; } s; }; @@ -121,19 +116,9 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); - grpc_call_op mop; GPR_ASSERT(calld->state == CALL_ACTIVE); - /* sending buffered metadata down the stack before the start call */ - mop.type = GRPC_SEND_METADATA; - mop.dir = GRPC_CALL_DOWN; - mop.flags = calld->pending_metadata_flags; - mop.data.metadata = calld->pending_metadata; - mop.done_cb = do_nothing; - mop.user_data = NULL; - child_elem->filter->call_op(child_elem, elem, &mop); - /* continue the start call down the stack, this nees to happen after metadata are flushed*/ child_elem->filter->call_op(child_elem, elem, op); @@ -177,10 +162,7 @@ static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { gpr_realloc(chand->waiting_children, chand->waiting_child_capacity * sizeof(call_data *)); } - calld->s.waiting.on_complete = op->done_cb; - calld->s.waiting.on_complete_user_data = op->user_data; - calld->s.waiting.start_flags = op->flags; - calld->s.waiting.pollset = op->data.start.pollset; + calld->s.waiting_op = *op; chand->waiting_children[chand->waiting_child_count++] = calld; gpr_mu_unlock(&chand->mu); @@ -233,7 +215,7 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); send_up_cancelled_ops(elem); - calld->s.waiting.on_complete(calld->s.waiting.on_complete_user_data, + calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR); return; /* early out */ case CALL_CREATED: @@ -257,12 +239,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, switch (op->type) { case GRPC_SEND_METADATA: - grpc_metadata_batch_merge(&calld->pending_metadata, &op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - break; - case GRPC_SEND_START: - /* filter out the start event to find which child to send on */ - start_rpc(elem, op); + if (!calld->got_first_send) { + /* filter out the start event to find which child to send on */ + calld->got_first_send = 1; + start_rpc(elem, op); + } else { + grpc_call_next_op(elem, op); + } break; case GRPC_CANCEL_OP: cancel_rpc(elem, op); @@ -365,12 +348,6 @@ static void channel_op(grpc_channel_element *elem, } } -static void error_bad_on_complete(void *arg, grpc_op_error error) { - gpr_log(GPR_ERROR, - "Waiting finished but not started? Bad on_complete callback"); - abort(); -} - /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, const void *server_transport_data) { @@ -381,17 +358,13 @@ static void init_call_elem(grpc_call_element *elem, calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->s.waiting.on_complete = error_bad_on_complete; - calld->s.waiting.on_complete_user_data = NULL; - grpc_metadata_batch_init(&calld->pending_metadata); + calld->got_first_send = 0; } /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; - /* if the metadata buffer is not flushed, destroy it here. */ - grpc_metadata_batch_destroy(&calld->pending_metadata); /* if the call got activated, we need to destroy the child stack also, and remove it from the in-flight requests tracked by the child_entry we picked */ @@ -498,13 +471,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { - call_ops[i].type = GRPC_SEND_START; - call_ops[i].dir = GRPC_CALL_DOWN; - call_ops[i].flags = waiting_children[i]->s.waiting.start_flags; - call_ops[i].done_cb = waiting_children[i]->s.waiting.on_complete; - call_ops[i].user_data = - waiting_children[i]->s.waiting.on_complete_user_data; - call_ops[i].data.start.pollset = waiting_children[i]->s.waiting.pollset; + call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 17abba06be9..84d1f3569df 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -119,14 +119,13 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (op->bind_pollset) { + grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); + } + switch (op->type) { case GRPC_SEND_METADATA: grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, - op->user_data); - break; - case GRPC_SEND_START: - grpc_transport_add_to_pollset(chand->transport, op->data.start.pollset); end_bufferable_op(op, chand, calld, 0); break; case GRPC_SEND_MESSAGE: diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 0e50f408b5f..77f204c7a71 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -188,8 +188,11 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_SEND_METADATA: /* If we haven't sent status 200 yet, we need to so so because it needs to come before any non : prefixed metadata. */ - grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, - grpc_mdelem_ref(channeld->status_ok)); + if (!calld->sent_status) { + calld->sent_status = 1; + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, + grpc_mdelem_ref(channeld->status_ok)); + } grpc_call_next_op(elem, op); break; default: diff --git a/src/core/security/auth.c b/src/core/security/auth.c index f61287ad3fa..0b9d566c90d 100644 --- a/src/core/security/auth.c +++ b/src/core/security/auth.c @@ -175,10 +175,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, calld->method = grpc_mdstr_ref(md->value); } } - grpc_call_next_op(elem, op); - break; - - case GRPC_SEND_START: if (calld->host != NULL) { grpc_security_status status; const char *call_host = grpc_mdstr_as_c_string(calld->host); @@ -200,7 +196,6 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, } send_security_metadata(elem, op); break; - default: /* pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 5facaa503d2..0d5fe225d9a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -94,6 +94,8 @@ typedef enum { /* Status came from the application layer overriding whatever the wire says */ STATUS_FROM_API_OVERRIDE = 0, + /* Status was created by some internal channel stack operation */ + STATUS_FROM_CORE, /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, @@ -363,6 +365,7 @@ static void request_more_data(grpc_call *call) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); } @@ -660,15 +663,9 @@ static void enact_send_action(grpc_call *call, send_action sa) { grpc_metadata_batch_link_head(&op.data.metadata, &call->send_initial_metadata[i]); } - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.start.pollset = grpc_cq_pollset(call->cq); op.done_cb = finish_start_step; op.user_data = call; + op.bind_pollset = grpc_cq_pollset(call->cq); grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: @@ -682,6 +679,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.data.message = data.send_message; op.done_cb = finish_write_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: @@ -694,6 +692,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { call, data.send_metadata.count, data.send_metadata.metadata); op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; op.data.metadata.deadline = call->send_deadline; + op.bind_pollset = NULL; /* send status */ /* TODO(ctiller): cache common status values */ data = call->request_data[GRPC_IOREQ_SEND_STATUS]; @@ -723,6 +722,7 @@ static void enact_send_action(grpc_call *call, send_action sa) { op.flags = 0; op.done_cb = finish_finish_step; op.user_data = call; + op.bind_pollset = NULL; grpc_call_execute_op(call, &op); break; } @@ -876,6 +876,7 @@ grpc_call_error grpc_call_cancel(grpc_call *c) { op.flags = 0; op.done_cb = do_nothing; op.user_data = NULL; + op.bind_pollset = NULL; elem = CALL_ELEM_FROM_CALL(c, 0); elem->filter->call_op(elem, NULL, &op); @@ -983,6 +984,14 @@ void grpc_call_recv_message(grpc_call_element *elem, unlock(call); } +void grpc_call_recv_synthetic_status(grpc_call_element *elem, grpc_status_code status, const char *message) { + grpc_call *call = CALL_FROM_TOP_ELEM(elem); + lock(call); + set_status_code(call, STATUS_FROM_CORE, status); + set_status_details(call, STATUS_FROM_CORE, grpc_mdstr_from_string(call->metadata_context, message)); + unlock(call); +} + int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_batch *md) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); @@ -990,6 +999,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_metadata_array *dest; grpc_metadata *mdusr; int is_trailing; + grpc_mdctx *mdctx = call->metadata_context; lock(call); is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; @@ -998,10 +1008,8 @@ int grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdstr *key = md->key; if (key == grpc_channel_get_status_string(call->channel)) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); - grpc_mdelem_unref(md); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - grpc_mdelem_unref(md); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { @@ -1022,6 +1030,7 @@ int grpc_call_recv_metadata(grpc_call_element *elem, sizeof(grpc_mdelem *) * call->owned_metadata_capacity); } call->owned_metadata[call->owned_metadata_count++] = md; + l->md = 0; } } if (gpr_time_cmp(md->deadline, gpr_inf_future) != 0) { @@ -1032,6 +1041,15 @@ int grpc_call_recv_metadata(grpc_call_element *elem, } unlock(call); + grpc_mdctx_lock(mdctx); + for (l = md->list.head; l; l = l->next) { + if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + for (l = md->garbage.head; l; l = l->next) { + grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + } + grpc_mdctx_unlock(mdctx); + return !is_trailing; } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 2848ccc5a13..16de197af11 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -113,6 +113,8 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); +void grpc_call_recv_synthetic_status(grpc_call_element *elem, grpc_status_code status, const char *message); + /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); diff --git a/src/core/surface/client.c b/src/core/surface/client.c index b987c2de5f6..719346689a8 100644 --- a/src/core/surface/client.c +++ b/src/core/surface/client.c @@ -61,6 +61,9 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, case GRPC_RECV_FINISH: grpc_call_stream_closed(elem); break; + case GRPC_RECV_SYNTHETIC_STATUS: + grpc_call_recv_synthetic_status(elem, op->data.synthetic_status.status, op->data.synthetic_status.message); + break; default: GPR_ASSERT(op->dir == GRPC_CALL_DOWN); grpc_call_next_op(elem, op); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 74e9628385c..b7b3d929589 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -51,14 +51,11 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, GRPC_CALL_LOG_OP(GPR_INFO, elem, op); switch (op->type) { - case GRPC_SEND_START: + case GRPC_SEND_METADATA: grpc_call_element_recv_status(elem, GRPC_STATUS_UNKNOWN, "Rpc sent on a lame channel."); grpc_call_stream_closed(elem); break; - case GRPC_SEND_METADATA: - abort(); - break; default: break; } diff --git a/src/core/transport/chttp2/stream_encoder.c b/src/core/transport/chttp2/stream_encoder.c index 8c66171b078..f50d1428abb 100644 --- a/src/core/transport/chttp2/stream_encoder.c +++ b/src/core/transport/chttp2/stream_encoder.c @@ -479,8 +479,9 @@ gpr_uint32 grpc_chttp2_preencode(grpc_stream_op *inops, size_t *inops_count, /* skip */ curop++; break; - case GRPC_OP_FLOW_CTL_CB: case GRPC_OP_METADATA: + grpc_metadata_batch_assert_ok(&op->data.metadata); + case GRPC_OP_FLOW_CTL_CB: /* these just get copied as they don't impact the number of flow controlled bytes */ grpc_sopb_append(outops, op, 1); @@ -527,6 +528,12 @@ exit_loop: *inops_count -= curop; memmove(inops, inops + curop, *inops_count * sizeof(grpc_stream_op)); + for (curop = 0; curop < *inops_count; curop++) { + if (inops[curop].type == GRPC_OP_METADATA) { + grpc_metadata_batch_assert_ok(&inops[curop].data.metadata); + } + } + return flow_controlled_bytes_taken; } diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index 1a8b4174ff1..9589599257f 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -33,11 +33,11 @@ #include "src/core/transport/stream_op.h" +#include + #include #include -#include - /* Exponential growth function: Given x, return a larger x. Currently we grow by 1.5 times upon reallocation. */ #define GROW(x) (3 * (x) / 2) @@ -91,19 +91,32 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops) { } } +static void assert_contained_metadata_ok(grpc_stream_op *ops, size_t nops) { + size_t i; + for (i = 0; i < nops; i++) { + if (ops[i].type == GRPC_OP_METADATA) { + grpc_metadata_batch_assert_ok(&ops[i].data.metadata); + } + } +} + static void expandto(grpc_stream_op_buffer *sopb, size_t new_capacity) { sopb->capacity = new_capacity; + assert_contained_metadata_ok(sopb->ops, sopb->nops); if (sopb->ops == sopb->inlined_ops) { sopb->ops = gpr_malloc(sizeof(grpc_stream_op) * new_capacity); memcpy(sopb->ops, sopb->inlined_ops, sopb->nops * sizeof(grpc_stream_op)); } else { sopb->ops = gpr_realloc(sopb->ops, sizeof(grpc_stream_op) * new_capacity); } + assert_contained_metadata_ok(sopb->ops, sopb->nops); } static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { grpc_stream_op *out; + assert_contained_metadata_ok(sopb->ops, sopb->nops); + if (sopb->nops == sopb->capacity) { expandto(sopb, GROW(sopb->capacity)); } @@ -114,6 +127,7 @@ static grpc_stream_op *add(grpc_stream_op_buffer *sopb) { void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb) { add(sopb)->type = GRPC_NO_OP; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, @@ -122,6 +136,7 @@ void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, op->type = GRPC_OP_BEGIN_MESSAGE; op->data.begin_message.length = length; op->data.begin_message.flags = flags; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) { @@ -129,12 +144,15 @@ void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch b) grpc_metadata_batch_assert_ok(&b); op->type = GRPC_OP_METADATA; op->data.metadata = b; + grpc_metadata_batch_assert_ok(&op->data.metadata); + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice) { grpc_stream_op *op = add(sopb); op->type = GRPC_OP_SLICE; op->data.slice = slice; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, @@ -144,6 +162,7 @@ void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, op->type = GRPC_OP_FLOW_CTL_CB; op->data.flow_ctl_cb.cb = cb; op->data.flow_ctl_cb.arg = arg; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, @@ -151,12 +170,15 @@ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t orig_nops = sopb->nops; size_t new_nops = orig_nops + nops; + assert_contained_metadata_ok(ops, nops); + assert_contained_metadata_ok(sopb->ops, sopb->nops); if (new_nops > sopb->capacity) { expandto(sopb, GPR_MAX(GROW(sopb->capacity), new_nops)); } memcpy(sopb->ops + orig_nops, ops, sizeof(grpc_stream_op) * nops); sopb->nops = new_nops; + assert_contained_metadata_ok(sopb->ops, sopb->nops); } @@ -183,13 +205,19 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd) { assert_valid_list(&comd->garbage); } -void grpc_metadata_batch_init(grpc_metadata_batch *comd) { abort(); } - -void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { abort(); } +void grpc_metadata_batch_init(grpc_metadata_batch *comd) { + comd->list.head = comd->list.tail = comd->garbage.head = comd->garbage.tail = NULL; + comd->deadline = gpr_inf_future; +} -void grpc_metadata_batch_merge(grpc_metadata_batch *target, - grpc_metadata_batch *add) { - abort(); +void grpc_metadata_batch_destroy(grpc_metadata_batch *comd) { + grpc_linked_mdelem *l; + for (l = comd->list.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } + for (l = comd->garbage.head; l; l = l->next) { + grpc_mdelem_unref(l->md); + } } void grpc_metadata_batch_add_head(grpc_metadata_batch *comd, @@ -246,6 +274,20 @@ void grpc_metadata_batch_link_tail(grpc_metadata_batch *comd, link_tail(&comd->list, storage); } +void grpc_metadata_batch_merge(grpc_metadata_batch *target, + grpc_metadata_batch *add) { + grpc_linked_mdelem *l; + grpc_linked_mdelem *next; + for (l = add->list.head; l; l = next) { + next = l->next; + link_tail(&target->list, l); + } + for (l = add->garbage.head; l; l = next) { + next = l->next; + link_tail(&target->garbage, l); + } +} + void grpc_metadata_batch_filter(grpc_metadata_batch *comd, grpc_mdelem *(*filter)(void *user_data, grpc_mdelem *elem),