Merge pull request #20906 from yashykt/removrun

Remove GRPC_CLOSURE_RUN and replace with grpc_core::Closure::Run
pull/20990/head
Yash Tibrewal 5 years ago committed by GitHub
commit bd0aa9a600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      src/core/ext/filters/client_channel/client_channel.cc
  2. 8
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
  3. 4
      src/core/ext/filters/client_channel/subchannel.cc
  4. 10
      src/core/ext/filters/deadline/deadline_filter.cc
  5. 10
      src/core/ext/filters/http/client/http_client_filter.cc
  6. 5
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  7. 9
      src/core/ext/filters/http/server/http_server_filter.cc
  8. 5
      src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
  9. 5
      src/core/ext/filters/message_size/message_size_filter.cc
  10. 5
      src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
  11. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  12. 3
      src/core/lib/iomgr/call_combiner.cc
  13. 45
      src/core/lib/iomgr/closure.h
  14. 3
      src/core/lib/iomgr/resource_quota.cc
  15. 22
      src/core/lib/iomgr/tcp_posix.cc
  16. 5
      src/core/lib/security/transport/server_auth_filter.cc
  17. 11
      src/core/lib/surface/call.cc
  18. 5
      src/core/lib/surface/server.cc
  19. 2
      src/core/lib/transport/transport.cc
  20. 8
      src/cpp/ext/filters/census/client_filter.cc
  21. 8
      src/cpp/ext/filters/census/server_filter.cc
  22. 4
      test/core/end2end/tests/filter_causes_close.cc
  23. 5
      test/core/iomgr/resource_quota_test.cc
  24. 6
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  25. 13
      test/cpp/microbenchmarks/bm_closure.cc

@ -1197,9 +1197,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
// This assumes that the closure is scheduled on the ExecCtx scheduler
// and that GRPC_CLOSURE_RUN() will run the closure immediately.
GRPC_CLOSURE_RUN(self->watcher_timer_init_, GRPC_ERROR_NONE);
Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE);
// Add new watcher.
self->chand_->state_tracker_.AddWatcher(
self->initial_state_,
@ -2272,8 +2270,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata,
&calld->lb_call_state_);
// Chain to original callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
}
void CallData::MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
@ -2768,7 +2766,8 @@ void CallData::InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error) {
calld->MaybeClearPendingBatch(batch_data->elem, pending);
batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
void CallData::RecvInitialMetadataReady(void* arg, grpc_error* error) {
@ -2858,7 +2857,7 @@ void CallData::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
calld->MaybeClearPendingBatch(batch_data->elem, pending);
batch_data->Unref();
// Invoke callback.
GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
}
void CallData::RecvMessageReady(void* arg, grpc_error* error) {

@ -59,7 +59,8 @@ static void on_complete_for_send(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
calld->send_initial_metadata_succeeded = true;
}
GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_on_complete_for_send,
GRPC_ERROR_REF(error));
}
static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
@ -67,8 +68,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
calld->recv_initial_metadata_succeeded = true;
}
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
static grpc_error* clr_init_call_elem(grpc_call_element* elem,

@ -293,8 +293,8 @@ void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
} else {
channelz_subchannel->RecordCallFailed();
}
GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
}
void SubchannelCall::IncrementRefCount() {

@ -133,8 +133,9 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
cancel_timer_if_needed(deadline_state);
// Invoke the original callback.
GRPC_CLOSURE_RUN(deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
deadline_state->original_recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
// Inject our own recv_trailing_metadata_ready callback into op.
@ -290,8 +291,9 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
start_timer_if_needed(elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
// Method for starting a call op for server filter.

@ -188,7 +188,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
calld->call_combiner, &calld->recv_trailing_metadata_ready,
calld->recv_trailing_metadata_error, "continue recv_trailing_metadata");
}
GRPC_CLOSURE_RUN(closure, error);
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
@ -209,15 +209,17 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
}
error = grpc_error_add_child(
error, GRPC_ERROR_REF(calld->recv_initial_metadata_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
static void send_message_on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->send_message_cache.Destroy();
GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
// Pulls a slice from the send_message byte stream, updating

@ -227,8 +227,9 @@ static void send_message_on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_slice_buffer_reset_and_unref_internal(&calld->slices);
GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
static void send_message_batch_continue(grpc_call_element* elem) {

@ -363,7 +363,8 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
"resuming hs_recv_trailing_metadata_ready from "
"hs_recv_initial_metadata_ready");
}
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_initial_metadata_ready, err);
}
static void hs_recv_message_ready(void* user_data, grpc_error* err) {
@ -378,7 +379,8 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
calld->recv_message->reset(calld->read_stream.get());
calld->have_read_stream = false;
}
GRPC_CLOSURE_RUN(calld->original_recv_message_ready, GRPC_ERROR_REF(err));
grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_recv_message_ready,
GRPC_ERROR_REF(err));
} else {
// We have not yet seen the recv_initial_metadata callback, so we
// need to wait to see if this is a GET request.
@ -404,7 +406,8 @@ static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
err = grpc_error_add_child(
GRPC_ERROR_REF(err),
GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, err);
}
static grpc_error* hs_mutate_op(grpc_call_element* elem,

@ -267,8 +267,9 @@ void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
{::grpc::load_reporter::TagKeyUserId(),
{chand->peer_identity(), chand->peer_identity_len()}}});
}
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(err));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_initial_metadata_ready_,
GRPC_ERROR_REF(err));
}
grpc_error* ServerLoadReportingCallData::Init(

@ -216,7 +216,7 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
}
GRPC_CLOSURE_RUN(closure, error);
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
// Callback invoked on completion of recv_trailing_metadata
@ -235,7 +235,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
error =
grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error));
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
// Start transport stream op.

