src/core changes

reviewable/pr20892/r1
Yash Tibrewal 5 years ago
parent 336b476d32
commit 5d18d4450a
  1. 15
      src/core/ext/filters/client_channel/client_channel.cc
  2. 3
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 6
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  4. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc
  5. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  6. 7
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  7. 7
      src/core/ext/filters/client_channel/subchannel.cc
  8. 2
      src/core/ext/filters/deadline/deadline_filter.cc
  9. 9
      src/core/ext/filters/max_age/max_age_filter.cc
  10. 4
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  11. 3
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  12. 32
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 2
      src/core/ext/transport/chttp2/transport/frame_data.cc
  14. 5
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  15. 71
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  16. 87
      src/core/ext/transport/inproc/inproc_transport.cc
  17. 2
      src/core/lib/channel/handshaker.cc
  18. 2
      src/core/lib/http/httpcli.cc
  19. 2
      src/core/lib/http/httpcli_security_connector.cc
  20. 10
      src/core/lib/iomgr/call_combiner.cc
  21. 7
      src/core/lib/iomgr/call_combiner.h
  22. 4
      src/core/lib/iomgr/endpoint_cfstream.cc
  23. 5
      src/core/lib/iomgr/ev_epoll1_linux.cc
  24. 8
      src/core/lib/iomgr/ev_epollex_linux.cc
  25. 20
      src/core/lib/iomgr/ev_poll_posix.cc
  26. 17
      src/core/lib/iomgr/lockfree_event.cc
  27. 2
      src/core/lib/iomgr/pollset_custom.cc
  28. 4
      src/core/lib/iomgr/pollset_windows.cc
  29. 4
      src/core/lib/iomgr/resolve_address_custom.cc
  30. 5
      src/core/lib/iomgr/resolve_address_posix.cc
  31. 2
      src/core/lib/iomgr/resolve_address_windows.cc
  32. 22
      src/core/lib/iomgr/resource_quota.cc
  33. 4
      src/core/lib/iomgr/socket_windows.cc
  34. 4
      src/core/lib/iomgr/tcp_client_cfstream.cc
  35. 2
      src/core/lib/iomgr/tcp_client_custom.cc
  36. 9
      src/core/lib/iomgr/tcp_client_posix.cc
  37. 4
      src/core/lib/iomgr/tcp_client_windows.cc
  38. 19
      src/core/lib/iomgr/tcp_custom.cc
  39. 20
      src/core/lib/iomgr/tcp_posix.cc
  40. 3
      src/core/lib/iomgr/tcp_server_custom.cc
  41. 3
      src/core/lib/iomgr/tcp_server_posix.cc
  42. 5
      src/core/lib/iomgr/tcp_server_windows.cc
  43. 26
      src/core/lib/iomgr/tcp_windows.cc
  44. 7
      src/core/lib/iomgr/timer_custom.cc
  45. 15
      src/core/lib/iomgr/timer_generic.cc
  46. 3
      src/core/lib/iomgr/udp_server.cc
  47. 3
      src/core/lib/security/credentials/composite/composite_credentials.cc
  48. 3
      src/core/lib/security/credentials/fake/fake_credentials.cc
  49. 8
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  50. 7
      src/core/lib/security/credentials/plugin/plugin_credentials.cc
  51. 2
      src/core/lib/security/security_connector/alts/alts_security_connector.cc
  52. 2
      src/core/lib/security/security_connector/fake/fake_security_connector.cc
  53. 4
      src/core/lib/security/security_connector/local/local_security_connector.cc
  54. 4
      src/core/lib/security/security_connector/ssl/ssl_security_connector.cc
  55. 8
      src/core/lib/security/security_connector/tls/spiffe_security_connector.cc
  56. 9
      src/core/lib/security/transport/secure_endpoint.cc
  57. 10
      src/core/lib/security/transport/security_handshaker.cc
  58. 2
      src/core/lib/security/transport/server_auth_filter.cc
  59. 8
      src/core/lib/surface/call.cc
  60. 4
      src/core/lib/surface/completion_queue.cc
  61. 14
      src/core/lib/surface/lame_client.cc
  62. 15
      src/core/lib/surface/server.cc
  63. 2
      src/core/lib/transport/connectivity_state.cc
  64. 6
      src/core/lib/transport/transport.cc

