Merge pull request #20892 from yashykt/removsched

Replace GRPC_CLOSURE_SCHED with ExecCtx::Run
pull/20935/head
Yash Tibrewal 5 years ago committed by GitHub
commit 4b31415e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      src/core/ext/filters/client_channel/client_channel.cc
  2. 3
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 6
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  4. 10
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc
  5. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  6. 7
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  7. 7
      src/core/ext/filters/client_channel/subchannel.cc
  8. 2
      src/core/ext/filters/deadline/deadline_filter.cc
  9. 9
      src/core/ext/filters/max_age/max_age_filter.cc
  10. 4
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  11. 3
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  12. 32
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  13. 2
      src/core/ext/transport/chttp2/transport/frame_data.cc
  14. 5
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  15. 71
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  16. 87
      src/core/ext/transport/inproc/inproc_transport.cc
  17. 2
      src/core/lib/channel/handshaker.cc
  18. 2
      src/core/lib/http/httpcli.cc
  19. 2
      src/core/lib/http/httpcli_security_connector.cc
  20. 10
      src/core/lib/iomgr/call_combiner.cc
  21. 7
      src/core/lib/iomgr/call_combiner.h
  22. 38
      src/core/lib/iomgr/closure.h
  23. 4
      src/core/lib/iomgr/endpoint_cfstream.cc
  24. 5
      src/core/lib/iomgr/ev_epoll1_linux.cc
  25. 8
      src/core/lib/iomgr/ev_epollex_linux.cc
  26. 20
      src/core/lib/iomgr/ev_poll_posix.cc
  27. 26
      src/core/lib/iomgr/exec_ctx.cc
  28. 4
      src/core/lib/iomgr/exec_ctx.h
  29. 17
      src/core/lib/iomgr/lockfree_event.cc
  30. 2
      src/core/lib/iomgr/pollset_custom.cc
  31. 5
      src/core/lib/iomgr/pollset_windows.cc
  32. 4
      src/core/lib/iomgr/resolve_address_custom.cc
  33. 5
      src/core/lib/iomgr/resolve_address_posix.cc
  34. 2
      src/core/lib/iomgr/resolve_address_windows.cc
  35. 22
      src/core/lib/iomgr/resource_quota.cc
  36. 4
      src/core/lib/iomgr/socket_windows.cc
  37. 4
      src/core/lib/iomgr/tcp_client_cfstream.cc
  38. 2
      src/core/lib/iomgr/tcp_client_custom.cc
  39. 9
      src/core/lib/iomgr/tcp_client_posix.cc
  40. 4
      src/core/lib/iomgr/tcp_client_windows.cc
  41. 19
      src/core/lib/iomgr/tcp_custom.cc
  42. 3
      src/core/lib/iomgr/tcp_server_custom.cc
  43. 3
      src/core/lib/iomgr/tcp_server_posix.cc
  44. 6
      src/core/lib/iomgr/tcp_server_windows.cc
  45. 29
      src/core/lib/iomgr/tcp_windows.cc
  46. 7
      src/core/lib/iomgr/timer_custom.cc
  47. 15
      src/core/lib/iomgr/timer_generic.cc
  48. 3
      src/core/lib/iomgr/udp_server.cc
  49. 3
      src/core/lib/security/credentials/composite/composite_credentials.cc
  50. 3
      src/core/lib/security/credentials/fake/fake_credentials.cc
  51. 8
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  52. 7
      src/core/lib/security/credentials/plugin/plugin_credentials.cc
  53. 2
      src/core/lib/security/security_connector/alts/alts_security_connector.cc
  54. 2
      src/core/lib/security/security_connector/fake/fake_security_connector.cc
  55. 4
      src/core/lib/security/security_connector/local/local_security_connector.cc
  56. 4
      src/core/lib/security/security_connector/ssl/ssl_security_connector.cc
  57. 8
      src/core/lib/security/security_connector/tls/spiffe_security_connector.cc
  58. 9
      src/core/lib/security/transport/secure_endpoint.cc
  59. 18
      src/core/lib/security/transport/security_handshaker.cc
  60. 2
      src/core/lib/security/transport/server_auth_filter.cc
  61. 4
      src/core/lib/surface/completion_queue.cc
  62. 14
      src/core/lib/surface/lame_client.cc
  63. 15
      src/core/lib/surface/server.cc
  64. 2
      src/core/lib/transport/connectivity_state.cc
  65. 6
      src/core/lib/transport/transport.cc
  66. 4
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  67. 17
      test/core/end2end/fuzzers/api_fuzzer.cc
  68. 4
      test/core/end2end/goaway_server_test.cc
  69. 6
      test/core/iomgr/endpoint_tests.cc
  70. 2
      test/core/iomgr/udp_server_test.cc
  71. 14
      test/core/security/credentials_test.cc
  72. 10
      test/core/security/jwt_verifier_test.cc
  73. 12
      test/core/util/mock_endpoint.cc
  74. 19
      test/core/util/passthru_endpoint.cc
  75. 3
      test/core/util/trickle_endpoint.cc
  76. 6
      test/cpp/microbenchmarks/bm_call_create.cc
  77. 15
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  78. 23
      test/cpp/microbenchmarks/bm_closure.cc
  79. 2
      test/cpp/microbenchmarks/bm_cq_multiple_threads.cc

@ -1163,7 +1163,7 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
// Report new state to the user.
*state_ = state;
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
// Hop back into the combiner to clean up.
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
@ -1180,7 +1180,7 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
MemoryOrder::RELAXED)) {
return; // Already done.
}
GRPC_CLOSURE_SCHED(on_complete_, GRPC_ERROR_CANCELLED);
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
@ -1826,8 +1826,9 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
grpc_error* error = chand->DoPingLocked(op);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
GRPC_ERROR_REF(error));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
}
op->bind_pollset = nullptr;
op->send_ping.on_initiate = nullptr;
@ -1869,7 +1870,7 @@ void ChannelData::StartTransportOpLocked(void* arg, grpc_error* /*ignored*/) {
}
}
GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "start_transport_op");
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
void ChannelData::StartTransportOp(grpc_channel_element* elem,
@ -2058,7 +2059,8 @@ void CallData::Destroy(grpc_call_element* elem,
then_schedule_closure = nullptr;
}
calld->~CallData();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
// TODO(yashkt) : This can potentially be a Closure::Run
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
void CallData::StartTransportStreamOpBatch(
@ -3679,7 +3681,7 @@ void CallData::CreateSubchannelCall(grpc_call_element* elem) {
void CallData::AsyncPickDone(grpc_call_element* elem, grpc_error* error) {
GRPC_CLOSURE_INIT(&pick_closure_, PickDone, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&pick_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
}
void CallData::PickDone(void* arg, grpc_error* error) {

@ -300,7 +300,8 @@ void HealthCheckClient::CallState::StartCall() {
// Schedule instead of running directly, since we must not be
// holding health_check_client_->mu_ when CallEnded() is called.
call_->Ref(DEBUG_LOCATION, "call_end_closure").release();
GRPC_CLOSURE_SCHED(
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);

@ -124,7 +124,7 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error* error) {
is_shutdown_ = true;
}
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
}
// Callback invoked when finished writing HTTP CONNECT request.
@ -220,7 +220,7 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
goto done;
}
// Success. Invoke handshake-done callback.
GRPC_CLOSURE_SCHED(handshaker->on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, handshaker->on_handshake_done_, error);
done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
@ -260,7 +260,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
MutexLock lock(&mu_);
is_shutdown_ = true;
}
GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done, GRPC_ERROR_NONE);
return;
}
// Get headers from channel args.

