From eaddd597d75cbf05835318b8d047654eb8b37e72 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 26 Apr 2018 14:31:45 -0700 Subject: [PATCH 1/6] Add combiner_run to run closures immediately if we already have the combiner lock --- .../ext/filters/client_channel/subchannel.cc | 2 +- .../chttp2/transport/chttp2_transport.cc | 14 +++---- src/core/lib/iomgr/combiner.cc | 39 ++++++++++++++++++- src/core/lib/iomgr/tcp_posix.cc | 4 +- 4 files changed, 47 insertions(+), 12 deletions(-) diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index d7815fb7e14..450e6842739 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) { gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(void* arg, grpc_error* error) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 0ef73961a56..eb16c1b9104 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1665,8 +1665,8 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(on_ack, GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; @@ -1683,16 +1683,16 @@ static void send_ping_locked(grpc_chttp2_transport* t, */ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* There is a ping in flight. Add yourself to the inflight closure list. */ - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); return; diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 9429842eb85..c11dd65f5a9 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -63,13 +63,15 @@ struct grpc_combiner { gpr_refcount refs; }; +static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); +static void combiner_finally_run(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { - combiner_exec, combiner_exec, "combiner:immediately"}; + combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { - combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; + combiner_finally_run, combiner_finally_exec, "combiner:finally"}; static void offload(void* arg, grpc_error* error); @@ -343,6 +345,39 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { grpc_closure_list_append(&lock->final_list, closure, error); } +static void combiner_run(grpc_closure* closure, grpc_error* error) { +#ifndef NDEBUG + closure->scheduled = false; + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); +#endif + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + +static void combiner_finally_run(grpc_closure* closure, grpc_error* error) { +#ifndef NDEBUG + closure->scheduled = false; + grpc_combiner* lock = + COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); +#endif + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + static void enqueue_finally(void* closure, grpc_error* error) { combiner_finally_exec(static_cast(closure), GRPC_ERROR_REF(error)); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 153be05e83c..b79ffe20f1a 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } #define MAX_READ_IOVEC 4 @@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } From 730b7de15874732d8c7c0eb081b29d43c805f81a Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 30 Apr 2018 13:02:08 -0700 Subject: [PATCH 2/6] Revert combiner_finally_run and restrict changes to combiner_run --- src/core/lib/iomgr/combiner.cc | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index c11dd65f5a9..60785b6b430 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -65,13 +65,12 @@ struct grpc_combiner { static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); -static void combiner_finally_run(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { - combiner_finally_run, combiner_finally_exec, "combiner:finally"}; + combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; static void offload(void* arg, grpc_error* error); @@ -347,25 +346,8 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { static void combiner_run(grpc_closure* closure, grpc_error* error) { #ifndef NDEBUG - closure->scheduled = false; grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", - lock, closure, closure->file_created, closure->line_created, - closure->file_initiated, closure->line_initiated)); - GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == - lock); -#endif - closure->cb(closure->cb_arg, error); - GRPC_ERROR_UNREF(error); -} - -static void combiner_finally_run(grpc_closure* closure, grpc_error* error) { -#ifndef NDEBUG closure->scheduled = false; - grpc_combiner* lock = - COMBINER_FROM_CLOSURE_SCHEDULER(closure, finally_scheduler); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", From 2629f466dd074f14041fe8e65707f18836ae0546 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 30 Apr 2018 14:52:31 -0700 Subject: [PATCH 3/6] Remove illegal GRPC_CLOSURE_RUNs --- src/core/ext/filters/client_channel/client_channel.cc | 2 +- src/core/lib/iomgr/combiner.cc | 4 ++-- src/core/lib/iomgr/resource_quota.cc | 2 +- src/core/lib/surface/call.cc | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 80a647fa94d..532ff11ff0e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -3242,7 +3242,7 @@ static void on_external_watch_complete_locked(void* arg, grpc_error* error) { "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(void* arg, diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 60785b6b430..6789e4d12db 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -345,17 +345,17 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { } static void combiner_run(grpc_closure* closure, grpc_error* error) { -#ifndef NDEBUG grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); +#ifndef NDEBUG closure->scheduled = false; GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", lock, closure, closure->file_created, closure->line_created, closure->file_initiated, closure->line_initiated)); +#endif GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == lock); -#endif closure->cb(closure->cb_arg, error); GRPC_ERROR_UNREF(error); } diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8cf4fe99282..539bc120cec 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); return true; } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index da488034ca9..0a4282188cc 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1259,8 +1259,8 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs bctl->error */ From 9c2d4a08378ca6e90f96f0eaf064dced1cfba31e Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 30 Apr 2018 16:27:41 -0700 Subject: [PATCH 4/6] Remove further invalid usecases of GRPC_CLOSURE_RUN --- src/core/lib/iomgr/tcp_custom.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index b3b29340146..990e8d632b9 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) { TCP_UNREF(tcp, "read"); tcp->read_slices = nullptr; tcp->read_cb = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } static void custom_read_callback(grpc_custom_socket* socket, size_t nread, From 592dc0fa1fa2934ce2848a5b8b91a9e19b368ba5 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 1 May 2018 16:52:26 -0700 Subject: [PATCH 5/6] Revert GRPC_CLOSURE_RUN to GRPC_CLOSURE_SCHED for send_ping_locked --- src/core/ext/transport/chttp2/transport/chttp2_transport.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index eb16c1b9104..f52a7f03714 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1665,8 +1665,8 @@ static void cancel_pings(grpc_chttp2_transport* t, grpc_error* error) { static void send_ping_locked(grpc_chttp2_transport* t, grpc_closure* on_initiate, grpc_closure* on_ack) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_RUN(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_RUN(on_ack, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; From f609e62578da510297b0592ba94a9aa2f2c06c72 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 16 May 2018 15:14:50 -0700 Subject: [PATCH 6/6] Add comment on use of GPRC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN --- src/core/lib/surface/call.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 0a4282188cc..7ed1696f802 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1259,6 +1259,10 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = nullptr; + /* This closure may be meant to be run within some combiner. Since we aren't + * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead + * of GRPC_CLOSURE_RUN. + */ GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(call, "completion");