Merge pull request #15200 from yashykt/combiner_run

Add combiner_run
reviewable/pr15436/r1
Yash Tibrewal 7 years ago committed by GitHub
commit 627bba4925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/ext/filters/client_channel/client_channel.cc
  2. 2
      src/core/ext/filters/client_channel/subchannel.cc
  3. 10
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 19
      src/core/lib/iomgr/combiner.cc
  5. 2
      src/core/lib/iomgr/resource_quota.cc
  6. 2
      src/core/lib/iomgr/tcp_custom.cc
  7. 4
      src/core/lib/iomgr/tcp_posix.cc
  8. 8
      src/core/lib/surface/call.cc

@ -3242,7 +3242,7 @@ static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
"external_connectivity_watcher"); "external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w); external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w); gpr_free(w);
GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
} }
static void watch_connectivity_state_locked(void* arg, static void watch_connectivity_state_locked(void* arg,

@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) {
gpr_mu_unlock(&w->subchannel->mu); gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher");
gpr_free(w); gpr_free(w);
GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
} }
static void on_alarm(void* arg, grpc_error* error) { static void on_alarm(void* arg, grpc_error* error) {

@ -1684,16 +1684,16 @@ static void send_ping_locked(grpc_chttp2_transport* t,
*/ */
static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
if (t->closed_with_error != GRPC_ERROR_NONE) { if (t->closed_with_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
GRPC_ERROR_REF(t->closed_with_error)); GRPC_ERROR_REF(t->closed_with_error));
GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
GRPC_ERROR_REF(t->closed_with_error)); GRPC_ERROR_REF(t->closed_with_error));
return; return;
} }
grpc_chttp2_ping_queue* pq = &t->ping_queue; grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* There is a ping in flight. Add yourself to the inflight closure list. */ /* There is a ping in flight. Add yourself to the inflight closure list. */
GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
&t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
return; return;

@ -63,11 +63,12 @@ struct grpc_combiner {
gpr_refcount refs; gpr_refcount refs;
}; };
static void combiner_run(grpc_closure* closure, grpc_error* error);
static void combiner_exec(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
static const grpc_closure_scheduler_vtable scheduler = { static const grpc_closure_scheduler_vtable scheduler = {
combiner_exec, combiner_exec, "combiner:immediately"}; combiner_run, combiner_exec, "combiner:immediately"};
static const grpc_closure_scheduler_vtable finally_scheduler = { static const grpc_closure_scheduler_vtable finally_scheduler = {
combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
@ -343,6 +344,22 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
grpc_closure_list_append(&lock->final_list, closure, error); grpc_closure_list_append(&lock->final_list, closure, error);
} }
static void combiner_run(grpc_closure* closure, grpc_error* error) {
grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler);
#ifndef NDEBUG
closure->scheduled = false;
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG,
"Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]",
lock, closure, closure->file_created, closure->line_created,
closure->file_initiated, closure->line_initiated));
#endif
GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner ==
lock);
closure->cb(closure->cb_arg, error);
GRPC_ERROR_UNREF(error);
}
static void enqueue_finally(void* closure, grpc_error* error) { static void enqueue_finally(void* closure, grpc_error* error) {
combiner_finally_exec(static_cast<grpc_closure*>(closure), combiner_finally_exec(static_cast<grpc_closure*>(closure),
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));

@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c; resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = nullptr; resource_user->reclaimers[destructive] = nullptr;
GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
return true; return true;
} }

@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
TCP_UNREF(tcp, "read"); TCP_UNREF(tcp, "read");
tcp->read_slices = nullptr; tcp->read_slices = nullptr;
tcp->read_cb = nullptr; tcp->read_cb = nullptr;
GRPC_CLOSURE_RUN(cb, error); GRPC_CLOSURE_SCHED(cb, error);
} }
static void custom_read_callback(grpc_custom_socket* socket, size_t nread, static void custom_read_callback(grpc_custom_socket* socket, size_t nread,

@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr; tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr; tcp->incoming_buffer = nullptr;
GRPC_CLOSURE_RUN(cb, error); GRPC_CLOSURE_SCHED(cb, error);
} }
#define MAX_READ_IOVEC 4 #define MAX_READ_IOVEC 4
@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
gpr_log(GPR_INFO, "write: %s", str); gpr_log(GPR_INFO, "write: %s", str);
} }
GRPC_CLOSURE_RUN(cb, error); GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write"); TCP_UNREF(tcp, "write");
} }
} }

@ -1259,8 +1259,12 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) { if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */ /* unrefs bctl->error */
bctl->call = nullptr; bctl->call = nullptr;
GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, /* This closure may be meant to be run within some combiner. Since we aren't
error); * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
* of GRPC_CLOSURE_RUN.
*/
GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion"); GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else { } else {
/* unrefs bctl->error */ /* unrefs bctl->error */

Loading…
Cancel
Save