@ -1163,7 +1163,7 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
// Report new state to the user.
*state_ = state;
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
// Hop back into the combiner to clean up.
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
@ -1180,7 +1180,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
MemoryOrder::RELAXED)) {
return; // Already done.
}
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
@ -1826,8 +1826,9 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
grpc_error* error = chand->DoPingLocked(op);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
GRPC_ERROR_REF(error));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
}
op->bind_pollset = nullptr;
op->send_ping.on_initiate = nullptr;
@ -1869,7 +1870,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
}
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
@ -2058,7 +2059,7 @@ void CallData::Destroy(grpc_call_element* elem,
then_schedule_closure = nullptr;
}
calld->~CallData();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
void CallData::StartTransportStreamOpBatch(
@ -3679,7 +3680,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) {
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&pick_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
}
void CallData::PickDone(void* arg, grpc_error* error) {

@ -300,7 +300,8 @@ void HealthCheckClient::CallState::StartCall() {
// Schedule instead of running directly, since we must not be
// holding health_check_client_->mu_ when CallEnded() is called.
call_->Ref(DEBUG_LOCATION, "call_end_closure").release();
GRPC_CLOSURE_SCHED(
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);

@ -125,7 +125,7 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error* error) {
is_shutdown_ = true;
}
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
}
// Callback invoked when finished writing HTTP CONNECT request.
@ -222,7 +222,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
goto done;
}
// Success. Invoke handshake-done callback.
GRPC_CLOSURE_SCHED(handshaker->on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, handshaker->on_handshake_done_, error);
done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
@ -260,7 +260,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
gpr_mu_lock(&mu_);
is_shutdown_ = true;
gpr_mu_unlock(&mu_);
GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, GRPC_ERROR_NONE);
return;
}
// Get headers from channel args.

@ -81,10 +81,10 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
uv_poll_stop(handle_);
uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
if (read_closure_ != nullptr) {
GRPC_CLOSURE_SCHED(read_closure_, GRPC_ERROR_CANCELLED);
ExecCtx::Run(DEBUG_LOCATION, read_closure_, GRPC_ERROR_CANCELLED);
}
if (write_closure_ != nullptr) {
GRPC_CLOSURE_SCHED(write_closure_, GRPC_ERROR_CANCELLED);
ExecCtx::Run(DEBUG_LOCATION, write_closure_, GRPC_ERROR_CANCELLED);
}
}
@ -135,13 +135,13 @@ static void ares_uv_poll_cb_locked(void* arg, grpc_error* error) {
}
if (events & UV_READABLE) {
GPR_ASSERT(polled_fd->read_closure_ != nullptr);
GRPC_CLOSURE_SCHED(polled_fd->read_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, polled_fd->read_closure_, error);
polled_fd->read_closure_ = nullptr;
polled_fd->poll_events_ &= ~UV_READABLE;
}
if (events & UV_WRITABLE) {
GPR_ASSERT(polled_fd->write_closure_ != nullptr);
GRPC_CLOSURE_SCHED(polled_fd->write_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, polled_fd->write_closure_, error);
polled_fd->write_closure_ = nullptr;
polled_fd->poll_events_ &= ~UV_WRITABLE;
}

@ -128,12 +128,12 @@ class GrpcPolledFdWindows {
}
void ScheduleAndNullReadClosure(grpc_error* error) {
GRPC_CLOSURE_SCHED(read_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, read_closure_, error);
read_closure_ = nullptr;
}
void ScheduleAndNullWriteClosure(grpc_error* error) {
GRPC_CLOSURE_SCHED(write_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, write_closure_, error);
write_closure_ = nullptr;
}

@ -148,7 +148,7 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) {
// TODO(apolcyn): allow c-ares to return a service config
// with no addresses along side it
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, r->error);
}
static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
@ -447,7 +447,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
return;
error_cleanup:
GRPC_CLOSURE_SCHED(r->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
}
static bool inner_resolve_as_ip_literal_locked(
@ -714,7 +714,8 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
sizeof(grpc_resolved_address));
}
}
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done,
GRPC_ERROR_REF(error));
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
grpc_core::Delete(r);
}

@ -734,9 +734,10 @@ void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
old_refs = RefMutate(-static_cast<gpr_atm>(1),
1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(subchannel_destroy, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
}

@ -199,7 +199,7 @@ grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_core::New<start_timer_after_init_state>(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE);
}
}

