Convert call_combiner to C++.

pull/18596/head
Mark D. Roth 6 years ago
parent 031f61e135
commit ce3ff86763
  1. 4
      src/core/ext/filters/client_channel/client_channel.cc
  2. 18
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 2
      src/core/ext/filters/client_channel/health/health_check_client.h
  4. 2
      src/core/ext/filters/client_channel/subchannel.h
  5. 5
      src/core/ext/filters/deadline/deadline_filter.cc
  6. 5
      src/core/ext/filters/deadline/deadline_filter.h
  7. 2
      src/core/ext/filters/http/client/http_client_filter.cc
  8. 2
      src/core/ext/filters/http/client_authority_filter.cc
  9. 2
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  10. 2
      src/core/ext/filters/http/server/http_server_filter.cc
  11. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  12. 2
      src/core/lib/channel/channel_stack.h
  13. 4
      src/core/lib/channel/connected_channel.cc
  14. 146
      src/core/lib/iomgr/call_combiner.cc
  15. 155
      src/core/lib/iomgr/call_combiner.h
  16. 18
      src/core/lib/security/transport/client_auth_filter.cc
  17. 5
      src/core/lib/security/transport/server_auth_filter.cc
  18. 8
      src/core/lib/surface/call.cc
  19. 2
      src/core/lib/surface/lame_client.cc
  20. 2
      src/core/lib/surface/server.cc
  21. 2
      src/core/lib/transport/transport.cc
  22. 2
      src/core/lib/transport/transport.h
  23. 2
      test/cpp/microbenchmarks/bm_call_create.cc

@ -617,7 +617,7 @@ class CallData {
grpc_millis deadline_;
gpr_arena* arena_;
grpc_call_stack* owning_call_;
grpc_call_combiner* call_combiner_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
@ -3017,7 +3017,7 @@ class CallData::QueuedPickCanceller {
GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller");
GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
grpc_combiner_scheduler(chand->data_plane_combiner()));
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner_, &closure_);
calld->call_combiner_->SetNotifyOnCancel(&closure_);
}
private:

