diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e33043c34a4..2b635d83d16 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1195,8 +1195,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( void* arg, grpc_error* /*ignored*/) { ExternalConnectivityWatcher* self = static_cast(arg); - Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, - GRPC_ERROR_NONE); + Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE); // Add new watcher. self->chand_->state_tracker_.AddWatcher( self->initial_state_, @@ -2268,9 +2267,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata, &calld->lb_call_state_); // Chain to original callback. - Closure::Run(DEBUG_LOCATION, - calld->original_recv_trailing_metadata_ready_, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, + GRPC_ERROR_REF(error)); } void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( @@ -2766,7 +2764,7 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) { batch_data->Unref(); // Invoke callback. Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error)); } void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { @@ -2856,8 +2854,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - Closure::Run(DEBUG_LOCATION, recv_message_ready, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error)); } void CallData::RecvMessageReady(void* arg, grpc_error* error) { diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9c4e2b93782..1566dabfe9c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -293,9 +293,8 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } else { channelz_subchannel->RecordCallFailed(); } - grpc_core::Closure::Run(DEBUG_LOCATION, - call->original_recv_trailing_metadata_, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); } void SubchannelCall::IncrementRefCount() { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 3c446aedcf8..5347d2692dc 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1836,7 +1836,7 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - grpc_core::Closure::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 9c72ea4dfea..d0e26902d7b 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -253,30 +253,25 @@ class Closure { public: static void Run(const DebugLocation& location, grpc_closure* closure, grpc_error* error) { - if (closure != nullptr) { + if (closure == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } #ifndef NDEBUG - closure->file_initiated = location.file(); - closure->line_initiated = location.line(); - closure->run = true; - closure->scheduled = false; - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", - closure, closure->file_created, closure->line_created, - closure->run ? "run" : "scheduled", closure->file_initiated, - closure->line_initiated); - } - GPR_ASSERT(closure->cb != nullptr); + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", + closure, closure->file_created, closure->line_created, "run", + location.file(), location.line()); + } + GPR_ASSERT(closure->cb != nullptr); #endif - closure->cb(closure->cb_arg, error); + closure->cb(closure->cb_arg, error); #ifndef NDEBUG - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "closure %p finished", closure); - } -#endif - GRPC_ERROR_UNREF(error); - } else { - GRPC_ERROR_UNREF(error); + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "closure %p finished", closure); } +#endif + GRPC_ERROR_UNREF(error); } }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index e422d17a60e..668a0c805e8 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } #define MAX_READ_IOVEC 4 @@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, * right thing (i.e calls tcp_do_read() which either reads the available * bytes or calls notify_on_read() to be notified when new bytes become * available */ - GRPC_CLOSURE_RUN(&tcp->read_done_closure, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure, + GRPC_ERROR_NONE); } } @@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; - GRPC_CLOSURE_RUN(cb, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "write"); return; } @@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { - GRPC_CLOSURE_RUN(cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) - : GRPC_ERROR_NONE); + grpc_core::Closure::Run( + DEBUG_LOCATION, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), + tcp) + : GRPC_ERROR_NONE); tcp_shutdown_buffer_list(tcp); return; } @@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index e3e54baa9f4..fcebe9bc410 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1224,8 +1224,9 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + grpc_core::Closure::Run(DEBUG_LOCATION, + (grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs error */ @@ -1571,7 +1572,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, static_cast( gpr_malloc(sizeof(grpc_cq_completion)))); } else { - GRPC_CLOSURE_RUN((grpc_closure*)notify_tag, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag, + GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 2ce21650a1f..7736012128a 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -78,19 +78,18 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { } BENCHMARK(BM_ClosureInitAgainstCombiner); -static void BM_ClosureRunOnExecCtx(benchmark::State& state) { +static void BM_ClosureRun(benchmark::State& state) { TrackCounters track_counters; grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { grpc_core::Closure::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); } track_counters.Finish(state); } -BENCHMARK(BM_ClosureRunOnExecCtx); +BENCHMARK(BM_ClosureRun); static void BM_ClosureCreateAndRun(benchmark::State& state) { TrackCounters track_counters;