@ -81,10 +81,12 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
uv_poll_stop(handle_);
uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
if (read_closure_ != nullptr) {
GRPC_CLOSURE_SCHED(read_closure_, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_,
GRPC_ERROR_CANCELLED);
}
if (write_closure_ != nullptr) {
GRPC_CLOSURE_SCHED(write_closure_, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_,
GRPC_ERROR_CANCELLED);
}
}
@ -135,13 +137,13 @@ static void ares_uv_poll_cb_locked(void* arg, grpc_error* error) {
}
if (events & UV_READABLE) {
GPR_ASSERT(polled_fd->read_closure_ != nullptr);
GRPC_CLOSURE_SCHED(polled_fd->read_closure_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->read_closure_, error);
polled_fd->read_closure_ = nullptr;
polled_fd->poll_events_ &= ~UV_READABLE;
}
if (events & UV_WRITABLE) {
GPR_ASSERT(polled_fd->write_closure_ != nullptr);
GRPC_CLOSURE_SCHED(polled_fd->write_closure_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, polled_fd->write_closure_, error);
polled_fd->write_closure_ = nullptr;
polled_fd->poll_events_ &= ~UV_WRITABLE;
}

@ -128,12 +128,12 @@ class GrpcPolledFdWindows {
}
void ScheduleAndNullReadClosure(grpc_error* error) {
GRPC_CLOSURE_SCHED(read_closure_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_closure_, error);
read_closure_ = nullptr;
}
void ScheduleAndNullWriteClosure(grpc_error* error) {
GRPC_CLOSURE_SCHED(write_closure_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, write_closure_, error);
write_closure_ = nullptr;
}

@ -148,7 +148,7 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r) {
// TODO(apolcyn): allow c-ares to return a service config
// with no addresses along side it
}
GRPC_CLOSURE_SCHED(r->on_done, r->error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, r->error);
}
static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
@ -447,7 +447,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
return;
error_cleanup:
GRPC_CLOSURE_SCHED(r->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
}
static bool inner_resolve_as_ip_literal_locked(
@ -714,7 +714,8 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
sizeof(grpc_resolved_address));
}
}
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done,
GRPC_ERROR_REF(error));
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
grpc_core::Delete(r);
}

@ -734,9 +734,10 @@ void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
old_refs = RefMutate(-static_cast<gpr_atm>(1),
1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(subchannel_destroy, this,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
}

@ -199,7 +199,7 @@ grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_core::New<start_timer_after_init_state>(elem, deadline);
GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&state->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, GRPC_ERROR_NONE);
}
}

@ -492,7 +492,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
initialization is done. */
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_age_timer_after_init");
GRPC_CLOSURE_SCHED(&chand->start_max_age_timer_after_init, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
&chand->start_max_age_timer_after_init,
GRPC_ERROR_NONE);
}
/* Initialize the number of calls as 1, so that the max_idle_timer will not
@ -501,8 +503,9 @@ static grpc_error* max_age_init_channel_elem(grpc_channel_element* elem,
if (chand->max_connection_idle != GRPC_MILLIS_INF_FUTURE) {
GRPC_CHANNEL_STACK_REF(chand->channel_stack,
"max_age start_max_idle_timer_after_init");
GRPC_CLOSURE_SCHED(&chand->start_max_idle_timer_after_init,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
&chand->start_max_idle_timer_after_init,
GRPC_ERROR_NONE);
}
return GRPC_ERROR_NONE;
}

@ -150,7 +150,7 @@ static void on_handshake_done(void* arg, grpc_error* error) {
}
grpc_closure* notify = c->notify;
c->notify = nullptr;
GRPC_CLOSURE_SCHED(notify, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
c->handshake_mgr.reset();
gpr_mu_unlock(&c->mu);
chttp2_connector_unref(reinterpret_cast<grpc_connector*>(c));
@ -182,7 +182,7 @@ static void connected(void* arg, grpc_error* error) {
c->result->reset();
grpc_closure* notify = c->notify;
c->notify = nullptr;
GRPC_CLOSURE_SCHED(notify, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify, error);
if (c->endpoint != nullptr) {
grpc_endpoint_shutdown(c->endpoint, GRPC_ERROR_REF(error));
}

@ -266,7 +266,8 @@ static void tcp_server_shutdown_complete(void* arg, grpc_error* error) {
// may do a synchronous unref.
grpc_core::ExecCtx::Get()->Flush();
if (destroy_done != nullptr) {
GRPC_CLOSURE_SCHED(destroy_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_done,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Get()->Flush();
}
grpc_channel_args_destroy(state->args);

@ -593,7 +593,8 @@ static void close_transport_locked(grpc_chttp2_transport* t,
grpc_endpoint_shutdown(t->ep, GRPC_ERROR_REF(error));
}
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(t->notify_on_receive_settings, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
GRPC_ERROR_CANCELLED);
t->notify_on_receive_settings = nullptr;
}
GRPC_ERROR_UNREF(error);
@ -706,7 +707,7 @@ grpc_chttp2_stream::~grpc_chttp2_stream() {
}
GRPC_CHTTP2_UNREF_TRANSPORT(t, "stream");
GRPC_CLOSURE_SCHED(destroy_stream_arg, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, GRPC_ERROR_NONE);
}
static int init_stream(grpc_transport* gt, grpc_stream* gs,
@ -1177,7 +1178,7 @@ static grpc_closure* add_closure_barrier(grpc_closure* closure) {
static void null_then_sched_closure(grpc_closure** closure) {
grpc_closure* c = *closure;
*closure = nullptr;
GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
}
void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
@ -1220,7 +1221,8 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
// Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
// closures earlier than when it is safe to do so.
GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
closure->error_data.error);
} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
@ -1678,8 +1680,10 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) {
static void send_ping_locked(grpc_chttp2_transport* t,
grpc_closure* on_initiate, grpc_closure* on_ack) {
if (t->closed_with_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error));
GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate,
GRPC_ERROR_REF(t->closed_with_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack,
GRPC_ERROR_REF(t->closed_with_error));
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
@ -2933,7 +2937,7 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_REF(error));
s->on_next = nullptr;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
@ -2991,10 +2995,11 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
@ -3003,8 +3008,8 @@ void Chttp2IncomingByteStream::NextLocked(void* arg,
if (bs->remaining_bytes_ != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
@ -3092,7 +3097,8 @@ grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_->on_next,
GRPC_ERROR_REF(error));
stream_->on_next = nullptr;
GRPC_ERROR_UNREF(stream_->byte_stream_error);
stream_->byte_stream_error = GRPC_ERROR_REF(error);

@ -291,7 +291,7 @@ grpc_error* grpc_chttp2_data_parser_parse(void* /*parser*/,
GPR_ASSERT(s->frame_storage.length == 0);
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->on_next, GRPC_ERROR_NONE);
s->on_next = nullptr;
s->unprocessed_incoming_frames_decompressed = false;
} else {

@ -135,8 +135,9 @@ grpc_error* grpc_chttp2_settings_parser_parse(void* p, grpc_chttp2_transport* t,
t->num_pending_induced_frames++;
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(t->notify_on_receive_settings,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
GRPC_ERROR_NONE);
t->notify_on_receive_settings = nullptr;
}
}