@ -492,7 +492,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
initialization is done. */
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_age_timer_after_init");
GRPC_CLOSURE_SCHED(&chand->start_max_age_timer_after_init, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
&chand->start_max_age_timer_after_init,
GRPC_ERROR_NONE);
}
/* Initialize the number of calls as 1, so that the max_idle_timer will not
@ -501,8 +503,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
if (chand->max_connection_idle != GRPC_MILLIS_INF_FUTURE) {
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_idle_timer_after_init");
GRPC_CLOSURE_SCHED(&chand->start_max_idle_timer_after_init,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
&chand->start_max_idle_timer_after_init,
GRPC_ERROR_NONE);
}
return GRPC_ERROR_NONE;
}

@ -150,7 +150,7 @@ static void on_handshake_done(void* arg, grpc_error* error) {
}
grpc_closure* notify = c->notify;
c->notify = nullptr;
GRPC_CLOSURE_SCHED(notify, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
c->handshake_mgr.reset();
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(reinterpret_cast<grpc_connector*>(c));
@ -182,7 +182,7 @@ static void connected(void* arg, grpc_error* error) {
c->result->reset();
grpc_closure* notify = c->notify;
c->notify = nullptr;
GRPC_CLOSURE_SCHED(notify, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
if (c->endpoint != nullptr) {
grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
}

@ -266,7 +266,8 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) {
// may do a synchronous unref.
grpc_core::ExecCtx::Get()->Flush();
if (destroy_done != nullptr) {
GRPC_CLOSURE_SCHED(destroy_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_done,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Get()->Flush();
}
grpc_channel_args_destroy(state->args);

@ -593,7 +593,8 @@ static void close_transport_locked(grpc_chttp2_transport* t,
grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
}
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
GRPC_ERROR_CANCELLED);
t->notify_on_receive_settings = nullptr;
}
GRPC_ERROR_UNREF(error);
@ -706,7 +707,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
}
static int init_stream(grpc_transport* gt, grpc_stream* gs,
@ -1177,7 +1178,7 @@ static grpc_closure* add_closure_barrier(grpc_closure* closure) {
static void null_then_sched_closure(grpc_closure** closure) {
grpc_closure* c = *closure;
*closure = nullptr;
GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
}
void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
@ -1220,7 +1221,8 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
// Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
// closures earlier than when it is safe to do so.
GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
closure->error_data.error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
@ -1678,8 +1680,10 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack) {
if (t->closed_with_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
GRPC_ERROR_REF(t->closed_with_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
GRPC_ERROR_REF(t->closed_with_error));
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
@ -2933,7 +2937,7 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
s->on_next = nullptr;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
@ -2991,10 +2995,11 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
@ -3003,8 +3008,8 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
if (bs->remaining_bytes_ != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
@ -3092,7 +3097,8 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
GRPC_ERROR_REF(error));
stream_->on_next = nullptr;
GRPC_ERROR_UNREF(stream_->byte_stream_error);
stream_->byte_stream_error = GRPC_ERROR_REF(error);

@ -291,7 +291,7 @@ grpc_error* grpc_chttp2_data_parser_parse(void* /*parser*/,
GPR_ASSERT(s->frame_storage.length == 0);
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_NONE);
s->on_next = nullptr;
s->unprocessed_incoming_frames_decompressed = false;
} else {

@ -135,8 +135,9 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
t->num_pending_induced_frames++;
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(t->notify_on_receive_settings,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
GRPC_ERROR_NONE);
t->notify_on_receive_settings = nullptr;
}
}

@ -1150,22 +1150,26 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata,
stream_op->payload->recv_initial_metadata.recv_initial_metadata);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@ -1176,30 +1180,34 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@ -1240,7 +1248,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
@ -1296,8 +1305,9 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
/* Do an extra read to trigger on_succeeded() callback in case connection
@ -1328,7 +1338,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
@ -1352,13 +1363,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
}
} else if (stream_state->state_callback_received[OP_FAILED]) {
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(
stream_op->on_complete,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->on_complete,
make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
}
} else {
@ -1366,7 +1377,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
* callback
*/
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
GRPC_ERROR_NONE);
}
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
@ -1433,20 +1445,24 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
if (op->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message) {
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_trailing_metadata) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_CANCELLED);
}
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
GRPC_ERROR_CANCELLED);
return;
}
stream_obj* s = reinterpret_cast<stream_obj*>(gs);
@ -1458,7 +1474,8 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
stream_obj* s = reinterpret_cast<stream_obj*>(gs);
s->~stream_obj();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_transport* gt) {}

