diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 38131907942..fa0c280f801 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -3242,7 +3242,7 @@ static void on_external_watch_complete_locked(void* arg, grpc_error* error) { "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(void* arg, diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 140441da108..ad6b6dd1920 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) { gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(void* arg, grpc_error* error) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7ff7cabfbd0..cc4a8237981 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1684,16 +1684,16 @@ static void send_ping_locked(grpc_chttp2_transport* t, */ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* There is a ping in flight. Add yourself to the inflight closure list. */ - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); return; diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 9429842eb85..6789e4d12db 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -63,11 +63,12 @@ struct grpc_combiner { gpr_refcount refs; }; +static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { - combiner_exec, combiner_exec, "combiner:immediately"}; + combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; @@ -343,6 +344,22 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { grpc_closure_list_append(&lock->final_list, closure, error); } +static void combiner_run(grpc_closure* closure, grpc_error* error) { + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); +#ifndef NDEBUG + closure->scheduled = false; + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); +#endif + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + static void enqueue_finally(void* closure, grpc_error* error) { combiner_finally_exec(static_cast(closure), GRPC_ERROR_REF(error)); diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8cf4fe99282..539bc120cec 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); return true; } diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index b3b29340146..990e8d632b9 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) { TCP_UNREF(tcp, "read"); tcp->read_slices = nullptr; tcp->read_cb = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } static void custom_read_callback(grpc_custom_socket* socket, size_t nread, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 153be05e83c..b79ffe20f1a 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } #define MAX_READ_IOVEC 4 @@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index da488034ca9..7ed1696f802 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1259,8 +1259,12 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + /* This closure may be meant to be run within some combiner. Since we aren't + * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead + * of GRPC_CLOSURE_RUN. + */ + GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs bctl->error */