@ -70,8 +70,9 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
}
// Invoke the next callback.
GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->next_recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
// Start transport stream op.

@ -1836,7 +1836,7 @@ static void perform_transport_op_locked(void* stream_op,
close_transport_locked(t, op->disconnect_with_error);
}
GRPC_CLOSURE_RUN(op->on_consumed, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
GRPC_CHTTP2_UNREF_TRANSPORT(t, "transport_op");
}

@ -80,7 +80,8 @@ void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
} else {
lock.reset();
}
GRPC_CLOSURE_RUN(self->original_closure_, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
GRPC_ERROR_REF(error));
if (lock != nullptr) {
TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
bool prev = true;

@ -27,6 +27,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/error.h"
@ -247,35 +248,33 @@ inline bool grpc_closure_list_empty(grpc_closure_list closure_list) {
return closure_list.head == nullptr;
}
namespace grpc_core {
class Closure {
public:
static void Run(const DebugLocation& location, grpc_closure* closure,
grpc_error* error) {
if (closure == nullptr) {
GRPC_ERROR_UNREF(error);
return;
}
#ifndef NDEBUG
inline void grpc_closure_run(const char* file, int line, grpc_closure* c,
grpc_error* error) {
#else
inline void grpc_closure_run(grpc_closure* c, grpc_error* error) {
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: run [%s:%d]",
closure, closure->file_created, closure->line_created,
location.file(), location.line());
}
GPR_ASSERT(closure->cb != nullptr);
#endif
GPR_TIMER_SCOPE("grpc_closure_run", 0);
if (c != nullptr) {
closure->cb(closure->cb_arg, error);
#ifndef NDEBUG
c->file_initiated = file;
c->line_initiated = line;
c->run = true;
GPR_ASSERT(c->cb != nullptr);
if (grpc_trace_closure.enabled()) {
gpr_log(GPR_DEBUG, "closure %p finished", closure);
}
#endif
c->scheduler->vtable->run(c, error);
} else {
GRPC_ERROR_UNREF(error);
}
}
/** Run a closure directly. Caller ensures that no locks are being held above.
* Note that calling this at the end of a closure callback function itself is
* by definition safe. */
#ifndef NDEBUG
#define GRPC_CLOSURE_RUN(closure, error) \
grpc_closure_run(__FILE__, __LINE__, closure, error)
#else
#define GRPC_CLOSURE_RUN(closure, error) grpc_closure_run(closure, error)
#endif
};
} // namespace grpc_core
#ifndef NDEBUG
inline void grpc_closure_list_sched(const char* file, int line,

@ -601,7 +601,8 @@ static void ru_allocated_slices(void* arg, grpc_error* error) {
grpc_resource_user_slice_allocator* slice_allocator =
static_cast<grpc_resource_user_slice_allocator*>(arg);
if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator);
GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done,
GRPC_ERROR_REF(error));
}
/*******************************************************************************

@ -417,7 +417,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
GRPC_CLOSURE_RUN(cb, error);
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
#define MAX_READ_IOVEC 4
@ -645,7 +645,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* right thing (i.e calls tcp_do_read() which either reads the available
* bytes or calls notify_on_read() to be notified when new bytes become
* available */
GRPC_CLOSURE_RUN(&tcp->read_done_closure, GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, &tcp->read_done_closure,
GRPC_ERROR_NONE);
}
}
@ -1026,7 +1027,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
GRPC_CLOSURE_RUN(cb, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, cb, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "write");
return;
}
@ -1046,7 +1047,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
gpr_log(GPR_INFO, "write: %s", str);
}
// No need to take a ref on error since tcp_flush provides a ref.
GRPC_CLOSURE_RUN(cb, error);
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
TCP_UNREF(tcp, "write");
}
}
@ -1075,11 +1076,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
tcp->outgoing_buffer_arg = arg;
if (buf->length == 0) {
GRPC_CLOSURE_RUN(cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
grpc_core::Closure::Run(
DEBUG_LOCATION, cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
tcp)
: GRPC_ERROR_NONE);
tcp_shutdown_buffer_list(tcp);
return;
}
@ -1101,7 +1103,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
GRPC_CLOSURE_RUN(cb, error);
grpc_core::Closure::Run(DEBUG_LOCATION, cb, error);
}
}