@ -1150,22 +1150,26 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_callback_received[OP_FAILED]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
} else {
grpc_chttp2_incoming_metadata_buffer_publish(
&oas->s->state.rs.initial_metadata,
stream_op->payload->recv_initial_metadata.recv_initial_metadata);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_NONE);
}
@ -1176,30 +1180,34 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->state_callback_received[OP_FAILED]) {
CRONET_LOG(GPR_DEBUG, "Stream failed.");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->rs.read_stream_closed == true) {
/* No more data will be received */
CRONET_LOG(GPR_DEBUG, "read stream closed");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
} else if (stream_state->flush_read) {
CRONET_LOG(GPR_DEBUG, "flush read");
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
@ -1240,7 +1248,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
@ -1296,8 +1305,9 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
oas->state.state_op_done[OP_RECV_MESSAGE] = true;
/* Do an extra read to trigger on_succeeded() callback in case connection
@ -1328,7 +1338,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
error);
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
@ -1352,13 +1363,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
GRPC_ERROR_REF(stream_state->cancel_error));
}
} else if (stream_state->state_callback_received[OP_FAILED]) {
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(
stream_op->on_complete,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, stream_op->on_complete,
make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
}
} else {
@ -1366,7 +1377,8 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
* callback
*/
if (stream_op->on_complete) {
GRPC_CLOSURE_SCHED(stream_op->on_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
GRPC_ERROR_NONE);
}
}
oas->state.state_op_done[OP_ON_COMPLETE] = true;
@ -1433,20 +1445,24 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
/* Cronet does not support :authority header field. We cancel the call when
this field is present in metadata */
if (op->recv_initial_metadata) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_message) {
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_CANCELLED);
}
if (op->recv_trailing_metadata) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_CANCELLED);
}
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
GRPC_ERROR_CANCELLED);
return;
}
stream_obj* s = reinterpret_cast<stream_obj*>(gs);
@ -1458,7 +1474,8 @@ static void destroy_stream(grpc_transport* gt, grpc_stream* gs,
grpc_closure* then_schedule_closure) {
stream_obj* s = reinterpret_cast<stream_obj*>(gs);
s->~stream_obj();
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
GRPC_ERROR_NONE);
}
static void destroy_transport(grpc_transport* gt) {}

