From 08ba7246c024cdb082837c7e997dfb476a4880b3 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 1 Nov 2019 16:02:00 -0700 Subject: [PATCH 1/4] Remove GRPC_CLOSURE_RUN and replace with grpc_core::Closure::Run --- .../filters/client_channel/client_channel.cc | 16 +++--- .../grpclb/client_load_reporting_filter.cc | 8 +-- .../ext/filters/client_channel/subchannel.cc | 5 +- .../ext/filters/deadline/deadline_filter.cc | 10 ++-- .../filters/http/client/http_client_filter.cc | 10 ++-- .../message_compress_filter.cc | 5 +- .../filters/http/server/http_server_filter.cc | 9 ++-- .../server_load_reporting_filter.cc | 5 +- .../message_size/message_size_filter.cc | 5 +- .../workaround_cronet_compression_filter.cc | 5 +- .../chttp2/transport/chttp2_transport.cc | 2 +- src/core/lib/iomgr/call_combiner.cc | 3 +- src/core/lib/iomgr/closure.h | 52 ++++++++++--------- src/core/lib/iomgr/resource_quota.cc | 3 +- src/core/lib/iomgr/tcp_posix.cc | 2 +- .../security/transport/server_auth_filter.cc | 5 +- src/core/lib/surface/call.cc | 3 +- src/core/lib/surface/server.cc | 5 +- src/core/lib/transport/transport.cc | 2 +- src/cpp/ext/filters/census/client_filter.cc | 8 +-- src/cpp/ext/filters/census/server_filter.cc | 8 +-- .../core/end2end/tests/filter_causes_close.cc | 4 +- test/core/iomgr/resource_quota_test.cc | 5 +- .../microbenchmarks/bm_chttp2_transport.cc | 6 +-- test/cpp/microbenchmarks/bm_closure.cc | 8 +-- 25 files changed, 113 insertions(+), 81 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8812b2b1ad6..282204a49f4 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1191,9 +1191,8 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( void* arg, grpc_error* /*ignored*/) { ExternalConnectivityWatcher* self = static_cast(arg); - // This assumes that the closure is scheduled on the ExecCtx scheduler - // and that GRPC_CLOSURE_RUN() will run the closure immediately. - GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, + GRPC_ERROR_NONE); // Add new watcher. self->chand_->state_tracker_.AddWatcher( self->initial_state_, @@ -2263,8 +2262,9 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata, &calld->lb_call_state_); // Chain to original callback. - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready_, + GRPC_ERROR_REF(error)); } void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( @@ -2759,7 +2759,8 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { @@ -2849,7 +2850,8 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, recv_message_ready, + GRPC_ERROR_REF(error)); } void CallData::RecvMessageReady(void* arg, grpc_error* error) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 27f5bb8c030..c8d1b1d7af8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -59,7 +59,8 @@ static void on_complete_for_send(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { calld->send_initial_metadata_succeeded = true; } - GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_on_complete_for_send, + GRPC_ERROR_REF(error)); } static void recv_initial_metadata_ready(void* arg, grpc_error* error) { @@ -67,8 +68,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { calld->recv_initial_metadata_succeeded = true; } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } static grpc_error* clr_init_call_elem(grpc_call_element* elem, diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 1c245ae427a..14eff2f0cd7 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -293,8 +293,9 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } else { channelz_subchannel->RecordCallFailed(); } - GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata_, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + call->original_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); } void SubchannelCall::IncrementRefCount() { diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index a99d9cb595b..004cee47726 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -133,8 +133,9 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { grpc_deadline_state* deadline_state = static_cast(arg); cancel_timer_if_needed(deadline_state); // Invoke the original callback. - GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + deadline_state->original_recv_trailing_metadata_ready, + GRPC_ERROR_REF(error)); } // Inject our own recv_trailing_metadata_ready callback into op. @@ -290,8 +291,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { server_call_data* calld = static_cast(elem->call_data); start_timer_if_needed(elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Method for starting a call op for server filter. diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 81b9e68445f..b1956c344dc 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -188,7 +188,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { calld->call_combiner, &calld->recv_trailing_metadata_ready, calld->recv_trailing_metadata_error, "continue recv_trailing_metadata"); } - GRPC_CLOSURE_RUN(closure, error); + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { @@ -209,15 +209,17 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { } error = grpc_error_add_child( error, GRPC_ERROR_REF(calld->recv_initial_metadata_error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready, error); } static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); calld->send_message_cache.Destroy(); - GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); } // Pulls a slice from the send_message byte stream, updating diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index f2b2ff34582..0d6bc1212dc 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -227,8 +227,9 @@ static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); grpc_slice_buffer_reset_and_unref_internal(&calld->slices); - GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); } static void send_message_batch_continue(grpc_call_element* elem) { diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 905f2ebc55d..62dc76fb6cc 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -363,7 +363,8 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) { "resuming hs_recv_trailing_metadata_ready from " "hs_recv_initial_metadata_ready"); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_initial_metadata_ready, err); } static void hs_recv_message_ready(void* user_data, grpc_error* err) { @@ -378,7 +379,8 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) { calld->recv_message->reset(calld->read_stream.get()); calld->have_read_stream = false; } - GRPC_CLOSURE_RUN(calld->original_recv_message_ready, GRPC_ERROR_REF(err)); + grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_recv_message_ready, + GRPC_ERROR_REF(err)); } else { // We have not yet seen the recv_initial_metadata callback, so we // need to wait to see if this is a GET request. @@ -404,7 +406,8 @@ static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) { err = grpc_error_add_child( GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready, err); } static grpc_error* hs_mutate_op(grpc_call_element* elem, diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index f48b0f4fdcf..83c9f3aff29 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -267,8 +267,9 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg, {::grpc::load_reporter::TagKeyUserId(), {chand->peer_identity(), chand->peer_identity_len()}}}); } - GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_, - GRPC_ERROR_REF(err)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_initial_metadata_ready_, + GRPC_ERROR_REF(err)); } grpc_error* ServerLoadReportingCallData::Init( diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index f13efd76236..2b15e7ecd5f 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -215,7 +215,7 @@ static void recv_message_ready(void* user_data, grpc_error* error) { calld->recv_trailing_metadata_error, "continue recv_trailing_metadata_ready"); } - GRPC_CLOSURE_RUN(closure, error); + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); } // Callback invoked on completion of recv_trailing_metadata @@ -234,7 +234,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) { error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error)); // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready, error); } // Start transport stream op. diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc index ee491411cfd..7f9a4fcaf82 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc @@ -70,8 +70,9 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) { } // Invoke the next callback. - GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Start transport stream op. diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0936ee854cc..1921f41a3a0 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1831,7 +1831,7 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index bfbbb7f385d..7c282eb3f24 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -80,7 +80,8 @@ void CallCombiner::TsanClosure(void* arg, grpc_error* error) { } else { lock.reset(); } - GRPC_CLOSURE_RUN(self->original_closure_, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_, + GRPC_ERROR_REF(error)); if (lock != nullptr) { TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true); bool prev = true; diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index c7b2e8299b9..18c751d7aef 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -27,6 +27,7 @@ #include #include +#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/gprpp/mpscq.h" #include "src/core/lib/iomgr/error.h" @@ -247,35 +248,38 @@ inline bool grpc_closure_list_empty(grpc_closure_list closure_list) { return closure_list.head == nullptr; } +namespace grpc_core { +class Closure { + public: + static void Run(const DebugLocation& location, grpc_closure* closure, + grpc_error* error) { + if (closure != nullptr) { #ifndef NDEBUG -inline void grpc_closure_run(const char* file, int line, grpc_closure* c, - grpc_error* error) { -#else -inline void grpc_closure_run(grpc_closure* c, grpc_error* error) { + closure->file_initiated = location.file(); + closure->line_initiated = location.line(); + closure->run = true; + closure->scheduled = false; + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", + closure, closure->file_created, closure->line_created, + closure->run ? "run" : "scheduled", closure->file_initiated, + closure->line_initiated); + } + GPR_ASSERT(closure->cb != nullptr); #endif - GPR_TIMER_SCOPE("grpc_closure_run", 0); - if (c != nullptr) { + closure->cb(closure->cb_arg, error); #ifndef NDEBUG - c->file_initiated = file; - c->line_initiated = line; - c->run = true; - GPR_ASSERT(c->cb != nullptr); + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "closure %p finished", closure); + } #endif - c->scheduler->vtable->run(c, error); - } else { - GRPC_ERROR_UNREF(error); + GRPC_ERROR_UNREF(error); + } else { + GRPC_ERROR_UNREF(error); + } } -} - -/** Run a closure directly. Caller ensures that no locks are being held above. - * Note that calling this at the end of a closure callback function itself is - * by definition safe. */ -#ifndef NDEBUG -#define GRPC_CLOSURE_RUN(closure, error) \ - grpc_closure_run(__FILE__, __LINE__, closure, error) -#else -#define GRPC_CLOSURE_RUN(closure, error) grpc_closure_run(closure, error) -#endif +}; +} // namespace grpc_core #ifndef NDEBUG inline void grpc_closure_sched(const char* file, int line, grpc_closure* c, diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 661783eeedd..ab1f86e4a92 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -597,7 +597,8 @@ static void ru_allocated_slices(void* arg, grpc_error* error) { grpc_resource_user_slice_allocator* slice_allocator = static_cast(arg); if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator); - GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done, + GRPC_ERROR_REF(error)); } /******************************************************************************* diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index f783a4fd6cf..bcb7e6128dc 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -1046,7 +1046,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } // No need to take a ref on error since tcp_flush provides a ref. - GRPC_CLOSURE_RUN(cb, error); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); TCP_UNREF(tcp, "write"); } } diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index d7b33f99509..698a5a30dba 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -238,7 +238,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { calld->recv_trailing_metadata_error, "continue recv_trailing_metadata_ready"); } - GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); } static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { @@ -254,7 +254,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) { } err = grpc_error_add_child( GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready, err); } static void server_auth_start_transport_stream_op_batch( diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 605d69a2ebf..1094573f8c6 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1515,7 +1515,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) { } } if (saved_rsr_closure != nullptr) { - GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure, + GRPC_ERROR_REF(error)); } finish_batch_step(bctl); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index eed32a9a23d..4300d7d3a54 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -781,7 +781,7 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) { calld->recv_trailing_metadata_error, "continue server_recv_trailing_metadata_ready"); } - GRPC_CLOSURE_RUN(closure, error); + grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); } static void server_recv_trailing_metadata_ready(void* user_data, @@ -802,7 +802,8 @@ static void server_recv_trailing_metadata_ready(void* user_data, error = grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->recv_initial_metadata_error)); - GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->original_recv_trailing_metadata_ready, error); } static void server_mutate_op(grpc_call_element* elem, diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 0a82f7cb8ca..1651b5a2292 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -242,7 +242,7 @@ static void destroy_made_transport_stream_op(void* arg, grpc_error* error) { made_transport_stream_op* op = static_cast(arg); grpc_closure* c = op->inner_on_complete; gpr_free(op); - GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op_batch* grpc_make_transport_stream_op( diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc index 45a883c45b8..f2053a6183a 100644 --- a/src/cpp/ext/filters/census/client_filter.cc +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -58,8 +58,9 @@ void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, FilterTrailingMetadata(calld->recv_trailing_metadata_, &calld->elapsed_time_); } - GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->initial_on_done_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); } void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, @@ -75,7 +76,8 @@ void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, if ((*calld->recv_message_) != nullptr) { calld->recv_message_count_++; } - GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_, + GRPC_ERROR_REF(error)); } void CensusClientCallData::StartTransportStreamOpBatch( diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc index 09ceb9f1da8..ab7fadc50ed 100644 --- a/src/cpp/ext/filters/census/server_filter.cc +++ b/src/cpp/ext/filters/census/server_filter.cc @@ -74,7 +74,8 @@ void CensusServerCallData::OnDoneRecvMessageCb(void* user_data, if ((*calld->recv_message_) != nullptr) { ++calld->recv_message_count_; } - GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_, + GRPC_ERROR_REF(error)); } void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, @@ -121,8 +122,9 @@ void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, grpc_census_call_set_context( calld->gc_, reinterpret_cast(&calld->context_)); } - GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_, - GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, + calld->initial_on_done_recv_initial_metadata_, + GRPC_ERROR_REF(error)); } void CensusServerCallData::StartTransportStreamOpBatch( diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc index 2e8d1e56663..7f99b9219e2 100644 --- a/test/core/end2end/tests/filter_causes_close.cc +++ b/test/core/end2end/tests/filter_causes_close.cc @@ -198,8 +198,8 @@ typedef struct { static void recv_im_ready(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); - GRPC_CLOSURE_RUN( - calld->recv_im_ready, + grpc_core::Closure::Run( + DEBUG_LOCATION, calld->recv_im_ready, grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failure that's not preventable.", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, diff --git a/test/core/iomgr/resource_quota_test.cc b/test/core/iomgr/resource_quota_test.cc index 27ffd3f9ec9..d5d86a37e7a 100644 --- a/test/core/iomgr/resource_quota_test.cc +++ b/test/core/iomgr/resource_quota_test.cc @@ -62,7 +62,7 @@ static void reclaimer_cb(void* args, grpc_error* error) { reclaimer_args* a = static_cast(args); grpc_resource_user_free(a->resource_user, a->size); grpc_resource_user_finish_reclamation(a->resource_user); - GRPC_CLOSURE_RUN(a->then, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, a->then, GRPC_ERROR_NONE); gpr_free(a); } @@ -77,7 +77,8 @@ grpc_closure* make_reclaimer(grpc_resource_user* resource_user, size_t size, static void unused_reclaimer_cb(void* arg, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_CANCELLED); - GRPC_CLOSURE_RUN(static_cast(arg), GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, static_cast(arg), + GRPC_ERROR_NONE); } grpc_closure* make_unused_reclaimer(grpc_closure* then) { return GRPC_CLOSURE_CREATE(unused_reclaimer_cb, then, diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index ac77d6cab1f..e04c2276f06 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -277,7 +277,7 @@ static void BM_StreamCreateDestroy(benchmark::State& state) { s->Op(&op); s->DestroyThen(next.get()); }); - GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, next.get(), GRPC_ERROR_NONE); f.FlushExecCtx(); track_counters.Finish(state); } @@ -606,7 +606,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) { GPR_ASSERT(!state.KeepRunning()); return; } - GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE); }); drain = MakeClosure([&](grpc_error* /*error*/) { @@ -627,7 +627,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) { recv_stream->Pull(&recv_slice); received += GRPC_SLICE_LENGTH(recv_slice); grpc_slice_unref_internal(recv_slice); - GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE); }); reset_op(); diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 0a0e8d664e7..d51b29752e8 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -84,7 +84,7 @@ static void BM_ClosureRunOnExecCtx(benchmark::State& state) { GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_RUN(&c, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } @@ -96,7 +96,8 @@ static void BM_ClosureCreateAndRun(benchmark::State& state) { TrackCounters track_counters; grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_RUN( + grpc_core::Closure::Run( + DEBUG_LOCATION, GRPC_CLOSURE_CREATE(DoNothing, nullptr, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -110,7 +111,8 @@ static void BM_ClosureInitAndRun(benchmark::State& state) { grpc_core::ExecCtx exec_ctx; grpc_closure c; for (auto _ : state) { - GRPC_CLOSURE_RUN( + grpc_core::Closure::Run( + DEBUG_LOCATION, GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } From a9da023413d16e7396436131683e6b01cab9152d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 4 Nov 2019 10:37:51 -0800 Subject: [PATCH 2/4] Reviewer comments --- src/core/ext/filters/client_channel/client_channel.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 282204a49f4..1fa26ad1745 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1191,7 +1191,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( void* arg, grpc_error* /*ignored*/) { ExternalConnectivityWatcher* self = static_cast(arg); - grpc_core::Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, + Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE); // Add new watcher. self->chand_->state_tracker_.AddWatcher( @@ -2262,7 +2262,7 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata, &calld->lb_call_state_); // Chain to original callback. - grpc_core::Closure::Run(DEBUG_LOCATION, + Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, GRPC_ERROR_REF(error)); } @@ -2759,7 +2759,7 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - grpc_core::Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, + Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } @@ -2850,7 +2850,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - grpc_core::Closure::Run(DEBUG_LOCATION, recv_message_ready, + Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error)); } From d60b60a837a5c1198f03792d5ec4b03618aa5992 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 7 Nov 2019 15:40:30 -0800 Subject: [PATCH 3/4] Reviewer comments --- .../filters/client_channel/client_channel.cc | 13 +++---- .../ext/filters/client_channel/subchannel.cc | 5 ++- .../chttp2/transport/chttp2_transport.cc | 2 +- src/core/lib/iomgr/closure.h | 35 ++++++++----------- src/core/lib/iomgr/tcp_posix.cc | 20 ++++++----- src/core/lib/surface/call.cc | 8 +++-- test/cpp/microbenchmarks/bm_closure.cc | 5 ++- 7 files changed, 41 insertions(+), 47 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e33043c34a4..2b635d83d16 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1195,8 +1195,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked( void* arg, grpc_error* /*ignored*/) { ExternalConnectivityWatcher* self = static_cast(arg); - Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, - GRPC_ERROR_NONE); + Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE); // Add new watcher. self->chand_->state_tracker_.AddWatcher( self->initial_state_, @@ -2268,9 +2267,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy( calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata, &calld->lb_call_state_); // Chain to original callback. - Closure::Run(DEBUG_LOCATION, - calld->original_recv_trailing_metadata_ready_, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, + GRPC_ERROR_REF(error)); } void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy( @@ -2766,7 +2764,7 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) { batch_data->Unref(); // Invoke callback. Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error)); } void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) { @@ -2856,8 +2854,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) { calld->MaybeClearPendingBatch(batch_data->elem, pending); batch_data->Unref(); // Invoke callback. - Closure::Run(DEBUG_LOCATION, recv_message_ready, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error)); } void CallData::RecvMessageReady(void* arg, grpc_error* error) { diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9c4e2b93782..1566dabfe9c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -293,9 +293,8 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { } else { channelz_subchannel->RecordCallFailed(); } - grpc_core::Closure::Run(DEBUG_LOCATION, - call->original_recv_trailing_metadata_, - GRPC_ERROR_REF(error)); + Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); } void SubchannelCall::IncrementRefCount() { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 3c446aedcf8..5347d2692dc 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1836,7 +1836,7 @@ static void perform_transport_op_locked(void* stream_op, close_transport_locked(t, op->disconnect_with_error); } - grpc_core::Closure::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE); GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op"); } diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 9c72ea4dfea..d0e26902d7b 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -253,30 +253,25 @@ class Closure { public: static void Run(const DebugLocation& location, grpc_closure* closure, grpc_error* error) { - if (closure != nullptr) { + if (closure == nullptr) { + GRPC_ERROR_UNREF(error); + return; + } #ifndef NDEBUG - closure->file_initiated = location.file(); - closure->line_initiated = location.line(); - closure->run = true; - closure->scheduled = false; - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", - closure, closure->file_created, closure->line_created, - closure->run ? "run" : "scheduled", closure->file_initiated, - closure->line_initiated); - } - GPR_ASSERT(closure->cb != nullptr); + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", + closure, closure->file_created, closure->line_created, "run", + location.file(), location.line()); + } + GPR_ASSERT(closure->cb != nullptr); #endif - closure->cb(closure->cb_arg, error); + closure->cb(closure->cb_arg, error); #ifndef NDEBUG - if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "closure %p finished", closure); - } -#endif - GRPC_ERROR_UNREF(error); - } else { - GRPC_ERROR_UNREF(error); + if (grpc_trace_closure.enabled()) { + gpr_log(GPR_DEBUG, "closure %p finished", closure); } +#endif + GRPC_ERROR_UNREF(error); } }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index e422d17a60e..668a0c805e8 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } #define MAX_READ_IOVEC 4 @@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, * right thing (i.e calls tcp_do_read() which either reads the available * bytes or calls notify_on_read() to be notified when new bytes become * available */ - GRPC_CLOSURE_RUN(&tcp->read_done_closure, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure, + GRPC_ERROR_NONE); } } @@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; - GRPC_CLOSURE_RUN(cb, GRPC_ERROR_REF(error)); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "write"); return; } @@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { - GRPC_CLOSURE_RUN(cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) - : GRPC_ERROR_NONE); + grpc_core::Closure::Run( + DEBUG_LOCATION, cb, + grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), + tcp) + : GRPC_ERROR_NONE); tcp_shutdown_buffer_list(tcp); return; } @@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + grpc_core::Closure::Run(DEBUG_LOCATION, cb, error); } } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index e3e54baa9f4..fcebe9bc410 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1224,8 +1224,9 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + grpc_core::Closure::Run(DEBUG_LOCATION, + (grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs error */ @@ -1571,7 +1572,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, static_cast( gpr_malloc(sizeof(grpc_cq_completion)))); } else { - GRPC_CLOSURE_RUN((grpc_closure*)notify_tag, GRPC_ERROR_NONE); + grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag, + GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 2ce21650a1f..7736012128a 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -78,19 +78,18 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { } BENCHMARK(BM_ClosureInitAgainstCombiner); -static void BM_ClosureRunOnExecCtx(benchmark::State& state) { +static void BM_ClosureRun(benchmark::State& state) { TrackCounters track_counters; grpc_closure c; GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { grpc_core::Closure::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE); - grpc_core::ExecCtx::Get()->Flush(); } track_counters.Finish(state); } -BENCHMARK(BM_ClosureRunOnExecCtx); +BENCHMARK(BM_ClosureRun); static void BM_ClosureCreateAndRun(benchmark::State& state) { TrackCounters track_counters; From 0a9a0eb2e04a31606e6381050bfe90b19f192231 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Fri, 8 Nov 2019 11:49:00 -0800 Subject: [PATCH 4/4] Reviewer comments --- src/core/lib/iomgr/closure.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index d0e26902d7b..60400c8017a 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -259,8 +259,8 @@ class Closure { } #ifndef NDEBUG if (grpc_trace_closure.enabled()) { - gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", - closure, closure->file_created, closure->line_created, "run", + gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: run [%s:%d]", + closure, closure->file_created, closure->line_created, location.file(), location.line()); } GPR_ASSERT(closure->cb != nullptr);