@ -204,7 +204,8 @@ struct inproc_stream {
t->unref();
if (closure_at_destroy) {
GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_at_destroy,
GRPC_ERROR_NONE);
}
}
@ -390,13 +391,15 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
GRPC_ERROR_REF(error));
}
}
void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
s->ops_needed = false;
}
@ -471,9 +474,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
err);
// Last use of err so no need to REF and then UNREF it
complete_if_batch_end_locked(
@ -484,7 +489,8 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
if (s->recv_message_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
complete_if_batch_end_locked(
@ -508,9 +514,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@ -564,7 +572,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
receiver->recv_stream.get());
INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
receiver->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -656,14 +665,16 @@ void op_state_machine(void* arg, grpc_error* error) {
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready", s);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = nullptr;
needs_close = true;
}
@ -708,9 +719,11 @@ void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
complete_if_batch_end_locked(
s, new_err, s->recv_initial_md_op,
"op_state_machine scheduling recv-initial-metadata-on-complete");
@ -748,7 +761,8 @@ void op_state_machine(void* arg, grpc_error* error) {
// satisfied
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -785,12 +799,14 @@ void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
needs_close = true;
} else {
@ -810,7 +826,8 @@ void op_state_machine(void* arg, grpc_error* error) {
// recv_message_op
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -883,9 +900,11 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@ -1026,7 +1045,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
(op->recv_message && other && other->send_message_op != nullptr) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_NONE);
s->op_closure_scheduled = true;
}
} else {
@ -1054,7 +1074,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GPR_INFO,
"perform_stream_op error %p scheduling initial-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
@ -1063,22 +1084,24 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GPR_INFO,
"perform_stream_op error %p scheduling recv message-ready %p", s,
error);
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_trailing_metadata) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
}
if (needs_close) {
close_other_side_locked(s, "perform_stream_op:other_side");
@ -1121,7 +1144,7 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
t->accept_stream_data = op->set_accept_stream_user_data;
}
if (op->on_consumed) {
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
bool do_close = false;

@ -169,7 +169,7 @@ bool HandshakeManager::CallNextHandshakerLocked(grpc_error* error) {
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(&deadline_timer_);
GRPC_CLOSURE_SCHED(&on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error);
is_shutdown_ = true;
} else {
auto handshaker = handshakers_[index_];

@ -88,7 +88,7 @@ static void next_address(internal_request* req, grpc_error* due_to_error);
static void finish(internal_request* req, grpc_error* error) {
grpc_polling_entity_del_from_pollset_set(req->pollent,
req->context->pollset_set);
GRPC_CLOSURE_SCHED(req->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != nullptr) {
grpc_resolved_addresses_destroy(req->addresses);

@ -100,7 +100,7 @@ class grpc_httpcli_ssl_channel_security_connector final
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}

@ -92,9 +92,9 @@ void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
#ifdef GRPC_TSAN_ENABLED
original_closure_ = closure;
GRPC_CLOSURE_SCHED(&tsan_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
#else
GRPC_CLOSURE_SCHED(closure, error);
ExecCtx::Run(DEBUG_LOCATION, closure, error);
#endif
}
@ -199,7 +199,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
"for pre-existing cancellation",
this, closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
break;
} else {
if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) {
@ -217,7 +217,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
"call_combiner=%p: scheduling old cancel callback=%p", this,
closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
}
break;
}
@ -244,7 +244,7 @@ void CallCombiner::Cancel(grpc_error* error) {
"call_combiner=%p: scheduling notify_on_cancel callback=%p",
this, notify_on_cancel);
}
GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
}
break;
}

@ -31,6 +31,7 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/dynamic_annotations.h"
#include "src/core/lib/iomgr/exec_ctx.h"
// A simple, lock-free mechanism for serializing activity related to a
// single call. This is similar to a combiner but is more lightweight.
@ -156,8 +157,8 @@ class CallCombinerClosureList {
//
// All but one of the closures in the list will be scheduled via
// GRPC_CALL_COMBINER_START(), and the remaining closure will be
// scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
// yielding the call combiner. If the list is empty, then the call
// scheduled via ExecCtx::Run(DEBUG_LOCATION,), which will eventually result
// in yielding the call combiner. If the list is empty, then the call
// combiner will be yielded immediately.
void RunClosures(CallCombiner* call_combiner) {
if (closures_.empty()) {
@ -177,7 +178,7 @@ class CallCombinerClosureList {
grpc_error_string(closures_[0].error), closures_[0].reason);
}
// This will release the call combiner.
GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error);
closures_.clear();
}

@ -132,7 +132,7 @@ static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
grpc_closure* cb = ep->read_cb;
ep->read_cb = nullptr;
ep->read_slices = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
@ -145,7 +145,7 @@ static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
grpc_closure* cb = ep->write_cb;
ep->write_cb = nullptr;
ep->write_slices = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void ReadAction(void* arg, grpc_error* error) {

@ -420,7 +420,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
close(fd->fd);
}
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
fork_fd_list_remove_grpc_fd(fd);
@ -623,7 +623,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
pollset->begin_refs == 0) {
GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
GRPC_ERROR_NONE);
pollset->shutdown_closure = nullptr;
}
}