@ -37,11 +37,10 @@
#define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
#define HEALTH_CHECK_RECONNECT_JITTER 0.2
grpc_core::TraceFlag grpc_health_check_client_trace(false,
"health_check_client");
namespace grpc_core {
TraceFlag grpc_health_check_client_trace(false, "health_check_client");
//
// HealthCheckClient
//
@ -50,7 +49,7 @@ HealthCheckClient::HealthCheckClient(
const char* service_name,
RefCountedPtr<ConnectedSubchannel> connected_subchannel,
grpc_pollset_set* interested_parties,
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode> channelz_node)
RefCountedPtr<channelz::SubchannelNode> channelz_node)
: InternallyRefCounted<HealthCheckClient>(&grpc_health_check_client_trace),
service_name_(service_name),
connected_subchannel_(std::move(connected_subchannel)),
@ -283,9 +282,7 @@ HealthCheckClient::CallState::CallState(
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(gpr_arena_create(health_check_client_->connected_subchannel_
->GetInitialCallSizeEstimate(0))),
payload_(context_) {
grpc_call_combiner_init(&call_combiner_);
}
payload_(context_) {}
HealthCheckClient::CallState::~CallState() {
if (grpc_health_check_client_trace.enabled()) {
@ -303,14 +300,13 @@ HealthCheckClient::CallState::~CallState() {
// holding to the call stack. Also flush the closures on exec_ctx so that
// filters that schedule cancel notification closures on exec_ctx do not
// need to take a ref of the call stack to guarantee closure liveness.
grpc_call_combiner_set_notify_on_cancel(&call_combiner_, nullptr);
grpc_core::ExecCtx::Get()->Flush();
grpc_call_combiner_destroy(&call_combiner_);
call_combiner_.SetNotifyOnCancel(nullptr);
ExecCtx::Get()->Flush();
gpr_arena_destroy(arena_);
}
void HealthCheckClient::CallState::Orphan() {
grpc_call_combiner_cancel(&call_combiner_, GRPC_ERROR_CANCELLED);
call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
Cancel();
}

@ -98,7 +98,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_polling_entity pollent_;
gpr_arena* arena_;
grpc_call_combiner call_combiner_;
grpc_core::CallCombiner call_combiner_;
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
// The streaming call to the backend. Always non-NULL.

@ -77,7 +77,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_millis deadline;
gpr_arena* arena;
grpc_call_context_element* context;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
size_t parent_data_size;
};

@ -68,8 +68,7 @@ static void timer_callback(void* arg, grpc_error* error) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED);
grpc_call_combiner_cancel(deadline_state->call_combiner,
GRPC_ERROR_REF(error));
deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error));
GRPC_CLOSURE_INIT(&deadline_state->timer_callback,
send_cancel_op_in_call_combiner, elem,
grpc_schedule_on_exec_ctx);
@ -183,7 +182,7 @@ static void start_timer_after_init(void* arg, grpc_error* error) {
grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner,
grpc_core::CallCombiner* call_combiner,
grpc_millis deadline)
: call_stack(call_stack), call_combiner(call_combiner) {
// Deadline will always be infinite on servers, so the timer will only be

@ -32,12 +32,13 @@ enum grpc_deadline_timer_state {
// Must be the first field in the filter's call_data.
struct grpc_deadline_state {
grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack,
grpc_call_combiner* call_combiner, grpc_millis deadline);
grpc_core::CallCombiner* call_combiner,
grpc_millis deadline);
~grpc_deadline_state();
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL;
grpc_timer timer;
grpc_closure timer_callback;

@ -62,7 +62,7 @@ struct call_data {
~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); }
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method;
grpc_linked_mdelem scheme;

