Merge pull request #15539 from markdroth/recv_trailing_metadata_ready

move recv_trailing_metadata into its own callback, don't use on_complete for recv_ops
pull/15690/head
Mark D. Roth 7 years ago committed by GitHub
commit 5fd74922ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 874
      src/core/ext/filters/client_channel/client_channel.cc
  2. 28
      src/core/ext/filters/deadline/deadline_filter.cc
  3. 10
      src/core/ext/filters/deadline/deadline_filter.h
  4. 19
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 35
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  6. 19
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  7. 52
      src/core/ext/transport/inproc/inproc_transport.cc
  8. 9
      src/core/lib/channel/connected_channel.cc
  9. 80
      src/core/lib/iomgr/call_combiner.h
  10. 5
      src/core/lib/iomgr/closure.h
  11. 63
      src/core/lib/surface/call.cc
  12. 29
      src/core/lib/transport/transport.cc
  13. 22
      src/core/lib/transport/transport.h
  14. 7
      src/core/lib/transport/transport_op_string.cc
  15. 24
      test/cpp/microbenchmarks/bm_call_create.cc

File diff suppressed because it is too large Load Diff

@ -128,21 +128,25 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
}
}
// Callback run when the call is complete.
static void on_complete(void* arg, grpc_error* error) {
// Callback run when we receive trailing metadata.
static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
// Invoke the next callback.
GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error));
// Invoke the original callback.
GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Inject our own on_complete callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op_batch* op) {
deadline_state->next_on_complete = op->on_complete;
GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state,
// Inject our own recv_trailing_metadata_ready callback into op.
static void inject_recv_trailing_metadata_ready(
grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
deadline_state->original_recv_trailing_metadata_ready =
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, deadline_state,
grpc_schedule_on_exec_ctx);
op->on_complete = &deadline_state->on_complete;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&deadline_state->recv_trailing_metadata_ready;
}
// Callback and associated state for starting the timer after call stack
@ -226,7 +230,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch(
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata) {
inject_on_complete_cb(deadline_state, op);
inject_recv_trailing_metadata_ready(deadline_state, op);
}
}
}
@ -323,7 +327,7 @@ static void server_start_transport_stream_op_batch(
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata) {
inject_on_complete_cb(&calld->base.deadline_state, op);
inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op);
}
}
// Chain to next filter.

@ -37,12 +37,12 @@ typedef struct grpc_deadline_state {
grpc_deadline_timer_state timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
// Closure to invoke when we receive trailing metadata.
// We use this to cancel the timer.
grpc_closure on_complete;
// The original on_complete closure, which we chain to after our own
// closure is invoked.
grpc_closure* next_on_complete;
grpc_closure recv_trailing_metadata_ready;
// The original recv_trailing_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* original_recv_trailing_metadata_ready;
} grpc_deadline_state;
//

@ -55,8 +55,8 @@ struct call_data {
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* original_recv_trailing_metadata_on_complete;
grpc_closure recv_trailing_metadata_on_complete;
grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure recv_trailing_metadata_ready;
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
@ -153,8 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
}
static void recv_trailing_metadata_on_complete(void* user_data,
grpc_error* error) {
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
@ -163,7 +162,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
} else {
GRPC_ERROR_REF(error);
}
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error);
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
@ -312,8 +311,10 @@ static void hc_start_transport_stream_op_batch(
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
batch->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
batch->on_complete = &calld->recv_trailing_metadata_on_complete;
calld->original_recv_trailing_metadata_ready =
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&calld->recv_trailing_metadata_ready;
}
grpc_error* error = GRPC_ERROR_NONE;
@ -420,8 +421,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
recv_trailing_metadata_on_complete, elem,
GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
recv_trailing_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);