@ -204,7 +204,8 @@ struct inproc_stream {
t->unref();
if (closure_at_destroy) {
GRPC_CLOSURE_SCHED(closure_at_destroy, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_at_destroy,
GRPC_ERROR_NONE);
}
}
@ -390,13 +391,15 @@ void complete_if_batch_end_locked(inproc_stream* s, grpc_error* error,
if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
INPROC_LOG(GPR_INFO, "%s %p %p %p", msg, s, op, error);
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete,
GRPC_ERROR_REF(error));
}
}
void maybe_schedule_op_closure_locked(inproc_stream* s, grpc_error* error) {
if (s && s->ops_needed && !s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_REF(error));
s->op_closure_scheduled = true;
s->ops_needed = false;
}
@ -471,9 +474,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
err);
// Last use of err so no need to REF and then UNREF it
complete_if_batch_end_locked(
@ -484,7 +489,8 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
if (s->recv_message_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %p", s,
error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
complete_if_batch_end_locked(
@ -508,9 +514,11 @@ void fail_helper_locked(inproc_stream* s, grpc_error* error) {
if (s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %p",
s, error);
complete_if_batch_end_locked(
@ -564,7 +572,8 @@ void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
receiver->recv_stream.get());
INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
receiver->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -656,14 +665,16 @@ void op_state_machine(void* arg, grpc_error* error) {
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready", s);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = nullptr;
needs_close = true;
}
@ -708,9 +719,11 @@ void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
GRPC_CLOSURE_SCHED(s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
complete_if_batch_end_locked(
s, new_err, s->recv_initial_md_op,
"op_state_machine scheduling recv-initial-metadata-on-complete");
@ -748,7 +761,8 @@ void op_state_machine(void* arg, grpc_error* error) {
// satisfied
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -785,12 +799,14 @@ void op_state_machine(void* arg, grpc_error* error) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(new_err));
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
needs_close = true;
} else {
@ -810,7 +826,8 @@ void op_state_machine(void* arg, grpc_error* error) {
// recv_message_op
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
@ -883,9 +900,11 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error* error) {
// couldn't complete that because we hadn't yet sent out trailing
// md, now's the chance
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
GRPC_CLOSURE_SCHED(s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(s->cancel_self_error));
complete_if_batch_end_locked(
s, s->cancel_self_error, s->recv_trailing_md_op,
"cancel_stream scheduling trailing-md-on-complete");
@ -1026,7 +1045,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
(op->recv_message && other && other->send_message_op != nullptr) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
if (!s->op_closure_scheduled) {
GRPC_CLOSURE_SCHED(&s->op_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &s->op_closure,
GRPC_ERROR_NONE);
s->op_closure_scheduled = true;
}
} else {
@ -1054,7 +1074,8 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GPR_INFO,
"perform_stream_op error %p scheduling initial-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
@ -1063,22 +1084,24 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
GPR_INFO,
"perform_stream_op error %p scheduling recv message-ready %p", s,
error);
GRPC_CLOSURE_SCHED(op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_trailing_metadata) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling trailing-metadata-ready %p",
s, error);
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
GRPC_CLOSURE_SCHED(on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
}
if (needs_close) {
close_other_side_locked(s, "perform_stream_op:other_side");
@ -1121,7 +1144,7 @@ void perform_transport_op(grpc_transport* gt, grpc_transport_op* op) {
t->accept_stream_data = op->set_accept_stream_user_data;
}
if (op->on_consumed) {
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
bool do_close = false;

@ -169,7 +169,7 @@ bool HandshakeManager::CallNextHandshakerLocked(grpc_error* error) {
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(&deadline_timer_);
GRPC_CLOSURE_SCHED(&on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, &on_handshake_done_, error);
is_shutdown_ = true;
} else {
auto handshaker = handshakers_[index_];

@ -88,7 +88,7 @@ static void next_address(internal_request* req, grpc_error* due_to_error);
static void finish(internal_request* req, grpc_error* error) {
grpc_polling_entity_del_from_pollset_set(req->pollent,
req->context->pollset_set);
GRPC_CLOSURE_SCHED(req->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, req->on_done, error);
grpc_http_parser_destroy(&req->parser);
if (req->addresses != nullptr) {
grpc_resolved_addresses_destroy(req->addresses);

@ -100,7 +100,7 @@ class grpc_httpcli_ssl_channel_security_connector final
error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}

@ -92,9 +92,9 @@ void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
#ifdef GRPC_TSAN_ENABLED
original_closure_ = closure;
GRPC_CLOSURE_SCHED(&tsan_closure_, error);
ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
#else
GRPC_CLOSURE_SCHED(closure, error);
ExecCtx::Run(DEBUG_LOCATION, closure, error);
#endif
}
@ -199,7 +199,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
"for pre-existing cancellation",
this, closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
break;
} else {
if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) {
@ -217,7 +217,7 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
"call_combiner=%p: scheduling old cancel callback=%p", this,
closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
}
break;
}
@ -244,7 +244,7 @@ void CallCombiner::Cancel(grpc_error* error) {
"call_combiner=%p: scheduling notify_on_cancel callback=%p",
this, notify_on_cancel);
}
GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
}
break;
}

@ -31,6 +31,7 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/dynamic_annotations.h"
#include "src/core/lib/iomgr/exec_ctx.h"
// A simple, lock-free mechanism for serializing activity related to a
// single call. This is similar to a combiner but is more lightweight.
@ -156,8 +157,8 @@ class CallCombinerClosureList {
//
// All but one of the closures in the list will be scheduled via
// GRPC_CALL_COMBINER_START(), and the remaining closure will be
// scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
// yielding the call combiner. If the list is empty, then the call
// scheduled via ExecCtx::Run(), which will eventually result
// in yielding the call combiner. If the list is empty, then the call
// combiner will be yielded immediately.
void RunClosures(CallCombiner* call_combiner) {
if (closures_.empty()) {
@ -177,7 +178,7 @@ class CallCombinerClosureList {
grpc_error_string(closures_[0].error), closures_[0].reason);
}
// This will release the call combiner.
GRPC_CLOSURE_SCHED(closures_[0].closure, closures_[0].error);
ExecCtx::Run(DEBUG_LOCATION, closures_[0].closure, closures_[0].error);
closures_.clear();
}

@ -277,44 +277,6 @@ inline void grpc_closure_run(grpc_closure* c, grpc_error* error) {
#define GRPC_CLOSURE_RUN(closure, error) grpc_closure_run(closure, error)
#endif
#ifndef NDEBUG
inline void grpc_closure_sched(const char* file, int line, grpc_closure* c,
grpc_error* error) {
#else
inline void grpc_closure_sched(grpc_closure* c, grpc_error* error) {
#endif
GPR_TIMER_SCOPE("grpc_closure_sched", 0);
if (c != nullptr) {
#ifndef NDEBUG
if (c->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
"run?: %s",
c, c->file_created, c->line_created, c->file_initiated,
c->line_initiated, file, line, c->run ? "true" : "false");
abort();
}
c->scheduled = true;
c->file_initiated = file;
c->line_initiated = line;
c->run = false;
GPR_ASSERT(c->cb != nullptr);
#endif
c->scheduler->vtable->sched(c, error);
} else {
GRPC_ERROR_UNREF(error);
}
}
/** Schedule a closure to be run. Does not need to be run from a safe point. */
#ifndef NDEBUG
#define GRPC_CLOSURE_SCHED(closure, error) \
grpc_closure_sched(__FILE__, __LINE__, closure, error)
#else
#define GRPC_CLOSURE_SCHED(closure, error) grpc_closure_sched(closure, error)
#endif
#ifndef NDEBUG
inline void grpc_closure_list_sched(const char* file, int line,
grpc_closure_list* list) {

@ -132,7 +132,7 @@ static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
grpc_closure* cb = ep->read_cb;
ep->read_cb = nullptr;
ep->read_slices = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
@ -145,7 +145,7 @@ static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
grpc_closure* cb = ep->write_cb;
ep->write_cb = nullptr;
ep->write_slices = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void ReadAction(void* arg, grpc_error* error) {

@ -420,7 +420,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
close(fd->fd);
}
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
fork_fd_list_remove_grpc_fd(fd);
@ -623,7 +623,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
pollset->begin_refs == 0) {
GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
GRPC_ERROR_NONE);
pollset->shutdown_closure = nullptr;
}
}

@ -393,7 +393,8 @@ static void unref_by(grpc_fd* fd, int n) {
#endif
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
} else {
@ -487,7 +488,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
if (pollable_obj) {
gpr_mu_unlock(&pollable_obj->owner_orphan_mu);
@ -662,7 +663,8 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
pollset->containing_pollset_set_count == 0) {
GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
GRPC_ERROR_NONE);
pollset->shutdown_closure = nullptr;
pollset->already_shutdown = true;
}

@ -436,7 +436,7 @@ static void close_fd_locked(grpc_fd* fd) {
if (!fd->released) {
close(fd->fd);
}
GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd* fd) {
@ -497,17 +497,18 @@ static grpc_error* fd_shutdown_error(grpc_fd* fd) {
static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
grpc_closure* closure) {
if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
GRPC_CLOSURE_SCHED(
closure, grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, closure,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@ -529,7 +530,7 @@ static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
return 0;
} else {
/* waiting ==> queue closure */
GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@ -574,7 +575,7 @@ static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
}
static void fd_set_readable(grpc_fd* fd) {
@ -896,7 +897,8 @@ static void finish_shutdown(grpc_pollset* pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error** composite, grpc_error* error) {

@ -174,4 +174,30 @@ grpc_millis ExecCtx::Now() {
return now_;
}
void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
#ifndef NDEBUG
if (closure->scheduled) {
gpr_log(GPR_ERROR,
"Closure already scheduled. (closure: %p, created: [%s:%d], "
"previously scheduled at: [%s: %d], newly scheduled at [%s: %d], "
"run?: %s",
closure, closure->file_created, closure->line_created,
closure->file_initiated, closure->line_initiated, location.file(),
location.line(), closure->run ? "true" : "false");
abort();
}
closure->scheduled = true;
closure->file_initiated = location.file();
closure->line_initiated = location.line();
closure->run = false;
GPR_ASSERT(closure->cb != nullptr);
#endif
exec_ctx_sched(closure, error);
}
} // namespace grpc_core

@ -28,6 +28,7 @@
#include "src/core/lib/gpr/time_precise.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/iomgr/closure.h"
@ -221,6 +222,9 @@ class ExecCtx {
gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
}
static void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error);
protected:
/** Check if ready to finish. */
virtual bool CheckReadyToFinish() { return false; }

@ -23,6 +23,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::DebugOnlyTraceFlag grpc_polling_trace;
@ -124,7 +125,7 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(&state_, kClosureReady, kClosureNotReady)) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */
}
@ -137,9 +138,9 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
schedule the closure with the shutdown error */
if ((curr & kShutdownBit) > 0) {
grpc_error* shutdown_err = (grpc_error*)(curr & ~kShutdownBit);
GRPC_CLOSURE_SCHED(closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return;
}
@ -189,9 +190,9 @@ bool LockfreeEvent::SetShutdown(grpc_error* shutdown_err) {
happens-after on that edge), and a release to pair with anything
loading the shutdown state. */
if (gpr_atm_full_cas(&state_, curr, new_state)) {
GRPC_CLOSURE_SCHED((grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return true;
}
@ -239,7 +240,7 @@ void LockfreeEvent::SetReady() {
spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(&state_, curr, kClosureNotReady)) {
GRPC_CLOSURE_SCHED((grpc_closure*)curr, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, (grpc_closure*)curr, GRPC_ERROR_NONE);
return;
}
/* else the state changed again (only possible by either a racing

@ -55,7 +55,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
static void pollset_shutdown(grpc_pollset* /*pollset*/, grpc_closure* closure) {
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
}
static void pollset_destroy(grpc_pollset* pollset) {

@ -98,7 +98,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@ -146,7 +146,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
GRPC_CLOSURE_SCHED(pollset->on_shutdown, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->on_shutdown,
GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;

@ -79,7 +79,7 @@ void grpc_custom_resolve_callback(grpc_custom_resolver* r,
return;
}
if (r->on_done) {
GRPC_CLOSURE_SCHED(r->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
}
gpr_free(r->host);
gpr_free(r->port);
@ -161,7 +161,7 @@ static void resolve_address_impl(const char* name, const char* default_port,
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_done, err);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, err);
return;
}
r = (grpc_custom_resolver*)gpr_malloc(sizeof(grpc_custom_resolver));

@ -152,8 +152,9 @@ typedef struct {
* grpc_blocking_resolve_address */
static void do_request_thread(void* rp, grpc_error* /*error*/) {
request* r = static_cast<request*>(rp);
GRPC_CLOSURE_SCHED(r->on_done, grpc_blocking_resolve_address(
r->name, r->default_port, r->addrs_out));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, r->on_done,
grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);

@ -138,7 +138,7 @@ static void do_request_thread(void* rp, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
GRPC_CLOSURE_SCHED(r->on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);

@ -424,7 +424,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = nullptr;
GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
return true;
}
@ -506,7 +506,7 @@ static bool ru_post_reclaimer(grpc_resource_user* resource_user,
resource_user->new_reclaimers[destructive] = nullptr;
GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@ -549,8 +549,10 @@ static void ru_shutdown(void* ru, grpc_error* /*error*/) {
}
grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
gpr_mu_lock(&resource_user->mu);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = nullptr;
resource_user->reclaimers[1] = nullptr;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
@ -572,8 +574,10 @@ static void ru_destroy(void* ru, grpc_error* /*error*/) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, static_cast<grpc_rulist>(i));
}
GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(resource_user->resource_quota);
@ -719,7 +723,7 @@ void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
gpr_atm_no_barrier_store(&resource_quota->last_size,
(gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
}
size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
@ -994,8 +998,8 @@ bool grpc_resource_user_alloc_slices(
size_t count, grpc_slice_buffer* dest) {
if (GPR_UNLIKELY(
gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
GRPC_CLOSURE_SCHED(
&slice_allocator->on_allocated,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, &slice_allocator->on_allocated,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
return false;
}

@ -124,7 +124,7 @@ static void socket_notify_on_iocp(grpc_winsocket* socket, grpc_closure* closure,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@ -145,7 +145,7 @@ void grpc_socket_become_ready(grpc_winsocket* socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
GRPC_CLOSURE_SCHED(info->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;

@ -96,7 +96,7 @@ static void OnAlarm(void* arg, grpc_error* error) {
} else {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
}
@ -137,7 +137,7 @@ static void OnOpen(void* arg, grpc_error* error) {
GRPC_ERROR_REF(error);
}
gpr_mu_unlock(&connect->mu);
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
}

@ -96,7 +96,7 @@ static void custom_connect_callback_internal(grpc_custom_socket* socket,
grpc_core::ExecCtx::Get()->Flush();
custom_tcp_connect_cleanup(connect);
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
static void custom_connect_callback(grpc_custom_socket* socket,

@ -241,7 +241,7 @@ finish:
grpc_channel_args_destroy(ac->channel_args);
gpr_free(ac);
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
@ -298,12 +298,13 @@ void grpc_tcp_client_create_from_prepared_fd(
char* addr_str = grpc_sockaddr_to_uri(addr);
*ep = grpc_tcp_client_create_from_fd(fdobj, channel_args, addr_str);
gpr_free(addr_str);
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
return;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect"));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
GRPC_OS_ERROR(errno, "connect"));
return;
}
@ -344,7 +345,7 @@ static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
*ep = nullptr;
if ((error = grpc_tcp_client_prepare_fd(channel_args, addr, &mapped_addr,
&fdobj)) != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
return;
}
grpc_tcp_client_create_from_prepared_fd(interested_parties, closure, fdobj,

@ -117,7 +117,7 @@ static void on_connect(void* acp, grpc_error* error) {
async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
GRPC_CLOSURE_SCHED(on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@ -225,7 +225,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
GRPC_CLOSURE_SCHED(on_done, final_error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, final_error);
}
grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect};

@ -140,7 +140,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
TCP_UNREF(tcp, "read");
tcp->read_slices = nullptr;
tcp->read_cb = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void custom_read_callback(grpc_custom_socket* socket, size_t nread,
@ -220,7 +220,7 @@ static void custom_write_callback(grpc_custom_socket* socket,
gpr_log(GPR_INFO, "write complete on %p: error=%s", tcp->socket, str);
}
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
@ -241,8 +241,9 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TCP socket is shutting down"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP socket is shutting down"));
return;
}
@ -252,7 +253,7 @@ static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
return;
}
tcp->write_cb = cb;
@ -289,10 +290,10 @@ static void endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
gpr_log(GPR_INFO, "TCP %p shutdown why=%s", tcp->socket, str);
}
tcp->shutting_down = true;
// GRPC_CLOSURE_SCHED(tcp->read_cb, GRPC_ERROR_REF(why));
// GRPC_CLOSURE_SCHED(tcp->write_cb, GRPC_ERROR_REF(why));
// tcp->read_cb = nullptr;
// tcp->write_cb = nullptr;
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->read_cb,
// GRPC_ERROR_REF(why));
// grpc_core::ExecCtx::Run(DEBUG_LOCATION,tcp->write_cb,
// GRPC_ERROR_REF(why)); tcp->read_cb = nullptr; tcp->write_cb = nullptr;
grpc_resource_user_shutdown(tcp->resource_user);
grpc_custom_socket_vtable->shutdown(tcp->socket);
}

@ -124,7 +124,8 @@ static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
static void finish_shutdown(grpc_tcp_server* s) {
GPR_ASSERT(s->shutdown);
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
while (s->head) {

@ -108,7 +108,8 @@ static void finish_shutdown(grpc_tcp_server* s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);

@ -135,10 +135,12 @@ static void destroy_server(void* arg, grpc_error* error) {
static void finish_shutdown_locked(grpc_tcp_server* s) {
if (s->shutdown_complete != NULL) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
GRPC_CLOSURE_SCHED(
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}

@ -233,7 +233,7 @@ static void on_read(void* tcpp, grpc_error* error) {
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
#define DEFAULT_TARGET_READ_SIZE 8192
@ -254,9 +254,10 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@ -289,7 +290,7 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transferred = bytes_read;
GRPC_CLOSURE_SCHED(&tcp->on_read, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@ -302,8 +303,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
GRPC_CLOSURE_SCHED(&tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
@ -338,7 +339,7 @@ static void on_write(void* tcpp, grpc_error* error) {
}
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
/* Initiates a write. */
@ -366,9 +367,10 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
if (tcp->shutting_down) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
return;
}
@ -399,7 +401,7 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_error* error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@ -417,7 +419,8 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
GRPC_CLOSURE_SCHED(cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb,
GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}

@ -37,7 +37,7 @@ void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* /*error*/) {
grpc_timer* timer = t->original;
GPR_ASSERT(timer->pending);
timer->pending = 0;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
custom_timer_impl->stop(t);
gpr_free(t);
}
@ -48,7 +48,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
if (deadline <= grpc_core::ExecCtx::Get()->Now()) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
timer->pending = false;
return;
} else {
@ -69,7 +69,8 @@ static void timer_cancel(grpc_timer* timer) {
grpc_custom_timer* tw = (grpc_custom_timer*)timer->custom_timer;
if (timer->pending) {
timer->pending = 0;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CANCELLED);
custom_timer_impl->stop(tw);
gpr_free(tw);
}

@ -369,9 +369,10 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
if (!g_shared_mutables.initialized) {
timer->pending = false;
GRPC_CLOSURE_SCHED(timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
return;
}
@ -380,7 +381,7 @@ static void timer_init(grpc_timer* timer, grpc_millis deadline,
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
if (deadline <= now) {
timer->pending = false;
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
/* early out */
return;
@ -471,7 +472,8 @@ static void timer_cancel(grpc_timer* timer) {
if (timer->pending) {
REMOVE_FROM_HASH_TABLE(timer);
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_CANCELLED);
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@ -564,7 +566,8 @@ static size_t pop_timers(timer_shard* shard, grpc_millis now,
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
REMOVE_FROM_HASH_TABLE(timer);
GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, timer->closure,
GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);

@ -243,7 +243,8 @@ void GrpcUdpListener::shutdown_fd(void* args, grpc_error* error) {
static void finish_shutdown(grpc_udp_server* s) {
if (s->shutdown_complete != nullptr) {
GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);

@ -80,7 +80,8 @@ static void composite_call_metadata_cb(void* arg, grpc_error* error) {
}
// We're done!
}
GRPC_CLOSURE_SCHED(ctx->on_request_metadata, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, ctx->on_request_metadata,
GRPC_ERROR_REF(error));
gpr_free(ctx);
}

@ -94,7 +94,8 @@ bool grpc_md_only_test_credentials::get_request_metadata(
grpc_error** /*error*/) {
grpc_credentials_mdelem_array_add(md_array, md_);
if (is_async_) {
GRPC_CLOSURE_SCHED(on_request_metadata, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_request_metadata,
GRPC_ERROR_NONE);
return false;
}
return true;

@ -256,7 +256,8 @@ void grpc_oauth2_token_fetcher_credentials::on_http_response(
new_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Error occurred when fetching oauth2 token.", &error, 1);
}
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata, new_error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata, new_error);
grpc_polling_entity_del_from_pollset_set(
pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_));
grpc_oauth2_pending_get_request_metadata* prev = pending_request;
@ -332,8 +333,9 @@ void grpc_oauth2_token_fetcher_credentials::cancel_get_request_metadata(
pending_requests_ = pending_request->next;
}
// Invoke the callback immediately with an error.
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
gpr_free(pending_request);
break;
}

@ -131,7 +131,7 @@ static void plugin_md_request_metadata_ready(void* request,
if (!r->cancelled) {
grpc_error* error =
process_plugin_result(r, md, num_md, status, error_details);
GRPC_CLOSURE_SCHED(r->on_request_metadata, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_request_metadata, error);
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_plugin_credentials_trace)) {
gpr_log(GPR_INFO,
"plugin_credentials[%p]: request %p: plugin was previously "
@ -228,8 +228,9 @@ void grpc_plugin_credentials::cancel_get_request_metadata(
pending_request);
}
pending_request->cancelled = true;
GRPC_CLOSURE_SCHED(pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
pending_request->on_request_metadata,
GRPC_ERROR_REF(error));
pending_request_remove_locked(pending_request);
break;
}

@ -59,7 +59,7 @@ void alts_check_peer(tsi_peer peer,
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Could not get ALTS auth context from TSI peer");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
class grpc_alts_channel_security_connector final

@ -245,7 +245,7 @@ static void fake_check_peer(
auth_context->get(), GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME,
GRPC_FAKE_TRANSPORT_SECURITY_TYPE);
end:
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}

@ -100,7 +100,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/,
if (!is_endpoint_local) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Endpoint is neither UDS or TCP loopback address.");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
return;
}
/* Create an auth context which is necessary to pass the santiy check in
@ -112,7 +112,7 @@ void local_check_peer(grpc_security_connector* /*sc*/, tsi_peer /*peer*/,
error = *auth_context != nullptr ? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Could not create local auth context");
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
class grpc_local_channel_security_connector final

@ -167,7 +167,7 @@ class grpc_ssl_channel_security_connector final
}
}
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@ -302,7 +302,7 @@ class grpc_ssl_server_security_connector
grpc_closure* on_peer_checked) override {
grpc_error* error = ssl_check_peer(nullptr, &peer, auth_context);
tsi_peer_destruct(&peer);
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
int cmp(const grpc_security_connector* other) const override {

@ -172,7 +172,7 @@ void SpiffeChannelSecurityConnector::check_peer(
: target_name_.get();
grpc_error* error = grpc_ssl_check_alpn(&peer);
if (error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
return;
}
@ -213,7 +213,7 @@ void SpiffeChannelSecurityConnector::check_peer(
error = ProcessServerAuthorizationCheckResult(check_arg_);
}
}
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
tsi_peer_destruct(&peer);
}
@ -342,7 +342,7 @@ void SpiffeChannelSecurityConnector::ServerAuthorizationCheckDone(
grpc_error* error = ProcessServerAuthorizationCheckResult(arg);
SpiffeChannelSecurityConnector* connector =
static_cast<SpiffeChannelSecurityConnector*>(arg->cb_user_data);
GRPC_CLOSURE_SCHED(connector->on_peer_checked_, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, connector->on_peer_checked_, error);
}
grpc_error*
@ -446,7 +446,7 @@ void SpiffeServerSecurityConnector::check_peer(
*auth_context = grpc_ssl_peer_to_auth_context(
&peer, GRPC_TLS_SPIFFE_TRANSPORT_SECURITY_TYPE);
tsi_peer_destruct(&peer);
GRPC_CLOSURE_SCHED(on_peer_checked, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_peer_checked, error);
}
int SpiffeServerSecurityConnector::cmp(

@ -165,7 +165,7 @@ static void call_read_cb(secure_endpoint* ep, grpc_error* error) {
}
}
ep->read_buffer = nullptr;
GRPC_CLOSURE_SCHED(ep->read_cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
SECURE_ENDPOINT_UNREF(ep, "read");
}
@ -363,9 +363,10 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
grpc_slice_buffer_reset_and_unref_internal(&ep->output_buffer);
GRPC_CLOSURE_SCHED(
cb, grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
grpc_set_tsi_error_result(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result));
return;
}

@ -208,7 +208,7 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error* error) {
is_shutdown_ = true;
}
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, error);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error);
}
void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) {
@ -268,7 +268,7 @@ void SecurityHandshaker::OnPeerCheckedInner(grpc_error* error) {
args_->args = grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1);
grpc_channel_args_destroy(tmp_args);
// Invoke callback.
GRPC_CLOSURE_SCHED(on_handshake_done_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, GRPC_ERROR_NONE);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
is_shutdown_ = true;
@ -316,7 +316,7 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
// Read more if we need to.
if (result == TSI_INCOMPLETE_DATA) {
GPR_ASSERT(bytes_to_send_size == 0);
GRPC_CLOSURE_SCHED(&schedule_read_closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &schedule_read_closure_, GRPC_ERROR_NONE);
return error;
}
if (result != TSI_OK) {
@ -334,10 +334,10 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked(
reinterpret_cast<const char*>(bytes_to_send), bytes_to_send_size);
grpc_slice_buffer_reset_and_unref_internal(&outgoing_);
grpc_slice_buffer_add(&outgoing_, to_send);
GRPC_CLOSURE_SCHED(&schedule_write_closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &schedule_write_closure_, GRPC_ERROR_NONE);
} else if (handshaker_result == nullptr) {
// There is nothing to send, but need to read from peer.
GRPC_CLOSURE_SCHED(&schedule_read_closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &schedule_read_closure_, GRPC_ERROR_NONE);
} else {
// Handshake has finished, check peer and so on.
error = CheckPeerLocked();
@ -412,7 +412,7 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg,
}
// We may be done.
if (h->handshaker_result_ == nullptr) {
GRPC_CLOSURE_SCHED(&h->schedule_read_closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &h->schedule_read_closure_, GRPC_ERROR_NONE);
} else {
error = h->CheckPeerLocked();
if (error != GRPC_ERROR_NONE) {
@ -466,9 +466,9 @@ class FailHandshaker : public Handshaker {
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* /*args*/) override {
GRPC_CLOSURE_SCHED(on_handshake_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to create security handshaker"));
ExecCtx::Run(DEBUG_LOCATION, on_handshake_done,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to create security handshaker"));
}
private:

@ -159,7 +159,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
}
GRPC_CLOSURE_SCHED(closure, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, error);
}
// Called from application code.

@ -131,7 +131,7 @@ grpc_error* non_polling_poller_work(grpc_pollset* pollset,
npp->root = w.next;
if (&w == npp->root) {
if (npp->shutdown) {
GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, npp->shutdown, GRPC_ERROR_NONE);
}
npp->root = nullptr;
}
@ -166,7 +166,7 @@ void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GPR_ASSERT(closure != nullptr);
p->shutdown = closure;
if (p->root == nullptr) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
} else {
non_polling_worker* w = p->root;
do {

@ -111,18 +111,16 @@ static void lame_start_transport_op(grpc_channel_element* elem,
}
}
if (op->send_ping.on_initiate != nullptr) {
GRPC_CLOSURE_SCHED(
op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
if (op->send_ping.on_ack != nullptr) {
GRPC_CLOSURE_SCHED(
op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
if (op->on_consumed != nullptr) {
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
}
@ -136,7 +134,7 @@ static grpc_error* lame_init_call_elem(grpc_call_element* elem,
static void lame_destroy_call_elem(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure) {
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
}
static grpc_error* lame_init_channel_elem(grpc_channel_element* elem,

@ -381,7 +381,8 @@ static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
}
}
@ -527,7 +528,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) {
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_REF(error));
return;
}
@ -588,7 +590,8 @@ static void finish_start_new_rpc(
gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
return;
}
@ -843,7 +846,8 @@ static void got_initial_metadata(void* ptr, grpc_error* error) {
if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
/* zombied call will be destroyed when it's removed from the pending
queue... later */
@ -1448,7 +1452,8 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
GRPC_ERROR_NONE);
} else {
publish_call(server, calld, cq_idx, rc);
}

@ -67,7 +67,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
} else {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
}

@ -52,7 +52,8 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount) {
there. */
grpc_core::Executor::Run(&refcount->destroy, GRPC_ERROR_NONE);
} else {
GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &refcount->destroy,
GRPC_ERROR_NONE);
}
}
@ -218,7 +219,8 @@ struct made_transport_op {
static void destroy_made_transport_op(void* arg, grpc_error* error) {
made_transport_op* op = static_cast<made_transport_op*>(arg);
GRPC_CLOSURE_SCHED(op->inner_on_complete, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->inner_on_complete,
GRPC_ERROR_REF(error));
grpc_core::Delete<made_transport_op>(op);
}

@ -54,7 +54,7 @@ static void my_resolve_address(const char* addr, const char* /*default_port*/,
gpr_malloc(sizeof(*(*addrs)->addrs)));
(*addrs)->addrs[0].len = 123;
}
GRPC_CLOSURE_SCHED(on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
}
static grpc_address_resolver_vtable test_resolver = {my_resolve_address,
@ -81,7 +81,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
dummy_resolved_address.len = 123;
(*addresses)->emplace_back(dummy_resolved_address, nullptr);
}
GRPC_CLOSURE_SCHED(on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
return nullptr;
}

@ -347,11 +347,11 @@ static void finish_resolve(void* arg, grpc_error* error) {
dummy_resolved_address.len = 0;
(*r->addresses)->emplace_back(dummy_resolved_address, nullptr);
}
GRPC_CLOSURE_SCHED(r->on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, GRPC_ERROR_NONE);
} else {
GRPC_CLOSURE_SCHED(r->on_done,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolution failed", &error, 1));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolution failed", &error, 1));
}
gpr_free(r->addr);
@ -412,7 +412,7 @@ static void do_connect(void* arg, grpc_error* error) {
future_connect* fc = static_cast<future_connect*>(arg);
if (error != GRPC_ERROR_NONE) {
*fc->ep = nullptr;
GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_REF(error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_REF(error));
} else if (g_server != nullptr) {
grpc_endpoint* client;
grpc_endpoint* server;
@ -424,7 +424,7 @@ static void do_connect(void* arg, grpc_error* error) {
grpc_server_setup_transport(g_server, transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
GRPC_CLOSURE_SCHED(fc->closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_NONE);
} else {
sched_connect(fc->closure, fc->ep, fc->deadline);
}
@ -435,8 +435,9 @@ static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
gpr_timespec deadline) {
if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) {
*ep = nullptr;
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Connect deadline exceeded"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connect deadline exceeded"));
return;
}

@ -88,7 +88,7 @@ static void my_resolve_address(const char* addr, const char* default_port,
(*addrs)->addrs[0].len = static_cast<socklen_t>(sizeof(*sa));
gpr_mu_unlock(&g_mu);
}
GRPC_CLOSURE_SCHED(on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
}
static grpc_error* my_blocking_resolve_address(
@ -127,7 +127,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
(*addresses)->emplace_back(&sa, sizeof(sa), nullptr);
gpr_mu_unlock(&g_mu);
}
GRPC_CLOSURE_SCHED(on_done, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
return nullptr;
}

@ -141,7 +141,8 @@ static void read_and_write_test_read_handler(void* data, grpc_error* error) {
/* We perform many reads one after another. If grpc_endpoint_read and the
* read_handler are both run inline, we might end up growing the stack
* beyond the limit. Schedule the read on ExecCtx to avoid this. */
GRPC_CLOSURE_SCHED(&state->read_scheduler, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
GRPC_ERROR_NONE);
}
}
@ -172,7 +173,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
/* We perform many writes one after another. If grpc_endpoint_write and
* the write_handler are both run inline, we might end up growing the
* stack beyond the limit. Schedule the write on ExecCtx to avoid this. */
GRPC_CLOSURE_SCHED(&state->write_scheduler, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
GRPC_ERROR_NONE);
gpr_free(slices);
return;
}

@ -99,7 +99,7 @@ class TestGrpcUdpHandler : public GrpcUdpHandler {
void* /*user_data*/) override {
gpr_log(GPR_INFO, "gRPC FD about to be orphaned: %d",
grpc_fd_wrapped_fd(emfd()));
GRPC_CLOSURE_SCHED(orphan_fd_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, orphan_fd_closure, GRPC_ERROR_NONE);
g_number_of_orphan_calls++;
}

@ -564,7 +564,7 @@ static int compute_engine_httpcli_get_success_override(
grpc_closure* on_done, grpc_httpcli_response* response) {
validate_compute_engine_http_request(request);
*response = http_response(200, valid_oauth2_json_response);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -573,7 +573,7 @@ static int compute_engine_httpcli_get_failure_override(
grpc_closure* on_done, grpc_httpcli_response* response) {
validate_compute_engine_http_request(request);
*response = http_response(403, "Not Authorized.");
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -667,7 +667,7 @@ static int refresh_token_httpcli_post_success(
grpc_httpcli_response* response) {
validate_refresh_token_http_request(request, body, body_size);
*response = http_response(200, valid_oauth2_json_response);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -678,7 +678,7 @@ static int token_httpcli_post_failure(const grpc_httpcli_request* /*request*/,
grpc_closure* on_done,
grpc_httpcli_response* response) {
*response = http_response(403, "Not Authorized.");
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -881,7 +881,7 @@ static int sts_token_httpcli_post_success(const grpc_httpcli_request* request,
grpc_httpcli_response* response) {
validate_sts_token_http_request(request, body, body_size);
*response = http_response(200, valid_sts_json_response);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -1215,7 +1215,7 @@ static int default_creds_metadata_server_detection_httpcli_get_success_override(
response->hdrs = headers;
GPR_ASSERT(strcmp(request->http.path, "/") == 0);
GPR_ASSERT(strcmp(request->host, "metadata.google.internal.") == 0);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -1306,7 +1306,7 @@ static int default_creds_gce_detection_httpcli_get_failure_override(
GPR_ASSERT(strcmp(request->http.path, "/") == 0);
GPR_ASSERT(strcmp(request->host, "metadata.google.internal.") == 0);
*response = http_response(200, "");
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}

@ -335,7 +335,7 @@ static int httpcli_get_google_keys_for_email(
"/robot/v1/metadata/x509/"
"777-abaslkan11hlb6nmim3bpspl31ud@developer."
"gserviceaccount.com") == 0);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -378,7 +378,7 @@ static int httpcli_get_custom_keys_for_email(
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0);
GPR_ASSERT(strcmp(request->http.path, "/jwk/foo@bar.com") == 0);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -411,7 +411,7 @@ static int httpcli_get_jwk_set(const grpc_httpcli_request* request,
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0);
GPR_ASSERT(strcmp(request->http.path, "/oauth2/v3/certs") == 0);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -425,7 +425,7 @@ static int httpcli_get_openid_config(const grpc_httpcli_request* request,
GPR_ASSERT(strcmp(request->http.path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0);
grpc_httpcli_set_override(httpcli_get_jwk_set,
httpcli_post_should_not_be_called);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}
@ -464,7 +464,7 @@ static int httpcli_get_bad_json(const grpc_httpcli_request* request,
grpc_httpcli_response* response) {
*response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}"));
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_NONE);
return 1;
}

@ -46,7 +46,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
gpr_mu_lock(&m->mu);
if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices);
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
} else {
m->on_read = cb;
m->on_read_out = slices;
@ -60,7 +60,7 @@ static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
}
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
}
static void me_add_to_pollset(grpc_endpoint* /*ep*/,
@ -76,9 +76,9 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
gpr_mu_lock(&m->mu);
if (m->on_read) {
GRPC_CLOSURE_SCHED(m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Endpoint Shutdown", &why, 1));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Endpoint Shutdown", &why, 1));
m->on_read = nullptr;
}
gpr_mu_unlock(&m->mu);
@ -139,7 +139,7 @@ void grpc_mock_endpoint_put_read(grpc_endpoint* ep, grpc_slice slice) {
gpr_mu_lock(&m->mu);
if (m->on_read != nullptr) {
grpc_slice_buffer_add(m->on_read_out, slice);
GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
m->on_read = nullptr;
} else {
grpc_slice_buffer_add(&m->read_buffer, slice);

@ -58,11 +58,12 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu);
if (m->parent->shutdown) {
GRPC_CLOSURE_SCHED(
cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
} else if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices);
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
} else {
m->on_read = cb;
m->on_read_out = slices;
@ -87,7 +88,7 @@ static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
for (size_t i = 0; i < slices->count; i++) {
grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
}
GRPC_CLOSURE_SCHED(m->on_read, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
m->on_read = nullptr;
} else {
for (size_t i = 0; i < slices->count; i++) {
@ -96,7 +97,7 @@ static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
}
gpr_mu_unlock(&m->parent->mu);
GRPC_CLOSURE_SCHED(cb, error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
}
static void me_add_to_pollset(grpc_endpoint* /*ep*/,
@ -113,15 +114,15 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error* why) {
gpr_mu_lock(&m->parent->mu);
m->parent->shutdown = true;
if (m->on_read) {
GRPC_CLOSURE_SCHED(
m->on_read,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
m->on_read = nullptr;
}
m = other_half(m);
if (m->on_read) {
GRPC_CLOSURE_SCHED(
m->on_read,
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->on_read,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Shutdown", &why, 1));
m->on_read = nullptr;
}

@ -56,7 +56,8 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) {
if (te->write_cb != nullptr &&
(te->error != GRPC_ERROR_NONE ||
te->write_buffer.length <= WRITE_BUFFER_SIZE)) {
GRPC_CLOSURE_SCHED(te->write_cb, GRPC_ERROR_REF(te->error));
grpc_core::ExecCtx::Run(DEBUG_LOCATION, te->write_cb,
GRPC_ERROR_REF(te->error));
te->write_cb = nullptr;
}
}

@ -414,7 +414,7 @@ void SetPollsetSet(grpc_transport* /*self*/, grpc_stream* /*stream*/,
/* implementation of grpc_transport_perform_stream_op */
void PerformStreamOp(grpc_transport* /*self*/, grpc_stream* /*stream*/,
grpc_transport_stream_op_batch* op) {
GRPC_CLOSURE_SCHED(op->on_complete, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, GRPC_ERROR_NONE);
}
/* implementation of grpc_transport_perform_op */
@ -636,7 +636,7 @@ static void StartTransportOp(grpc_channel_element* /*elem*/,
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
}
static grpc_error* InitCallElem(grpc_call_element* elem,
@ -652,7 +652,7 @@ static void SetPollsetOrPollsetSet(grpc_call_element* /*elem*/,
static void DestroyCallElem(grpc_call_element* /*elem*/,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_sched_closure) {
GRPC_CLOSURE_SCHED(then_sched_closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_sched_closure, GRPC_ERROR_NONE);
}
grpc_error* InitChannelElem(grpc_channel_element* /*elem*/,

@ -67,7 +67,7 @@ class DummyEndpoint : public grpc_endpoint {
return;
}
grpc_slice_buffer_add(slices_, slice);
GRPC_CLOSURE_SCHED(read_cb_, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, read_cb_, GRPC_ERROR_NONE);
read_cb_ = nullptr;
}
@ -83,7 +83,7 @@ class DummyEndpoint : public grpc_endpoint {
if (have_slice_) {
have_slice_ = false;
grpc_slice_buffer_add(slices, buffered_slice_);
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
return;
}
read_cb_ = cb;
@ -97,7 +97,7 @@ class DummyEndpoint : public grpc_endpoint {
static void write(grpc_endpoint* /*ep*/, grpc_slice_buffer* /*slices*/,
grpc_closure* cb, void* /*arg*/) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE);
}
static void add_to_pollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {
@ -111,7 +111,8 @@ class DummyEndpoint : public grpc_endpoint {
static void shutdown(grpc_endpoint* ep, grpc_error* why) {
grpc_resource_user_shutdown(static_cast<DummyEndpoint*>(ep)->ru_);
GRPC_CLOSURE_SCHED(static_cast<DummyEndpoint*>(ep)->read_cb_, why);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
static_cast<DummyEndpoint*>(ep)->read_cb_, why);
}
static void destroy(grpc_endpoint* ep) {
@ -354,7 +355,7 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State& state) {
s->Op(&op);
s->DestroyThen(start.get());
});
GRPC_CLOSURE_SCHED(start.get(), GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, start.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
gpr_event_wait(&bm_done, gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_metadata_batch_destroy(&b);
@ -381,7 +382,7 @@ static void BM_TransportEmptyOp(benchmark::State& state) {
op.on_complete = c.get();
s->Op(&op);
});
GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
reset_op();
op.cancel_stream = true;
@ -613,7 +614,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
do {
if (received == recv_stream->length()) {
recv_stream.reset();
GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, c.get(), GRPC_ERROR_NONE);
return;
}
} while (recv_stream->Next(recv_stream->length() - received,

@ -125,7 +125,7 @@ static void BM_ClosureSchedOnExecCtx(benchmark::State& state) {
GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
}
@ -141,8 +141,8 @@ static void BM_ClosureSched2OnExecCtx(benchmark::State& state) {
GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c1, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c2, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
}
@ -160,9 +160,9 @@ static void BM_ClosureSched3OnExecCtx(benchmark::State& state) {
GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c1, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c2, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &c3, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
}
@ -354,12 +354,15 @@ class Rescheduler {
GRPC_CLOSURE_INIT(&closure_, Step, this, scheduler);
}
void ScheduleFirst() { GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE); }
void ScheduleFirst() {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
}
void ScheduleFirstAgainstDifferentScheduler(
grpc_closure_scheduler* scheduler) {
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(Step, this, scheduler),
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(Step, this, scheduler),
GRPC_ERROR_NONE);
}
private:
@ -369,7 +372,7 @@ class Rescheduler {
static void Step(void* arg, grpc_error* /*error*/) {
Rescheduler* self = static_cast<Rescheduler*>(arg);
if (self->state_.KeepRunning()) {
GRPC_CLOSURE_SCHED(&self->closure_, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &self->closure_, GRPC_ERROR_NONE);
}
}
};

@ -45,7 +45,7 @@ static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
}
static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {

Loading…
Cancel
Save