Merge pull request #15709 from markdroth/recv_trailing_metadata_ready2

Second attempt: move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops
reviewable/pr13368/r11
Mark D. Roth 7 years ago committed by GitHub
commit 6c25d604c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 891
      src/core/ext/filters/client_channel/client_channel.cc
  2. 28
      src/core/ext/filters/deadline/deadline_filter.cc
  3. 10
      src/core/ext/filters/deadline/deadline_filter.h
  4. 19
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 35
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  6. 19
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  7. 52
      src/core/ext/transport/inproc/inproc_transport.cc
  8. 9
      src/core/lib/channel/connected_channel.cc
  9. 2
      src/core/lib/gprpp/inlined_vector.h
  10. 80
      src/core/lib/iomgr/call_combiner.h
  11. 5
      src/core/lib/iomgr/closure.h
  12. 81
      src/core/lib/surface/call.cc
  13. 29
      src/core/lib/transport/transport.cc
  14. 22
      src/core/lib/transport/transport.h
  15. 7
      src/core/lib/transport/transport_op_string.cc
  16. 2
      test/core/gprpp/inlined_vector_test.cc
  17. 24
      test/cpp/microbenchmarks/bm_call_create.cc

File diff suppressed because it is too large Load Diff

@ -128,21 +128,25 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
} }
} }
// Callback run when the call is complete. // Callback run when we receive trailing metadata.
static void on_complete(void* arg, grpc_error* error) { static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg); grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state); cancel_timer_if_needed(deadline_state);
// Invoke the next callback. // Invoke the original callback.
GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error)); GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
} }
// Inject our own on_complete callback into op. // Inject our own recv_trailing_metadata_ready callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state, static void inject_recv_trailing_metadata_ready(
grpc_transport_stream_op_batch* op) { grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
deadline_state->next_on_complete = op->on_complete; deadline_state->original_recv_trailing_metadata_ready =
GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
op->on_complete = &deadline_state->on_complete; op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&deadline_state->recv_trailing_metadata_ready;
} }
// Callback and associated state for starting the timer after call stack // Callback and associated state for starting the timer after call stack
@ -226,7 +230,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch(
// Make sure we know when the call is complete, so that we can cancel // Make sure we know when the call is complete, so that we can cancel
// the timer. // the timer.
if (op->recv_trailing_metadata) { if (op->recv_trailing_metadata) {
inject_on_complete_cb(deadline_state, op); inject_recv_trailing_metadata_ready(deadline_state, op);
} }
} }
} }
@ -322,7 +326,7 @@ static void server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the // the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side. // hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) { if (op->recv_trailing_metadata) {
inject_on_complete_cb(&calld->base.deadline_state, op); inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
} }
} }
// Chain to next filter. // Chain to next filter.

@ -37,12 +37,12 @@ typedef struct grpc_deadline_state {
grpc_deadline_timer_state timer_state; grpc_deadline_timer_state timer_state;
grpc_timer timer; grpc_timer timer;
grpc_closure timer_callback; grpc_closure timer_callback;
// Closure to invoke when the call is complete. // Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer. // We use this to cancel the timer.
grpc_closure on_complete; grpc_closure recv_trailing_metadata_ready;
// The original on_complete closure, which we chain to after our own // The original recv_trailing_metadata_ready closure, which we chain to
// closure is invoked. // after our own closure is invoked.
grpc_closure* next_on_complete; grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state; } grpc_deadline_state;
// //

@ -55,8 +55,8 @@ struct call_data {
grpc_closure recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops. // State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata; grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* original_recv_trailing_metadata_on_complete; grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure recv_trailing_metadata_on_complete; grpc_closure recv_trailing_metadata_ready;
// State for handling send_message ops. // State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch; grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read; size_t send_message_bytes_read;
@ -153,8 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
} }
static void recv_trailing_metadata_on_complete(void* user_data, static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data); grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
@ -163,7 +162,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
} else { } else {
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
} }
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error); GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
} }
static void send_message_on_complete(void* arg, grpc_error* error) { static void send_message_on_complete(void* arg, grpc_error* error) {
@ -312,8 +311,10 @@ static void hc_start_transport_stream_op_batch(
/* substitute our callback for the higher callback */ /* substitute our callback for the higher callback */
calld->recv_trailing_metadata = calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata; batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata_on_complete = batch->on_complete; calld->original_recv_trailing_metadata_ready =
batch->on_complete = &calld->recv_trailing_metadata_on_complete; batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
} }
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
@ -420,8 +421,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem, recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_on_complete, elem, recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx); elem, grpc_schedule_on_exec_ctx);

