Reviewer comments

reviewable/pr20906/r3
Yash Tibrewal 5 years ago
parent fe88f14410
commit d60b60a837
  1. 13
      src/core/ext/filters/client_channel/client_channel.cc
  2. 5
      src/core/ext/filters/client_channel/subchannel.cc
  3. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 35
      src/core/lib/iomgr/closure.h
  5. 20
      src/core/lib/iomgr/tcp_posix.cc
  6. 8
      src/core/lib/surface/call.cc
  7. 5
      test/cpp/microbenchmarks/bm_closure.cc

@ -1195,8 +1195,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_,
GRPC_ERROR_NONE);
Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE);
// Add new watcher.
self->chand_->state_tracker_.AddWatcher(
self->initial_state_,
@ -2268,9 +2267,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata,
&calld->lb_call_state_);
// Chain to original callback.
Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
}
void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
@ -2766,7 +2764,7 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
batch_data->Unref();
// Invoke callback.
Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error));
}
void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
@ -2856,8 +2854,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
calld->MaybeClearPendingBatch(batch_data->elem, pending);
batch_data->Unref();
// Invoke callback.
Closure::Run(DEBUG_LOCATION, recv_message_ready,
GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
}
void CallData::RecvMessageReady(void* arg, grpc_error* error) {

@ -293,9 +293,8 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
} else {
channelz_subchannel->RecordCallFailed();
}
grpc_core::Closure::Run(DEBUG_LOCATION,
call->original_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
}
void SubchannelCall::IncrementRefCount() {

@ -1836,7 +1836,7 @@ static void perform_transport_op_locked(void* stream_op,
close_transport_locked(t, op->disconnect_with_error);
}
grpc_core::Closure::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
}

@ -253,30 +253,25 @@ class Closure {
public:
static void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
if (closure != nullptr) {
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
#ifndef NDEBUG
closure->file_initiated = location.file();
closure->line_initiated = location.line();
closure->run = true;
closure->scheduled = false;
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]",
closure, closure->file_created, closure->line_created,
closure->run ? "run" : "scheduled", closure->file_initiated,
closure->line_initiated);
}
GPR_ASSERT(closure->cb != nullptr);
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]",
closure, closure->file_created, closure->line_created, "run",
location.file(), location.line());
}
GPR_ASSERT(closure->cb != nullptr);
#endif
closure->cb(closure->cb_arg, error);
closure->cb(closure->cb_arg, error);
#ifndef NDEBUG
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
GRPC_ERROR_UNREF(error);
} else {
GRPC_ERROR_UNREF(error);
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
GRPC_ERROR_UNREF(error);
}
};
} // namespace grpc_core

@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
GRPC_CLOSURE_RUN(cb, error);
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
#define MAX_READ_IOVEC 4
@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* right thing (i.e calls tcp_do_read() which either reads the available
* bytes or calls notify_on_read() to be notified when new bytes become
* available */
GRPC_CLOSURE_RUN(&tcp->read_done_closure, GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure,
GRPC_ERROR_NONE);
}
}
@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
GRPC_CLOSURE_RUN(cb, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "write");
return;
}
@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
tcp->outgoing_buffer_arg = arg;
if (buf->length == 0) {
GRPC_CLOSURE_RUN(cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
grpc_core::Closure::Run(
DEBUG_LOCATION, cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
tcp)
: GRPC_ERROR_NONE);
tcp_shutdown_buffer_list(tcp);
return;
}
@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
GRPC_CLOSURE_RUN(cb, error);
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
}

@ -1224,8 +1224,9 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs error */
bctl->call = nullptr;
GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
grpc_core::Closure::Run(DEBUG_LOCATION,
(grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs error */
@ -1571,7 +1572,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
GRPC_CLOSURE_RUN((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;

@ -78,19 +78,18 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
}
BENCHMARK(BM_ClosureInitAgainstCombiner);
static void BM_ClosureRunOnExecCtx(benchmark::State& state) {
static void BM_ClosureRun(benchmark::State& state) {
TrackCounters track_counters;
grpc_closure c;
GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
grpc_core::Closure::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
}
track_counters.Finish(state);
}
BENCHMARK(BM_ClosureRunOnExecCtx);
BENCHMARK(BM_ClosureRun);
static void BM_ClosureCreateAndRun(benchmark::State& state) {
TrackCounters track_counters;

Loading…
Cancel
Save