@ -393,7 +393,8 @@ static void unref_by(grpc_fd* fd, int n) {
#endif
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
} else {
@ -487,7 +488,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
if (pollable_obj) {
gpr_mu_unlock(&pollable_obj->owner_orphan_mu);
@ -662,7 +663,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
pollset->containing_pollset_set_count == 0) {
GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
GRPC_ERROR_NONE);
pollset->shutdown_closure = nullptr;
pollset->already_shutdown = true;
}

@ -436,7 +436,7 @@ static void close_fd_locked(grpc_fd* fd) {
if (!fd->released) {
close(fd->fd);
}
GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd* fd) {
@ -497,17 +497,18 @@ static grpc_error* fd_shutdown_error(grpc_fd* fd) {
static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
grpc_closure* closure) {
if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
GRPC_CLOSURE_SCHED(
closure, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, closure,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@ -529,7 +530,7 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
return 0;
} else {
/* waiting ==> queue closure */
GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -574,7 +575,7 @@ static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
}
static void fd_set_readable(grpc_fd* fd) {
@ -896,7 +897,8 @@ static void finish_shutdown(grpc_pollset* pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error** composite, grpc_error* error) {

@ -23,6 +23,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::DebugOnlyTraceFlag grpc_polling_trace;
@ -124,7 +125,7 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(&state_, kClosureReady, kClosureNotReady)) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */
}
@ -137,9 +138,9 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
schedule the closure with the shutdown error */
if ((curr & kShutdownBit) > 0) {
grpc_error* shutdown_err = (grpc_error*)(curr & ~kShutdownBit);
GRPC_CLOSURE_SCHED(closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return;
}
@ -189,9 +190,9 @@ bool LockfreeEvent::SetShutdown(grpc_error* shutdown_err) {
happens-after on that edge), and a release to pair with anything
loading the shutdown state. */
if (gpr_atm_full_cas(&state_, curr, new_state)) {
GRPC_CLOSURE_SCHED((grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return true;
}
@ -239,7 +240,7 @@ void LockfreeEvent::SetReady() {
spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(&state_, curr, kClosureNotReady)) {
GRPC_CLOSURE_SCHED((grpc_closure*)curr, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr, GRPC_ERROR_NONE);
return;
}
/* else the state changed again (only possible by either a racing

@ -55,7 +55,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
static void pollset_shutdown(grpc_pollset* /*pollset*/, grpc_closure* closure) {
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
}
static void pollset_destroy(grpc_pollset* pollset) {

@ -98,7 +98,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@ -146,7 +146,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
GRPC_CLOSURE_SCHED(pollset->on_shutdown, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;

@ -79,7 +79,7 @@ void grpc_custom_resolve_callback(grpc_custom_resolver* r,
return;
}
if (r->on_done) {
GRPC_CLOSURE_SCHED(r->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
}
gpr_free(r->host);
gpr_free(r->port);
@ -161,7 +161,7 @@ static void resolve_address_impl(const char* name, const char* default_port,
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_done, err);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, err);
return;
}
r = (grpc_custom_resolver*)gpr_malloc(sizeof(grpc_custom_resolver));

@ -152,8 +152,9 @@ typedef struct {
* grpc_blocking_resolve_address */
static void do_request_thread(void* rp, grpc_error* /*error*/) {
request* r = static_cast<request*>(rp);
GRPC_CLOSURE_SCHED(r->on_done, grpc_blocking_resolve_address(
r->name, r->default_port, r->addrs_out));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, r->on_done,
grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);

@ -138,7 +138,7 @@ static void do_request_thread(void* rp, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
GRPC_CLOSURE_SCHED(r->on_done, error);
ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);

@ -424,7 +424,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = nullptr;
GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
return true;
}
@ -506,7 +506,7 @@ static bool ru_post_reclaimer(grpc_resource_user* resource_user,
resource_user->new_reclaimers[destructive] = nullptr;
GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@ -549,8 +549,10 @@ static void ru_shutdown(void* ru, grpc_error* /*error*/) {
}
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
gpr_mu_lock(&resource_user->mu);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
@ -572,8 +574,10 @@ static void ru_destroy(void* ru, grpc_error* /*error*/) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(resource_user->resource_quota);
@ -719,7 +723,7 @@ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
gpr_atm_no_barrier_store(&resource_quota->last_size,
(gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
}
size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
@ -994,8 +998,8 @@ bool grpc_resource_user_alloc_slices(
size_t count, grpc_slice_buffer* dest) {
if (GPR_UNLIKELY(
gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
GRPC_CLOSURE_SCHED(
&slice_allocator->on_allocated,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, &slice_allocator->on_allocated,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
return false;
}

@ -124,7 +124,7 @@ static void socket_notify_on_iocp(grpc_winsocket* socket, grpc_closure* closure,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@ -145,7 +145,7 @@ void grpc_socket_become_ready(grpc_winsocket* socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
GRPC_CLOSURE_SCHED(info->closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;

@ -96,7 +96,7 @@ static void OnAlarm(void* arg, grpc_error* error) {
} else {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
GRPC_CLOSURE_SCHED(closure, error);
ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
}
@ -137,7 +137,7 @@ static void OnOpen(void* arg, grpc_error* error) {
GRPC_ERROR_REF(error);
}
gpr_mu_unlock(&connect->mu);
GRPC_CLOSURE_SCHED(closure, error);
ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
}

@ -96,7 +96,7 @@ static void custom_connect_callback_internal(grpc_custom_socket* socket,
grpc_core::ExecCtx::Get()->Flush();
custom_tcp_connect_cleanup(connect);
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
static void custom_connect_callback(grpc_custom_socket* socket,

@ -241,7 +241,7 @@ finish:
grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
@ -298,12 +298,13 @@ void grpc_tcp_client_create_from_prepared_fd(
char* addr_str = grpc_sockaddr_to_uri(addr);
*ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
gpr_free(addr_str);
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_OS_ERROR(errno, "connect"));
return;
}
@ -344,7 +345,7 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
*ep = nullptr;
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fdobj)) != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
}
grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,

@ -117,7 +117,7 @@ static void on_connect(void* acp, grpc_error* error) {
async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
GRPC_CLOSURE_SCHED(on_done, error);
ExecCtx::Run(DEBUG_LOCATION, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@ -225,7 +225,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
GRPC_CLOSURE_SCHED(on_done, final_error);
ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
}
grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect};

@ -140,7 +140,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
TCP_UNREF(tcp, "read");
tcp->read_slices = nullptr;
tcp->read_cb = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void custom_read_callback(grpc_custom_socket* socket, size_t nread,
@ -220,7 +220,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
gpr_log(GPR_INFO, "write complete on %p: error=%s", tcp->socket, str);
}
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
@ -241,8 +241,9 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TCP socket is shutting down"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP socket is shutting down"));
return;
}
@ -252,7 +253,7 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
return;
}
tcp->write_cb = cb;
@ -289,10 +290,10 @@ static void endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
gpr_log(GPR_INFO, "TCP %p shutdown why=%s", tcp->socket, str);
}
tcp->shutting_down = true;
// GRPC_CLOSURE_SCHED(tcp->read_cb, GRPC_ERROR_REF(why));
// GRPC_CLOSURE_SCHED(tcp->write_cb, GRPC_ERROR_REF(why));
// tcp->read_cb = nullptr;
// tcp->write_cb = nullptr;
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->read_cb,
// GRPC_ERROR_REF(why));
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->write_cb,
// GRPC_ERROR_REF(why)); tcp->read_cb = nullptr; tcp->write_cb = nullptr;
grpc_resource_user_shutdown(tcp->resource_user);
grpc_custom_socket_vtable->shutdown(tcp->socket);
}

@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
#define MAX_READ_IOVEC 4
@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* right thing (i.e calls tcp_do_read() which either reads the available
* bytes or calls notify_on_read() to be notified when new bytes become
* available */
GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->read_done_closure,
GRPC_ERROR_NONE);
}
}
@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "write");
return;
}
@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
tcp->outgoing_buffer_arg = arg;
if (buf->length == 0) {
GRPC_CLOSURE_SCHED(
cb, grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
tcp)
: GRPC_ERROR_NONE);
tcp_shutdown_buffer_list(tcp);
return;
}
@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
}