@ -1149,12 +1149,10 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
} }
} }
/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so /* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got we should not complete this closure until we can prove that the write got
scheduled */ scheduled */
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1) #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low /* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */ bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@ -1206,10 +1204,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error); grpc_error_add_child(closure->error_data.error, error);
} }
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = nullptr;
}
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error); GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@ -1351,9 +1345,14 @@ static void perform_stream_op_locked(void* stream_op,
} }
grpc_closure* on_complete = op->on_complete; grpc_closure* on_complete = op->on_complete;
// TODO(roth): This is a hack needed because we use data inside of the
// closure itself to do the barrier calculation (i.e., to ensure that
// we don't schedule the closure until all ops in the batch have been
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) { if (on_complete == nullptr) {
on_complete = on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx); nullptr, grpc_schedule_on_exec_ctx);
} }
/* use final_data as a barrier until enqueue time; the inital counter is /* use final_data as a barrier until enqueue time; the inital counter is
@ -1361,12 +1360,6 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE; on_complete->error_data.error = GRPC_ERROR_NONE;
if (op->collect_stats) {
GPR_ASSERT(s->collecting_stats == nullptr);
s->collecting_stats = op_payload->collect_stats.collect_stats;
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
if (op->cancel_stream) { if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL(); GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@ -1600,8 +1593,11 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_trailing_metadata) { if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(); GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
GPR_ASSERT(s->collecting_stats == nullptr);
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata_finished =
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata = s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata; op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true; s->final_metadata_requested = true;
@ -1960,11 +1956,12 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
} }
if (s->read_closed && s->frame_storage.length == 0 && !pending_data && if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) { s->recv_trailing_metadata_finished != nullptr) {
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata); s->recv_trailing_metadata);
grpc_chttp2_complete_closure_step( null_then_run_closure(&s->recv_trailing_metadata_finished,
t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, GRPC_ERROR_NONE);
"recv_trailing_metadata_finished");
} }
} }
} }

@ -925,6 +925,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
result = false; result = false;
} }
/* Check if every op that was asked for is done. */ /* Check if every op that was asked for is done. */
/* TODO(muxi): We should not consider the recv ops here, since they
* have their own callbacks. We should invoke a batch's on_complete
* as soon as all of the batch's send ops are complete, even if
* there are still recv ops pending. */
else if (curr_op->send_initial_metadata && else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) { !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because"); CRONET_LOG(GPR_DEBUG, "Because");
@ -1280,12 +1284,20 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state, op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) { OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) { grpc_error* error = GRPC_ERROR_NONE;
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
error = GRPC_ERROR_REF(stream_state->cancel_error);
} else if (stream_state->state_op_done[OP_FAILED]) {
error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
} else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata, &oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false; stream_state->rs.trailing_metadata_valid = false;
} }
GRPC_CLOSURE_SCHED(
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK; result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream && } else if (stream_op->cancel_stream &&
@ -1398,6 +1410,11 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED); GRPC_ERROR_CANCELLED);
} }
if (op->recv_trailing_metadata) {
GRPC_CLOSURE_SCHED(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_CANCELLED);
}
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return; return;
} }