@ -238,7 +238,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
calld->recv_trailing_metadata_error,
"continue recv_trailing_metadata_ready");
}
GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
}
static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
@ -254,7 +254,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
}
err = grpc_error_add_child(
GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, err);
}
static void server_auth_start_transport_stream_op_batch(

@ -1224,8 +1224,9 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs error */
bctl->call = nullptr;
GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
grpc_core::Closure::Run(DEBUG_LOCATION,
(grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs error */
@ -1511,7 +1512,8 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
}
}
if (saved_rsr_closure != nullptr) {
GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
GRPC_ERROR_REF(error));
}
finish_batch_step(bctl);
@ -1570,7 +1572,8 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
GRPC_CLOSURE_RUN((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;

@ -784,7 +784,7 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
calld->recv_trailing_metadata_error,
"continue server_recv_trailing_metadata_ready");
}
GRPC_CLOSURE_RUN(closure, error);
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
}
static void server_recv_trailing_metadata_ready(void* user_data,
@ -805,7 +805,8 @@ static void server_recv_trailing_metadata_ready(void* user_data,
error =
grpc_error_add_child(GRPC_ERROR_REF(error),
GRPC_ERROR_REF(calld->recv_initial_metadata_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_trailing_metadata_ready, error);
}
static void server_mutate_op(grpc_call_element* elem,

@ -244,7 +244,7 @@ static void destroy_made_transport_stream_op(void* arg, grpc_error* error) {
made_transport_stream_op* op = static_cast<made_transport_stream_op*>(arg);
grpc_closure* c = op->inner_on_complete;
gpr_free(op);
GRPC_CLOSURE_RUN(c, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, c, GRPC_ERROR_REF(error));
}
grpc_transport_stream_op_batch* grpc_make_transport_stream_op(

@ -58,8 +58,9 @@ void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
FilterTrailingMetadata(calld->recv_trailing_metadata_,
&calld->elapsed_time_);
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->initial_on_done_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
}
void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
@ -75,7 +76,8 @@ void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
if ((*calld->recv_message_) != nullptr) {
calld->recv_message_count_++;
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
GRPC_ERROR_REF(error));
}
void CensusClientCallData::StartTransportStreamOpBatch(

@ -74,7 +74,8 @@ void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
if ((*calld->recv_message_) != nullptr) {
++calld->recv_message_count_;
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION, calld->initial_on_done_recv_message_,
GRPC_ERROR_REF(error));
}
void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
@ -121,8 +122,9 @@ void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
grpc_census_call_set_context(
calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
GRPC_ERROR_REF(error));
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->initial_on_done_recv_initial_metadata_,
GRPC_ERROR_REF(error));
}
void CensusServerCallData::StartTransportStreamOpBatch(

@ -198,8 +198,8 @@ typedef struct {
static void recv_im_ready(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_CLOSURE_RUN(
calld->recv_im_ready,
grpc_core::Closure::Run(
DEBUG_LOCATION, calld->recv_im_ready,
grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failure that's not preventable.", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS,

@ -62,7 +62,7 @@ static void reclaimer_cb(void* args, grpc_error* error) {
reclaimer_args* a = static_cast<reclaimer_args*>(args);
grpc_resource_user_free(a->resource_user, a->size);
grpc_resource_user_finish_reclamation(a->resource_user);
GRPC_CLOSURE_RUN(a->then, GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, a->then, GRPC_ERROR_NONE);
gpr_free(a);
}
@ -77,7 +77,8 @@ grpc_closure* make_reclaimer(grpc_resource_user* resource_user, size_t size,
static void unused_reclaimer_cb(void* arg, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_CANCELLED);
GRPC_CLOSURE_RUN(static_cast<grpc_closure*>(arg), GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(arg),
GRPC_ERROR_NONE);
}
grpc_closure* make_unused_reclaimer(grpc_closure* then) {
return GRPC_CLOSURE_CREATE(unused_reclaimer_cb, then,

@ -278,7 +278,7 @@ static void BM_StreamCreateDestroy(benchmark::State& state) {
s->Op(&op);
s->DestroyThen(next.get());
});
GRPC_CLOSURE_RUN(next.get(), GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, next.get(), GRPC_ERROR_NONE);
f.FlushExecCtx();
track_counters.Finish(state);
}
@ -607,7 +607,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
GPR_ASSERT(!state.KeepRunning());
return;
}
GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE);
});
drain = MakeClosure([&](grpc_error* /*error*/) {
@ -628,7 +628,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
recv_stream->Pull(&recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(recv_slice);
GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE);
grpc_core::Closure::Run(DEBUG_LOCATION, drain.get(), GRPC_ERROR_NONE);
});
reset_op();

@ -78,25 +78,25 @@ static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
}
BENCHMARK(BM_ClosureInitAgainstCombiner);
static void BM_ClosureRunOnExecCtx(benchmark::State& state) {
static void BM_ClosureRun(benchmark::State& state) {
TrackCounters track_counters;
grpc_closure c;
GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx);
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
GRPC_CLOSURE_RUN(&c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush();
grpc_core::Closure::Run(DEBUG_LOCATION, &c, GRPC_ERROR_NONE);
}
track_counters.Finish(state);
}
BENCHMARK(BM_ClosureRunOnExecCtx);
BENCHMARK(BM_ClosureRun);
static void BM_ClosureCreateAndRun(benchmark::State& state) {
TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx;
for (auto _ : state) {
GRPC_CLOSURE_RUN(
grpc_core::Closure::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(DoNothing, nullptr, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
@ -110,7 +110,8 @@ static void BM_ClosureInitAndRun(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
grpc_closure c;
for (auto _ : state) {
GRPC_CLOSURE_RUN(
grpc_core::Closure::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}

Loading…
Cancel
Save