@ -40,7 +40,7 @@ namespace {
struct call_data {
grpc_linked_mdelem authority_storage;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
};
struct channel_data {

@ -72,7 +72,7 @@ struct call_data {
GRPC_ERROR_UNREF(cancel_error);
}
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
grpc_linked_mdelem compression_algorithm_storage;
grpc_linked_mdelem stream_compression_algorithm_storage;
grpc_linked_mdelem accept_encoding_storage;

@ -61,7 +61,7 @@ struct call_data {
}
}
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
// Outgoing headers to add to send_initial_metadata.
grpc_linked_mdelem status;

@ -137,7 +137,7 @@ struct call_data {
~call_data() { GRPC_ERROR_UNREF(error); }
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
message_size_limits limits;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to

@ -70,7 +70,7 @@ typedef struct {
gpr_timespec start_time;
grpc_millis deadline;
gpr_arena* arena;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
} grpc_call_element_args;
typedef struct {

@ -41,12 +41,12 @@ typedef struct connected_channel_channel_data {
typedef struct {
grpc_closure closure;
grpc_closure* original_closure;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
const char* reason;
} callback_state;
typedef struct connected_channel_call_data {
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
// Closures used for returning results on the call combiner.
callback_state on_complete[6]; // Max number of pending batches.
callback_state recv_initial_metadata_ready;

@ -26,23 +26,43 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/profiling/timers.h"
grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
namespace grpc_core {
static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
TraceFlag grpc_call_combiner_trace(false, "call_combiner");
namespace {
grpc_error* DecodeCancelStateError(gpr_atm cancel_state) {
if (cancel_state & 1) {
return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
}
return GRPC_ERROR_NONE;
}
static gpr_atm encode_cancel_state_error(grpc_error* error) {
gpr_atm EncodeCancelStateError(grpc_error* error) {
return static_cast<gpr_atm>(1) | (gpr_atm)error;
}
} // namespace
CallCombiner::CallCombiner() {
gpr_atm_no_barrier_store(&cancel_state_, 0);
gpr_atm_no_barrier_store(&size_, 0);
gpr_mpscq_init(&queue_);
#ifdef GRPC_TSAN_ENABLED
GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
grpc_schedule_on_exec_ctx);
#endif
}
CallCombiner::~CallCombiner() {
gpr_mpscq_destroy(&queue_);
GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
}
#ifdef GRPC_TSAN_ENABLED
static void tsan_closure(void* user_data, grpc_error* error) {
grpc_call_combiner* call_combiner =
static_cast<grpc_call_combiner*>(user_data);
void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
CallCombiner* self = static_cast<CallCombiner*>(arg);
// We ref-count the lock, and check if it's already taken.
// If it was taken, we should do nothing. Otherwise, we will mark it as
// locked. Note that if two different threads try to do this, only one of
@ -51,18 +71,18 @@ static void tsan_closure(void* user_data, grpc_error* error) {
// TSAN will correctly produce an error.
//
// TODO(soheil): This only covers the callbacks scheduled by
// grpc_call_combiner_(start|finish). If in the future, a
// callback gets scheduled using other mechanisms, we will need
// to add APIs to externally lock call combiners.
grpc_core::RefCountedPtr<grpc_call_combiner::TsanLock> lock =
call_combiner->tsan_lock;
// CallCombiner::Start() and CallCombiner::Stop().
// If in the future, a callback gets scheduled using other
// mechanisms, we will need to add APIs to externally lock
// call combiners.
RefCountedPtr<TsanLock> lock = self->tsan_lock_;
bool prev = false;
if (lock->taken.compare_exchange_strong(prev, true)) {
TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
} else {
lock.reset();
}
GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error));
GRPC_CLOSURE_RUN(self->original_closure_, GRPC_ERROR_REF(error));
if (lock != nullptr) {
TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
bool prev = true;
@ -71,34 +91,17 @@ static void tsan_closure(void* user_data, grpc_error* error) {
}
#endif
static void call_combiner_sched_closure(grpc_call_combiner* call_combiner,
grpc_closure* closure,
grpc_error* error) {
void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
#ifdef GRPC_TSAN_ENABLED
call_combiner->original_closure = closure;
GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error);
original_closure_ = closure;
GRPC_CLOSURE_SCHED(&tsan_closure_, error);
#else
GRPC_CLOSURE_SCHED(closure, error);
#endif
}
void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0);
gpr_atm_no_barrier_store(&call_combiner->size, 0);
gpr_mpscq_init(&call_combiner->queue);
#ifdef GRPC_TSAN_ENABLED
GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner,
grpc_schedule_on_exec_ctx);
#endif
}
void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
gpr_mpscq_destroy(&call_combiner->queue);
GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
}
#ifndef NDEBUG
#define DEBUG_ARGS , const char *file, int line
#define DEBUG_ARGS const char *file, int line,
#define DEBUG_FMT_STR "%s:%d: "
#define DEBUG_FMT_ARGS , file, line
#else
@ -107,20 +110,17 @@ void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
#define DEBUG_FMT_ARGS
#endif
void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
grpc_closure* closure,
grpc_error* error DEBUG_ARGS,
const char* reason) {
GPR_TIMER_SCOPE("call_combiner_start", 0);
void CallCombiner::Start(grpc_closure* closure, grpc_error* error,
DEBUG_ARGS const char* reason) {
GPR_TIMER_SCOPE("CallCombiner::Start", 0);
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
"==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
"%s] error=%s",
call_combiner, closure DEBUG_FMT_ARGS, reason,
grpc_error_string(error));
this, closure DEBUG_FMT_ARGS, reason, grpc_error_string(error));
}
size_t prev_size = static_cast<size_t>(
gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
size_t prev_size =
static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size + 1);
@ -128,34 +128,30 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
if (prev_size == 0) {
GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
GPR_TIMER_MARK("call_combiner_initiate", 0);
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
}
// Queue was empty, so execute this closure immediately.
call_combiner_sched_closure(call_combiner, closure, error);
ScheduleClosure(closure, error);
} else {
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " QUEUING");
}
// Queue was not empty, so add closure to queue.
closure->error_data.error = error;
gpr_mpscq_push(&call_combiner->queue,
reinterpret_cast<gpr_mpscq_node*>(closure));
gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(closure));
}
}
void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
const char* reason) {
GPR_TIMER_SCOPE("call_combiner_stop", 0);
void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
call_combiner DEBUG_FMT_ARGS, reason);
gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
this DEBUG_FMT_ARGS, reason);
}
size_t prev_size = static_cast<size_t>(
gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
size_t prev_size =
static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
prev_size - 1);
@ -168,10 +164,10 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
}
bool empty;
grpc_closure* closure = reinterpret_cast<grpc_closure*>(
gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
gpr_mpscq_pop_and_check_end(&queue_, &empty));
if (closure == nullptr) {
// This can happen either due to a race condition within the mpscq
// code or because of a race with grpc_call_combiner_start().
// code or because of a race with Start().
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, " queue returned no result; checking again");
}
@ -181,8 +177,7 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
closure, grpc_error_string(closure->error_data.error));
}
call_combiner_sched_closure(call_combiner, closure,
closure->error_data.error);
ScheduleClosure(closure, closure->error_data.error);
break;
}
} else if (grpc_call_combiner_trace.enabled()) {
@ -190,13 +185,12 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
}
}
void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
grpc_closure* closure) {
void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
while (true) {
// Decode original state.
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);
gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
grpc_error* original_error = DecodeCancelStateError(original_state);
// If error is set, invoke the cancellation closure immediately.
// Otherwise, store the new closure.
if (original_error != GRPC_ERROR_NONE) {
@ -204,16 +198,15 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
gpr_log(GPR_INFO,
"call_combiner=%p: scheduling notify_on_cancel callback=%p "
"for pre-existing cancellation",
call_combiner, closure);
this, closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
break;
} else {
if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
(gpr_atm)closure)) {
if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) {
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
call_combiner, closure);
this, closure);
}
// If we replaced an earlier closure, invoke the original
// closure with GRPC_ERROR_NONE. This allows callers to clean
@ -222,8 +215,8 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
closure = (grpc_closure*)original_state;
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"call_combiner=%p: scheduling old cancel callback=%p",
call_combiner, closure);
"call_combiner=%p: scheduling old cancel callback=%p", this,
closure);
}
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
}
@ -234,24 +227,23 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
}
}
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error) {
void CallCombiner::Cancel(grpc_error* error) {
GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
grpc_error* original_error = decode_cancel_state_error(original_state);
gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
grpc_error* original_error = DecodeCancelStateError(original_state);
if (original_error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
break;
}
if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
encode_cancel_state_error(error))) {
if (gpr_atm_full_cas(&cancel_state_, original_state,
EncodeCancelStateError(error))) {
if (original_state != 0) {
grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
if (grpc_call_combiner_trace.enabled()) {
gpr_log(GPR_INFO,
"call_combiner=%p: scheduling notify_on_cancel callback=%p",
call_combiner, notify_on_cancel);
this, notify_on_cancel);
}
GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
}
@ -260,3 +252,5 @@ void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
// cas failed, try again.
}
}
} // namespace grpc_core

