From ce3ff86763abd26e377d37798ba6e13581d413a3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 23 Apr 2019 15:27:06 -0700 Subject: [PATCH] Convert call_combiner to C++. --- .../filters/client_channel/client_channel.cc | 4 +- .../health/health_check_client.cc | 18 +- .../health/health_check_client.h | 2 +- .../ext/filters/client_channel/subchannel.h | 2 +- .../ext/filters/deadline/deadline_filter.cc | 5 +- .../ext/filters/deadline/deadline_filter.h | 5 +- .../filters/http/client/http_client_filter.cc | 2 +- .../filters/http/client_authority_filter.cc | 2 +- .../message_compress_filter.cc | 2 +- .../filters/http/server/http_server_filter.cc | 2 +- .../message_size/message_size_filter.cc | 2 +- src/core/lib/channel/channel_stack.h | 2 +- src/core/lib/channel/connected_channel.cc | 4 +- src/core/lib/iomgr/call_combiner.cc | 146 ++++++++--------- src/core/lib/iomgr/call_combiner.h | 155 +++++++++--------- .../security/transport/client_auth_filter.cc | 18 +- .../security/transport/server_auth_filter.cc | 5 +- src/core/lib/surface/call.cc | 8 +- src/core/lib/surface/lame_client.cc | 2 +- src/core/lib/surface/server.cc | 2 +- src/core/lib/transport/transport.cc | 2 +- src/core/lib/transport/transport.h | 2 +- test/cpp/microbenchmarks/bm_call_create.cc | 2 +- 23 files changed, 185 insertions(+), 209 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 412ac1662b8..1e407cd0625 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -617,7 +617,7 @@ class CallData { grpc_millis deadline_; gpr_arena* arena_; grpc_call_stack* owning_call_; - grpc_call_combiner* call_combiner_; + CallCombiner* call_combiner_; grpc_call_context_element* call_context_; RefCountedPtr retry_throttle_data_; @@ -3017,7 +3017,7 @@ class CallData::QueuedPickCanceller { GRPC_CALL_STACK_REF(calld->owning_call_, "QueuedPickCanceller"); GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, grpc_combiner_scheduler(chand->data_plane_combiner())); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner_, &closure_); + calld->call_combiner_->SetNotifyOnCancel(&closure_); } private: diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index a99f1e54062..2dab43b499f 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -37,11 +37,10 @@ #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120 #define HEALTH_CHECK_RECONNECT_JITTER 0.2 -grpc_core::TraceFlag grpc_health_check_client_trace(false, - "health_check_client"); - namespace grpc_core { +TraceFlag grpc_health_check_client_trace(false, "health_check_client"); + // // HealthCheckClient // @@ -50,7 +49,7 @@ HealthCheckClient::HealthCheckClient( const char* service_name, RefCountedPtr connected_subchannel, grpc_pollset_set* interested_parties, - grpc_core::RefCountedPtr channelz_node) + RefCountedPtr channelz_node) : InternallyRefCounted(&grpc_health_check_client_trace), service_name_(service_name), connected_subchannel_(std::move(connected_subchannel)), @@ -283,9 +282,7 @@ HealthCheckClient::CallState::CallState( pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)), arena_(gpr_arena_create(health_check_client_->connected_subchannel_ ->GetInitialCallSizeEstimate(0))), - payload_(context_) { - grpc_call_combiner_init(&call_combiner_); -} + payload_(context_) {} HealthCheckClient::CallState::~CallState() { if (grpc_health_check_client_trace.enabled()) { @@ -303,14 +300,13 @@ HealthCheckClient::CallState::~CallState() { // holding to the call stack. Also flush the closures on exec_ctx so that // filters that schedule cancel notification closures on exec_ctx do not // need to take a ref of the call stack to guarantee closure liveness. - grpc_call_combiner_set_notify_on_cancel(&call_combiner_, nullptr); - grpc_core::ExecCtx::Get()->Flush(); - grpc_call_combiner_destroy(&call_combiner_); + call_combiner_.SetNotifyOnCancel(nullptr); + ExecCtx::Get()->Flush(); gpr_arena_destroy(arena_); } void HealthCheckClient::CallState::Orphan() { - grpc_call_combiner_cancel(&call_combiner_, GRPC_ERROR_CANCELLED); + call_combiner_.Cancel(GRPC_ERROR_CANCELLED); Cancel(); } diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 6e0123e4925..1a9fe47b02b 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -98,7 +98,7 @@ class HealthCheckClient : public InternallyRefCounted { grpc_polling_entity pollent_; gpr_arena* arena_; - grpc_call_combiner call_combiner_; + grpc_core::CallCombiner call_combiner_; grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; // The streaming call to the backend. Always non-NULL. diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 9c2e57d3e05..52175098f61 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -77,7 +77,7 @@ class ConnectedSubchannel : public RefCounted { grpc_millis deadline; gpr_arena* arena; grpc_call_context_element* context; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; size_t parent_data_size; }; diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index b4cb07f0f92..960641dc168 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -68,8 +68,7 @@ static void timer_callback(void* arg, grpc_error* error) { error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); - grpc_call_combiner_cancel(deadline_state->call_combiner, - GRPC_ERROR_REF(error)); + deadline_state->call_combiner->Cancel(GRPC_ERROR_REF(error)); GRPC_CLOSURE_INIT(&deadline_state->timer_callback, send_cancel_op_in_call_combiner, elem, grpc_schedule_on_exec_ctx); @@ -183,7 +182,7 @@ static void start_timer_after_init(void* arg, grpc_error* error) { grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, + grpc_core::CallCombiner* call_combiner, grpc_millis deadline) : call_stack(call_stack), call_combiner(call_combiner) { // Deadline will always be infinite on servers, so the timer will only be diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index e37032999c6..7c4e9aaed0e 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -32,12 +32,13 @@ enum grpc_deadline_timer_state { // Must be the first field in the filter's call_data. struct grpc_deadline_state { grpc_deadline_state(grpc_call_element* elem, grpc_call_stack* call_stack, - grpc_call_combiner* call_combiner, grpc_millis deadline); + grpc_core::CallCombiner* call_combiner, + grpc_millis deadline); ~grpc_deadline_state(); // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_deadline_timer_state timer_state = GRPC_DEADLINE_STATE_INITIAL; grpc_timer timer; grpc_closure timer_callback; diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index bf9a01f659b..3f6d464d5dd 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -62,7 +62,7 @@ struct call_data { ~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); } - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc index 125059c93a9..85b30bc13ca 100644 --- a/src/core/ext/filters/http/client_authority_filter.cc +++ b/src/core/ext/filters/http/client_authority_filter.cc @@ -40,7 +40,7 @@ namespace { struct call_data { grpc_linked_mdelem authority_storage; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; }; struct channel_data { diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index 1527ea440c4..a53d813c3f0 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -72,7 +72,7 @@ struct call_data { GRPC_ERROR_UNREF(cancel_error); } - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_linked_mdelem compression_algorithm_storage; grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index ce1be8370c6..a27a51a90ff 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -61,7 +61,7 @@ struct call_data { } } - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; // Outgoing headers to add to send_initial_metadata. grpc_linked_mdelem status; diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index 4d120c0eb76..ec506eaae52 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -137,7 +137,7 @@ struct call_data { ~call_data() { GRPC_ERROR_UNREF(error); } - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; message_size_limits limits; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 580e1e55100..d5dc418991d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -70,7 +70,7 @@ typedef struct { gpr_timespec start_time; grpc_millis deadline; gpr_arena* arena; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; } grpc_call_element_args; typedef struct { diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc index e2ea334dedf..bd30c3663a2 100644 --- a/src/core/lib/channel/connected_channel.cc +++ b/src/core/lib/channel/connected_channel.cc @@ -41,12 +41,12 @@ typedef struct connected_channel_channel_data { typedef struct { grpc_closure closure; grpc_closure* original_closure; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; const char* reason; } callback_state; typedef struct connected_channel_call_data { - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; // Closures used for returning results on the call combiner. callback_state on_complete[6]; // Max number of pending batches. callback_state recv_initial_metadata_ready; diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index 6b5759a036f..a245ff04874 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -26,23 +26,43 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" -grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner"); +namespace grpc_core { -static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { +TraceFlag grpc_call_combiner_trace(false, "call_combiner"); + +namespace { + +grpc_error* DecodeCancelStateError(gpr_atm cancel_state) { if (cancel_state & 1) { return (grpc_error*)(cancel_state & ~static_cast(1)); } return GRPC_ERROR_NONE; } -static gpr_atm encode_cancel_state_error(grpc_error* error) { +gpr_atm EncodeCancelStateError(grpc_error* error) { return static_cast(1) | (gpr_atm)error; } +} // namespace + +CallCombiner::CallCombiner() { + gpr_atm_no_barrier_store(&cancel_state_, 0); + gpr_atm_no_barrier_store(&size_, 0); + gpr_mpscq_init(&queue_); +#ifdef GRPC_TSAN_ENABLED + GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this, + grpc_schedule_on_exec_ctx); +#endif +} + +CallCombiner::~CallCombiner() { + gpr_mpscq_destroy(&queue_); + GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_)); +} + #ifdef GRPC_TSAN_ENABLED -static void tsan_closure(void* user_data, grpc_error* error) { - grpc_call_combiner* call_combiner = - static_cast(user_data); +void CallCombiner::TsanClosure(void* arg, grpc_error* error) { + CallCombiner* self = static_cast(arg); // We ref-count the lock, and check if it's already taken. // If it was taken, we should do nothing. Otherwise, we will mark it as // locked. Note that if two different threads try to do this, only one of @@ -51,18 +71,18 @@ static void tsan_closure(void* user_data, grpc_error* error) { // TSAN will correctly produce an error. // // TODO(soheil): This only covers the callbacks scheduled by - // grpc_call_combiner_(start|finish). If in the future, a - // callback gets scheduled using other mechanisms, we will need - // to add APIs to externally lock call combiners. - grpc_core::RefCountedPtr lock = - call_combiner->tsan_lock; + // CallCombiner::Start() and CallCombiner::Stop(). + // If in the future, a callback gets scheduled using other + // mechanisms, we will need to add APIs to externally lock + // call combiners. + RefCountedPtr lock = self->tsan_lock_; bool prev = false; if (lock->taken.compare_exchange_strong(prev, true)) { TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true); } else { lock.reset(); } - GRPC_CLOSURE_RUN(call_combiner->original_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(self->original_closure_, GRPC_ERROR_REF(error)); if (lock != nullptr) { TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true); bool prev = true; @@ -71,34 +91,17 @@ static void tsan_closure(void* user_data, grpc_error* error) { } #endif -static void call_combiner_sched_closure(grpc_call_combiner* call_combiner, - grpc_closure* closure, - grpc_error* error) { +void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) { #ifdef GRPC_TSAN_ENABLED - call_combiner->original_closure = closure; - GRPC_CLOSURE_SCHED(&call_combiner->tsan_closure, error); + original_closure_ = closure; + GRPC_CLOSURE_SCHED(&tsan_closure_, error); #else GRPC_CLOSURE_SCHED(closure, error); #endif } -void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { - gpr_atm_no_barrier_store(&call_combiner->cancel_state, 0); - gpr_atm_no_barrier_store(&call_combiner->size, 0); - gpr_mpscq_init(&call_combiner->queue); -#ifdef GRPC_TSAN_ENABLED - GRPC_CLOSURE_INIT(&call_combiner->tsan_closure, tsan_closure, call_combiner, - grpc_schedule_on_exec_ctx); -#endif -} - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { - gpr_mpscq_destroy(&call_combiner->queue); - GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); -} - #ifndef NDEBUG -#define DEBUG_ARGS , const char *file, int line +#define DEBUG_ARGS const char *file, int line, #define DEBUG_FMT_STR "%s:%d: " #define DEBUG_FMT_ARGS , file, line #else @@ -107,20 +110,17 @@ void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { #define DEBUG_FMT_ARGS #endif -void grpc_call_combiner_start(grpc_call_combiner* call_combiner, - grpc_closure* closure, - grpc_error* error DEBUG_ARGS, - const char* reason) { - GPR_TIMER_SCOPE("call_combiner_start", 0); +void CallCombiner::Start(grpc_closure* closure, grpc_error* error, + DEBUG_ARGS const char* reason) { + GPR_TIMER_SCOPE("CallCombiner::Start", 0); if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, - "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR + "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR "%s] error=%s", - call_combiner, closure DEBUG_FMT_ARGS, reason, - grpc_error_string(error)); + this, closure DEBUG_FMT_ARGS, reason, grpc_error_string(error)); } - size_t prev_size = static_cast( - gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1)); + size_t prev_size = + static_cast(gpr_atm_full_fetch_add(&size_, (gpr_atm)1)); if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size + 1); @@ -128,34 +128,30 @@ void grpc_call_combiner_start(grpc_call_combiner* call_combiner, GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(); if (prev_size == 0) { GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(); - GPR_TIMER_MARK("call_combiner_initiate", 0); if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY"); } // Queue was empty, so execute this closure immediately. - call_combiner_sched_closure(call_combiner, closure, error); + ScheduleClosure(closure, error); } else { if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " QUEUING"); } // Queue was not empty, so add closure to queue. closure->error_data.error = error; - gpr_mpscq_push(&call_combiner->queue, - reinterpret_cast(closure)); + gpr_mpscq_push(&queue_, reinterpret_cast(closure)); } } -void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, - const char* reason) { - GPR_TIMER_SCOPE("call_combiner_stop", 0); +void CallCombiner::Stop(DEBUG_ARGS const char* reason) { + GPR_TIMER_SCOPE("CallCombiner::Stop", 0); if (grpc_call_combiner_trace.enabled()) { - gpr_log(GPR_INFO, - "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", - call_combiner DEBUG_FMT_ARGS, reason); + gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]", + this DEBUG_FMT_ARGS, reason); } - size_t prev_size = static_cast( - gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1)); + size_t prev_size = + static_cast(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1)); if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size - 1); @@ -168,10 +164,10 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, } bool empty; grpc_closure* closure = reinterpret_cast( - gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty)); + gpr_mpscq_pop_and_check_end(&queue_, &empty)); if (closure == nullptr) { // This can happen either due to a race condition within the mpscq - // code or because of a race with grpc_call_combiner_start(). + // code or because of a race with Start(). if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, " queue returned no result; checking again"); } @@ -181,8 +177,7 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s", closure, grpc_error_string(closure->error_data.error)); } - call_combiner_sched_closure(call_combiner, closure, - closure->error_data.error); + ScheduleClosure(closure, closure->error_data.error); break; } } else if (grpc_call_combiner_trace.enabled()) { @@ -190,13 +185,12 @@ void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS, } } -void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, - grpc_closure* closure) { +void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) { GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(); while (true) { // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); + gpr_atm original_state = gpr_atm_acq_load(&cancel_state_); + grpc_error* original_error = DecodeCancelStateError(original_state); // If error is set, invoke the cancellation closure immediately. // Otherwise, store the new closure. if (original_error != GRPC_ERROR_NONE) { @@ -204,16 +198,15 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, gpr_log(GPR_INFO, "call_combiner=%p: scheduling notify_on_cancel callback=%p " "for pre-existing cancellation", - call_combiner, closure); + this, closure); } GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error)); break; } else { - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - (gpr_atm)closure)) { + if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) { if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p", - call_combiner, closure); + this, closure); } // If we replaced an earlier closure, invoke the original // closure with GRPC_ERROR_NONE. This allows callers to clean @@ -222,8 +215,8 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, closure = (grpc_closure*)original_state; if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, - "call_combiner=%p: scheduling old cancel callback=%p", - call_combiner, closure); + "call_combiner=%p: scheduling old cancel callback=%p", this, + closure); } GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); } @@ -234,24 +227,23 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, } } -void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, - grpc_error* error) { +void CallCombiner::Cancel(grpc_error* error) { GRPC_STATS_INC_CALL_COMBINER_CANCELLED(); while (true) { - gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); - grpc_error* original_error = decode_cancel_state_error(original_state); + gpr_atm original_state = gpr_atm_acq_load(&cancel_state_); + grpc_error* original_error = DecodeCancelStateError(original_state); if (original_error != GRPC_ERROR_NONE) { GRPC_ERROR_UNREF(error); break; } - if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, - encode_cancel_state_error(error))) { + if (gpr_atm_full_cas(&cancel_state_, original_state, + EncodeCancelStateError(error))) { if (original_state != 0) { grpc_closure* notify_on_cancel = (grpc_closure*)original_state; if (grpc_call_combiner_trace.enabled()) { gpr_log(GPR_INFO, "call_combiner=%p: scheduling notify_on_cancel callback=%p", - call_combiner, notify_on_cancel); + this, notify_on_cancel); } GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error)); } @@ -260,3 +252,5 @@ void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, // cas failed, try again. } } + +} // namespace grpc_core diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h index 4ec0044f056..ffcd0f7fb76 100644 --- a/src/core/lib/iomgr/call_combiner.h +++ b/src/core/lib/iomgr/call_combiner.h @@ -41,15 +41,78 @@ // when it is done with the action that was kicked off by the original // callback. -extern grpc_core::TraceFlag grpc_call_combiner_trace; +namespace grpc_core { + +extern TraceFlag grpc_call_combiner_trace; + +class CallCombiner { + public: + CallCombiner(); + ~CallCombiner(); + +#ifndef NDEBUG +#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ + (call_combiner)->Start((closure), (error), __FILE__, __LINE__, (reason)) +#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ + (call_combiner)->Stop(__FILE__, __LINE__, (reason)) + /// Starts processing \a closure. + void Start(grpc_closure* closure, grpc_error* error, const char* file, + int line, const char* reason); + /// Yields the call combiner to the next closure in the queue, if any. + void Stop(const char* file, int line, const char* reason); +#else +#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ + (call_combiner)->Start((closure), (error), (reason)) +#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ + (call_combiner)->Stop((reason)) + /// Starts processing \a closure. + void Start(grpc_closure* closure, grpc_error* error, const char* reason); + /// Yields the call combiner to the next closure in the queue, if any. + void Stop(const char* reason); +#endif + + /// Registers \a closure to be invoked when Cancel() is called. + /// + /// Once a closure is registered, it will always be scheduled exactly + /// once; this allows the closure to hold references that will be freed + /// regardless of whether or not the call was cancelled. If a cancellation + /// does occur, the closure will be scheduled with the cancellation error; + /// otherwise, it will be scheduled with GRPC_ERROR_NONE. + /// + /// The closure will be scheduled in the following cases: + /// - If Cancel() was called prior to registering the closure, it will be + /// scheduled immediately with the cancelation error. + /// - If Cancel() is called after registering the closure, the closure will + /// be scheduled with the cancellation error. + /// - If SetNotifyOnCancel() is called again to register a new cancellation + /// closure, the previous cancellation closure will be scheduled with + /// GRPC_ERROR_NONE. + /// + /// If \a closure is NULL, then no closure will be invoked on + /// cancellation; this effectively unregisters the previously set closure. + /// However, most filters will not need to explicitly unregister their + /// callbacks, as this is done automatically when the call is destroyed. + /// Filters that schedule the cancellation closure on ExecCtx do not need + /// to take a ref on the call stack to guarantee closure liveness. This is + /// done by explicitly flushing ExecCtx after the unregistration during + /// call destruction. + void SetNotifyOnCancel(grpc_closure* closure); + + /// Indicates that the call has been cancelled. + void Cancel(grpc_error* error); + + private: + void ScheduleClosure(grpc_closure* closure, grpc_error* error); +#ifdef GRPC_TSAN_ENABLED + static void TsanClosure(void* arg, grpc_error* error); +#endif -struct grpc_call_combiner { - gpr_atm size = 0; // size_t, num closures in queue or currently executing - gpr_mpscq queue; + gpr_atm size_ = 0; // size_t, num closures in queue or currently executing + gpr_mpscq queue_; // Either 0 (if not cancelled and no cancellation closure set), // a grpc_closure* (if the lowest bit is 0), // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancel_state = 0; + gpr_atm cancel_state_ = 0; #ifdef GRPC_TSAN_ENABLED // A fake ref-counted lock that is kept alive after the destruction of // grpc_call_combiner, when we are running the original closure. @@ -58,90 +121,20 @@ struct grpc_call_combiner { // callback is called. However, original_closure is free to trigger // anything on the call combiner (including destruction of grpc_call). // Thus, we need a ref-counted structure that can outlive the call combiner. - struct TsanLock - : public grpc_core::RefCounted { + struct TsanLock : public RefCounted { TsanLock() { TSAN_ANNOTATE_RWLOCK_CREATE(&taken); } ~TsanLock() { TSAN_ANNOTATE_RWLOCK_DESTROY(&taken); } - // To avoid double-locking by the same thread, we should acquire/release // the lock only when taken is false. On each acquire taken must be set to // true. std::atomic taken{false}; }; - grpc_core::RefCountedPtr tsan_lock = - grpc_core::MakeRefCounted(); - grpc_closure tsan_closure; - grpc_closure* original_closure; + RefCountedPtr tsan_lock_ = MakeRefCounted(); + grpc_closure tsan_closure_; + grpc_closure* original_closure_; #endif }; -// Assumes memory was initialized to zero. -void grpc_call_combiner_init(grpc_call_combiner* call_combiner); - -void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); - -#ifndef NDEBUG -#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ - grpc_call_combiner_start((call_combiner), (closure), (error), __FILE__, \ - __LINE__, (reason)) -#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ - grpc_call_combiner_stop((call_combiner), __FILE__, __LINE__, (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* file, int line, const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_call_combiner* call_combiner, - const char* file, int line, const char* reason); -#else -#define GRPC_CALL_COMBINER_START(call_combiner, closure, error, reason) \ - grpc_call_combiner_start((call_combiner), (closure), (error), (reason)) -#define GRPC_CALL_COMBINER_STOP(call_combiner, reason) \ - grpc_call_combiner_stop((call_combiner), (reason)) -/// Starts processing \a closure on \a call_combiner. -void grpc_call_combiner_start(grpc_call_combiner* call_combiner, - grpc_closure* closure, grpc_error* error, - const char* reason); -/// Yields the call combiner to the next closure in the queue, if any. -void grpc_call_combiner_stop(grpc_call_combiner* call_combiner, - const char* reason); -#endif - -/// Registers \a closure to be invoked by \a call_combiner when -/// grpc_call_combiner_cancel() is called. -/// -/// Once a closure is registered, it will always be scheduled exactly -/// once; this allows the closure to hold references that will be freed -/// regardless of whether or not the call was cancelled. If a cancellation -/// does occur, the closure will be scheduled with the cancellation error; -/// otherwise, it will be scheduled with GRPC_ERROR_NONE. -/// -/// The closure will be scheduled in the following cases: -/// - If grpc_call_combiner_cancel() was called prior to registering the -/// closure, it will be scheduled immediately with the cancelation error. -/// - If grpc_call_combiner_cancel() is called after registering the -/// closure, the closure will be scheduled with the cancellation error. -/// - If grpc_call_combiner_set_notify_on_cancel() is called again to -/// register a new cancellation closure, the previous cancellation -/// closure will be scheduled with GRPC_ERROR_NONE. -/// -/// If \a closure is NULL, then no closure will be invoked on -/// cancellation; this effectively unregisters the previously set closure. -/// However, most filters will not need to explicitly unregister their -/// callbacks, as this is done automatically when the call is destroyed. Filters -/// that schedule the cancellation closure on ExecCtx do not need to take a ref -/// on the call stack to guarantee closure liveness. This is done by explicitly -/// flushing ExecCtx after the unregistration during call destruction. -void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner, - grpc_closure* closure); - -/// Indicates that the call has been cancelled. -void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner, - grpc_error* error); - -namespace grpc_core { - // Helper for running a list of closures in a call combiner. // // Each callback running in the call combiner will eventually be @@ -166,7 +159,7 @@ class CallCombinerClosureList { // scheduled via GRPC_CLOSURE_SCHED(), which will eventually result in // yielding the call combiner. If the list is empty, then the call // combiner will be yielded immediately. - void RunClosures(grpc_call_combiner* call_combiner) { + void RunClosures(CallCombiner* call_combiner) { if (closures_.empty()) { GRPC_CALL_COMBINER_STOP(call_combiner, "no closures to schedule"); return; @@ -190,7 +183,7 @@ class CallCombinerClosureList { // Runs all closures in the call combiner, but does NOT yield the call // combiner. All closures will be scheduled via GRPC_CALL_COMBINER_START(). - void RunClosuresWithoutYielding(grpc_call_combiner* call_combiner) { + void RunClosuresWithoutYielding(CallCombiner* call_combiner) { for (size_t i = 0; i < closures_.size(); ++i) { auto& closure = closures_[i]; GRPC_CALL_COMBINER_START(call_combiner, closure.closure, closure.error, diff --git a/src/core/lib/security/transport/client_auth_filter.cc b/src/core/lib/security/transport/client_auth_filter.cc index f90c92efdc2..0c40dd7ff1e 100644 --- a/src/core/lib/security/transport/client_auth_filter.cc +++ b/src/core/lib/security/transport/client_auth_filter.cc @@ -92,7 +92,7 @@ struct call_data { } grpc_call_stack* owning_call; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_core::RefCountedPtr creds; grpc_slice host = grpc_empty_slice(); grpc_slice method = grpc_empty_slice(); @@ -270,11 +270,9 @@ static void send_security_metadata(grpc_call_element* elem, GRPC_ERROR_UNREF(error); } else { // Async return; register cancellation closure with call combiner. - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure, - cancel_get_request_metadata, elem, - grpc_schedule_on_exec_ctx)); + calld->call_combiner->SetNotifyOnCancel(GRPC_CLOSURE_INIT( + &calld->get_request_metadata_cancel_closure, + cancel_get_request_metadata, elem, grpc_schedule_on_exec_ctx)); } } @@ -345,11 +343,9 @@ static void auth_start_transport_stream_op_batch( GRPC_ERROR_UNREF(error); } else { // Async return; register cancellation closure with call combiner. - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure, - cancel_check_call_host, elem, - grpc_schedule_on_exec_ctx)); + calld->call_combiner->SetNotifyOnCancel(GRPC_CLOSURE_INIT( + &calld->check_call_host_cancel_closure, cancel_check_call_host, + elem, grpc_schedule_on_exec_ctx)); } gpr_free(call_host); return; /* early exit */ diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc index 81b9c2ce074..43509e6c61b 100644 --- a/src/core/lib/security/transport/server_auth_filter.cc +++ b/src/core/lib/security/transport/server_auth_filter.cc @@ -74,7 +74,7 @@ struct call_data { ~call_data() { GRPC_ERROR_UNREF(recv_initial_metadata_error); } - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_call_stack* owning_call; grpc_transport_stream_op_batch* recv_initial_metadata_batch; grpc_closure* original_recv_initial_metadata_ready; @@ -219,8 +219,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { // to drop the call combiner early if we get cancelled. GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, grpc_schedule_on_exec_ctx); - grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, - &calld->cancel_closure); + calld->call_combiner->SetNotifyOnCancel(&calld->cancel_closure); GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( batch->payload->recv_initial_metadata.recv_initial_metadata); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 8aaff4a67d5..0b539b0d9b2 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -131,7 +131,6 @@ struct grpc_call { is_client(args.server_transport_data == nullptr), stream_op_payload(context) { gpr_ref_init(&ext_ref, 1); - grpc_call_combiner_init(&call_combiner); for (int i = 0; i < 2; i++) { for (int j = 0; j < 2; j++) { metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE; @@ -141,12 +140,11 @@ struct grpc_call { ~grpc_call() { gpr_free(static_cast(const_cast(final_info.error_string))); - grpc_call_combiner_destroy(&call_combiner); } gpr_refcount ext_ref; gpr_arena* arena; - grpc_call_combiner call_combiner; + grpc_core::CallCombiner call_combiner; grpc_completion_queue* cq; grpc_polling_entity pollent; grpc_channel* channel; @@ -589,7 +587,7 @@ void grpc_call_unref(grpc_call* c) { // holding to the call stack. Also flush the closures on exec_ctx so that // filters that schedule cancel notification closures on exec_ctx do not // need to take a ref of the call stack to guarantee closure liveness. - grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr); + c->call_combiner.SetNotifyOnCancel(nullptr); grpc_core::ExecCtx::Get()->Flush(); } GRPC_CALL_INTERNAL_UNREF(c, "destroy"); @@ -685,7 +683,7 @@ static void cancel_with_error(grpc_call* c, grpc_error* error) { // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. - grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error)); + c->call_combiner.Cancel(GRPC_ERROR_REF(error)); cancel_state* state = static_cast(gpr_malloc(sizeof(*state))); state->call = c; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index 5f5f10d2ebf..dde39b8c681 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -39,7 +39,7 @@ namespace grpc_core { namespace { struct CallData { - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; grpc_linked_mdelem status; grpc_linked_mdelem details; grpc_core::Atomic filled_metadata; diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 1e0ac0e9237..e3fcf116c2a 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -190,7 +190,7 @@ struct call_data { grpc_closure publish; call_data* pending_next = nullptr; - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; }; struct request_matcher { diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 9f666b382e9..5023c28ecca 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -174,7 +174,7 @@ grpc_endpoint* grpc_transport_get_endpoint(grpc_transport* transport) { // it's grpc_transport_stream_op_batch_finish_with_failure void grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch* batch, grpc_error* error, - grpc_call_combiner* call_combiner) { + grpc_core::CallCombiner* call_combiner) { if (batch->send_message) { batch->payload->send_message.send_message.reset(); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index c372003902a..cff39aacbca 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -379,7 +379,7 @@ void grpc_transport_destroy_stream(grpc_transport* transport, void grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch* op, grpc_error* error, - grpc_call_combiner* call_combiner); + grpc_core::CallCombiner* call_combiner); char* grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch* op); char* grpc_transport_op_string(grpc_transport_op* op); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index e84999b213f..707732ba6ad 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -609,7 +609,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, MessageSizeFilter, SendEmptyMetadata); namespace isolated_call_filter { typedef struct { - grpc_call_combiner* call_combiner; + grpc_core::CallCombiner* call_combiner; } call_data; static void StartTransportStreamOp(grpc_call_element* elem,