@ -120,7 +120,6 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next; struct inproc_stream* stream_list_next;
} inproc_stream; } inproc_stream;
static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error); static void op_state_machine(void* arg, grpc_error* error);
@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) { const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op); int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op); int is_stm = static_cast<int>(op == s->send_trailing_md_op);
// TODO(vjpai): We should not consider the recv ops here, since they
// have their own callbacks. We should invoke a batch's on_complete
// as soon as all of the batch's send ops are complete, even if there
// are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op); int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op); int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op); int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr; s->send_trailing_md_op = nullptr;
} }
if (s->recv_trailing_md_op) { if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p", INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error); s, error);
complete_if_batch_end_locked( complete_if_batch_end_locked(
@ -638,6 +646,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
} }
s->trailing_md_sent = true; s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready", s);
GRPC_CLOSURE_SCHED(
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
INPROC_LOG(GPR_INFO, INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s); "op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
} }
if (s->recv_trailing_md_op && s->t->is_client && other && if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) { other->send_message_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready %p", s,
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
} }
if (s->to_read_trailing_md_filled) { if (s->to_read_trailing_md_filled) {
@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO, INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p", "op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err); s, new_err);
GRPC_CLOSURE_SCHED(
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err)); GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr; s->recv_trailing_md_op = nullptr;
@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing // couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance // md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked( complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op, s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete"); "cancel_stream scheduling trailing-md-on-complete");
@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret; return ret;
} }
static void do_nothing(void* arg, grpc_error* error) {}
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) { grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
} }
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete; grpc_closure* on_complete = op->on_complete;
// TODO(roth): This is a hack needed because we use data inside of the
// closure itself to do the barrier calculation (i.e., to ensure that
// we don't schedule the closure until all ops in the batch have been
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) { if (on_complete == nullptr) {
on_complete = &do_nothing_closure; on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
nullptr, grpc_schedule_on_exec_ctx);
} }
if (op->cancel_stream) { if (op->cancel_stream) {
@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
if (op->recv_trailing_metadata) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
} }
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s, INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error); error);
@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/******************************************************************************* /*******************************************************************************
* GLOBAL INIT AND DESTROY * GLOBAL INIT AND DESTROY
*/ */
static void do_nothing(void* arg, grpc_error* error) {}
void grpc_inproc_transport_init(void) { void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path"); grpc_slice key_tmp = grpc_slice_from_static_string(":path");

@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches. callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready; callback_state recv_initial_metadata_ready;
callback_state recv_message_ready; callback_state recv_message_ready;
callback_state recv_trailing_metadata_ready;
} call_data; } call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) { static void run_in_call_combiner(void* arg, grpc_error* error) {
@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready", intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready); &batch->payload->recv_message.recv_message_ready);
} }
if (batch->recv_trailing_metadata) {
callback_state* state = &calld->recv_trailing_metadata_ready;
intercept_callback(
calld, state, false, "recv_trailing_metadata_ready",
&batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
}
if (batch->cancel_stream) { if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any // There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into // given time, so we can't just pick out a fixed index into
@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state))); static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)", intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete); &batch->on_complete);
} else { } else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch); callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete); intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
} }

@ -99,6 +99,8 @@ class InlinedVector {
void push_back(T&& value) { emplace_back(std::move(value)); } void push_back(T&& value) { emplace_back(std::move(value)); }
size_t size() const { return size_; } size_t size() const { return size_; }
bool empty() const { return size_ == 0; }
size_t capacity() const { return capacity_; } size_t capacity() const { return capacity_; }
void clear() { void clear() {

@ -26,6 +26,7 @@
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h" #include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a // A simple, lock-free mechanism for serializing activity related to a
@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error); grpc_error* error);
namespace grpc_core {
// Helper for running a list of closures in a call combiner.
//
// Each callback running in the call combiner will eventually be
// returned to the surface, at which point the surface will yield the
// call combiner. So when we are running in the call combiner and have
// more than one callback to return to the surface, we need to re-enter
// the call combiner for all but one of those callbacks.
class CallCombinerClosureList {
public:
CallCombinerClosureList() {}
// Adds a closure to the list. The closure must eventually result in
// the call combiner being yielded.
void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
closures_.emplace_back(closure, error, reason);
}
// Runs all closures in the call combiner and yields the call combiner.
//
// All but one of the closures in the list will be scheduled via
// GRPC_CALL_COMBINER_START(), and the remaining closure will be
// scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
// yielding the call combiner. If the list is empty, then the call
// combiner will be yielded immediately.
void RunClosures(grpc_call_combiner* call_combiner) {
if (closures_.empty()) {
GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
return;
}
for (size_t i = 1; i < closures_.size(); ++i) {
auto& closure = closures_[i];
GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
closure.reason);
}
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"CallCombinerClosureList executing closure while already "
"holding call_combiner %p: closure=%p error=%s reason=%s",
call_combiner, closures_[0].closure,
grpc_error_string(closures_[0].error), closures_[0].reason);
}
// This will release the call combiner.
GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
closures_.clear();
}
// Runs all closures in the call combiner, but does NOT yield the call
// combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
for (size_t i = 0; i < closures_.size(); ++i) {
auto& closure = closures_[i];
GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
closure.reason);
}
closures_.clear();
}
size_t size() const { return closures_.size(); }
private:
struct CallCombinerClosure {
grpc_closure* closure;
grpc_error* error;
const char* reason;
CallCombinerClosure(grpc_closure* closure, grpc_error* error,
const char* reason)
: closure(closure), error(error), reason(reason) {}
};
// There are generally a maximum of 6 closures to run in the call
// combiner, one for each pending op.
InlinedVector<CallCombinerClosure, 6> closures_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */

@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) { if (c->scheduled) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], " "Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s", "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
"run?: %s",
c, c->file_created, c->line_created, c->file_initiated, c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false"); c->line_initiated, file, line, c->run ? "true" : "false");
abort(); abort();
} }
c->scheduled = true; c->scheduled = true;

@ -233,6 +233,7 @@ struct grpc_call {
grpc_closure receiving_slice_ready; grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready; grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready; grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags; uint32_t test_only_last_message_flags;
grpc_closure release_call; grpc_closure release_call;
@ -270,8 +271,17 @@ struct grpc_call {
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression"); grpc_core::TraceFlag grpc_compression_trace(false, "compression");
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1)) /* Given a size, round up to the next multiple of sizeof(void*) */
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1) #define ROUND_UP_TO_ALIGNMENT_SIZE(x) \
(((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u))
#define CALL_STACK_FROM_CALL(call) \
(grpc_call_stack*)((char*)(call) + \
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
#define CALL_FROM_CALL_STACK(call_stack) \
(grpc_call*)(((char*)(call_stack)) - \
ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
#define CALL_ELEM_FROM_CALL(call, idx) \ #define CALL_ELEM_FROM_CALL(call, idx) \
grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
#define CALL_FROM_TOP_ELEM(top_elem) \ #define CALL_FROM_TOP_ELEM(top_elem) \
@ -342,8 +352,9 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size); GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
gpr_arena* arena = gpr_arena_create(initial_size); gpr_arena* arena = gpr_arena_create(initial_size);
call = static_cast<grpc_call*>(gpr_arena_alloc( call = static_cast<grpc_call*>(
arena, sizeof(grpc_call) + channel_stack->call_stack_size)); gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1); gpr_ref_init(&call->ext_ref, 1);
call->arena = arena; call->arena = arena;
grpc_call_combiner_init(&call->call_combiner); grpc_call_combiner_init(&call->call_combiner);
@ -1209,7 +1220,6 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) { if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy( grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
} }
if (bctl->op.send_message) { if (bctl->op.send_message) {
@ -1217,14 +1227,9 @@ static void post_batch_completion(batch_control* bctl) {
} }
if (bctl->op.send_trailing_metadata) { if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy( grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
} }
if (bctl->op.recv_trailing_metadata) { if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
/* propagate cancellation to any interested children */ /* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1); gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call); parent_call* pc = get_parent_call(call);
@ -1246,7 +1251,6 @@ static void post_batch_completion(batch_control* bctl) {
} }
gpr_mu_unlock(&pc->child_list_mu); gpr_mu_unlock(&pc->child_list_mu);
} }
if (call->is_client) { if (call->is_client) {
get_final_status(call, set_status_value_directly, get_final_status(call, set_status_value_directly,
call->final_op.client.status, call->final_op.client.status,
@ -1256,7 +1260,6 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value, get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr); call->final_op.server.cancelled, nullptr, nullptr);
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE; error = GRPC_ERROR_NONE;
} }
@ -1538,6 +1541,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl); finish_batch_step(bctl);
} }
static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
}
finish_batch_step(bctl);
}
static void finish_batch(void* bctlp, grpc_error* error) { static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp); batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call; grpc_call* call = bctl->call;
@ -1558,7 +1574,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i; size_t i;
const grpc_op* op; const grpc_op* op;
batch_control* bctl; batch_control* bctl;
int num_completion_callbacks_needed = 1; bool has_send_ops = false;
int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK; grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload; grpc_transport_stream_op_batch_payload* stream_op_payload;
@ -1664,6 +1681,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string = stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string; &call->peer_string;
} }
has_send_ops = true;
break; break;
} }
case GRPC_OP_SEND_MESSAGE: { case GRPC_OP_SEND_MESSAGE: {
@ -1693,6 +1711,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags); &op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset( stream_op_payload->send_message.send_message.reset(
call->sending_stream.get()); call->sending_stream.get());
has_send_ops = true;
break; break;
} }
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@ -1713,6 +1732,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true; call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata = stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break; break;
} }
case GRPC_OP_SEND_STATUS_FROM_SERVER: { case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@ -1777,6 +1797,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
} }
stream_op_payload->send_trailing_metadata.send_trailing_metadata = stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break; break;
} }
case GRPC_OP_RECV_INITIAL_METADATA: { case GRPC_OP_RECV_INITIAL_METADATA: {
@ -1804,7 +1825,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string = stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string; &call->peer_string;
} }
num_completion_callbacks_needed++; ++num_recv_ops;
break; break;
} }
case GRPC_OP_RECV_MESSAGE: { case GRPC_OP_RECV_MESSAGE: {
@ -1826,7 +1847,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready = stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready; &call->receiving_stream_ready;
num_completion_callbacks_needed++; ++num_recv_ops;
break; break;
} }
case GRPC_OP_RECV_STATUS_ON_CLIENT: { case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@ -1852,11 +1873,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string = call->final_op.client.error_string =
op->data.recv_status_on_client.error_string; op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true; stream_op->recv_trailing_metadata = true;
stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->collect_stats.collect_stats = stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats; &call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break; break;
} }
case GRPC_OP_RECV_CLOSE_ON_SERVER: { case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@ -1877,11 +1903,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled = call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled; op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true; stream_op->recv_trailing_metadata = true;
stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->collect_stats.collect_stats = stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats; &call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break; break;
} }
} }
@ -1891,13 +1922,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) { if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
} }
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, if (has_send_ops) {
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
stream_op->on_complete = &bctl->finish_batch; grpc_schedule_on_exec_ctx);
gpr_atm_rel_store(&call->any_ops_sent_atm, 1); stream_op->on_complete = &bctl->finish_batch;
}
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch); execute_batch(call, stream_op, &bctl->start_batch);
done: done:

@ -212,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) { if (batch->send_message) {
batch->payload->send_message.send_message.reset(); batch->payload->send_message.send_message.reset();
} }
if (batch->recv_message) { if (batch->cancel_stream) {
GRPC_CALL_COMBINER_START( GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
call_combiner, batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error), "failing recv_message_ready");
} }
// Construct a list of closures to execute.
grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) { if (batch->recv_initial_metadata) {
GRPC_CALL_COMBINER_START( closures.Add(
call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready, batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
} }
GRPC_CLOSURE_SCHED(batch->on_complete, error); if (batch->recv_message) {
if (batch->cancel_stream) { closures.Add(batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); GRPC_ERROR_REF(error), "failing recv_message_ready");
}
if (batch->recv_trailing_metadata) {
closures.Add(
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
}
if (batch->on_complete != nullptr) {
closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
"failing on_complete");
} }
// Execute closures.
closures.RunClosures(call_combiner);
GRPC_ERROR_UNREF(error);
} }
typedef struct { typedef struct {

@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport /* Transport stream op: a set of operations to perform on a transport
against a single stream */ against a single stream */
typedef struct grpc_transport_stream_op_batch { typedef struct grpc_transport_stream_op_batch {
/** Should be enqueued when all requested operations (excluding recv_message /** Should be scheduled when all of the non-recv operations in the batch
and recv_initial_metadata which have their own closures) in a given batch are complete.
have been completed. */
The recv ops (recv_initial_metadata, recv_message, and
recv_trailing_metadata) each have their own callbacks. If a batch
contains both recv ops and non-recv ops, on_complete should be
scheduled as soon as the non-recv ops are complete, regardless of
whether or not the recv ops are complete. If a batch contains
only recv ops, on_complete can be null. */
grpc_closure* on_complete; grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */ /** Values for the stream op (fields set are determined by flags above) */
@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch {
*/ */
bool recv_trailing_metadata : 1; bool recv_trailing_metadata : 1;
/** Collect any stats into provided buffer, zero internal stat counters */
bool collect_stats : 1;
/** Cancel this stream with the provided error */ /** Cancel this stream with the provided error */
bool cancel_stream : 1; bool cancel_stream : 1;
@ -219,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload {
struct { struct {
grpc_metadata_batch* recv_trailing_metadata; grpc_metadata_batch* recv_trailing_metadata;
} recv_trailing_metadata;
struct {
grpc_transport_stream_stats* collect_stats; grpc_transport_stream_stats* collect_stats;
} collect_stats; /** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure* recv_trailing_metadata_ready;
} recv_trailing_metadata;
/** Forcefully close this stream. /** Forcefully close this stream.
The HTTP2 semantics should be: The HTTP2 semantics should be:

@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp); gpr_strvec_add(&b, tmp);
} }
if (op->collect_stats) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_asprintf(&tmp, "COLLECT_STATS:%p",
op->payload->collect_stats.collect_stats);
gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, nullptr); out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b); gpr_strvec_destroy(&b);

@ -27,10 +27,12 @@ namespace testing {
TEST(InlinedVectorTest, CreateAndIterate) { TEST(InlinedVectorTest, CreateAndIterate) {
const int kNumElements = 9; const int kNumElements = 9;
InlinedVector<int, 2> v; InlinedVector<int, 2> v;
EXPECT_TRUE(v.empty());
for (int i = 0; i < kNumElements; ++i) { for (int i = 0; i < kNumElements; ++i) {
v.push_back(i); v.push_back(i);
} }
EXPECT_EQ(static_cast<size_t>(kNumElements), v.size()); EXPECT_EQ(static_cast<size_t>(kNumElements), v.size());
EXPECT_FALSE(v.empty());
for (int i = 0; i < kNumElements; ++i) { for (int i = 0; i < kNumElements; ++i) {
EXPECT_EQ(i, v[i]); EXPECT_EQ(i, v[i]);
EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation. EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation.

@ -621,18 +621,26 @@ typedef struct {
static void StartTransportStreamOp(grpc_call_element* elem, static void StartTransportStreamOp(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) { grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data); call_data* calld = static_cast<call_data*>(elem->call_data);
// Construct list of closures to return.
grpc_core::CallCombinerClosureList closures;
if (op->recv_initial_metadata) { if (op->recv_initial_metadata) {
GRPC_CALL_COMBINER_START( closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
calld->call_combiner, GRPC_ERROR_NONE, "recv_initial_metadata");
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE, "recv_initial_metadata");
} }
if (op->recv_message) { if (op->recv_message) {
GRPC_CALL_COMBINER_START(calld->call_combiner, closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
op->payload->recv_message.recv_message_ready, "recv_message");
GRPC_ERROR_NONE, "recv_message");
} }
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE); if (op->recv_trailing_metadata) {
closures.Add(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_NONE, "recv_trailing_metadata");
}
if (op->on_complete != nullptr) {
closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
}
// Execute closures.
closures.RunClosures(calld->call_combiner);
} }
static void StartTransportOp(grpc_channel_element* elem, static void StartTransportOp(grpc_channel_element* elem,

Loading…
Cancel
Save