@ -1149,12 +1149,10 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
}
}
/* Flag that this closure barrier wants stats to be updated before finishing */
#define CLOSURE_BARRIER_STATS_BIT (1 << 0)
/* Flag that this closure barrier may be covering a write in a pollset, and so
we should not complete this closure until we can prove that the write got
scheduled */
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1)
#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
/* First bit of the reference count, stored in the high order bits (with the low
bits being used for flags defined above) */
#define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
@ -1206,10 +1204,6 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
grpc_error_add_child(closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) {
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = nullptr;
}
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
GRPC_CLOSURE_RUN(closure, closure->error_data.error);
@ -1351,9 +1345,14 @@ static void perform_stream_op_locked(void* stream_op,
}
grpc_closure* on_complete = op->on_complete;
// TODO(roth): This is a hack needed because we use data inside of the
// closure itself to do the barrier calculation (i.e., to ensure that
// we don't schedule the closure until all ops in the batch have been
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
on_complete =
GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx);
on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
nullptr, grpc_schedule_on_exec_ctx);
}
/* use final_data as a barrier until enqueue time; the inital counter is
@ -1361,12 +1360,6 @@ static void perform_stream_op_locked(void* stream_op,
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
if (op->collect_stats) {
GPR_ASSERT(s->collecting_stats == nullptr);
s->collecting_stats = op_payload->collect_stats.collect_stats;
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
if (op->cancel_stream) {
GRPC_STATS_INC_HTTP2_OP_CANCEL();
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
@ -1600,8 +1593,11 @@ static void perform_stream_op_locked(void* stream_op,
if (op->recv_trailing_metadata) {
GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
GPR_ASSERT(s->collecting_stats == nullptr);
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
s->recv_trailing_metadata_finished = add_closure_barrier(on_complete);
s->recv_trailing_metadata_finished =
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
@ -1960,11 +1956,12 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
if (s->read_closed && s->frame_storage.length == 0 && !pending_data &&
s->recv_trailing_metadata_finished != nullptr) {
grpc_transport_move_stats(&s->stats, s->collecting_stats);
s->collecting_stats = nullptr;
grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1],
s->recv_trailing_metadata);
grpc_chttp2_complete_closure_step(
t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE,
"recv_trailing_metadata_finished");
null_then_run_closure(&s->recv_trailing_metadata_finished,
GRPC_ERROR_NONE);
}
}
}

@ -925,6 +925,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
result = false;
}
/* Check if every op that was asked for is done. */
/* TODO(muxi): We should not consider the recv ops here, since they
* have their own callbacks. We should invoke a batch's on_complete
* as soon as all of the batch's send ops are complete, even if
* there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
@ -1280,12 +1284,20 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
op_can_be_run(stream_op, s, &oas->state,
OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_error* error = GRPC_ERROR_NONE;
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
error = GRPC_ERROR_REF(stream_state->cancel_error);
} else if (stream_state->state_op_done[OP_FAILED]) {
error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
} else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.trailing_metadata,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
GRPC_CLOSURE_SCHED(
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_op->cancel_stream &&
@ -1398,6 +1410,11 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_trailing_metadata) {
GRPC_CLOSURE_SCHED(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_CANCELLED);
}
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
return;
}

@ -120,7 +120,6 @@ typedef struct inproc_stream {
struct inproc_stream* stream_list_next;
} inproc_stream;
static grpc_closure do_nothing_closure;
static bool cancel_stream_locked(inproc_stream* s, grpc_error* error);
static void op_state_machine(void* arg, grpc_error* error);
@ -373,6 +372,10 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
const char* msg) {
int is_sm = static_cast<int>(op == s->send_message_op);
int is_stm = static_cast<int>(op == s->send_trailing_md_op);
// TODO(vjpai): We should not consider the recv ops here, since they
// have their own callbacks. We should invoke a batch's on_complete
// as soon as all of the batch's send ops are complete, even if there
// are still recv ops pending.
int is_rim = static_cast<int>(op == s->recv_initial_md_op);
int is_rm = static_cast<int>(op == s->recv_message_op);
int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
@ -496,6 +499,11 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->send_trailing_md_op = nullptr;
}
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@ -638,6 +646,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready", s);
GRPC_CLOSURE_SCHED(
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
@ -711,6 +725,12 @@ static void op_state_machine(void* arg, grpc_error* error) {
}
if (s->recv_trailing_md_op && s->t->is_client && other &&
other->send_message_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready %p", s,
GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE);
}
if (s->to_read_trailing_md_filled) {
@ -766,6 +786,10 @@ static void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
GRPC_CLOSURE_SCHED(
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
@ -859,6 +883,9 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@ -873,6 +900,8 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
return ret;
}
static void do_nothing(void* arg, grpc_error* error) {}
static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
@ -892,8 +921,14 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
// TODO(roth): This is a hack needed because we use data inside of the
// closure itself to do the barrier calculation (i.e., to ensure that
// we don't schedule the closure until all ops in the batch have been
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
on_complete = &do_nothing_closure;
on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
@ -1026,6 +1061,15 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_trailing_metadata) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
@ -1129,12 +1173,8 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; }
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
static void do_nothing(void* arg, grpc_error* error) {}
void grpc_inproc_transport_init(void) {
grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr,
grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");

@ -51,6 +51,7 @@ typedef struct connected_channel_call_data {
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;
callback_state recv_message_ready;
callback_state recv_trailing_metadata_ready;
} call_data;
static void run_in_call_combiner(void* arg, grpc_error* error) {
@ -111,6 +112,12 @@ static void con_start_transport_stream_op_batch(
intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
if (batch->recv_trailing_metadata) {
callback_state* state = &calld->recv_trailing_metadata_ready;
intercept_callback(
calld, state, false, "recv_trailing_metadata_ready",
&batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
}
if (batch->cancel_stream) {
// There can be more than one cancellation batch in flight at any
// given time, so we can't just pick out a fixed index into
@ -121,7 +128,7 @@ static void con_start_transport_stream_op_batch(
static_cast<callback_state*>(gpr_malloc(sizeof(*state)));
intercept_callback(calld, state, true, "on_complete (cancel_stream)",
&batch->on_complete);
} else {
} else if (batch->on_complete != nullptr) {
callback_state* state = get_state_for_batch(calld, batch);
intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
}

@ -26,6 +26,7 @@
#include <grpc/support/atm.h>
#include "src/core/lib/gpr/mpscq.h"
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/closure.h"
// A simple, lock-free mechanism for serializing activity related to a
@ -109,4 +110,83 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
namespace grpc_core {
// Helper for running a list of closures in a call combiner.
//
// Each callback running in the call combiner will eventually be
// returned to the surface, at which point the surface will yield the
// call combiner. So when we are running in the call combiner and have
// more than one callback to return to the surface, we need to re-enter
// the call combiner for all but one of those callbacks.
class CallCombinerClosureList {
public:
CallCombinerClosureList() {}
// Adds a closure to the list. The closure must eventually result in
// the call combiner being yielded.
void Add(grpc_closure* closure, grpc_error* error, const char* reason) {
closures_.emplace_back(closure, error, reason);
}
// Runs all closures in the call combiner and yields the call combiner.
//
// All but one of the closures in the list will be scheduled via
// GRPC_CALL_COMBINER_START(), and the remaining closure will be
// scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
// yielding the call combiner. If the list is empty, then the call
// combiner will be yielded immediately.
void RunClosures(grpc_call_combiner* call_combiner) {
for (size_t i = 1; i < closures_.size(); ++i) {
auto& closure = closures_[i];
GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
closure.reason);
}
if (closures_.size() > 0) {
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"CallCombinerClosureList executing closure while already "
"holding call_combiner %p: closure=%p error=%s reason=%s",
call_combiner, closures_[0].closure,
grpc_error_string(closures_[0].error), closures_[0].reason);
}
// This will release the call combiner.
GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
} else {
GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
}
closures_.clear();
}
// Runs all closures in the call combiner, but does NOT yield the call
// combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
for (size_t i = 0; i < closures_.size(); ++i) {
auto& closure = closures_[i];
GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,
closure.reason);
}
closures_.clear();
}
size_t size() const { return closures_.size(); }
private:
struct CallCombinerClosure {
grpc_closure* closure;
grpc_error* error;
const char* reason;
CallCombinerClosure(grpc_closure* closure, grpc_error* error,
const char* reason)
: closure(closure), error(error), reason(reason) {}
};
// There are generally a maximum of 6 closures to run in the call
// combiner, one for each pending op.
InlinedVector<CallCombinerClosure, 6> closures_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */

@ -283,9 +283,10 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d] run?: %s",
"previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
"run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, c->run ? "true" : "false");
c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;

@ -233,6 +233,7 @@ struct grpc_call {
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
grpc_closure release_call;
@ -1209,7 +1210,6 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
}
if (bctl->op.send_message) {
@ -1217,14 +1217,9 @@ static void post_batch_completion(batch_control* bctl) {
}
if (bctl->op.send_trailing_metadata) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
if (bctl->op.recv_trailing_metadata) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
/* propagate cancellation to any interested children */
gpr_atm_rel_store(&call->received_final_op_atm, 1);
parent_call* pc = get_parent_call(call);
@ -1246,7 +1241,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
if (call->is_client) {
get_final_status(call, set_status_value_directly,
call->final_op.client.status,
@ -1256,7 +1250,6 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@ -1538,6 +1531,19 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
finish_batch_step(bctl);
}
static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
recv_trailing_filter(call, md);
}
finish_batch_step(bctl);
}
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
@ -1558,7 +1564,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t i;
const grpc_op* op;
batch_control* bctl;
int num_completion_callbacks_needed = 1;
bool has_send_ops = false;
int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
@ -1664,6 +1671,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
@ -1693,6 +1701,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@ -1713,6 +1722,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
@ -1777,6 +1787,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
@ -1804,7 +1815,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
num_completion_callbacks_needed++;
++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
@ -1826,7 +1837,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
@ -1852,11 +1863,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->collect_stats.collect_stats =
stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
@ -1877,11 +1893,16 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
stream_op->collect_stats = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->collect_stats.collect_stats =
stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break;
}
}
@ -1891,13 +1912,15 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
if (has_send_ops) {
GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
}
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:

@ -212,21 +212,32 @@ void grpc_transport_stream_op_batch_finish_with_failure(
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}
if (batch->recv_message) {
GRPC_CALL_COMBINER_START(
call_combiner, batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error), "failing recv_message_ready");
if (batch->cancel_stream) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
}
// Construct a list of closures to execute.
grpc_core::CallCombinerClosureList closures;
if (batch->recv_initial_metadata) {
GRPC_CALL_COMBINER_START(
call_combiner,
closures.Add(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
GRPC_CLOSURE_SCHED(batch->on_complete, error);
if (batch->cancel_stream) {
GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
if (batch->recv_message) {
closures.Add(batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error), "failing recv_message_ready");
}
if (batch->recv_trailing_metadata) {
closures.Add(
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready");
}
if (batch->on_complete != nullptr) {
closures.Add(batch->on_complete, GRPC_ERROR_REF(error),
"failing on_complete");
}
// Execute closures.
closures.RunClosures(call_combiner);
GRPC_ERROR_UNREF(error);
}
typedef struct {

@ -122,9 +122,15 @@ typedef struct grpc_transport_stream_op_batch_payload
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op_batch {
/** Should be enqueued when all requested operations (excluding recv_message
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
/** Should be scheduled when all of the non-recv operations in the batch
are complete.
The recv ops (recv_initial_metadata, recv_message, and
recv_trailing_metadata) each have their own callbacks. If a batch
contains both recv ops and non-recv ops, on_complete should be
scheduled as soon as the non-recv ops are complete, regardless of
whether or not the recv ops are complete. If a batch contains
only recv ops, on_complete can be null. */
grpc_closure* on_complete;
/** Values for the stream op (fields set are determined by flags above) */
@ -149,9 +155,6 @@ typedef struct grpc_transport_stream_op_batch {
*/
bool recv_trailing_metadata : 1;
/** Collect any stats into provided buffer, zero internal stat counters */
bool collect_stats : 1;
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
@ -219,11 +222,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_trailing_metadata;
} recv_trailing_metadata;
struct {
grpc_transport_stream_stats* collect_stats;
} collect_stats;
/** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure* recv_trailing_metadata_ready;
} recv_trailing_metadata;
/** Forcefully close this stream.
The HTTP2 semantics should be:

@ -120,13 +120,6 @@ char* grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
if (op->collect_stats) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_asprintf(&tmp, "COLLECT_STATS:%p",
op->payload->collect_stats.collect_stats);
gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, nullptr);
gpr_strvec_destroy(&b);

@ -621,18 +621,26 @@ typedef struct {
static void StartTransportStreamOp(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Construct list of closures to return.
grpc_core::CallCombinerClosureList closures;
if (op->recv_initial_metadata) {
GRPC_CALL_COMBINER_START(
calld->call_combiner,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE, "recv_initial_metadata");
closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE, "recv_initial_metadata");
}
if (op->recv_message) {
GRPC_CALL_COMBINER_START(calld->call_combiner,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE, "recv_message");
closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE,
"recv_message");
}
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
if (op->recv_trailing_metadata) {
closures.Add(
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_NONE, "recv_trailing_metadata");
}
if (op->on_complete != nullptr) {
closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete");
}
// Execute closures.
closures.RunClosures(calld->call_combiner);
}
static void StartTransportOp(grpc_channel_element* elem,

Loading…
Cancel
Save