diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8812b2b1ad6..a8395d8d504 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1163,7 +1163,7 @@ void ChannelData::ExternalConnectivityWatcher::Notify( chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false); // Report new state to the user. *state_ = state; - GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE); // Hop back into the combiner to clean up. // Not needed in state SHUTDOWN, because the tracker will // automatically remove all watchers in that case. @@ -1180,7 +1180,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() { MemoryOrder::RELAXED)) { return; // Already done. } - GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED); + ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED); // Hop back into the combiner to clean up. chand_->combiner_->Run( GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr), @@ -1826,8 +1826,9 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { grpc_error* error = chand->DoPingLocked(op); if (error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error)); - GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error); + ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, + GRPC_ERROR_REF(error)); + ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error); } op->bind_pollset = nullptr; op->send_ping.on_initiate = nullptr; @@ -1869,7 +1870,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) { } } GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op"); - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); } void ChannelData::StartTransportOp(grpc_channel_element* elem, @@ -2058,7 +2059,7 @@ void CallData::Destroy(grpc_call_element* elem, then_schedule_closure = nullptr; } calld->~CallData(); - GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE); } void CallData::StartTransportStreamOpBatch( @@ -3679,7 +3680,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) { void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) { GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&pick_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error); } void CallData::PickDone(void* arg, grpc_error* error) { diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index b5bca9887c9..aa90c3dd2a5 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -300,7 +300,8 @@ void HealthCheckClient::CallState::StartCall() { // Schedule instead of running directly, since we must not be // holding health_check_client_->mu_ when CallEnded() is called. call_->Ref(DEBUG_LOCATION, "call_end_closure").release(); - GRPC_CLOSURE_SCHED( + ExecCtx::Run( + DEBUG_LOCATION, GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 64ed7746761..28ec64e8605 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -125,7 +125,7 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error* error) { is_shutdown_ = true; } // Invoke callback. - GRPC_CLOSURE_SCHED(on_handshake_done_, error); + ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error); } // Callback invoked when finished writing HTTP CONNECT request. @@ -222,7 +222,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) { goto done; } // Success. Invoke handshake-done callback. - GRPC_CLOSURE_SCHED(handshaker->on_handshake_done_, error); + ExecCtx::Run(DEBUG_LOCATION, handshaker->on_handshake_done_, error); done: // Set shutdown to true so that subsequent calls to // http_connect_handshaker_shutdown() do nothing. @@ -260,7 +260,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, gpr_mu_lock(&mu_); is_shutdown_ = true; gpr_mu_unlock(&mu_); - GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, GRPC_ERROR_NONE); return; } // Get headers from channel args. diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc index f5acec4fda7..7fb6cde600c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc @@ -81,10 +81,10 @@ class GrpcPolledFdLibuv : public GrpcPolledFd { uv_poll_stop(handle_); uv_close(reinterpret_cast(handle_), ares_uv_poll_close_cb); if (read_closure_ != nullptr) { - GRPC_CLOSURE_SCHED(read_closure_, GRPC_ERROR_CANCELLED); + ExecCtx::Run(DEBUG_LOCATION, read_closure_, GRPC_ERROR_CANCELLED); } if (write_closure_ != nullptr) { - GRPC_CLOSURE_SCHED(write_closure_, GRPC_ERROR_CANCELLED); + ExecCtx::Run(DEBUG_LOCATION, write_closure_, GRPC_ERROR_CANCELLED); } } @@ -135,13 +135,13 @@ static void ares_uv_poll_cb_locked(void* arg, grpc_error* error) { } if (events & UV_READABLE) { GPR_ASSERT(polled_fd->read_closure_ != nullptr); - GRPC_CLOSURE_SCHED(polled_fd->read_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, polled_fd->read_closure_, error); polled_fd->read_closure_ = nullptr; polled_fd->poll_events_ &= ~UV_READABLE; } if (events & UV_WRITABLE) { GPR_ASSERT(polled_fd->write_closure_ != nullptr); - GRPC_CLOSURE_SCHED(polled_fd->write_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, polled_fd->write_closure_, error); polled_fd->write_closure_ = nullptr; polled_fd->poll_events_ &= ~UV_WRITABLE; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index 437645345ba..c25891ac442 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -128,12 +128,12 @@ class GrpcPolledFdWindows { } void ScheduleAndNullReadClosure(grpc_error* error) { - GRPC_CLOSURE_SCHED(read_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, read_closure_, error); read_closure_ = nullptr; } void ScheduleAndNullWriteClosure(grpc_error* error) { - GRPC_CLOSURE_SCHED(write_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, write_closure_, error); write_closure_ = nullptr; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 381cbf63b9c..69257bd60cc 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -148,7 +148,7 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) { // TODO(apolcyn): allow c-ares to return a service config // with no addresses along side it } - GRPC_CLOSURE_SCHED(r->on_done, r->error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, r->error); } static grpc_ares_hostbyname_request* create_hostbyname_request_locked( @@ -447,7 +447,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked( return; error_cleanup: - GRPC_CLOSURE_SCHED(r->on_done, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error); } static bool inner_resolve_as_ip_literal_locked( @@ -714,7 +714,8 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { sizeof(grpc_resolved_address)); } } - GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, + GRPC_ERROR_REF(error)); GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb"); grpc_core::Delete(r); } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 1c245ae427a..c61616aafda 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -734,9 +734,10 @@ void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { old_refs = RefMutate(-static_cast(1), 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, + GRPC_CLOSURE_CREATE(subchannel_destroy, this, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } } diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index a99d9cb595b..1ea170ec595 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -199,7 +199,7 @@ grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem, grpc_core::New(elem, deadline); GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE); } } diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index c95c687d103..b926be7018c 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -492,7 +492,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem, initialization is done. */ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_age_timer_after_init"); - GRPC_CLOSURE_SCHED(&chand->start_max_age_timer_after_init, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + &chand->start_max_age_timer_after_init, + GRPC_ERROR_NONE); } /* Initialize the number of calls as 1, so that the max_idle_timer will not @@ -501,8 +503,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem, if (chand->max_connection_idle != GRPC_MILLIS_INF_FUTURE) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_idle_timer_after_init"); - GRPC_CLOSURE_SCHED(&chand->start_max_idle_timer_after_init, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + &chand->start_max_idle_timer_after_init, + GRPC_ERROR_NONE); } return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 6b6d299b6e2..18a2df8a2d0 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -150,7 +150,7 @@ static void on_handshake_done(void* arg, grpc_error* error) { } grpc_closure* notify = c->notify; c->notify = nullptr; - GRPC_CLOSURE_SCHED(notify, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error); c->handshake_mgr.reset(); gpr_mu_unlock(&c->mu); chttp2_connector_unref(reinterpret_cast(c)); @@ -182,7 +182,7 @@ static void connected(void* arg, grpc_error* error) { c->result->reset(); grpc_closure* notify = c->notify; c->notify = nullptr; - GRPC_CLOSURE_SCHED(notify, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error); if (c->endpoint != nullptr) { grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error)); } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index 4761af78bcf..f37551fce2d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -266,7 +266,8 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) { // may do a synchronous unref. grpc_core::ExecCtx::Get()->Flush(); if (destroy_done != nullptr) { - GRPC_CLOSURE_SCHED(destroy_done, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_done, + GRPC_ERROR_REF(error)); grpc_core::ExecCtx::Get()->Flush(); } grpc_channel_args_destroy(state->args); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0936ee854cc..e6f37b89ef0 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -593,7 +593,8 @@ static void close_transport_locked(grpc_chttp2_transport* t, grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error)); } if (t->notify_on_receive_settings != nullptr) { - GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings, + GRPC_ERROR_CANCELLED); t->notify_on_receive_settings = nullptr; } GRPC_ERROR_UNREF(error); @@ -706,7 +707,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() { } GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream"); - GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE); } static int init_stream(grpc_transport* gt, grpc_stream* gs, @@ -1177,7 +1178,7 @@ static grpc_closure* add_closure_barrier(grpc_closure* closure) { static void null_then_sched_closure(grpc_closure** closure) { grpc_closure* c = *closure; *closure = nullptr; - GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE); } void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, @@ -1220,7 +1221,8 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t, !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) { // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running // closures earlier than when it is safe to do so. - GRPC_CLOSURE_SCHED(closure, closure->error_data.error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, + closure->error_data.error); } else { grpc_closure_list_append(&t->run_after_write, closure, closure->error_data.error); @@ -1678,8 +1680,10 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, + GRPC_ERROR_REF(t->closed_with_error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -2933,7 +2937,7 @@ static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s); } else { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error)); s->on_next = nullptr; GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_NONE; @@ -2991,10 +2995,11 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete, + GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, - GRPC_ERROR_REF(s->byte_stream_error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete, + GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; @@ -3003,8 +3008,8 @@ void Chttp2IncomingByteStream::NextLocked(void* arg, if (bs->remaining_bytes_ != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, - GRPC_ERROR_REF(s->byte_stream_error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete, + GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; @@ -3092,7 +3097,8 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { void Chttp2IncomingByteStream::PublishError(grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next, + GRPC_ERROR_REF(error)); stream_->on_next = nullptr; GRPC_ERROR_UNREF(stream_->byte_stream_error); stream_->byte_stream_error = GRPC_ERROR_REF(error); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 20ea966c047..f2ff4d37b77 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -291,7 +291,7 @@ grpc_error* grpc_chttp2_data_parser_parse(void* /*parser*/, GPR_ASSERT(s->frame_storage.length == 0); grpc_slice_ref_internal(slice); grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_NONE); s->on_next = nullptr; s->unprocessed_incoming_frames_decompressed = false; } else { diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.cc b/src/core/ext/transport/chttp2/transport/frame_settings.cc index 4d739e85daa..60432d26f26 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.cc +++ b/src/core/ext/transport/chttp2/transport/frame_settings.cc @@ -135,8 +135,9 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t, t->num_pending_induced_frames++; grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create()); if (t->notify_on_receive_settings != nullptr) { - GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + t->notify_on_receive_settings, + GRPC_ERROR_NONE); t->notify_on_receive_settings = nullptr; } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 60959d3a495..8cc4becfff9 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -1150,22 +1150,26 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_callback_received[OP_FAILED]) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } else { grpc_chttp2_incoming_metadata_buffer_publish( &oas->s->state.rs.initial_metadata, stream_op->payload->recv_initial_metadata.recv_initial_metadata); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_NONE); } @@ -1176,30 +1180,34 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->state_callback_received[OP_FAILED]) { CRONET_LOG(GPR_DEBUG, "Stream failed."); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.read_stream_closed == true) { /* No more data will be received */ CRONET_LOG(GPR_DEBUG, "read stream closed"); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->flush_read) { CRONET_LOG(GPR_DEBUG, "flush read"); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; @@ -1240,7 +1248,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); stream_op->payload->recv_message.recv_message->reset( stream_state->rs.sbs.get()); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; @@ -1296,8 +1305,9 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); stream_op->payload->recv_message.recv_message->reset( stream_state->rs.sbs.get()); - GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; /* Do an extra read to trigger on_succeeded() callback in case connection @@ -1328,7 +1338,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { stream_op->payload->recv_trailing_metadata.recv_trailing_metadata); stream_state->rs.trailing_metadata_valid = false; } - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, error); stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; @@ -1352,13 +1363,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { if (stream_op->on_complete) { - GRPC_CLOSURE_SCHED(stream_op->on_complete, - GRPC_ERROR_REF(stream_state->cancel_error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete, + GRPC_ERROR_REF(stream_state->cancel_error)); } } else if (stream_state->state_callback_received[OP_FAILED]) { if (stream_op->on_complete) { - GRPC_CLOSURE_SCHED( - stream_op->on_complete, + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, stream_op->on_complete, make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); } } else { @@ -1366,7 +1377,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { * callback */ if (stream_op->on_complete) { - GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete, + GRPC_ERROR_NONE); } } oas->state.state_op_done[OP_ON_COMPLETE] = true; @@ -1433,20 +1445,24 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, /* Cronet does not support :authority header field. We cancel the call when this field is present in metadata */ if (op->recv_initial_metadata) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_CANCELLED); } if (op->recv_message) { - GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, - GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + op->payload->recv_message.recv_message_ready, + GRPC_ERROR_CANCELLED); } if (op->recv_trailing_metadata) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, GRPC_ERROR_CANCELLED); } - GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, + GRPC_ERROR_CANCELLED); return; } stream_obj* s = reinterpret_cast(gs); @@ -1458,7 +1474,8 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs, grpc_closure* then_schedule_closure) { stream_obj* s = reinterpret_cast(gs); s->~stream_obj(); - GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, + GRPC_ERROR_NONE); } static void destroy_transport(grpc_transport* gt) {} diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 9cb79f5cbe7..1271e79abf7 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -204,7 +204,8 @@ struct inproc_stream { t->unref(); if (closure_at_destroy) { - GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_at_destroy, + GRPC_ERROR_NONE); } } @@ -390,13 +391,15 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error, if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error); - GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, + GRPC_ERROR_REF(error)); } } void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) { if (s && s->ops_needed && !s->op_closure_scheduled) { - GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure, + GRPC_ERROR_REF(error)); s->op_closure_scheduled = true; s->ops_needed = false; } @@ -471,9 +474,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) { INPROC_LOG(GPR_INFO, "fail_helper %p scheduling initial-metadata-ready %p %p", s, error, err); - GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - err); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + err); // Last use of err so no need to REF and then UNREF it complete_if_batch_end_locked( @@ -484,7 +489,8 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) { if (s->recv_message_op) { INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s, error); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); complete_if_batch_end_locked( @@ -508,9 +514,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) { if (s->recv_trailing_md_op) { INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p", s, error); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); complete_if_batch_end_locked( @@ -564,7 +572,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { receiver->recv_stream.get()); INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready", receiver); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, receiver->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( @@ -656,14 +665,16 @@ void op_state_machine(void* arg, grpc_error* error) { if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-metadata-ready", s); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, s->recv_trailing_md_op->payload->recv_trailing_metadata .recv_trailing_metadata_ready, GRPC_ERROR_NONE); INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete", s); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, - GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); s->recv_trailing_md_op = nullptr; needs_close = true; } @@ -708,9 +719,11 @@ void op_state_machine(void* arg, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - GRPC_ERROR_REF(new_err)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); complete_if_batch_end_locked( s, new_err, s->recv_initial_md_op, "op_state_machine scheduling recv-initial-metadata-on-complete"); @@ -748,7 +761,8 @@ void op_state_machine(void* arg, grpc_error* error) { // satisfied *s->recv_message_op->payload->recv_message.recv_message = nullptr; INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( @@ -785,12 +799,14 @@ void op_state_machine(void* arg, grpc_error* error) { INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-on-complete %p", s, new_err); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, s->recv_trailing_md_op->payload->recv_trailing_metadata .recv_trailing_metadata_ready, GRPC_ERROR_REF(new_err)); - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(new_err)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + s->recv_trailing_md_op->on_complete, + GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = nullptr; needs_close = true; } else { @@ -810,7 +826,8 @@ void op_state_machine(void* arg, grpc_error* error) { // recv_message_op INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s); *s->recv_message_op->payload->recv_message.recv_message = nullptr; - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); complete_if_batch_end_locked( @@ -883,9 +900,11 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) { // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata - .recv_trailing_metadata_ready, - GRPC_ERROR_REF(s->cancel_self_error)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + s->recv_trailing_md_op->payload->recv_trailing_metadata + .recv_trailing_metadata_ready, + GRPC_ERROR_REF(s->cancel_self_error)); complete_if_batch_end_locked( s, s->cancel_self_error, s->recv_trailing_md_op, "cancel_stream scheduling trailing-md-on-complete"); @@ -1026,7 +1045,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, (op->recv_message && other && other->send_message_op != nullptr) || (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { if (!s->op_closure_scheduled) { - GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure, + GRPC_ERROR_NONE); s->op_closure_scheduled = true; } } else { @@ -1054,7 +1074,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GPR_INFO, "perform_stream_op error %p scheduling initial-metadata-ready %p", s, error); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -1063,22 +1084,24 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs, GPR_INFO, "perform_stream_op error %p scheduling recv message-ready %p", s, error); - GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + op->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); } if (op->recv_trailing_metadata) { INPROC_LOG( GPR_INFO, "perform_stream_op error %p scheduling trailing-metadata-ready %p", s, error); - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, op->payload->recv_trailing_metadata.recv_trailing_metadata_ready, GRPC_ERROR_REF(error)); } } INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s, error); - GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error)); } if (needs_close) { close_other_side_locked(s, "perform_stream_op:other_side"); @@ -1121,7 +1144,7 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) { t->accept_stream_data = op->set_accept_stream_user_data; } if (op->on_consumed) { - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); } bool do_close = false; diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index ad0bc875fd2..2bcc60abfa9 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -169,7 +169,7 @@ bool HandshakeManager::CallNextHandshakerLocked(grpc_error* error) { // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. grpc_timer_cancel(&deadline_timer_); - GRPC_CLOSURE_SCHED(&on_handshake_done_, error); + ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error); is_shutdown_ = true; } else { auto handshaker = handshakers_[index_]; diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 7319409c66f..7edcdcc8427 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -88,7 +88,7 @@ static void next_address(internal_request* req, grpc_error* due_to_error); static void finish(internal_request* req, grpc_error* error) { grpc_polling_entity_del_from_pollset_set(req->pollent, req->context->pollset_set); - GRPC_CLOSURE_SCHED(req->on_done, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != nullptr) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index fef467e8afa..5d8d177fe67 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -100,7 +100,7 @@ class grpc_httpcli_ssl_channel_security_connector final error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); } - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); tsi_peer_destruct(&peer); } diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index bfbbb7f385d..4092e5d8e08 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -92,9 +92,9 @@ void CallCombiner::TsanClosure(void* arg, grpc_error* error) { void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) { #ifdef GRPC_TSAN_ENABLED original_closure_ = closure; - GRPC_CLOSURE_SCHED(&tsan_closure_, error); + ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error); #else - GRPC_CLOSURE_SCHED(closure, error); + ExecCtx::Run(DEBUG_LOCATION, closure, error); #endif } @@ -199,7 +199,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) { "for pre-existing cancellation", this, closure); } - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error)); + ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error)); break; } else { if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) { @@ -217,7 +217,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) { "call_combiner=%p: scheduling old cancel callback=%p", this, closure); } - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } break; } @@ -244,7 +244,7 @@ void CallCombiner::Cancel(grpc_error* error) { "call_combiner=%p: scheduling notify_on_cancel callback=%p", this, notify_on_cancel); } - GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error)); + ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error)); } break; } diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index c90e565860a..c440241d0da 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -31,6 +31,7 @@ #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/dynamic_annotations.h" +#include "src/core/lib/iomgr/exec_ctx.h" // A simple, lock-free mechanism for serializing activity related to a // single call. This is similar to a combiner but is more lightweight. @@ -156,8 +157,8 @@ class CallCombinerClosureList { // // All but one of the closures in the list will be scheduled via // GRPC_CALL_COMBINER_START(), and the remaining closure will be - // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in - // yielding the call combiner. If the list is empty, then the call + // scheduled via ExecCtx::Run(DEBUG_LOCATION,), which will eventually result + // in yielding the call combiner. If the list is empty, then the call // combiner will be yielded immediately. void RunClosures(CallCombiner* call_combiner) { if (closures_.empty()) { @@ -177,7 +178,7 @@ class CallCombinerClosureList { grpc_error_string(closures_[0].error), closures_[0].reason); } // This will release the call combiner. - GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error); + ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error); closures_.clear(); } diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index c3af2ffa4cb..180a61b841e 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -132,7 +132,7 @@ static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) { grpc_closure* cb = ep->read_cb; ep->read_cb = nullptr; ep->read_slices = nullptr; - GRPC_CLOSURE_SCHED(cb, error); + ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { @@ -145,7 +145,7 @@ static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { grpc_closure* cb = ep->write_cb; ep->write_cb = nullptr; ep->write_slices = nullptr; - GRPC_CLOSURE_SCHED(cb, error); + ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void ReadAction(void* arg, grpc_error* error) { diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index f8f9aa2192c..e6935518980 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -420,7 +420,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, close(fd->fd); } - GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_REF(error)); grpc_iomgr_unregister_object(&fd->iomgr_object); fork_fd_list_remove_grpc_fd(fd); @@ -623,7 +623,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr && pollset->begin_refs == 0) { GPR_TIMER_MARK("pollset_finish_shutdown", 0); - GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure, + GRPC_ERROR_NONE); pollset->shutdown_closure = nullptr; } } diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 927ed57e762..e44b7a4b4d1 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -393,7 +393,8 @@ static void unref_by(grpc_fd* fd, int n) { #endif gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - GRPC_CLOSURE_SCHED( + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } else { @@ -487,7 +488,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE); if (pollable_obj) { gpr_mu_unlock(&pollable_obj->owner_orphan_mu); @@ -662,7 +663,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr && pollset->containing_pollset_set_count == 0) { GPR_TIMER_MARK("pollset_finish_shutdown", 0); - GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure, + GRPC_ERROR_NONE); pollset->shutdown_closure = nullptr; pollset->already_shutdown = true; } diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index c1d84888dbc..3daa1bd8f97 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -436,7 +436,7 @@ static void close_fd_locked(grpc_fd* fd) { if (!fd->released) { close(fd->fd); } - GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE); } static int fd_wrapped_fd(grpc_fd* fd) { @@ -497,17 +497,18 @@ static grpc_error* fd_shutdown_error(grpc_fd* fd) { static void notify_on_locked(grpc_fd* fd, grpc_closure** st, grpc_closure* closure) { if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) { - GRPC_CLOSURE_SCHED( - closure, grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, closure, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd)); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -529,7 +530,7 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) { return 0; } else { /* waiting ==> queue closure */ - GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -574,7 +575,7 @@ static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) { if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) { gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); } - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED); } static void fd_set_readable(grpc_fd* fd) { @@ -896,7 +897,8 @@ static void finish_shutdown(grpc_pollset* pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done, + GRPC_ERROR_NONE); } static void work_combine_error(grpc_error** composite, grpc_error* error) { diff --git a/src/core/lib/iomgr/lockfree_event.cc b/src/core/lib/iomgr/lockfree_event.cc index c7082345fd2..4b28af4ba4b 100644 --- a/src/core/lib/iomgr/lockfree_event.cc +++ b/src/core/lib/iomgr/lockfree_event.cc @@ -23,6 +23,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/exec_ctx.h" extern grpc_core::DebugOnlyTraceFlag grpc_polling_trace; @@ -124,7 +125,7 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) { closure when transitioning out of CLOSURE_NO_READY state (i.e there is no other code that needs to 'happen-after' this) */ if (gpr_atm_no_barrier_cas(&state_, kClosureReady, kClosureNotReady)) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); return; /* Successful. Return */ } @@ -137,9 +138,9 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) { schedule the closure with the shutdown error */ if ((curr & kShutdownBit) > 0) { grpc_error* shutdown_err = (grpc_error*)(curr & ~kShutdownBit); - GRPC_CLOSURE_SCHED(closure, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "FD Shutdown", &shutdown_err, 1)); + ExecCtx::Run(DEBUG_LOCATION, closure, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return; } @@ -189,9 +190,9 @@ bool LockfreeEvent::SetShutdown(grpc_error* shutdown_err) { happens-after on that edge), and a release to pair with anything loading the shutdown state. */ if (gpr_atm_full_cas(&state_, curr, new_state)) { - GRPC_CLOSURE_SCHED((grpc_closure*)curr, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "FD Shutdown", &shutdown_err, 1)); + ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); return true; } @@ -239,7 +240,7 @@ void LockfreeEvent::SetReady() { spurious set_ready; release pairs with this or the acquire in notify_on (or set_shutdown) */ else if (gpr_atm_full_cas(&state_, curr, kClosureNotReady)) { - GRPC_CLOSURE_SCHED((grpc_closure*)curr, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr, GRPC_ERROR_NONE); return; } /* else the state changed again (only possible by either a racing diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc index 3ec2d3d573b..98c8e64e2f0 100644 --- a/src/core/lib/iomgr/pollset_custom.cc +++ b/src/core/lib/iomgr/pollset_custom.cc @@ -55,7 +55,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { static void pollset_shutdown(grpc_pollset* /*pollset*/, grpc_closure* closure) { GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } static void pollset_destroy(grpc_pollset* pollset) { diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc index e9a808d8ad9..22ed6540e3b 100644 --- a/src/core/lib/iomgr/pollset_windows.cc +++ b/src/core/lib/iomgr/pollset_windows.cc @@ -98,7 +98,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } else { pollset->on_shutdown = closure; } @@ -146,7 +146,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - GRPC_CLOSURE_SCHED(pollset->on_shutdown, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown, GRPC_ERROR_NONE); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/lib/iomgr/resolve_address_custom.cc b/src/core/lib/iomgr/resolve_address_custom.cc index 2b3452cc1d7..5c081309c7d 100644 --- a/src/core/lib/iomgr/resolve_address_custom.cc +++ b/src/core/lib/iomgr/resolve_address_custom.cc @@ -79,7 +79,7 @@ void grpc_custom_resolve_callback(grpc_custom_resolver* r, return; } if (r->on_done) { - GRPC_CLOSURE_SCHED(r->on_done, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error); } gpr_free(r->host); gpr_free(r->port); @@ -161,7 +161,7 @@ static void resolve_address_impl(const char* name, const char* default_port, GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_done, err); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, err); return; } r = (grpc_custom_resolver*)gpr_malloc(sizeof(grpc_custom_resolver)); diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 2de8768afe1..1d2a79f41d8 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -152,8 +152,9 @@ typedef struct { * grpc_blocking_resolve_address */ static void do_request_thread(void* rp, grpc_error* /*error*/) { request* r = static_cast(rp); - GRPC_CLOSURE_SCHED(r->on_done, grpc_blocking_resolve_address( - r->name, r->default_port, r->addrs_out)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, r->on_done, + grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out)); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index cd52f13f1ef..7cc2998900d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -138,7 +138,7 @@ static void do_request_thread(void* rp, grpc_error* error) { } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_SCHED(r->on_done, error); + ExecCtx::Run(DEBUG_LOCATION, r->on_done, error); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 661783eeedd..91c8b8cb151 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -424,7 +424,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE); return true; } @@ -506,7 +506,7 @@ static bool ru_post_reclaimer(grpc_resource_user* resource_user, resource_user->new_reclaimers[destructive] = nullptr; GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr); if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED); return false; } resource_user->reclaimers[destructive] = closure; @@ -549,8 +549,10 @@ static void ru_shutdown(void* ru, grpc_error* /*error*/) { } grpc_resource_user* resource_user = static_cast(ru); gpr_mu_lock(&resource_user->mu); - GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); resource_user->reclaimers[0] = nullptr; resource_user->reclaimers[1] = nullptr; rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); @@ -572,8 +574,10 @@ static void ru_destroy(void* ru, grpc_error* /*error*/) { for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, static_cast(i)); } - GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; rq_step_sched(resource_user->resource_quota); @@ -719,7 +723,7 @@ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota, gpr_atm_no_barrier_store(&resource_quota->last_size, (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE); } size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) { @@ -994,8 +998,8 @@ bool grpc_resource_user_alloc_slices( size_t count, grpc_slice_buffer* dest) { if (GPR_UNLIKELY( gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) { - GRPC_CLOSURE_SCHED( - &slice_allocator->on_allocated, + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, &slice_allocator->on_allocated, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown")); return false; } diff --git a/src/core/lib/iomgr/socket_windows.cc b/src/core/lib/iomgr/socket_windows.cc index c87cfa8e831..abc8d937fcb 100644 --- a/src/core/lib/iomgr/socket_windows.cc +++ b/src/core/lib/iomgr/socket_windows.cc @@ -124,7 +124,7 @@ static void socket_notify_on_iocp(grpc_winsocket* socket, grpc_closure* closure, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } else { info->closure = closure; } @@ -145,7 +145,7 @@ void grpc_socket_become_ready(grpc_winsocket* socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - GRPC_CLOSURE_SCHED(info->closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, info->closure, GRPC_ERROR_NONE); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index fcad5edd222..a4596246e6a 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -96,7 +96,7 @@ static void OnAlarm(void* arg, grpc_error* error) { } else { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); - GRPC_CLOSURE_SCHED(closure, error); + ExecCtx::Run(DEBUG_LOCATION, closure, error); } } @@ -137,7 +137,7 @@ static void OnOpen(void* arg, grpc_error* error) { GRPC_ERROR_REF(error); } gpr_mu_unlock(&connect->mu); - GRPC_CLOSURE_SCHED(closure, error); + ExecCtx::Run(DEBUG_LOCATION, closure, error); } } diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc index 28da83366df..0295d449ae2 100644 --- a/src/core/lib/iomgr/tcp_client_custom.cc +++ b/src/core/lib/iomgr/tcp_client_custom.cc @@ -96,7 +96,7 @@ static void custom_connect_callback_internal(grpc_custom_socket* socket, grpc_core::ExecCtx::Get()->Flush(); custom_tcp_connect_cleanup(connect); } - GRPC_CLOSURE_SCHED(closure, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); } static void custom_connect_callback(grpc_custom_socket* socket, diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index e48555b6f9f..b74bd1f665f 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -241,7 +241,7 @@ finish: grpc_channel_args_destroy(ac->channel_args); gpr_free(ac); } - GRPC_CLOSURE_SCHED(closure, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); } grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, @@ -298,12 +298,13 @@ void grpc_tcp_client_create_from_prepared_fd( char* addr_str = grpc_sockaddr_to_uri(addr); *ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str); gpr_free(addr_str); - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); return; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error"); - GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect")); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, + GRPC_OS_ERROR(errno, "connect")); return; } @@ -344,7 +345,7 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep, *ep = nullptr; if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr, &fdobj)) != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(closure, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); return; } grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj, diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc index 66699533d7a..047a9f36c0e 100644 --- a/src/core/lib/iomgr/tcp_client_windows.cc +++ b/src/core/lib/iomgr/tcp_client_windows.cc @@ -117,7 +117,7 @@ static void on_connect(void* acp, grpc_error* error) { async_connect_unlock_and_cleanup(ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - GRPC_CLOSURE_SCHED(on_done, error); + ExecCtx::Run(DEBUG_LOCATION, on_done, error); } /* Tries to issue one async connection, then schedules both an IOCP @@ -225,7 +225,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - GRPC_CLOSURE_SCHED(on_done, final_error); + ExecCtx::Run(DEBUG_LOCATION, on_done, final_error); } grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect}; diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index d594f01d89a..9141c65cb18 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -140,7 +140,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) { TCP_UNREF(tcp, "read"); tcp->read_slices = nullptr; tcp->read_cb = nullptr; - GRPC_CLOSURE_SCHED(cb, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void custom_read_callback(grpc_custom_socket* socket, size_t nread, @@ -220,7 +220,7 @@ static void custom_write_callback(grpc_custom_socket* socket, gpr_log(GPR_INFO, "write complete on %p: error=%s", tcp->socket, str); } TCP_UNREF(tcp, "write"); - GRPC_CLOSURE_SCHED(cb, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, @@ -241,8 +241,9 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, } if (tcp->shutting_down) { - GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "TCP socket is shutting down")); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, cb, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP socket is shutting down")); return; } @@ -252,7 +253,7 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, if (tcp->write_slices->count == 0) { // No slices means we don't have to do anything, // and libuv doesn't like empty writes - GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE); return; } tcp->write_cb = cb; @@ -289,10 +290,10 @@ static void endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) { gpr_log(GPR_INFO, "TCP %p shutdown why=%s", tcp->socket, str); } tcp->shutting_down = true; - // GRPC_CLOSURE_SCHED(tcp->read_cb, GRPC_ERROR_REF(why)); - // GRPC_CLOSURE_SCHED(tcp->write_cb, GRPC_ERROR_REF(why)); - // tcp->read_cb = nullptr; - // tcp->write_cb = nullptr; + // grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->read_cb, + // GRPC_ERROR_REF(why)); + // grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->write_cb, + // GRPC_ERROR_REF(why)); tcp->read_cb = nullptr; tcp->write_cb = nullptr; grpc_resource_user_shutdown(tcp->resource_user); grpc_custom_socket_vtable->shutdown(tcp->socket); } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index f783a4fd6cf..51f87e03faa 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_SCHED(cb, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } #define MAX_READ_IOVEC 4 @@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, * right thing (i.e calls tcp_do_read() which either reads the available * bytes or calls notify_on_read() to be notified when new bytes become * available */ - GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->read_done_closure, + GRPC_ERROR_NONE); } } @@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; - GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "write"); return; } @@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { - GRPC_CLOSURE_SCHED( - cb, grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) - : GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), + tcp) + : GRPC_ERROR_NONE); tcp_shutdown_buffer_list(tcp); return; } @@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_SCHED(cb, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } } diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc index 90ed555f38a..d5fd1919ca2 100644 --- a/src/core/lib/iomgr/tcp_server_custom.cc +++ b/src/core/lib/iomgr/tcp_server_custom.cc @@ -124,7 +124,8 @@ static void tcp_server_shutdown_starting_add(grpc_tcp_server* s, static void finish_shutdown(grpc_tcp_server* s) { GPR_ASSERT(s->shutdown); if (s->shutdown_complete != nullptr) { - GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, + GRPC_ERROR_NONE); } while (s->head) { diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index b1637e9981a..eb4d3cbdef6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -108,7 +108,8 @@ static void finish_shutdown(grpc_tcp_server* s) { GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != nullptr) { - GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, + GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index 0024f807ed3..0f485d720f5 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -135,10 +135,11 @@ static void destroy_server(void* arg, grpc_error* error) { static void finish_shutdown_locked(grpc_tcp_server* s) { if (s->shutdown_complete != NULL) { - GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, GRPC_ERROR_NONE); } - GRPC_CLOSURE_SCHED( + ExecCtx::Run( + DEBUG_LOCATION, GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 32d0bb36ea7..3e9c94a54fb 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -233,7 +233,7 @@ static void on_read(void* tcpp, grpc_error* error) { tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); - GRPC_CLOSURE_SCHED(cb, error); + ExecCtx::Run(DEBUG_LOCATION, cb, error); } #define DEFAULT_TARGET_READ_SIZE 8192 @@ -254,9 +254,9 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, } if (tcp->shutting_down) { - GRPC_CLOSURE_SCHED( - cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "TCP socket is shutting down", &tcp->shutdown_error, 1)); + ExecCtx::Run(DEBUG_LOCATION, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } @@ -289,7 +289,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transferred = bytes_read; - GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE); return; } @@ -302,8 +302,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - GRPC_CLOSURE_SCHED(&tcp->on_read, - GRPC_WSA_ERROR(info->wsa_error, "WSARecv")); + ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, + GRPC_WSA_ERROR(info->wsa_error, "WSARecv")); return; } } @@ -338,7 +338,7 @@ static void on_write(void* tcpp, grpc_error* error) { } TCP_UNREF(tcp, "write"); - GRPC_CLOSURE_SCHED(cb, error); + ExecCtx::Run(DEBUG_LOCATION, cb, error); } /* Initiates a write. */ @@ -366,9 +366,9 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, } if (tcp->shutting_down) { - GRPC_CLOSURE_SCHED( - cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "TCP socket is shutting down", &tcp->shutdown_error, 1)); + ExecCtx::Run(DEBUG_LOCATION, cb, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "TCP socket is shutting down", &tcp->shutdown_error, 1)); return; } @@ -399,7 +399,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_error* error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - GRPC_CLOSURE_SCHED(cb, error); + ExecCtx::Run(DEBUG_LOCATION, cb, error); if (allocated) gpr_free(allocated); return; } @@ -417,7 +417,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend")); + ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_WSA_ERROR(wsa_error, "WSASend")); return; } } diff --git a/src/core/lib/iomgr/timer_custom.cc b/src/core/lib/iomgr/timer_custom.cc index 0af30ca5497..c550d441229 100644 --- a/src/core/lib/iomgr/timer_custom.cc +++ b/src/core/lib/iomgr/timer_custom.cc @@ -37,7 +37,7 @@ void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* /*error*/) { grpc_timer* timer = t->original; GPR_ASSERT(timer->pending); timer->pending = 0; - GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE); custom_timer_impl->stop(t); gpr_free(t); } @@ -48,7 +48,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); grpc_millis now = grpc_core::ExecCtx::Get()->Now(); if (deadline <= grpc_core::ExecCtx::Get()->Now()) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); timer->pending = false; return; } else { @@ -69,7 +69,8 @@ static void timer_cancel(grpc_timer* timer) { grpc_custom_timer* tw = (grpc_custom_timer*)timer->custom_timer; if (timer->pending) { timer->pending = 0; - GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, + GRPC_ERROR_CANCELLED); custom_timer_impl->stop(tw); gpr_free(tw); } diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 00c18789481..dc3016ad8ec 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -369,9 +369,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, if (!g_shared_mutables.initialized) { timer->pending = false; - GRPC_CLOSURE_SCHED(timer->closure, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Attempt to create timer before initialization")); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, timer->closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Attempt to create timer before initialization")); return; } @@ -380,7 +381,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline, grpc_millis now = grpc_core::ExecCtx::Get()->Now(); if (deadline <= now) { timer->pending = false; - GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); /* early out */ return; @@ -471,7 +472,8 @@ static void timer_cancel(grpc_timer* timer) { if (timer->pending) { REMOVE_FROM_HASH_TABLE(timer); - GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, + GRPC_ERROR_CANCELLED); timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -564,7 +566,8 @@ static size_t pop_timers(timer_shard* shard, grpc_millis now, gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { REMOVE_FROM_HASH_TABLE(timer); - GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, + GRPC_ERROR_REF(error)); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 498369ba47e..8bac6dd6a2d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -243,7 +243,8 @@ void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) { static void finish_shutdown(grpc_udp_server* s) { if (s->shutdown_complete != nullptr) { - GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, + GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/security/credentials/composite/composite_credentials.cc b/src/core/lib/security/credentials/composite/composite_credentials.cc index 586bbed778e..1e46798ff2c 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.cc +++ b/src/core/lib/security/credentials/composite/composite_credentials.cc @@ -80,7 +80,8 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error) { } // We're done! } - GRPC_CLOSURE_SCHED(ctx->on_request_metadata, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, ctx->on_request_metadata, + GRPC_ERROR_REF(error)); gpr_free(ctx); } diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index f6a04db3c1a..77aed2c1b9d 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -94,7 +94,8 @@ bool grpc_md_only_test_credentials::get_request_metadata( grpc_error** /*error*/) { grpc_credentials_mdelem_array_add(md_array, md_); if (is_async_) { - GRPC_CLOSURE_SCHED(on_request_metadata, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_request_metadata, + GRPC_ERROR_NONE); return false; } return true; diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index f72679257c5..1c6ea9388b0 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -256,7 +256,8 @@ void grpc_oauth2_token_fetcher_credentials::on_http_response( new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Error occurred when fetching oauth2 token.", &error, 1); } - GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, new_error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + pending_request->on_request_metadata, new_error); grpc_polling_entity_del_from_pollset_set( pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_)); grpc_oauth2_pending_get_request_metadata* prev = pending_request; @@ -332,8 +333,9 @@ void grpc_oauth2_token_fetcher_credentials::cancel_get_request_metadata( pending_requests_ = pending_request->next; } // Invoke the callback immediately with an error. - GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, - GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + pending_request->on_request_metadata, + GRPC_ERROR_REF(error)); gpr_free(pending_request); break; } diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.cc b/src/core/lib/security/credentials/plugin/plugin_credentials.cc index c1966ae3753..d74f77282a2 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.cc +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.cc @@ -131,7 +131,7 @@ static void plugin_md_request_metadata_ready(void* request, if (!r->cancelled) { grpc_error* error = process_plugin_result(r, md, num_md, status, error_details); - GRPC_CLOSURE_SCHED(r->on_request_metadata, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_request_metadata, error); } else if (GRPC_TRACE_FLAG_ENABLED(grpc_plugin_credentials_trace)) { gpr_log(GPR_INFO, "plugin_credentials[%p]: request %p: plugin was previously " @@ -228,8 +228,9 @@ void grpc_plugin_credentials::cancel_get_request_metadata( pending_request); } pending_request->cancelled = true; - GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, - GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + pending_request->on_request_metadata, + GRPC_ERROR_REF(error)); pending_request_remove_locked(pending_request); break; } diff --git a/src/core/lib/security/security_connector/alts/alts_security_connector.cc b/src/core/lib/security/security_connector/alts/alts_security_connector.cc index a20cf448f8d..e4e33256364 100644 --- a/src/core/lib/security/security_connector/alts/alts_security_connector.cc +++ b/src/core/lib/security/security_connector/alts/alts_security_connector.cc @@ -59,7 +59,7 @@ void alts_check_peer(tsi_peer peer, ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Could not get ALTS auth context from TSI peer"); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); } class grpc_alts_channel_security_connector final diff --git a/src/core/lib/security/security_connector/fake/fake_security_connector.cc b/src/core/lib/security/security_connector/fake/fake_security_connector.cc index 39006348f7f..d4203d1dfe8 100644 --- a/src/core/lib/security/security_connector/fake/fake_security_connector.cc +++ b/src/core/lib/security/security_connector/fake/fake_security_connector.cc @@ -245,7 +245,7 @@ static void fake_check_peer( auth_context->get(), GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); tsi_peer_destruct(&peer); } diff --git a/src/core/lib/security/security_connector/local/local_security_connector.cc b/src/core/lib/security/security_connector/local/local_security_connector.cc index d56fc450a9c..20d0888320b 100644 --- a/src/core/lib/security/security_connector/local/local_security_connector.cc +++ b/src/core/lib/security/security_connector/local/local_security_connector.cc @@ -100,7 +100,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/, if (!is_endpoint_local) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Endpoint is neither UDS or TCP loopback address."); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); return; } /* Create an auth context which is necessary to pass the santiy check in @@ -112,7 +112,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/, error = *auth_context != nullptr ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Could not create local auth context"); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); } class grpc_local_channel_security_connector final diff --git a/src/core/lib/security/security_connector/ssl/ssl_security_connector.cc b/src/core/lib/security/security_connector/ssl/ssl_security_connector.cc index 8e3558f102a..b28cee3c13b 100644 --- a/src/core/lib/security/security_connector/ssl/ssl_security_connector.cc +++ b/src/core/lib/security/security_connector/ssl/ssl_security_connector.cc @@ -167,7 +167,7 @@ class grpc_ssl_channel_security_connector final } } } - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -302,7 +302,7 @@ class grpc_ssl_server_security_connector grpc_closure* on_peer_checked) override { grpc_error* error = ssl_check_peer(nullptr, &peer, auth_context); tsi_peer_destruct(&peer); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); } int cmp(const grpc_security_connector* other) const override { diff --git a/src/core/lib/security/security_connector/tls/spiffe_security_connector.cc b/src/core/lib/security/security_connector/tls/spiffe_security_connector.cc index 8fedff78266..8dc2937c152 100644 --- a/src/core/lib/security/security_connector/tls/spiffe_security_connector.cc +++ b/src/core/lib/security/security_connector/tls/spiffe_security_connector.cc @@ -172,7 +172,7 @@ void SpiffeChannelSecurityConnector::check_peer( : target_name_.get(); grpc_error* error = grpc_ssl_check_alpn(&peer); if (error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); tsi_peer_destruct(&peer); return; } @@ -213,7 +213,7 @@ void SpiffeChannelSecurityConnector::check_peer( error = ProcessServerAuthorizationCheckResult(check_arg_); } } - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -342,7 +342,7 @@ void SpiffeChannelSecurityConnector::ServerAuthorizationCheckDone( grpc_error* error = ProcessServerAuthorizationCheckResult(arg); SpiffeChannelSecurityConnector* connector = static_cast(arg->cb_user_data); - GRPC_CLOSURE_SCHED(connector->on_peer_checked_, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, connector->on_peer_checked_, error); } grpc_error* @@ -446,7 +446,7 @@ void SpiffeServerSecurityConnector::check_peer( *auth_context = grpc_ssl_peer_to_auth_context( &peer, GRPC_TLS_SPIFFE_TRANSPORT_SECURITY_TYPE); tsi_peer_destruct(&peer); - GRPC_CLOSURE_SCHED(on_peer_checked, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error); } int SpiffeServerSecurityConnector::cmp( diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index 0aac7d8d780..705d1675615 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -165,7 +165,7 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) { } } ep->read_buffer = nullptr; - GRPC_CLOSURE_SCHED(ep->read_cb, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error); SECURE_ENDPOINT_UNREF(ep, "read"); } @@ -363,9 +363,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer); - GRPC_CLOSURE_SCHED( - cb, grpc_set_tsi_error_result( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, cb, + grpc_set_tsi_error_result( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); return; } diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index 74062be682f..7beb216063f 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -199,7 +199,7 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error* error) { is_shutdown_ = true; } // Invoke callback. - GRPC_CLOSURE_SCHED(on_handshake_done_, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error); } void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) { @@ -259,7 +259,7 @@ void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) { args_->args = grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); grpc_channel_args_destroy(tmp_args); // Invoke callback. - GRPC_CLOSURE_SCHED(on_handshake_done_, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, GRPC_ERROR_NONE); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. is_shutdown_ = true; @@ -449,9 +449,9 @@ class FailHandshaker : public Handshaker { void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, grpc_closure* on_handshake_done, HandshakerArgs* /*args*/) override { - GRPC_CLOSURE_SCHED(on_handshake_done, - GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to create security handshaker")); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, + GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to create security handshaker")); } private: diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index d7b33f99509..ebf0d1a92fa 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -159,7 +159,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem, calld->recv_trailing_metadata_error, "continue recv_trailing_metadata_ready"); } - GRPC_CLOSURE_SCHED(closure, error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error); } // Called from application code. diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 605d69a2ebf..258d5555e0e 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1228,8 +1228,9 @@ static void post_batch_completion(batch_control* bctl) { * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead * of GRPC_CLOSURE_RUN. */ - GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, + (grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs error */ @@ -1574,7 +1575,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, static_cast( gpr_malloc(sizeof(grpc_cq_completion)))); } else { - GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag, + GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 0f882851966..9b29b54ab25 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -131,7 +131,7 @@ grpc_error* non_polling_poller_work(grpc_pollset* pollset, npp->root = w.next; if (&w == npp->root) { if (npp->shutdown) { - GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE); } npp->root = nullptr; } @@ -166,7 +166,7 @@ void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) { GPR_ASSERT(closure != nullptr); p->shutdown = closure; if (p->root == nullptr) { - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } else { non_polling_worker* w = p->root; do { diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index e2a0cfd2dd3..a9f5c9c7791 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -111,18 +111,16 @@ static void lame_start_transport_op(grpc_channel_element* elem, } } if (op->send_ping.on_initiate != nullptr) { - GRPC_CLOSURE_SCHED( - op->send_ping.on_initiate, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); + ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } if (op->send_ping.on_ack != nullptr) { - GRPC_CLOSURE_SCHED( - op->send_ping.on_ack, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); + ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != nullptr) { - GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); } } @@ -136,7 +134,7 @@ static grpc_error* lame_init_call_elem(grpc_call_element* elem, static void lame_destroy_call_elem(grpc_call_element* /*elem*/, const grpc_call_final_info* /*final_info*/, grpc_closure* then_schedule_closure) { - GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE); } static grpc_error* lame_init_channel_elem(grpc_channel_element* elem, diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index eed32a9a23d..f3848fd4ffa 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -381,7 +381,8 @@ static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } } @@ -527,7 +528,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) { &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_REF(error)); return; } @@ -588,7 +590,8 @@ static void finish_start_new_rpc( gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); return; } @@ -843,7 +846,8 @@ static void got_initial_metadata(void* ptr, grpc_error* error) { if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) { GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) { /* zombied call will be destroyed when it's removed from the pending queue... later */ @@ -1448,7 +1452,8 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, + GRPC_ERROR_NONE); } else { publish_call(server, calld, cq_idx, rc); } diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index b51fdfdb13d..d8b8022aae1 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -67,7 +67,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier { } else { GRPC_CLOSURE_INIT(&closure_, SendNotification, this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); + ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 0a82f7cb8ca..f691074cf67 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -52,7 +52,8 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount) { there. */ grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE); } else { - GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy, + GRPC_ERROR_NONE); } } @@ -218,7 +219,8 @@ struct made_transport_op { static void destroy_made_transport_op(void* arg, grpc_error* error) { made_transport_op* op = static_cast(arg); - GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error)); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete, + GRPC_ERROR_REF(error)); grpc_core::Delete(op); }