diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 34ea97e23eb..ea6775a8d85 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -817,7 +817,6 @@ typedef struct { // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; - grpc_closure recv_trailing_metadata_ready; // For intercepting on_complete. grpc_closure on_complete; } subchannel_batch_data; @@ -1193,24 +1192,35 @@ static void pending_batches_fail(grpc_call_element* elem, grpc_error* error, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, num_batches, grpc_error_string(error)); } - grpc_core::CallCombinerClosureList closures; + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batch->handler_private.extra_arg = calld; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - fail_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error), - "pending_batches_fail"); + batches[num_batches++] = batch; pending_batch_clear(calld, pending); } } + for (size_t i = yield_call_combiner ? 1 : 0; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + fail_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, + GRPC_ERROR_REF(error), "pending_batches_fail"); + } if (yield_call_combiner) { - closures.RunClosures(calld->call_combiner); - } else { - closures.RunClosuresWithoutYielding(calld->call_combiner); + if (num_batches > 0) { + // Note: This will release the call combiner. + grpc_transport_stream_op_batch_finish_with_failure( + batches[0], GRPC_ERROR_REF(error), calld->call_combiner); + } else { + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "pending_batches_fail"); + } } GRPC_ERROR_UNREF(error); } @@ -1245,22 +1255,30 @@ static void pending_batches_resume(grpc_call_element* elem) { " pending batches on subchannel_call=%p", chand, calld, num_batches, calld->subchannel_call); } - grpc_core::CallCombinerClosureList closures; + grpc_transport_stream_op_batch* + batches[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_batches = 0; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; grpc_transport_stream_op_batch* batch = pending->batch; if (batch != nullptr) { - batch->handler_private.extra_arg = calld->subchannel_call; - GRPC_CLOSURE_INIT(&batch->handler_private.closure, - resume_pending_batch_in_call_combiner, batch, - grpc_schedule_on_exec_ctx); - closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE, - "pending_batches_resume"); + batches[num_batches++] = batch; pending_batch_clear(calld, pending); } } + for (size_t i = 1; i < num_batches; ++i) { + grpc_transport_stream_op_batch* batch = batches[i]; + batch->handler_private.extra_arg = calld->subchannel_call; + GRPC_CLOSURE_INIT(&batch->handler_private.closure, + resume_pending_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(calld->call_combiner, + &batch->handler_private.closure, GRPC_ERROR_NONE, + "pending_batches_resume"); + } + GPR_ASSERT(num_batches > 0); // Note: This will release the call combiner. - closures.RunClosures(calld->call_combiner); + grpc_subchannel_call_process_op(calld->subchannel_call, batches[0]); } static void maybe_clear_pending_batch(grpc_call_element* elem, @@ -1275,10 +1293,7 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, batch->payload->recv_initial_metadata.recv_initial_metadata_ready == nullptr) && (!batch->recv_message || - batch->payload->recv_message.recv_message_ready == nullptr) && - (!batch->recv_trailing_metadata || - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready == - nullptr)) { + batch->payload->recv_message.recv_message_ready == nullptr)) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand, calld); @@ -1287,27 +1302,75 @@ static void maybe_clear_pending_batch(grpc_call_element* elem, } } -// Returns a pointer to the first pending batch for which predicate(batch) -// returns true, or null if not found. -template -static pending_batch* pending_batch_find(grpc_call_element* elem, - const char* log_message, - Predicate predicate) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - grpc_transport_stream_op_batch* batch = pending->batch; - if (batch != nullptr && predicate(batch)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand, - calld, log_message, i); - } - return pending; - } +// Returns true if all ops in the pending batch have been completed. +static bool pending_batch_is_completed( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->completed_send_initial_metadata) { + return false; + } + if (pending->batch->send_message && + retry_state->completed_send_message_count < + calld->send_messages->size()) { + return false; + } + if (pending->batch->send_trailing_metadata && + !retry_state->completed_send_trailing_metadata) { + return false; + } + if (pending->batch->recv_initial_metadata && + !retry_state->completed_recv_initial_metadata) { + return false; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count < + retry_state->started_recv_message_count) { + return false; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->completed_recv_trailing_metadata) { + return false; } - return nullptr; + return true; +} + +// Returns true if any op in the batch was not yet started. +static bool pending_batch_is_unstarted( + pending_batch* pending, call_data* calld, + subchannel_call_retry_state* retry_state) { + if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { + return false; + } + if (pending->batch->send_initial_metadata && + !retry_state->started_send_initial_metadata) { + return true; + } + if (pending->batch->send_message && + retry_state->started_send_message_count < calld->send_messages->size()) { + return true; + } + if (pending->batch->send_trailing_metadata && + !retry_state->started_send_trailing_metadata) { + return true; + } + if (pending->batch->recv_initial_metadata && + !retry_state->started_recv_initial_metadata) { + return true; + } + if (pending->batch->recv_message && + retry_state->completed_recv_message_count == + retry_state->started_recv_message_count) { + return true; + } + if (pending->batch->recv_trailing_metadata && + !retry_state->started_recv_trailing_metadata) { + return true; + } + return false; } // @@ -1494,13 +1557,8 @@ static bool maybe_retry(grpc_call_element* elem, // subchannel_batch_data // -// Creates a subchannel_batch_data object on the call's arena with the -// specified refcount. If set_on_complete is true, the batch's -// on_complete callback will be set to point to on_complete(); -// otherwise, the batch's on_complete callback will be null. static subchannel_batch_data* batch_data_create(grpc_call_element* elem, - int refcount, - bool set_on_complete) { + int refcount) { call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( @@ -1513,11 +1571,9 @@ static subchannel_batch_data* batch_data_create(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create"); batch_data->batch.payload = &retry_state->batch_payload; gpr_ref_init(&batch_data->refs, refcount); - if (set_on_complete) { - GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.on_complete = &batch_data->on_complete; - } + GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data, + grpc_schedule_on_exec_ctx); + batch_data->batch.on_complete = &batch_data->on_complete; GRPC_CALL_STACK_REF(calld->owning_call, "batch_data"); return batch_data; } @@ -1550,14 +1606,26 @@ static void batch_data_unref(subchannel_batch_data* batch_data) { static void invoke_recv_initial_metadata_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); + channel_data* chand = + static_cast(batch_data->elem->channel_data); + call_data* calld = static_cast(batch_data->elem->call_data); // Find pending batch. - pending_batch* pending = pending_batch_find( - batch_data->elem, "invoking recv_initial_metadata_ready for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_initial_metadata && - batch->payload->recv_initial_metadata - .recv_initial_metadata_ready != nullptr; - }); + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_initial_metadata && + batch->payload->recv_initial_metadata.recv_initial_metadata_ready != + nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: invoking recv_initial_metadata_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } GPR_ASSERT(pending != nullptr); // Return metadata. grpc_metadata_batch_move( @@ -1593,19 +1661,10 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - retry_state->completed_recv_initial_metadata = true; - // If a retry was already dispatched, then we're not going to use the - // result of this recv_initial_metadata op, so do nothing. - if (retry_state->retry_dispatched) { - GRPC_CALL_COMBINER_STOP( - calld->call_combiner, - "recv_initial_metadata_ready after retry dispatched"); - return; - } // If we got an error or a Trailers-Only response and have not yet gotten - // the recv_trailing_metadata_ready callback, then defer propagating this - // callback back to the surface. We can evaluate whether to retry when - // recv_trailing_metadata comes back. + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. if (GPR_UNLIKELY((batch_data->trailing_metadata_available || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1630,9 +1689,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } // Received valid initial metadata, so commit the call. retry_commit(elem, retry_state); - // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_initial_metadata_callback(batch_data, error); + GRPC_ERROR_UNREF(error); } // @@ -1642,13 +1701,25 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // Invokes recv_message_ready for a subchannel batch. static void invoke_recv_message_callback(void* arg, grpc_error* error) { subchannel_batch_data* batch_data = static_cast(arg); + channel_data* chand = + static_cast(batch_data->elem->channel_data); + call_data* calld = static_cast(batch_data->elem->call_data); // Find pending op. - pending_batch* pending = pending_batch_find( - batch_data->elem, "invoking recv_message_ready for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_message && - batch->payload->recv_message.recv_message_ready != nullptr; - }); + pending_batch* pending = nullptr; + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + grpc_transport_stream_op_batch* batch = calld->pending_batches[i].batch; + if (batch != nullptr && batch->recv_message && + batch->payload->recv_message.recv_message_ready != nullptr) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: invoking recv_message_ready for " + "pending batch at index %" PRIuPTR, + chand, calld, i); + } + pending = &calld->pending_batches[i]; + break; + } + } GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = @@ -1680,18 +1751,10 @@ static void recv_message_ready(void* arg, grpc_error* error) { static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); - ++retry_state->completed_recv_message_count; - // If a retry was already dispatched, then we're not going to use the - // result of this recv_message op, so do nothing. - if (retry_state->retry_dispatched) { - GRPC_CALL_COMBINER_STOP(calld->call_combiner, - "recv_message_ready after retry dispatched"); - return; - } // If we got an error or the payload was nullptr and we have not yet gotten - // the recv_trailing_metadata_ready callback, then defer propagating this - // callback back to the surface. We can evaluate whether to retry when - // recv_trailing_metadata comes back. + // the recv_trailing_metadata on_complete callback, then defer + // propagating this callback back to the surface. We can evaluate whether + // to retry when recv_trailing_metadata comes back. if (GPR_UNLIKELY( (batch_data->recv_message == nullptr || error != GRPC_ERROR_NONE) && !retry_state->completed_recv_trailing_metadata)) { @@ -1714,268 +1777,133 @@ static void recv_message_ready(void* arg, grpc_error* error) { } // Received a valid message, so commit the call. retry_commit(elem, retry_state); - // Invoke the callback to return the result to the surface. // Manually invoking a callback function; it does not take ownership of error. invoke_recv_message_callback(batch_data, error); + GRPC_ERROR_UNREF(error); } // -// recv_trailing_metadata handling +// list of closures to execute in call combiner // -// Sets *status and *server_pushback_md based on batch_data and error. -static void get_call_status(subchannel_batch_data* batch_data, - grpc_error* error, grpc_status_code* status, - grpc_mdelem** server_pushback_md) { - grpc_call_element* elem = batch_data->elem; +// Represents a closure that needs to run in the call combiner as part of +// starting or completing a batch. +typedef struct { + grpc_closure* closure; + grpc_error* error; + const char* reason; + bool free_reason = false; +} closure_to_execute; + +static void execute_closures_in_call_combiner(grpc_call_element* elem, + const char* caller, + closure_to_execute* closures, + size_t num_closures) { + channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); - if (error != GRPC_ERROR_NONE) { - grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr, - nullptr); + // Note that the call combiner will be yielded for each closure that + // we schedule. We're already running in the call combiner, so one of + // the closures can be scheduled directly, but the others will + // have to re-enter the call combiner. + if (num_closures > 0) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: %s starting closure: %s", chand, + calld, caller, closures[0].reason); + } + GRPC_CLOSURE_SCHED(closures[0].closure, closures[0].error); + if (closures[0].free_reason) { + gpr_free(const_cast(closures[0].reason)); + } + for (size_t i = 1; i < num_closures; ++i) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: %s starting closure in call combiner: %s", + chand, calld, caller, closures[i].reason); + } + GRPC_CALL_COMBINER_START(calld->call_combiner, closures[i].closure, + closures[i].error, closures[i].reason); + if (closures[i].free_reason) { + gpr_free(const_cast(closures[i].reason)); + } + } } else { - grpc_metadata_batch* md_batch = - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata; - GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); - *status = - grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); - if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { - *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: no closures to run for %s", chand, + calld, caller); } + GRPC_CALL_COMBINER_STOP(calld->call_combiner, "no closures to run"); } - GRPC_ERROR_UNREF(error); } -// Adds recv_trailing_metadata_ready closure to closures. -static void add_closure_for_recv_trailing_metadata_ready( - grpc_call_element* elem, subchannel_batch_data* batch_data, - grpc_error* error, grpc_core::CallCombinerClosureList* closures) { - // Find pending batch. - pending_batch* pending = pending_batch_find( - elem, "invoking recv_trailing_metadata for", - [](grpc_transport_stream_op_batch* batch) { - return batch->recv_trailing_metadata && - batch->payload->recv_trailing_metadata - .recv_trailing_metadata_ready != nullptr; - }); - // If we generated the recv_trailing_metadata op internally via - // start_internal_recv_trailing_metadata(), then there will be no - // pending batch. - if (pending == nullptr) { - GRPC_ERROR_UNREF(error); - return; +// +// on_complete callback handling +// + +// Updates retry_state to reflect the ops completed in batch_data. +static void update_retry_state_for_completed_batch( + subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state) { + if (batch_data->batch.send_initial_metadata) { + retry_state->completed_send_initial_metadata = true; + } + if (batch_data->batch.send_message) { + ++retry_state->completed_send_message_count; + } + if (batch_data->batch.send_trailing_metadata) { + retry_state->completed_send_trailing_metadata = true; + } + if (batch_data->batch.recv_initial_metadata) { + retry_state->completed_recv_initial_metadata = true; + } + if (batch_data->batch.recv_message) { + ++retry_state->completed_recv_message_count; + } + if (batch_data->batch.recv_trailing_metadata) { + retry_state->completed_recv_trailing_metadata = true; } - // Return metadata. - grpc_metadata_batch_move( - &batch_data->recv_trailing_metadata, - pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); - // Add closure. - closures->Add(pending->batch->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - error, "recv_trailing_metadata_ready for pending batch"); - // Update bookkeeping. - pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - nullptr; - maybe_clear_pending_batch(elem, pending); } // Adds any necessary closures for deferred recv_initial_metadata and -// recv_message callbacks to closures. +// recv_message callbacks to closures, updating *num_closures as needed. static void add_closures_for_deferred_recv_callbacks( subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + closure_to_execute* closures, size_t* num_closures) { if (batch_data->batch.recv_trailing_metadata) { // Add closure for deferred recv_initial_metadata_ready. if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_initial_metadata_ready, - invoke_recv_initial_metadata_callback, - retry_state->recv_initial_metadata_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_initial_metadata_ready, - retry_state->recv_initial_metadata_error, - "resuming recv_initial_metadata_ready"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_initial_metadata_ready, + invoke_recv_initial_metadata_callback, + retry_state->recv_initial_metadata_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_initial_metadata_error; + closure->reason = "resuming recv_initial_metadata_ready"; retry_state->recv_initial_metadata_ready_deferred_batch = nullptr; } // Add closure for deferred recv_message_ready. if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch != nullptr)) { - GRPC_CLOSURE_INIT(&batch_data->recv_message_ready, - invoke_recv_message_callback, - retry_state->recv_message_ready_deferred_batch, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->recv_message_ready, - retry_state->recv_message_error, - "resuming recv_message_ready"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->recv_message_ready, invoke_recv_message_callback, + retry_state->recv_message_ready_deferred_batch, + grpc_schedule_on_exec_ctx); + closure->error = retry_state->recv_message_error; + closure->reason = "resuming recv_message_ready"; retry_state->recv_message_ready_deferred_batch = nullptr; } } } -// Returns true if any op in the batch was not yet started. -// Only looks at send ops, since recv ops are always started immediately. -static bool pending_batch_is_unstarted( - pending_batch* pending, call_data* calld, - subchannel_call_retry_state* retry_state) { - if (pending->batch == nullptr || pending->batch->on_complete == nullptr) { - return false; - } - if (pending->batch->send_initial_metadata && - !retry_state->started_send_initial_metadata) { - return true; - } - if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages->size()) { - return true; - } - if (pending->batch->send_trailing_metadata && - !retry_state->started_send_trailing_metadata) { - return true; - } - return false; -} - -// For any pending batch containing an op that has not yet been started, -// adds the pending batch's completion closures to closures. -static void add_closures_to_fail_unstarted_pending_batches( - grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_error* error, grpc_core::CallCombinerClosureList* closures) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { - pending_batch* pending = &calld->pending_batches[i]; - if (pending_batch_is_unstarted(pending, calld, retry_state)) { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: failing unstarted pending batch at index " - "%" PRIuPTR, - chand, calld, i); - } - closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error), - "failing on_complete for pending batch"); - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); - } - } - GRPC_ERROR_UNREF(error); -} - -// Runs necessary closures upon completion of a call attempt. -static void run_closures_for_completed_call(subchannel_batch_data* batch_data, - grpc_error* error) { - grpc_call_element* elem = batch_data->elem; - call_data* calld = static_cast(elem->call_data); - subchannel_call_retry_state* retry_state = - static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); - // Construct list of closures to execute. - grpc_core::CallCombinerClosureList closures; - // First, add closure for recv_trailing_metadata_ready. - add_closure_for_recv_trailing_metadata_ready( - elem, batch_data, GRPC_ERROR_REF(error), &closures); - // If there are deferred recv_initial_metadata_ready or recv_message_ready - // callbacks, add them to closures. - add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures); - // Add closures to fail any pending batches that have not yet been started. - add_closures_to_fail_unstarted_pending_batches( - elem, retry_state, GRPC_ERROR_REF(error), &closures); - // Don't need batch_data anymore. - batch_data_unref(batch_data); - // Schedule all of the closures identified above. - // Note: This will release the call combiner. - closures.RunClosures(calld->call_combiner); - GRPC_ERROR_UNREF(error); -} - -// Intercepts recv_trailing_metadata_ready callback for retries. -// Commits the call and returns the trailing metadata up the stack. -static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { - subchannel_batch_data* batch_data = static_cast(arg); - grpc_call_element* elem = batch_data->elem; - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s", - chand, calld, grpc_error_string(error)); - } - subchannel_call_retry_state* retry_state = - static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); - retry_state->completed_recv_trailing_metadata = true; - // Get the call's status and check for server pushback metadata. - grpc_status_code status = GRPC_STATUS_OK; - grpc_mdelem* server_pushback_md = nullptr; - get_call_status(batch_data, GRPC_ERROR_REF(error), &status, - &server_pushback_md); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, - calld, grpc_status_code_to_string(status)); - } - // Check if we should retry. - if (maybe_retry(elem, batch_data, status, server_pushback_md)) { - // Unref batch_data for deferred recv_initial_metadata_ready or - // recv_message_ready callbacks, if any. - if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); - } - if (retry_state->recv_message_ready_deferred_batch != nullptr) { - batch_data_unref(batch_data); - GRPC_ERROR_UNREF(retry_state->recv_message_error); - } - batch_data_unref(batch_data); - return; - } - // Not retrying, so commit the call. - retry_commit(elem, retry_state); - // Run any necessary closures. - run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error)); -} - -// -// on_complete callback handling -// - -// Adds the on_complete closure for the pending batch completed in -// batch_data to closures. -static void add_closure_for_completed_pending_batch( - grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, grpc_error* error, - grpc_core::CallCombinerClosureList* closures) { - pending_batch* pending = pending_batch_find( - elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) { - // Match the pending batch with the same set of send ops as the - // subchannel batch we've just completed. - return batch->on_complete != nullptr && - batch_data->batch.send_initial_metadata == - batch->send_initial_metadata && - batch_data->batch.send_message == batch->send_message && - batch_data->batch.send_trailing_metadata == - batch->send_trailing_metadata; - }); - // If batch_data is a replay batch, then there will be no pending - // batch to complete. - if (pending == nullptr) { - GRPC_ERROR_UNREF(error); - return; - } - // Add closure. - closures->Add(pending->batch->on_complete, error, - "on_complete for pending batch"); - pending->batch->on_complete = nullptr; - maybe_clear_pending_batch(elem, pending); -} - // If there are any cached ops to replay or pending ops to start on the // subchannel call, adds a closure to closures to invoke -// start_retriable_subchannel_batches(). +// start_retriable_subchannel_batches(), updating *num_closures as needed. static void add_closures_for_replay_or_pending_send_ops( grpc_call_element* elem, subchannel_batch_data* batch_data, - subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + subchannel_call_retry_state* retry_state, closure_to_execute* closures, + size_t* num_closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); bool have_pending_send_message_ops = @@ -2001,12 +1929,93 @@ static void add_closures_for_replay_or_pending_send_ops( "chand=%p calld=%p: starting next batch for pending send op(s)", chand, calld); } - GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure, - start_retriable_subchannel_batches, elem, - grpc_schedule_on_exec_ctx); - closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE, - "starting next batch for send_* op(s)"); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = GRPC_CLOSURE_INIT( + &batch_data->batch.handler_private.closure, + start_retriable_subchannel_batches, elem, grpc_schedule_on_exec_ctx); + closure->error = GRPC_ERROR_NONE; + closure->reason = "starting next batch for send_* op(s)"; + } +} + +// For any pending batch completed in batch_data, adds the necessary +// completion closures to closures, updating *num_closures as needed. +static void add_closures_for_completed_pending_batches( + grpc_call_element* elem, subchannel_batch_data* batch_data, + subchannel_call_retry_state* retry_state, grpc_error* error, + closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast(elem->channel_data); + call_data* calld = static_cast(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_completed(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: pending batch completed at index %" PRIuPTR, + chand, calld, i); + } + // Copy the trailing metadata to return it to the surface. + if (batch_data->batch.recv_trailing_metadata) { + grpc_metadata_batch_move(&batch_data->recv_trailing_metadata, + pending->batch->payload->recv_trailing_metadata + .recv_trailing_metadata); + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } } + GRPC_ERROR_UNREF(error); +} + +// For any pending batch containing an op that has not yet been started, +// adds the pending batch's completion closures to closures, updating +// *num_closures as needed. +static void add_closures_to_fail_unstarted_pending_batches( + grpc_call_element* elem, subchannel_call_retry_state* retry_state, + grpc_error* error, closure_to_execute* closures, size_t* num_closures) { + channel_data* chand = static_cast(elem->channel_data); + call_data* calld = static_cast(elem->call_data); + for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { + pending_batch* pending = &calld->pending_batches[i]; + if (pending_batch_is_unstarted(pending, calld, retry_state)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: failing unstarted pending batch at index " + "%" PRIuPTR, + chand, calld, i); + } + if (pending->batch->recv_initial_metadata) { + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = + "failing recv_initial_metadata_ready for pending batch"; + pending->batch->payload->recv_initial_metadata + .recv_initial_metadata_ready = nullptr; + } + if (pending->batch->recv_message) { + *pending->batch->payload->recv_message.recv_message = nullptr; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = + pending->batch->payload->recv_message.recv_message_ready; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing recv_message_ready for pending batch"; + pending->batch->payload->recv_message.recv_message_ready = nullptr; + } + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = pending->batch->on_complete; + closure->error = GRPC_ERROR_REF(error); + closure->reason = "failing on_complete for pending batch"; + pending->batch->on_complete = nullptr; + maybe_clear_pending_batch(elem, pending); + } + } + GRPC_ERROR_UNREF(error); } // Callback used to intercept on_complete from subchannel calls. @@ -2026,49 +2035,136 @@ static void on_complete(void* arg, grpc_error* error) { static_cast( grpc_connected_subchannel_call_get_parent_data( batch_data->subchannel_call)); + // If we have previously completed recv_trailing_metadata, then the + // call is finished. + bool call_finished = retry_state->completed_recv_trailing_metadata; + // Record whether we were already committed before receiving this callback. + const bool previously_committed = calld->retry_committed; // Update bookkeeping in retry_state. - if (batch_data->batch.send_initial_metadata) { - retry_state->completed_send_initial_metadata = true; - } - if (batch_data->batch.send_message) { - ++retry_state->completed_send_message_count; - } - if (batch_data->batch.send_trailing_metadata) { - retry_state->completed_send_trailing_metadata = true; + update_retry_state_for_completed_batch(batch_data, retry_state); + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call already finished", chand, + calld); + } + } else { + // Check if this batch finished the call, and if so, get its status. + // The call is finished if either (a) this callback was invoked with + // an error or (b) we receive status. + grpc_status_code status = GRPC_STATUS_OK; + grpc_mdelem* server_pushback_md = nullptr; + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { // Case (a). + call_finished = true; + grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr, + nullptr); + } else if (batch_data->batch.recv_trailing_metadata) { // Case (b). + call_finished = true; + grpc_metadata_batch* md_batch = + batch_data->batch.payload->recv_trailing_metadata + .recv_trailing_metadata; + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); + if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) { + server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md; + } + } + // If the call just finished, check if we should retry. + if (call_finished) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand, + calld, grpc_status_code_to_string(status)); + } + if (maybe_retry(elem, batch_data, status, server_pushback_md)) { + // Unref batch_data for deferred recv_initial_metadata_ready or + // recv_message_ready callbacks, if any. + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_initial_metadata_ready_deferred_batch != + nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error); + } + if (batch_data->batch.recv_trailing_metadata && + retry_state->recv_message_ready_deferred_batch != nullptr) { + batch_data_unref(batch_data); + GRPC_ERROR_UNREF(retry_state->recv_message_error); + } + // Track number of pending subchannel send batches and determine if + // this was the last one. + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } + batch_data_unref(batch_data); + // If we just completed the last subchannel send batch, unref the + // call stack. + if (last_callback_complete) { + GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); + } + return; + } + // Not retrying, so commit the call. + retry_commit(elem, retry_state); + } } - // If the call is committed, free cached data for send ops that we've just - // completed. - if (calld->retry_committed) { + // If we were already committed before receiving this callback, free + // cached data for send ops that we've just completed. (If the call has + // just now finished, the call to retry_commit() above will have freed all + // cached send ops, so we don't need to do it here.) + if (previously_committed) { free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state); } + // Call not being retried. // Construct list of closures to execute. - grpc_core::CallCombinerClosureList closures; - // If a retry was already dispatched, that means we saw - // recv_trailing_metadata before this, so we do nothing here. - // Otherwise, invoke the callback to return the result to the surface. - if (!retry_state->retry_dispatched) { - // Add closure for the completed pending batch, if any. - add_closure_for_completed_pending_batch(elem, batch_data, retry_state, - GRPC_ERROR_REF(error), &closures); - // If needed, add a callback to start any replay or pending send ops on - // the subchannel call. - if (!retry_state->completed_recv_trailing_metadata) { - add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, - &closures); - } + // Max number of closures is number of pending batches plus one for + // each of: + // - recv_initial_metadata_ready (either deferred or unstarted) + // - recv_message_ready (either deferred or unstarted) + // - starting a new batch for pending send ops + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches) + 3]; + size_t num_closures = 0; + // If there are deferred recv_initial_metadata_ready or recv_message_ready + // callbacks, add them to closures. + add_closures_for_deferred_recv_callbacks(batch_data, retry_state, closures, + &num_closures); + // Find pending batches whose ops are now complete and add their + // on_complete callbacks to closures. + add_closures_for_completed_pending_batches(elem, batch_data, retry_state, + GRPC_ERROR_REF(error), closures, + &num_closures); + // Add closures to handle any pending batches that have not yet been started. + // If the call is finished, we fail these batches; otherwise, we add a + // callback to start_retriable_subchannel_batches() to start them on + // the subchannel call. + if (call_finished) { + add_closures_to_fail_unstarted_pending_batches( + elem, retry_state, GRPC_ERROR_REF(error), closures, &num_closures); + } else { + add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state, + closures, &num_closures); } // Track number of pending subchannel send batches and determine if this // was the last one. - --calld->num_pending_retriable_subchannel_send_batches; - const bool last_send_batch_complete = - calld->num_pending_retriable_subchannel_send_batches == 0; + bool last_callback_complete = false; + if (batch_data->batch.send_initial_metadata || + batch_data->batch.send_message || + batch_data->batch.send_trailing_metadata) { + --calld->num_pending_retriable_subchannel_send_batches; + last_callback_complete = + calld->num_pending_retriable_subchannel_send_batches == 0; + } // Don't need batch_data anymore. batch_data_unref(batch_data); // Schedule all of the closures identified above. // Note: This yeilds the call combiner. - closures.RunClosures(calld->call_combiner); - // If this was the last subchannel send batch, unref the call stack. - if (last_send_batch_complete) { + execute_closures_in_call_combiner(elem, "on_complete", closures, + num_closures); + // If we just completed the last subchannel send batch, unref the call stack. + if (last_callback_complete) { GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches"); } } @@ -2089,22 +2185,27 @@ static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { // Adds a closure to closures that will execute batch in the call combiner. static void add_closure_for_subchannel_batch( - grpc_call_element* elem, grpc_transport_stream_op_batch* batch, - grpc_core::CallCombinerClosureList* closures) { - channel_data* chand = static_cast(elem->channel_data); - call_data* calld = static_cast(elem->call_data); + call_data* calld, grpc_transport_stream_op_batch* batch, + closure_to_execute* closures, size_t* num_closures) { batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch->handler_private.closure; + closure->error = GRPC_ERROR_NONE; + // If the tracer is enabled, we log a more detailed message, which + // requires dynamic allocation. This will be freed in + // start_retriable_subchannel_batches(). if (grpc_client_channel_trace.enabled()) { char* batch_str = grpc_transport_stream_op_batch_string(batch); - gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand, - calld, batch_str); + gpr_asprintf(const_cast(&closure->reason), + "starting batch in call combiner: %s", batch_str); gpr_free(batch_str); + closure->free_reason = true; + } else { + closure->reason = "start_subchannel_batch"; } - closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE, - "start_subchannel_batch"); } // Adds retriable send_initial_metadata op to batch_data. @@ -2240,13 +2341,9 @@ static void add_retriable_recv_trailing_metadata_op( grpc_metadata_batch_init(&batch_data->recv_trailing_metadata); batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata = &batch_data->recv_trailing_metadata; - batch_data->batch.payload->recv_trailing_metadata.collect_stats = + batch_data->batch.collect_stats = true; + batch_data->batch.payload->collect_stats.collect_stats = &batch_data->collect_stats; - GRPC_CLOSURE_INIT(&batch_data->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, batch_data, - grpc_schedule_on_exec_ctx); - batch_data->batch.payload->recv_trailing_metadata - .recv_trailing_metadata_ready = &batch_data->recv_trailing_metadata_ready; } // Helper function used to start a recv_trailing_metadata batch. This @@ -2267,11 +2364,9 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: - // once for the recv_trailing_metadata_ready callback when the subchannel - // batch returns, and again when we actually get a recv_trailing_metadata - // op from the surface. - subchannel_batch_data* batch_data = - batch_data_create(elem, 2, false /* set_on_complete */); + // once when the subchannel batch returns, and again when we actually get + // a recv_trailing_metadata op from the surface. + subchannel_batch_data* batch_data = batch_data_create(elem, 2); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. @@ -2296,7 +2391,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( "send_initial_metadata op", chand, calld); } - replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); add_retriable_send_initial_metadata_op(calld, retry_state, replay_batch_data); } @@ -2313,8 +2408,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = - batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); } add_retriable_send_message_op(elem, retry_state, replay_batch_data); } @@ -2333,8 +2427,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( chand, calld); } if (replay_batch_data == nullptr) { - replay_batch_data = - batch_data_create(elem, 1, true /* set_on_complete */); + replay_batch_data = batch_data_create(elem, 1); } add_retriable_send_trailing_metadata_op(calld, retry_state, replay_batch_data); @@ -2346,7 +2439,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // *num_batches as needed. static void add_subchannel_batches_for_pending_batches( grpc_call_element* elem, subchannel_call_retry_state* retry_state, - grpc_core::CallCombinerClosureList* closures) { + closure_to_execute* closures, size_t* num_closures) { call_data* calld = static_cast(elem->call_data); for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { pending_batch* pending = &calld->pending_batches[i]; @@ -2402,11 +2495,13 @@ static void add_subchannel_batches_for_pending_batches( if (retry_state->completed_recv_trailing_metadata) { subchannel_batch_data* batch_data = retry_state->recv_trailing_metadata_internal_batch; + closure_to_execute* closure = &closures[(*num_closures)++]; + closure->closure = &batch_data->on_complete; // Batches containing recv_trailing_metadata always succeed. - closures->Add( - &batch_data->recv_trailing_metadata_ready, GRPC_ERROR_NONE, - "re-executing recv_trailing_metadata_ready to propagate " - "internally triggered result"); + closure->error = GRPC_ERROR_NONE; + closure->reason = + "re-executing on_complete for recv_trailing_metadata " + "to propagate internally triggered result"; } else { batch_data_unref(retry_state->recv_trailing_metadata_internal_batch); } @@ -2418,19 +2513,14 @@ static void add_subchannel_batches_for_pending_batches( if (calld->method_params == nullptr || calld->method_params->retry_policy() == nullptr || calld->retry_committed) { - add_closure_for_subchannel_batch(elem, batch, closures); + add_closure_for_subchannel_batch(calld, batch, closures, num_closures); pending_batch_clear(calld, pending); continue; } // Create batch with the right number of callbacks. - const bool has_send_ops = batch->send_initial_metadata || - batch->send_message || - batch->send_trailing_metadata; - const int num_callbacks = has_send_ops + batch->recv_initial_metadata + - batch->recv_message + - batch->recv_trailing_metadata; - subchannel_batch_data* batch_data = batch_data_create( - elem, num_callbacks, has_send_ops /* set_on_complete */); + const int num_callbacks = + 1 + batch->recv_initial_metadata + batch->recv_message; + subchannel_batch_data* batch_data = batch_data_create(elem, num_callbacks); // Cache send ops if needed. maybe_cache_send_ops_for_batch(calld, pending); // send_initial_metadata. @@ -2457,9 +2547,11 @@ static void add_subchannel_batches_for_pending_batches( } // recv_trailing_metadata. if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->collect_stats); add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); } - add_closure_for_subchannel_batch(elem, &batch_data->batch, closures); + add_closure_for_subchannel_batch(calld, &batch_data->batch, closures, + num_closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (batch->send_initial_metadata || batch->send_message || @@ -2487,13 +2579,15 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { grpc_connected_subchannel_call_get_parent_data( calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. - grpc_core::CallCombinerClosureList closures; + // We can start up to 6 batches. + closure_to_execute closures[GPR_ARRAY_SIZE(calld->pending_batches)]; + size_t num_closures = 0; // Replay previously-returned send_* ops if needed. subchannel_batch_data* replay_batch_data = maybe_create_subchannel_batch_for_replay(elem, retry_state); if (replay_batch_data != nullptr) { - add_closure_for_subchannel_batch(elem, &replay_batch_data->batch, - &closures); + add_closure_for_subchannel_batch(calld, &replay_batch_data->batch, closures, + &num_closures); // Track number of pending subchannel send batches. // If this is the first one, take a ref to the call stack. if (calld->num_pending_retriable_subchannel_send_batches == 0) { @@ -2502,16 +2596,17 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { ++calld->num_pending_retriable_subchannel_send_batches; } // Now add pending batches. - add_subchannel_batches_for_pending_batches(elem, retry_state, &closures); + add_subchannel_batches_for_pending_batches(elem, retry_state, closures, + &num_closures); // Start batches on subchannel call. if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, closures.size(), calld->subchannel_call); + chand, calld, num_closures, calld->subchannel_call); } - // Note: This will yield the call combiner. - closures.RunClosures(calld->call_combiner); + execute_closures_in_call_combiner(elem, "start_retriable_subchannel_batches", + closures, num_closures); } // diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index d23ad67ad51..e0a41a36376 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -128,25 +128,21 @@ static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) { } } -// Callback run when we receive trailing metadata. -static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { +// Callback run when the call is complete. +static void on_complete(void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = static_cast(arg); cancel_timer_if_needed(deadline_state); - // Invoke the original callback. - GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); + // Invoke the next callback. + GRPC_CLOSURE_RUN(deadline_state->next_on_complete, GRPC_ERROR_REF(error)); } -// Inject our own recv_trailing_metadata_ready callback into op. -static void inject_recv_trailing_metadata_ready( - grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) { - deadline_state->original_recv_trailing_metadata_ready = - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, deadline_state, +// Inject our own on_complete callback into op. +static void inject_on_complete_cb(grpc_deadline_state* deadline_state, + grpc_transport_stream_op_batch* op) { + deadline_state->next_on_complete = op->on_complete; + GRPC_CLOSURE_INIT(&deadline_state->on_complete, on_complete, deadline_state, grpc_schedule_on_exec_ctx); - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &deadline_state->recv_trailing_metadata_ready; + op->on_complete = &deadline_state->on_complete; } // Callback and associated state for starting the timer after call stack @@ -230,7 +226,7 @@ void grpc_deadline_state_client_start_transport_stream_op_batch( // Make sure we know when the call is complete, so that we can cancel // the timer. if (op->recv_trailing_metadata) { - inject_recv_trailing_metadata_ready(deadline_state, op); + inject_on_complete_cb(deadline_state, op); } } } @@ -326,7 +322,7 @@ static void server_start_transport_stream_op_batch( // the client never sends trailing metadata, because this is the // hook that tells us when the call is complete on the server side. if (op->recv_trailing_metadata) { - inject_recv_trailing_metadata_ready(&calld->base.deadline_state, op); + inject_on_complete_cb(&calld->base.deadline_state, op); } } // Chain to next filter. diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index 1d797f445a7..13207cbd6fb 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -37,12 +37,12 @@ typedef struct grpc_deadline_state { grpc_deadline_timer_state timer_state; grpc_timer timer; grpc_closure timer_callback; - // Closure to invoke when we receive trailing metadata. + // Closure to invoke when the call is complete. // We use this to cancel the timer. - grpc_closure recv_trailing_metadata_ready; - // The original recv_trailing_metadata_ready closure, which we chain to - // after our own closure is invoked. - grpc_closure* original_recv_trailing_metadata_ready; + grpc_closure on_complete; + // The original on_complete closure, which we chain to after our own + // closure is invoked. + grpc_closure* next_on_complete; } grpc_deadline_state; // diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 1678051beb1..ae94ce47b9e 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -55,8 +55,8 @@ struct call_data { grpc_closure recv_initial_metadata_ready; // State for handling recv_trailing_metadata ops. grpc_metadata_batch* recv_trailing_metadata; - grpc_closure* original_recv_trailing_metadata_ready; - grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata_on_complete; + grpc_closure recv_trailing_metadata_on_complete; // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; @@ -153,7 +153,8 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error); } -static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { +static void recv_trailing_metadata_on_complete(void* user_data, + grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); if (error == GRPC_ERROR_NONE) { @@ -162,7 +163,7 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); + GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_on_complete, error); } static void send_message_on_complete(void* arg, grpc_error* error) { @@ -311,10 +312,8 @@ static void hc_start_transport_stream_op_batch( /* substitute our callback for the higher callback */ calld->recv_trailing_metadata = batch->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->original_recv_trailing_metadata_ready = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &calld->recv_trailing_metadata_ready; + calld->original_recv_trailing_metadata_on_complete = batch->on_complete; + batch->on_complete = &calld->recv_trailing_metadata_on_complete; } grpc_error* error = GRPC_ERROR_NONE; @@ -421,8 +420,8 @@ static grpc_error* init_call_elem(grpc_call_element* elem, GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, elem, + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, + recv_trailing_metadata_on_complete, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, elem, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0d6b72c66e4..a8090d18a65 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1149,10 +1149,12 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) { } } +/* Flag that this closure barrier wants stats to be updated before finishing */ +#define CLOSURE_BARRIER_STATS_BIT (1 << 0) /* Flag that this closure barrier may be covering a write in a pollset, and so we should not complete this closure until we can prove that the write got scheduled */ -#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0) +#define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 1) /* First bit of the reference count, stored in the high order bits (with the low bits being used for flags defined above) */ #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16) @@ -1204,6 +1206,10 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, grpc_error_add_child(closure->error_data.error, error); } if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) { + if (closure->next_data.scratch & CLOSURE_BARRIER_STATS_BIT) { + grpc_transport_move_stats(&s->stats, s->collecting_stats); + s->collecting_stats = nullptr; + } if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) || !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { GRPC_CLOSURE_RUN(closure, closure->error_data.error); @@ -1345,14 +1351,9 @@ static void perform_stream_op_locked(void* stream_op, } grpc_closure* on_complete = op->on_complete; - // TODO(roth): This is a hack needed because we use data inside of the - // closure itself to do the barrier calculation (i.e., to ensure that - // we don't schedule the closure until all ops in the batch have been - // completed). This can go away once we move to a new C++ closure API - // that provides the ability to create a barrier closure. if (on_complete == nullptr) { - on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, - nullptr, grpc_schedule_on_exec_ctx); + on_complete = + GRPC_CLOSURE_CREATE(do_nothing, nullptr, grpc_schedule_on_exec_ctx); } /* use final_data as a barrier until enqueue time; the inital counter is @@ -1360,6 +1361,12 @@ static void perform_stream_op_locked(void* stream_op, on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT; on_complete->error_data.error = GRPC_ERROR_NONE; + if (op->collect_stats) { + GPR_ASSERT(s->collecting_stats == nullptr); + s->collecting_stats = op_payload->collect_stats.collect_stats; + on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; + } + if (op->cancel_stream) { GRPC_STATS_INC_HTTP2_OP_CANCEL(); grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error); @@ -1593,11 +1600,8 @@ static void perform_stream_op_locked(void* stream_op, if (op->recv_trailing_metadata) { GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA(); - GPR_ASSERT(s->collecting_stats == nullptr); - s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats; GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr); - s->recv_trailing_metadata_finished = - op_payload->recv_trailing_metadata.recv_trailing_metadata_ready; + s->recv_trailing_metadata_finished = add_closure_barrier(on_complete); s->recv_trailing_metadata = op_payload->recv_trailing_metadata.recv_trailing_metadata; s->final_metadata_requested = true; @@ -1956,12 +1960,11 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t, } if (s->read_closed && s->frame_storage.length == 0 && !pending_data && s->recv_trailing_metadata_finished != nullptr) { - grpc_transport_move_stats(&s->stats, s->collecting_stats); - s->collecting_stats = nullptr; grpc_chttp2_incoming_metadata_buffer_publish(&s->metadata_buffer[1], s->recv_trailing_metadata); - null_then_run_closure(&s->recv_trailing_metadata_finished, - GRPC_ERROR_NONE); + grpc_chttp2_complete_closure_step( + t, s, &s->recv_trailing_metadata_finished, GRPC_ERROR_NONE, + "recv_trailing_metadata_finished"); } } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 4a252d972d5..420c2d13e1d 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -925,10 +925,6 @@ static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op, result = false; } /* Check if every op that was asked for is done. */ - /* TODO(muxi): We should not consider the recv ops here, since they - * have their own callbacks. We should invoke a batch's on_complete - * as soon as all of the batch's send ops are complete, even if - * there are still recv ops pending. */ else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) { CRONET_LOG(GPR_DEBUG, "Because"); @@ -1284,20 +1280,12 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); - grpc_error* error = GRPC_ERROR_NONE; - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - error = GRPC_ERROR_REF(stream_state->cancel_error); - } else if (stream_state->state_op_done[OP_FAILED]) { - error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."); - } else if (oas->s->state.rs.trailing_metadata_valid) { + if (oas->s->state.rs.trailing_metadata_valid) { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.trailing_metadata, stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } - GRPC_CLOSURE_SCHED( - stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - error); stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->cancel_stream && @@ -1410,11 +1398,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_CANCELLED); } - if (op->recv_trailing_metadata) { - GRPC_CLOSURE_SCHED( - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - GRPC_ERROR_CANCELLED); - } GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); return; } diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index b0ca7f8207e..2c3bff5c1e5 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -120,6 +120,7 @@ typedef struct inproc_stream { struct inproc_stream* stream_list_next; } inproc_stream; +static grpc_closure do_nothing_closure; static bool cancel_stream_locked(inproc_stream* s, grpc_error* error); static void op_state_machine(void* arg, grpc_error* error); @@ -372,10 +373,6 @@ static void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, const char* msg) { int is_sm = static_cast(op == s->send_message_op); int is_stm = static_cast(op == s->send_trailing_md_op); - // TODO(vjpai): We should not consider the recv ops here, since they - // have their own callbacks. We should invoke a batch's on_complete - // as soon as all of the batch's send ops are complete, even if there - // are still recv ops pending. int is_rim = static_cast(op == s->recv_initial_md_op); int is_rm = static_cast(op == s->recv_message_op); int is_rtm = static_cast(op == s->recv_trailing_md_op); @@ -499,11 +496,6 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->send_trailing_md_op = nullptr; } if (s->recv_trailing_md_op) { - INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p", - s, error); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); complete_if_batch_end_locked( @@ -646,12 +638,6 @@ static void op_state_machine(void* arg, grpc_error* error) { } s->trailing_md_sent = true; if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready", s); - GRPC_CLOSURE_SCHED( - s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_NONE); INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete", s); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, @@ -725,12 +711,6 @@ static void op_state_machine(void* arg, grpc_error* error) { } if (s->recv_trailing_md_op && s->t->is_client && other && other->send_message_op) { - INPROC_LOG(GPR_INFO, - "op_state_machine %p scheduling trailing-metadata-ready %p", s, - GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_NONE); maybe_schedule_op_closure_locked(other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { @@ -786,10 +766,6 @@ static void op_state_machine(void* arg, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete %p", s, new_err); - GRPC_CLOSURE_SCHED( - s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_REF(new_err)); GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; @@ -883,9 +859,6 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_REF(s->cancel_self_error)); complete_if_batch_end_locked( s, s->cancel_self_error, s->recv_trailing_md_op, "cancel_stream scheduling trailing-md-on-complete"); @@ -900,8 +873,6 @@ static bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { return ret; } -static void do_nothing(void* arg, grpc_error* error) {} - static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, grpc_transport_stream_op_batch* op) { INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op); @@ -921,14 +892,8 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, } grpc_error* error = GRPC_ERROR_NONE; grpc_closure* on_complete = op->on_complete; - // TODO(roth): This is a hack needed because we use data inside of the - // closure itself to do the barrier calculation (i.e., to ensure that - // we don't schedule the closure until all ops in the batch have been - // completed). This can go away once we move to a new C++ closure API - // that provides the ability to create a barrier closure. if (on_complete == nullptr) { - on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing, - nullptr, grpc_schedule_on_exec_ctx); + on_complete = &do_nothing_closure; } if (op->cancel_stream) { @@ -1061,15 +1026,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } - if (op->recv_trailing_metadata) { - INPROC_LOG( - GPR_INFO, - "perform_stream_op error %p scheduling trailing-metadata-ready %p", - s, error); - GRPC_CLOSURE_SCHED( - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); - } } INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s, error); @@ -1173,8 +1129,12 @@ static grpc_endpoint* get_endpoint(grpc_transport* t) { return nullptr; } /******************************************************************************* * GLOBAL INIT AND DESTROY */ +static void do_nothing(void* arg, grpc_error* error) {} + void grpc_inproc_transport_init(void) { grpc_core::ExecCtx exec_ctx; + GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, nullptr, + grpc_schedule_on_exec_ctx); g_empty_slice = grpc_slice_from_static_buffer(nullptr, 0); grpc_slice key_tmp = grpc_slice_from_static_string(":path"); diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index e2ea334dedf..ddd30294020 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -51,7 +51,6 @@ typedef struct connected_channel_call_data { callback_state on_complete[6]; // Max number of pending batches. callback_state recv_initial_metadata_ready; callback_state recv_message_ready; - callback_state recv_trailing_metadata_ready; } call_data; static void run_in_call_combiner(void* arg, grpc_error* error) { @@ -112,12 +111,6 @@ static void con_start_transport_stream_op_batch( intercept_callback(calld, state, false, "recv_message_ready", &batch->payload->recv_message.recv_message_ready); } - if (batch->recv_trailing_metadata) { - callback_state* state = &calld->recv_trailing_metadata_ready; - intercept_callback( - calld, state, false, "recv_trailing_metadata_ready", - &batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready); - } if (batch->cancel_stream) { // There can be more than one cancellation batch in flight at any // given time, so we can't just pick out a fixed index into @@ -128,7 +121,7 @@ static void con_start_transport_stream_op_batch( static_cast(gpr_malloc(sizeof(*state))); intercept_callback(calld, state, true, "on_complete (cancel_stream)", &batch->on_complete); - } else if (batch->on_complete != nullptr) { + } else { callback_state* state = get_state_for_batch(calld, batch); intercept_callback(calld, state, false, "on_complete", &batch->on_complete); } diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index 0d2586e507d..f36f6cb7066 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -99,8 +99,6 @@ class InlinedVector { void push_back(T&& value) { emplace_back(std::move(value)); } size_t size() const { return size_; } - bool empty() const { return size_ == 0; } - size_t capacity() const { return capacity_; } void clear() { diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 641fa180820..0ccd08ea577 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -26,7 +26,6 @@ #include #include "src/core/lib/gpr/mpscq.h" -#include "src/core/lib/gprpp/inlined_vector.h" #include "src/core/lib/iomgr/closure.h" // A simple, lock-free mechanism for serializing activity related to a @@ -110,83 +109,4 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, grpc_error* error); -namespace grpc_core { - -// Helper for running a list of closures in a call combiner. -// -// Each callback running in the call combiner will eventually be -// returned to the surface, at which point the surface will yield the -// call combiner. So when we are running in the call combiner and have -// more than one callback to return to the surface, we need to re-enter -// the call combiner for all but one of those callbacks. -class CallCombinerClosureList { - public: - CallCombinerClosureList() {} - - // Adds a closure to the list. The closure must eventually result in - // the call combiner being yielded. - void Add(grpc_closure* closure, grpc_error* error, const char* reason) { - closures_.emplace_back(closure, error, reason); - } - - // Runs all closures in the call combiner and yields the call combiner. - // - // All but one of the closures in the list will be scheduled via - // GRPC_CALL_COMBINER_START(), and the remaining closure will be - // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in - // yielding the call combiner. If the list is empty, then the call - // combiner will be yielded immediately. - void RunClosures(grpc_call_combiner* call_combiner) { - if (closures_.empty()) { - GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); - return; - } - for (size_t i = 1; i < closures_.size(); ++i) { - auto& closure = closures_[i]; - GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, - closure.reason); - } - if (grpc_call_combiner_trace.enabled()) { - gpr_log(GPR_INFO, - "CallCombinerClosureList executing closure while already " - "holding call_combiner %p: closure=%p error=%s reason=%s", - call_combiner, closures_[0].closure, - grpc_error_string(closures_[0].error), closures_[0].reason); - } - // This will release the call combiner. - GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); - closures_.clear(); - } - - // Runs all closures in the call combiner, but does NOT yield the call - // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). - void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { - for (size_t i = 0; i < closures_.size(); ++i) { - auto& closure = closures_[i]; - GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, - closure.reason); - } - closures_.clear(); - } - - size_t size() const { return closures_.size(); } - - private: - struct CallCombinerClosure { - grpc_closure* closure; - grpc_error* error; - const char* reason; - - CallCombinerClosure(grpc_closure* closure, grpc_error* error, - const char* reason) - : closure(closure), error(error), reason(reason) {} - }; - - // There are generally a maximum of 6 closures to run in the call - // combiner, one for each pending op. - InlinedVector closures_; -}; - -} // namespace grpc_core - #endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index f14c723844e..34a494485d9 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -283,10 +283,9 @@ inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) { if (c->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " - "previously scheduled at: [%s: %d], newly scheduled at [%s: %d], " - "run?: %s", + "previously scheduled at: [%s: %d] run?: %s", c, c->file_created, c->line_created, c->file_initiated, - c->line_initiated, file, line, c->run ? "true" : "false"); + c->line_initiated, c->run ? "true" : "false"); abort(); } c->scheduled = true; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 8b224b6e7bf..1cf8ea94e75 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -233,7 +233,6 @@ struct grpc_call { grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; - grpc_closure receiving_trailing_metadata_ready; uint32_t test_only_last_message_flags; grpc_closure release_call; @@ -271,17 +270,8 @@ struct grpc_call { grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); grpc_core::TraceFlag grpc_compression_trace(false, "compression"); -/* Given a size, round up to the next multiple of sizeof(void*) */ -#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ - (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) - -#define CALL_STACK_FROM_CALL(call) \ - (grpc_call_stack*)((char*)(call) + \ - ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call))) -#define CALL_FROM_CALL_STACK(call_stack) \ - (grpc_call*)(((char*)(call_stack)) - \ - ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call))) - +#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack*)((call) + 1)) +#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call*)(call_stack)) - 1) #define CALL_ELEM_FROM_CALL(call, idx) \ grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx) #define CALL_FROM_TOP_ELEM(top_elem) \ @@ -352,9 +342,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, size_t initial_size = grpc_channel_get_call_size_estimate(args->channel); GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size); gpr_arena* arena = gpr_arena_create(initial_size); - call = static_cast( - gpr_arena_alloc(arena, ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) + - channel_stack->call_stack_size)); + call = static_cast(gpr_arena_alloc( + arena, sizeof(grpc_call) + channel_stack->call_stack_size)); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; grpc_call_combiner_init(&call->call_combiner); @@ -1220,6 +1209,7 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->op.send_initial_metadata) { grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); } if (bctl->op.send_message) { @@ -1227,9 +1217,14 @@ static void post_batch_completion(batch_control* bctl) { } if (bctl->op.send_trailing_metadata) { grpc_metadata_batch_destroy( + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } if (bctl->op.recv_trailing_metadata) { + grpc_metadata_batch* md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + recv_trailing_filter(call, md); + /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); parent_call* pc = get_parent_call(call); @@ -1251,6 +1246,7 @@ static void post_batch_completion(batch_control* bctl) { } gpr_mu_unlock(&pc->child_list_mu); } + if (call->is_client) { get_final_status(call, set_status_value_directly, call->final_op.client.status, @@ -1260,6 +1256,7 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } + GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } @@ -1541,19 +1538,6 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { finish_batch_step(bctl); } -static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) { - batch_control* bctl = static_cast(bctlp); - grpc_call* call = bctl->call; - GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready"); - add_batch_error(bctl, GRPC_ERROR_REF(error), false); - if (error == GRPC_ERROR_NONE) { - grpc_metadata_batch* md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(call, md); - } - finish_batch_step(bctl); -} - static void finish_batch(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; @@ -1574,8 +1558,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, size_t i; const grpc_op* op; batch_control* bctl; - bool has_send_ops = false; - int num_recv_ops = 0; + int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; @@ -1681,7 +1664,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->send_initial_metadata.peer_string = &call->peer_string; } - has_send_ops = true; break; } case GRPC_OP_SEND_MESSAGE: { @@ -1711,7 +1693,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, &op->data.send_message.send_message->data.raw.slice_buffer, flags); stream_op_payload->send_message.send_message.reset( call->sending_stream.get()); - has_send_ops = true; break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1732,7 +1713,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->sent_final_op = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; - has_send_ops = true; break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { @@ -1797,7 +1777,6 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, } stream_op_payload->send_trailing_metadata.send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; - has_send_ops = true; break; } case GRPC_OP_RECV_INITIAL_METADATA: { @@ -1825,7 +1804,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, stream_op_payload->recv_initial_metadata.peer_string = &call->peer_string; } - ++num_recv_ops; + num_completion_callbacks_needed++; break; } case GRPC_OP_RECV_MESSAGE: { @@ -1847,7 +1826,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; - ++num_recv_ops; + num_completion_callbacks_needed++; break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { @@ -1873,16 +1852,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->recv_trailing_metadata.collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; - GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, - receiving_trailing_metadata_ready, bctl, - grpc_schedule_on_exec_ctx); - stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &call->receiving_trailing_metadata_ready; - ++num_recv_ops; break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { @@ -1903,16 +1877,11 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; + stream_op->collect_stats = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op_payload->recv_trailing_metadata.collect_stats = + stream_op_payload->collect_stats.collect_stats = &call->final_info.stats.transport_stream_stats; - GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready, - receiving_trailing_metadata_ready, bctl, - grpc_schedule_on_exec_ctx); - stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &call->receiving_trailing_metadata_ready; - ++num_recv_ops; break; } } @@ -1922,15 +1891,13 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag)); } - gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops); - - if (has_send_ops) { - GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, - grpc_schedule_on_exec_ctx); - stream_op->on_complete = &bctl->finish_batch; - } + gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, + grpc_schedule_on_exec_ctx); + stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); + execute_batch(call, stream_op, &bctl->start_batch); done: diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index cbdb77c8441..039d603394f 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -212,32 +212,21 @@ void grpc_transport_stream_op_batch_finish_with_failure( if (batch->send_message) { batch->payload->send_message.send_message.reset(); } - if (batch->cancel_stream) { - GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); + if (batch->recv_message) { + GRPC_CALL_COMBINER_START( + call_combiner, batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), "failing recv_message_ready"); } - // Construct a list of closures to execute. - grpc_core::CallCombinerClosureList closures; if (batch->recv_initial_metadata) { - closures.Add( + GRPC_CALL_COMBINER_START( + call_combiner, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } - if (batch->recv_message) { - closures.Add(batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error), "failing recv_message_ready"); - } - if (batch->recv_trailing_metadata) { - closures.Add( - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - GRPC_ERROR_REF(error), "failing recv_trailing_metadata_ready"); - } - if (batch->on_complete != nullptr) { - closures.Add(batch->on_complete, GRPC_ERROR_REF(error), - "failing on_complete"); + GRPC_CLOSURE_SCHED(batch->on_complete, error); + if (batch->cancel_stream) { + GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error); } - // Execute closures. - closures.RunClosures(call_combiner); - GRPC_ERROR_UNREF(error); } typedef struct { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 585b9dfae91..b2e252d939e 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -122,15 +122,9 @@ typedef struct grpc_transport_stream_op_batch_payload /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op_batch { - /** Should be scheduled when all of the non-recv operations in the batch - are complete. - - The recv ops (recv_initial_metadata, recv_message, and - recv_trailing_metadata) each have their own callbacks. If a batch - contains both recv ops and non-recv ops, on_complete should be - scheduled as soon as the non-recv ops are complete, regardless of - whether or not the recv ops are complete. If a batch contains - only recv ops, on_complete can be null. */ + /** Should be enqueued when all requested operations (excluding recv_message + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ grpc_closure* on_complete; /** Values for the stream op (fields set are determined by flags above) */ @@ -155,6 +149,9 @@ typedef struct grpc_transport_stream_op_batch { */ bool recv_trailing_metadata : 1; + /** Collect any stats into provided buffer, zero internal stat counters */ + bool collect_stats : 1; + /** Cancel this stream with the provided error */ bool cancel_stream : 1; @@ -222,11 +219,12 @@ struct grpc_transport_stream_op_batch_payload { struct { grpc_metadata_batch* recv_trailing_metadata; - grpc_transport_stream_stats* collect_stats; - /** Should be enqueued when initial metadata is ready to be processed. */ - grpc_closure* recv_trailing_metadata_ready; } recv_trailing_metadata; + struct { + grpc_transport_stream_stats* collect_stats; + } collect_stats; + /** Forcefully close this stream. The HTTP2 semantics should be: - server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 8c7db642a54..25ab492f3a4 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -120,6 +120,13 @@ char* grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } + if (op->collect_stats) { + gpr_strvec_add(&b, gpr_strdup(" ")); + gpr_asprintf(&tmp, "COLLECT_STATS:%p", + op->payload->collect_stats.collect_stats); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, nullptr); gpr_strvec_destroy(&b); diff --git a/test/core/gprpp/inlined_vector_test.cc b/test/core/gprpp/inlined_vector_test.cc index 41f4338f8a4..ae349477187 100644 --- a/test/core/gprpp/inlined_vector_test.cc +++ b/test/core/gprpp/inlined_vector_test.cc @@ -27,12 +27,10 @@ namespace testing { TEST(InlinedVectorTest, CreateAndIterate) { const int kNumElements = 9; InlinedVector v; - EXPECT_TRUE(v.empty()); for (int i = 0; i < kNumElements; ++i) { v.push_back(i); } EXPECT_EQ(static_cast(kNumElements), v.size()); - EXPECT_FALSE(v.empty()); for (int i = 0; i < kNumElements; ++i) { EXPECT_EQ(i, v[i]); EXPECT_EQ(i, &v[i] - &v[0]); // Ensure contiguous allocation. diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index dd1610dc3d3..831b29c506c 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -621,26 +621,18 @@ typedef struct { static void StartTransportStreamOp(grpc_call_element* elem, grpc_transport_stream_op_batch* op) { call_data* calld = static_cast(elem->call_data); - // Construct list of closures to return. - grpc_core::CallCombinerClosureList closures; if (op->recv_initial_metadata) { - closures.Add(op->payload->recv_initial_metadata.recv_initial_metadata_ready, - GRPC_ERROR_NONE, "recv_initial_metadata"); + GRPC_CALL_COMBINER_START( + calld->call_combiner, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_NONE, "recv_initial_metadata"); } if (op->recv_message) { - closures.Add(op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE, - "recv_message"); + GRPC_CALL_COMBINER_START(calld->call_combiner, + op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE, "recv_message"); } - if (op->recv_trailing_metadata) { - closures.Add( - op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, - GRPC_ERROR_NONE, "recv_trailing_metadata"); - } - if (op->on_complete != nullptr) { - closures.Add(op->on_complete, GRPC_ERROR_NONE, "on_complete"); - } - // Execute closures. - closures.RunClosures(calld->call_combiner); + GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE); } static void StartTransportOp(grpc_channel_element* elem,