@ -124,7 +124,8 @@ static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
static void finish_shutdown(grpc_tcp_server* s) {
GPR_ASSERT(s->shutdown);
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
while (s->head) {

@ -108,7 +108,8 @@ static void finish_shutdown(grpc_tcp_server* s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);

@ -135,10 +135,11 @@ static void destroy_server(void* arg, grpc_error* error) {
static void finish_shutdown_locked(grpc_tcp_server* s) {
if (s->shutdown_complete != NULL) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete, GRPC_ERROR_NONE);
}
GRPC_CLOSURE_SCHED(
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}

@ -233,7 +233,7 @@ static void on_read(void* tcpp, grpc_error* error) {
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
GRPC_CLOSURE_SCHED(cb, error);
ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
#define DEFAULT_TARGET_READ_SIZE 8192
@ -254,9 +254,9 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
ExecCtx::Run(DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@ -289,7 +289,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transferred = bytes_read;
GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@ -302,8 +302,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
GRPC_CLOSURE_SCHED(&tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
@ -338,7 +338,7 @@ static void on_write(void* tcpp, grpc_error* error) {
}
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, error);
ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
/* Initiates a write. */
@ -366,9 +366,9 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
ExecCtx::Run(DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@ -399,7 +399,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_error* error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
GRPC_CLOSURE_SCHED(cb, error);
ExecCtx::Run(DEBUG_LOCATION, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@ -417,7 +417,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}

@ -37,7 +37,7 @@ void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* /*error*/) {
grpc_timer* timer = t->original;
GPR_ASSERT(timer->pending);
timer->pending = 0;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
custom_timer_impl->stop(t);
gpr_free(t);
}
@ -48,7 +48,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
if (deadline <= grpc_core::ExecCtx::Get()->Now()) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
timer->pending = false;
return;
} else {
@ -69,7 +69,8 @@ static void timer_cancel(grpc_timer* timer) {
grpc_custom_timer* tw = (grpc_custom_timer*)timer->custom_timer;
if (timer->pending) {
timer->pending = 0;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CANCELLED);
custom_timer_impl->stop(tw);
gpr_free(tw);
}

@ -369,9 +369,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
if (!g_shared_mutables.initialized) {
timer->pending = false;
GRPC_CLOSURE_SCHED(timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
return;
}
@ -380,7 +381,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
if (deadline <= now) {
timer->pending = false;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
/* early out */
return;
@ -471,7 +472,8 @@ static void timer_cancel(grpc_timer* timer) {
if (timer->pending) {
REMOVE_FROM_HASH_TABLE(timer);
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CANCELLED);
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@ -564,7 +566,8 @@ static size_t pop_timers(timer_shard* shard, grpc_millis now,
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
REMOVE_FROM_HASH_TABLE(timer);
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);

@ -243,7 +243,8 @@ void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) {
static void finish_shutdown(grpc_udp_server* s) {
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);

@ -80,7 +80,8 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error) {
}
// We're done!
}
GRPC_CLOSURE_SCHED(ctx->on_request_metadata, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, ctx->on_request_metadata,
GRPC_ERROR_REF(error));
gpr_free(ctx);
}

@ -94,7 +94,8 @@ bool grpc_md_only_test_credentials::get_request_metadata(
grpc_error** /*error*/) {
grpc_credentials_mdelem_array_add(md_array, md_);
if (is_async_) {
GRPC_CLOSURE_SCHED(on_request_metadata, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_request_metadata,
GRPC_ERROR_NONE);
return false;
}
return true;

@ -256,7 +256,8 @@ void grpc_oauth2_token_fetcher_credentials::on_http_response(
new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Error occurred when fetching oauth2 token.", &error, 1);
}
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, new_error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata, new_error);
grpc_polling_entity_del_from_pollset_set(
pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_));
grpc_oauth2_pending_get_request_metadata* prev = pending_request;
@ -332,8 +333,9 @@ void grpc_oauth2_token_fetcher_credentials::cancel_get_request_metadata(
pending_requests_ = pending_request->next;
}
// Invoke the callback immediately with an error.
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
gpr_free(pending_request);
break;
}

@ -131,7 +131,7 @@ static void plugin_md_request_metadata_ready(void* request,
if (!r->cancelled) {
grpc_error* error =
process_plugin_result(r, md, num_md, status, error_details);
GRPC_CLOSURE_SCHED(r->on_request_metadata, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_request_metadata, error);
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_plugin_credentials_trace)) {
gpr_log(GPR_INFO,
"plugin_credentials[%p]: request %p: plugin was previously "
@ -228,8 +228,9 @@ void grpc_plugin_credentials::cancel_get_request_metadata(
pending_request);
}
pending_request->cancelled = true;
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
pending_request_remove_locked(pending_request);
break;
}

@ -59,7 +59,7 @@ void alts_check_peer(tsi_peer peer,
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Could not get ALTS auth context from TSI peer");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
class grpc_alts_channel_security_connector final

@ -245,7 +245,7 @@ static void fake_check_peer(
auth_context->get(), GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}

@ -100,7 +100,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/,
if (!is_endpoint_local) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Endpoint is neither UDS or TCP loopback address.");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
return;
}
/* Create an auth context which is necessary to pass the santiy check in
@ -112,7 +112,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/,
error = *auth_context != nullptr ? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Could not create local auth context");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
class grpc_local_channel_security_connector final

@ -167,7 +167,7 @@ class grpc_ssl_channel_security_connector final
}
}
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@ -302,7 +302,7 @@ class grpc_ssl_server_security_connector
grpc_closure* on_peer_checked) override {
grpc_error* error = ssl_check_peer(nullptr, &peer, auth_context);
tsi_peer_destruct(&peer);
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
int cmp(const grpc_security_connector* other) const override {

@ -172,7 +172,7 @@ void SpiffeChannelSecurityConnector::check_peer(
: target_name_.get();
grpc_error* error = grpc_ssl_check_alpn(&peer);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
return;
}
@ -213,7 +213,7 @@ void SpiffeChannelSecurityConnector::check_peer(
error = ProcessServerAuthorizationCheckResult(check_arg_);
}
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@ -342,7 +342,7 @@ void SpiffeChannelSecurityConnector::ServerAuthorizationCheckDone(
grpc_error* error = ProcessServerAuthorizationCheckResult(arg);
SpiffeChannelSecurityConnector* connector =
static_cast<SpiffeChannelSecurityConnector*>(arg->cb_user_data);
GRPC_CLOSURE_SCHED(connector->on_peer_checked_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, connector->on_peer_checked_, error);
}
grpc_error*
@ -446,7 +446,7 @@ void SpiffeServerSecurityConnector::check_peer(
*auth_context = grpc_ssl_peer_to_auth_context(
&peer, GRPC_TLS_SPIFFE_TRANSPORT_SECURITY_TYPE);
tsi_peer_destruct(&peer);
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
int SpiffeServerSecurityConnector::cmp(

@ -165,7 +165,7 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
}
}
ep->read_buffer = nullptr;
GRPC_CLOSURE_SCHED(ep->read_cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
SECURE_ENDPOINT_UNREF(ep, "read");
}
@ -363,9 +363,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
GRPC_CLOSURE_SCHED(
cb, grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
return;
}

@ -199,7 +199,7 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error* error) {
is_shutdown_ = true;
}
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
}
void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) {
@ -259,7 +259,7 @@ void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) {
args_->args = grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
grpc_channel_args_destroy(tmp_args);
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, GRPC_ERROR_NONE);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
is_shutdown_ = true;
@ -449,9 +449,9 @@ class FailHandshaker : public Handshaker {
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* /*args*/) override {
GRPC_CLOSURE_SCHED(on_handshake_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to create security handshaker"));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_handshake_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to create security handshaker"));
}
private:

@ -159,7 +159,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
// Called from application code.

@ -1228,8 +1228,9 @@ static void post_batch_completion(batch_control* bctl) {
* running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
* of GRPC_CLOSURE_RUN.
*/
GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
(grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs error */
@ -1574,7 +1575,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;

@ -131,7 +131,7 @@ grpc_error* non_polling_poller_work(grpc_pollset* pollset,
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = nullptr;
}
@ -166,7 +166,7 @@ void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
if (p->root == nullptr) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker* w = p->root;
do {

@ -111,18 +111,16 @@ static void lame_start_transport_op(grpc_channel_element* elem,
}
}
if (op->send_ping.on_initiate != nullptr) {
GRPC_CLOSURE_SCHED(
op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
if (op->send_ping.on_ack != nullptr) {
GRPC_CLOSURE_SCHED(
op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != nullptr) {
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
}
@ -136,7 +134,7 @@ static grpc_error* lame_init_call_elem(grpc_call_element* elem,
static void lame_destroy_call_elem(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
static grpc_error* lame_init_channel_elem(grpc_channel_element* elem,

@ -381,7 +381,8 @@ static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
}
}
@ -527,7 +528,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_REF(error));
return;
}
@ -588,7 +590,8 @@ static void finish_start_new_rpc(
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
return;
}
@ -843,7 +846,8 @@ static void got_initial_metadata(void* ptr, grpc_error* error) {
if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
/* zombied call will be destroyed when it's removed from the pending
queue... later */
@ -1448,7 +1452,8 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
publish_call(server, calld, cq_idx, rc);
}

@ -67,7 +67,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
} else {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
}

@ -52,7 +52,8 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount) {
there. */
grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE);
} else {
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
GRPC_ERROR_NONE);
}
}
@ -218,7 +219,8 @@ struct made_transport_op {
static void destroy_made_transport_op(void* arg, grpc_error* error) {
made_transport_op* op = static_cast<made_transport_op*>(arg);
GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete,
GRPC_ERROR_REF(error));
grpc_core::Delete<made_transport_op>(op);
}

Loading…
Cancel
Save