@ -41,15 +41,78 @@
// when it is done with the action that was kicked off by the original
// callback.
extern grpc_core::TraceFlag grpc_call_combiner_trace;
namespace grpc_core {
extern TraceFlag grpc_call_combiner_trace;
class CallCombiner {
public:
CallCombiner();
~CallCombiner();
#ifndef NDEBUG
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
(call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason))
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
(call_combiner)->Stop(__FILE__, __LINE__, (reason))
/// Starts processing \a closure.
void Start(grpc_closure* closure, grpc_error* error, const char* file,
int line, const char* reason);
/// Yields the call combiner to the next closure in the queue, if any.
void Stop(const char* file, int line, const char* reason);
#else
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
(call_combiner)->Start((closure), (error), (reason))
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
(call_combiner)->Stop((reason))
/// Starts processing \a closure.
void Start(grpc_closure* closure, grpc_error* error, const char* reason);
/// Yields the call combiner to the next closure in the queue, if any.
void Stop(const char* reason);
#endif
/// Registers \a closure to be invoked when Cancel() is called.
///
/// Once a closure is registered, it will always be scheduled exactly
/// once; this allows the closure to hold references that will be freed
/// regardless of whether or not the call was cancelled. If a cancellation
/// does occur, the closure will be scheduled with the cancellation error;
/// otherwise, it will be scheduled with GRPC_ERROR_NONE.
///
/// The closure will be scheduled in the following cases:
/// - If Cancel() was called prior to registering the closure, it will be
/// scheduled immediately with the cancelation error.
/// - If Cancel() is called after registering the closure, the closure will
/// be scheduled with the cancellation error.
/// - If SetNotifyOnCancel() is called again to register a new cancellation
/// closure, the previous cancellation closure will be scheduled with
/// GRPC_ERROR_NONE.
///
/// If \a closure is NULL, then no closure will be invoked on
/// cancellation; this effectively unregisters the previously set closure.
/// However, most filters will not need to explicitly unregister their
/// callbacks, as this is done automatically when the call is destroyed.
/// Filters that schedule the cancellation closure on ExecCtx do not need
/// to take a ref on the call stack to guarantee closure liveness. This is
/// done by explicitly flushing ExecCtx after the unregistration during
/// call destruction.
void SetNotifyOnCancel(grpc_closure* closure);
/// Indicates that the call has been cancelled.
void Cancel(grpc_error* error);
private:
void ScheduleClosure(grpc_closure* closure, grpc_error* error);
#ifdef GRPC_TSAN_ENABLED
static void TsanClosure(void* arg, grpc_error* error);
#endif
struct grpc_call_combiner {
gpr_atm size = 0; // size_t, num closures in queue or currently executing
gpr_mpscq queue;
gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
gpr_mpscq queue_;
// Either 0 (if not cancelled and no cancellation closure set),
// a grpc_closure* (if the lowest bit is 0),
// or a grpc_error* (if the lowest bit is 1).
gpr_atm cancel_state = 0;
gpr_atm cancel_state_ = 0;
#ifdef GRPC_TSAN_ENABLED
// A fake ref-counted lock that is kept alive after the destruction of
// grpc_call_combiner, when we are running the original closure.
@ -58,90 +121,20 @@ struct grpc_call_combiner {
// callback is called. However, original_closure is free to trigger
// anything on the call combiner (including destruction of grpc_call).
// Thus, we need a ref-counted structure that can outlive the call combiner.
struct TsanLock
: public grpc_core::RefCounted<TsanLock,
grpc_core::NonPolymorphicRefCount> {
struct TsanLock : public RefCounted<TsanLock, NonPolymorphicRefCount> {
TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); }
~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); }
// To avoid double-locking by the same thread, we should acquire/release
// the lock only when taken is false. On each acquire taken must be set to
// true.
std::atomic<bool> taken{false};
};
grpc_core::RefCountedPtr<TsanLock> tsan_lock =
grpc_core::MakeRefCounted<TsanLock>();
grpc_closure tsan_closure;
grpc_closure* original_closure;
RefCountedPtr<TsanLock> tsan_lock_ = MakeRefCounted<TsanLock>();
grpc_closure tsan_closure_;
grpc_closure* original_closure_;
#endif
};
// Assumes memory was initialized to zero.
void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
#ifndef NDEBUG
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
grpc_call_combiner_start((call_combiner), (closure), (error), __FILE__, \
__LINE__, (reason))
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
grpc_call_combiner_stop((call_combiner), __FILE__, __LINE__, (reason))
/// Starts processing \a closure on \a call_combiner.
void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
grpc_closure* closure, grpc_error* error,
const char* file, int line, const char* reason);
/// Yields the call combiner to the next closure in the queue, if any.
void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
const char* file, int line, const char* reason);
#else
#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \
grpc_call_combiner_start((call_combiner), (closure), (error), (reason))
#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \
grpc_call_combiner_stop((call_combiner), (reason))
/// Starts processing \a closure on \a call_combiner.
void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
grpc_closure* closure, grpc_error* error,
const char* reason);
/// Yields the call combiner to the next closure in the queue, if any.
void grpc_call_combiner_stop(grpc_call_combiner* call_combiner,
const char* reason);
#endif
/// Registers \a closure to be invoked by \a call_combiner when
/// grpc_call_combiner_cancel() is called.
///
/// Once a closure is registered, it will always be scheduled exactly
/// once; this allows the closure to hold references that will be freed
/// regardless of whether or not the call was cancelled. If a cancellation
/// does occur, the closure will be scheduled with the cancellation error;
/// otherwise, it will be scheduled with GRPC_ERROR_NONE.
///
/// The closure will be scheduled in the following cases:
/// - If grpc_call_combiner_cancel() was called prior to registering the
/// closure, it will be scheduled immediately with the cancelation error.
/// - If grpc_call_combiner_cancel() is called after registering the
/// closure, the closure will be scheduled with the cancellation error.
/// - If grpc_call_combiner_set_notify_on_cancel() is called again to
/// register a new cancellation closure, the previous cancellation
/// closure will be scheduled with GRPC_ERROR_NONE.
///
/// If \a closure is NULL, then no closure will be invoked on
/// cancellation; this effectively unregisters the previously set closure.
/// However, most filters will not need to explicitly unregister their
/// callbacks, as this is done automatically when the call is destroyed. Filters
/// that schedule the cancellation closure on ExecCtx do not need to take a ref
/// on the call stack to guarantee closure liveness. This is done by explicitly
/// flushing ExecCtx after the unregistration during call destruction.
void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
grpc_closure* closure);
/// Indicates that the call has been cancelled.
void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
grpc_error* error);
namespace grpc_core {
// Helper for running a list of closures in a call combiner.
//
// Each callback running in the call combiner will eventually be
@ -166,7 +159,7 @@ class CallCombinerClosureList {
// scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in
// yielding the call combiner. If the list is empty, then the call
// combiner will be yielded immediately.
void RunClosures(grpc_call_combiner* call_combiner) {
void RunClosures(CallCombiner* call_combiner) {
if (closures_.empty()) {
GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule");
return;
@ -190,7 +183,7 @@ class CallCombinerClosureList {
// Runs all closures in the call combiner, but does NOT yield the call
// combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START().
void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) {
void RunClosuresWithoutYielding(CallCombiner* call_combiner) {
for (size_t i = 0; i < closures_.size(); ++i) {
auto& closure = closures_[i];
GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error,

@ -92,7 +92,7 @@ struct call_data {
}
grpc_call_stack* owning_call;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
grpc_core::RefCountedPtr<grpc_call_credentials> creds;
grpc_slice host = grpc_empty_slice();
grpc_slice method = grpc_empty_slice();
@ -270,11 +270,9 @@ static void send_security_metadata(grpc_call_element* elem,
GRPC_ERROR_UNREF(error);
} else {
// Async return; register cancellation closure with call combiner.
grpc_call_combiner_set_notify_on_cancel(
calld->call_combiner,
GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure,
cancel_get_request_metadata, elem,
grpc_schedule_on_exec_ctx));
calld->call_combiner->SetNotifyOnCancel(GRPC_CLOSURE_INIT(
&calld->get_request_metadata_cancel_closure,
cancel_get_request_metadata, elem, grpc_schedule_on_exec_ctx));
}
}
@ -345,11 +343,9 @@ static void auth_start_transport_stream_op_batch(
GRPC_ERROR_UNREF(error);
} else {
// Async return; register cancellation closure with call combiner.
grpc_call_combiner_set_notify_on_cancel(
calld->call_combiner,
GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure,
cancel_check_call_host, elem,
grpc_schedule_on_exec_ctx));
calld->call_combiner->SetNotifyOnCancel(GRPC_CLOSURE_INIT(
&calld->check_call_host_cancel_closure, cancel_check_call_host,
elem, grpc_schedule_on_exec_ctx));
}
gpr_free(call_host);
return; /* early exit */

@ -74,7 +74,7 @@ struct call_data {
~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); }
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
grpc_call_stack* owning_call;
grpc_transport_stream_op_batch* recv_initial_metadata_batch;
grpc_closure* original_recv_initial_metadata_ready;
@ -219,8 +219,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
// to drop the call combiner early if we get cancelled.
GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
grpc_schedule_on_exec_ctx);
grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
&calld->cancel_closure);
calld->call_combiner->SetNotifyOnCancel(&calld->cancel_closure);
GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);

@ -131,7 +131,6 @@ struct grpc_call {
is_client(args.server_transport_data == nullptr),
stream_op_payload(context) {
gpr_ref_init(&ext_ref, 1);
grpc_call_combiner_init(&call_combiner);
for (int i = 0; i < 2; i++) {
for (int j = 0; j < 2; j++) {
metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
@ -141,12 +140,11 @@ struct grpc_call {
~grpc_call() {
gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
grpc_call_combiner_destroy(&call_combiner);
}
gpr_refcount ext_ref;
gpr_arena* arena;
grpc_call_combiner call_combiner;
grpc_core::CallCombiner call_combiner;
grpc_completion_queue* cq;
grpc_polling_entity pollent;
grpc_channel* channel;
@ -589,7 +587,7 @@ void grpc_call_unref(grpc_call* c) {
// holding to the call stack. Also flush the closures on exec_ctx so that
// filters that schedule cancel notification closures on exec_ctx do not
// need to take a ref of the call stack to guarantee closure liveness.
grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
c->call_combiner.SetNotifyOnCancel(nullptr);
grpc_core::ExecCtx::Get()->Flush();
}
GRPC_CALL_INTERNAL_UNREF(c, "destroy");
@ -685,7 +683,7 @@ static void cancel_with_error(grpc_call* c, grpc_error* error) {
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
c->call_combiner.Cancel(GRPC_ERROR_REF(error));
cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,

@ -39,7 +39,7 @@ namespace grpc_core {
namespace {
struct CallData {
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::Atomic<bool> filled_metadata;

@ -190,7 +190,7 @@ struct call_data {
grpc_closure publish;
call_data* pending_next = nullptr;
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
};
struct request_matcher {

@ -174,7 +174,7 @@ grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) {
// it's grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_transport_stream_op_batch* batch, grpc_error* error,
grpc_call_combiner* call_combiner) {
grpc_core::CallCombiner* call_combiner) {
if (batch->send_message) {
batch->payload->send_message.send_message.reset();
}

@ -379,7 +379,7 @@ void grpc_transport_destroy_stream(grpc_transport* transport,
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_transport_stream_op_batch* op, grpc_error* error,
grpc_call_combiner* call_combiner);
grpc_core::CallCombiner* call_combiner);
char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op);
char* grpc_transport_op_string(grpc_transport_op* op);

@ -609,7 +609,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata);
namespace isolated_call_filter {
typedef struct {
grpc_call_combiner* call_combiner;
grpc_core::CallCombiner* call_combiner;
} call_data;
static void StartTransportStreamOp(grpc_call_element* elem,

Loading…
Cancel
Save