diff --git a/BUILD b/BUILD index a8d588ffa63..7c23227f070 100644 --- a/BUILD +++ b/BUILD @@ -2851,6 +2851,7 @@ grpc_cc_library( "absl/cleanup", "absl/container:flat_hash_set", "absl/container:inlined_vector", + "absl/functional:any_invocable", "absl/status", "absl/status:statusor", "absl/strings", diff --git a/src/core/BUILD b/src/core/BUILD index 5688c43ed85..308c6dad11c 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4990,6 +4990,7 @@ grpc_cc_library( "ext/filters/client_channel/resolver/xds/xds_resolver.cc", ], external_deps = [ + "absl/functional:any_invocable", "absl/meta:type_traits", "absl/random", "absl/status", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 9d1f2574f2d..3fce6f5934a 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -400,7 +400,7 @@ class DynamicTerminationFilter::CallData { calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); calld->lb_call_ = client_channel->CreateLoadBalancedCall( args, pollent, nullptr, - service_config_call_data->call_dispatch_controller(), + [service_config_call_data]() { service_config_call_data->Commit(); }, /*is_transparent_retry=*/false); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) { gpr_log(GPR_INFO, @@ -1121,12 +1121,11 @@ OrphanablePtr ClientChannel::CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) { + absl::AnyInvocable on_commit, bool is_transparent_retry) { return OrphanablePtr( args.arena->New( this, args, pollent, on_call_destruction_complete, - call_dispatch_controller, is_transparent_retry)); + std::move(on_commit), is_transparent_retry)); } ChannelArgs ClientChannel::MakeSubchannelArgs( @@ -1849,7 +1848,7 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked( arena()->New( std::move(call_config->service_config), call_config->method_configs, std::move(call_config->call_attributes), - call_config->call_dispatch_controller, call_context()); + std::move(call_config->on_commit), call_context()); // Apply our own method params to the call. auto* method_params = static_cast( service_config_call_data->GetMethodParsedConfig( @@ -2010,9 +2009,8 @@ void ClientChannel::FilterBasedCallData::StartTransportStreamOpBatch( grpc_deadline_state_client_start_transport_stream_op_batch( &calld->deadline_state_, batch); } - // Intercept recv_trailing_metadata to call CallDispatchController::Commit(), - // in case we wind up failing the call before we get down to the retry - // or LB call layer. + // Intercept recv_trailing_metadata to commit the call, in case we wind up + // failing the call before we get down to the retry or LB call layer. if (batch->recv_trailing_metadata) { calld->original_recv_trailing_metadata_ready_ = batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; @@ -2337,7 +2335,7 @@ void ClientChannel::FilterBasedCallData:: service_config_call_data); } if (service_config_call_data != nullptr) { - service_config_call_data->call_dispatch_controller()->Commit(); + service_config_call_data->Commit(); } // Chain to original callback. Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_, @@ -2518,14 +2516,13 @@ ClientCallTracer::CallAttemptTracer* CreateCallAttemptTracer( ClientChannel::LoadBalancedCall::LoadBalancedCall( ClientChannel* chand, grpc_call_context_element* call_context, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) + absl::AnyInvocable on_commit, bool is_transparent_retry) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ? "LoadBalancedCall" : nullptr), chand_(chand), - call_dispatch_controller_(call_dispatch_controller) { + on_commit_(std::move(on_commit)) { CreateCallAttemptTracer(call_context, is_transparent_retry); if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this); @@ -2661,7 +2658,7 @@ absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( return error; } // Pick succeeded. - call_dispatch_controller_->Commit(); + Commit(); return absl::OkStatus(); } } @@ -2762,9 +2759,8 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelImpl( ClientChannel::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall( ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) - : LoadBalancedCall(chand, args.context, call_dispatch_controller, + absl::AnyInvocable on_commit, bool is_transparent_retry) + : LoadBalancedCall(chand, args.context, std::move(on_commit), is_transparent_retry), deadline_(args.deadline), arena_(args.arena), @@ -3131,7 +3127,7 @@ class ClientChannel::FilterBasedLoadBalancedCall::LbQueuedCallCanceller { lb_call->lb_call_canceller_); } if (lb_call->lb_call_canceller_ == self && !error.ok()) { - lb_call->call_dispatch_controller()->Commit(); + lb_call->Commit(); // Remove pick from list of queued picks. lb_call->RemoveCallFromLbQueuedCallsLocked(); // Remove from queued picks list. diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 957a4748c3b..1152b86eebc 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -25,9 +25,11 @@ #include #include #include +#include #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_set.h" +#include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -159,8 +161,7 @@ class ClientChannel { OrphanablePtr CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + absl::AnyInvocable on_commit, bool is_transparent_retry); // Exposed for testing only. static ChannelArgs MakeSubchannelArgs( @@ -365,10 +366,10 @@ class ClientChannel { class ClientChannel::LoadBalancedCall : public InternallyRefCounted { public: - LoadBalancedCall( - ClientChannel* chand, grpc_call_context_element* call_context, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + LoadBalancedCall(ClientChannel* chand, + grpc_call_context_element* call_context, + absl::AnyInvocable on_commit, + bool is_transparent_retry); ~LoadBalancedCall() override; void Orphan() override; @@ -384,9 +385,6 @@ class ClientChannel::LoadBalancedCall protected: ClientChannel* chand() const { return chand_; } - ConfigSelector::CallDispatchController* call_dispatch_controller() const { - return call_dispatch_controller_; - } ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { return static_cast( call_context()[GRPC_CONTEXT_CALL_TRACER].value); @@ -400,6 +398,11 @@ class ClientChannel::LoadBalancedCall return lb_subchannel_call_tracker_.get(); } + void Commit() { + auto on_commit = std::move(on_commit_); + on_commit(); + } + // Attempts an LB pick. The following outcomes are possible: // - No pick result is available yet. The call will be queued and // nullopt will be returned. The channel will later call @@ -441,7 +444,7 @@ class ClientChannel::LoadBalancedCall ClientChannel* chand_; - ConfigSelector::CallDispatchController* call_dispatch_controller_; + absl::AnyInvocable on_commit_; gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter(); @@ -460,11 +463,12 @@ class ClientChannel::FilterBasedLoadBalancedCall // the LB call has a subchannel call and ensuring that the // on_call_destruction_complete closure passed down from the surface // is not invoked until after the subchannel call stack is destroyed. - FilterBasedLoadBalancedCall( - ClientChannel* chand, const grpc_call_element_args& args, - grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + FilterBasedLoadBalancedCall(ClientChannel* chand, + const grpc_call_element_args& args, + grpc_polling_entity* pollent, + grpc_closure* on_call_destruction_complete, + absl::AnyInvocable on_commit, + bool is_transparent_retry); ~FilterBasedLoadBalancedCall() override; void Orphan() override; @@ -480,8 +484,8 @@ class ClientChannel::FilterBasedLoadBalancedCall // Work-around for Windows compilers that don't allow nested classes // to access protected members of the enclosing class's parent class. - using LoadBalancedCall::call_dispatch_controller; using LoadBalancedCall::chand; + using LoadBalancedCall::Commit; Arena* arena() const override { return arena_; } grpc_call_context_element* call_context() const override { diff --git a/src/core/ext/filters/client_channel/client_channel_internal.h b/src/core/ext/filters/client_channel/client_channel_internal.h index a2eaf1bf968..402fb3ac1b5 100644 --- a/src/core/ext/filters/client_channel/client_channel_internal.h +++ b/src/core/ext/filters/client_channel/client_channel_internal.h @@ -21,9 +21,9 @@ #include +#include "absl/functional/any_invocable.h" #include "absl/strings/string_view.h" -#include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/unique_type_name.h" @@ -47,66 +47,34 @@ class ClientChannelLbCallState : public LoadBalancingPolicy::CallState { virtual absl::string_view GetCallAttribute(UniqueTypeName type) = 0; }; -// Internal type for ServiceConfigCallData. Provides access to the -// CallDispatchController. +// Internal type for ServiceConfigCallData. Handles call commits. class ClientChannelServiceConfigCallData : public ServiceConfigCallData { public: ClientChannelServiceConfigCallData( RefCountedPtr service_config, const ServiceConfigParser::ParsedConfigVector* method_configs, ServiceConfigCallData::CallAttributes call_attributes, - ConfigSelector::CallDispatchController* call_dispatch_controller, + absl::AnyInvocable on_commit, grpc_call_context_element* call_context) : ServiceConfigCallData(std::move(service_config), method_configs, std::move(call_attributes)), - call_dispatch_controller_(call_dispatch_controller) { + on_commit_(std::move(on_commit)) { call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = this; call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].destroy = Destroy; } - ConfigSelector::CallDispatchController* call_dispatch_controller() { - return &call_dispatch_controller_; + void Commit() { + auto on_commit = std::move(on_commit_); + if (on_commit != nullptr) on_commit(); } private: - // A wrapper for the CallDispatchController returned by the ConfigSelector. - // Handles the case where the ConfigSelector doees not return any - // CallDispatchController. - // Also ensures that we call Commit() at most once, which allows the - // client channel code to call Commit() when the call is complete in case - // it wasn't called earlier, without needing to know whether or not it was. - class CallDispatchControllerWrapper - : public ConfigSelector::CallDispatchController { - public: - explicit CallDispatchControllerWrapper( - ConfigSelector::CallDispatchController* call_dispatch_controller) - : call_dispatch_controller_(call_dispatch_controller) {} - - bool ShouldRetry() override { - if (call_dispatch_controller_ != nullptr) { - return call_dispatch_controller_->ShouldRetry(); - } - return true; - } - - void Commit() override { - if (call_dispatch_controller_ != nullptr && !commit_called_) { - call_dispatch_controller_->Commit(); - commit_called_ = true; - } - } - - private: - ConfigSelector::CallDispatchController* call_dispatch_controller_; - bool commit_called_ = false; - }; - static void Destroy(void* ptr) { auto* self = static_cast(ptr); self->~ClientChannelServiceConfigCallData(); } - CallDispatchControllerWrapper call_dispatch_controller_; + absl::AnyInvocable on_commit_; }; } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/config_selector.h b/src/core/ext/filters/client_channel/config_selector.h index e434bb9f7d9..a75652fceb5 100644 --- a/src/core/ext/filters/client_channel/config_selector.h +++ b/src/core/ext/filters/client_channel/config_selector.h @@ -24,6 +24,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" @@ -50,20 +51,6 @@ namespace grpc_core { // MethodConfig and provide input to LB policies on a per-call basis. class ConfigSelector : public RefCounted { public: - // An interface to be used by the channel when dispatching calls. - class CallDispatchController { - public: - virtual ~CallDispatchController() = default; - - // Called by the channel to decide if it should retry the call upon a - // failure. - virtual bool ShouldRetry() = 0; - - // Called by the channel when no more LB picks will be performed for - // the call. - virtual void Commit() = 0; - }; - struct GetCallConfigArgs { grpc_metadata_batch* initial_metadata; Arena* arena; @@ -78,8 +65,9 @@ class ConfigSelector : public RefCounted { RefCountedPtr service_config; // Call attributes that will be accessible to LB policy implementations. ServiceConfigCallData::CallAttributes call_attributes; - // Call dispatch controller. - CallDispatchController* call_dispatch_controller = nullptr; + // To be called exactly once, when the call has been committed to a + // particular subchannel (i.e., after all LB picks are complete). + absl::AnyInvocable on_commit; }; ~ConfigSelector() override = default; @@ -136,7 +124,7 @@ class DefaultConfigSelector : public ConfigSelector { call_config.method_configs = service_config_->GetMethodParsedConfigVector(path->c_slice()); call_config.service_config = service_config_; - return call_config; + return std::move(call_config); } // Only comparing the ConfigSelector itself, not the underlying diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 4e7cb136d65..752820d7e87 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -29,6 +29,7 @@ #include #include +#include "absl/functional/any_invocable.h" #include "absl/meta/type_traits.h" #include "absl/random/random.h" #include "absl/status/status.h" @@ -248,11 +249,10 @@ class XdsResolver : public Resolver { .first) {} void Orphan() override { - auto* resolver = resolver_.release(); + auto* resolver = resolver_.get(); resolver->work_serializer_->Run( - [resolver]() { + [resolver = std::move(resolver_)]() { resolver->MaybeRemoveUnusedClusters(); - resolver->Unref(); }, DEBUG_LOCATION); } @@ -264,34 +264,6 @@ class XdsResolver : public Resolver { ClusterStateMap::iterator it_; }; - // Call dispatch controller, created for each call handled by the - // ConfigSelector. Holds a ref to the ClusterState object until the - // call is committed. - class XdsCallDispatchController - : public ConfigSelector::CallDispatchController { - public: - explicit XdsCallDispatchController( - RefCountedPtr cluster_state) - : cluster_state_(std::move(cluster_state)) {} - - bool ShouldRetry() override { - // TODO(donnadionne): Implement the retry circuit breaker here. - return true; - } - - void Commit() override { - // TODO(donnadionne): If ShouldRetry() was called previously, - // decrement the retry circuit breaker counter. - cluster_state_.reset(); - } - - private: - // Note: The XdsCallDispatchController object is never actually destroyed, - // so do not add any data members that require destruction unless you have - // some other way to clean them up. - RefCountedPtr cluster_state_; - }; - class XdsConfigSelector : public ConfigSelector { public: XdsConfigSelector(RefCountedPtr resolver, @@ -766,9 +738,10 @@ XdsResolver::XdsConfigSelector::GetCallConfig(GetCallConfigArgs args) { memcpy(hash_value, hash_string.c_str(), hash_string.size()); hash_value[hash_string.size()] = '\0'; call_config.call_attributes[RequestHashAttributeName()] = hash_value; - call_config.call_dispatch_controller = - args.arena->New(it->second->Ref()); - return call_config; + call_config.on_commit = [cluster_state = it->second->Ref()]() mutable { + cluster_state.reset(); + }; + return std::move(call_config); } // diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index a92a1837bbf..936cbb26a3b 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -28,6 +28,7 @@ #include #include "absl/container/inlined_vector.h" +#include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" @@ -43,7 +44,6 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_internal.h" -#include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/retry_service_config.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/lib/backoff/backoff.h" @@ -364,31 +364,6 @@ class RetryFilter::CallData { grpc_closure on_complete_; }; - class AttemptDispatchController - : public ConfigSelector::CallDispatchController { - public: - explicit AttemptDispatchController(CallAttempt* call_attempt) - : call_attempt_(call_attempt) {} - - // Will never be called. - bool ShouldRetry() override { return false; } - - void Commit() override { - call_attempt_->lb_call_committed_ = true; - auto* calld = call_attempt_->calld_; - if (calld->retry_committed_) { - auto* service_config_call_data = - static_cast( - calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] - .value); - service_config_call_data->call_dispatch_controller()->Commit(); - } - } - - private: - CallAttempt* call_attempt_; - }; - // Creates a BatchData object on the call's arena with the // specified refcount. If set_on_complete is true, the batch's // on_complete callback will be set to point to on_complete(); @@ -450,7 +425,6 @@ class RetryFilter::CallData { void MaybeCancelPerAttemptRecvTimer(); CallData* calld_; - AttemptDispatchController attempt_dispatch_controller_; OrphanablePtr lb_call_; bool lb_call_committed_ = false; @@ -558,9 +532,8 @@ class RetryFilter::CallData { static void StartTransparentRetry(void* arg, grpc_error_handle error); OrphanablePtr - CreateLoadBalancedCall( - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry); + CreateLoadBalancedCall(absl::AnyInvocable on_commit, + bool is_transparent_retry); void CreateCallAttempt(bool is_transparent_retry); @@ -693,7 +666,6 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld, : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt" : nullptr), calld_(calld), - attempt_dispatch_controller_(this), batch_payload_(calld->call_context_), started_send_initial_metadata_(false), completed_send_initial_metadata_(false), @@ -706,8 +678,18 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld, sent_cancel_stream_(false), seen_recv_trailing_metadata_from_surface_(false), abandoned_(false) { - lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_, - is_transparent_retry); + lb_call_ = calld->CreateLoadBalancedCall( + [this]() { + lb_call_committed_ = true; + if (calld_->retry_committed_) { + auto* service_config_call_data = + static_cast( + calld_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA] + .value); + service_config_call_data->Commit(); + } + }, + is_transparent_retry); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: created attempt, lb_call=%p", @@ -1199,19 +1181,6 @@ bool RetryFilter::CallData::CallAttempt::ShouldRetry( } } } - // Check with call dispatch controller. - auto* service_config_call_data = - static_cast( - calld_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); - if (!service_config_call_data->call_dispatch_controller()->ShouldRetry()) { - if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { - gpr_log( - GPR_INFO, - "chand=%p calld=%p attempt=%p: call dispatch controller denied retry", - calld_->chand_, calld_, this); - } - return false; - } // We should retry. return true; } @@ -2272,7 +2241,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( static_cast( call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); committed_call_ = CreateLoadBalancedCall( - service_config_call_data->call_dispatch_controller(), + [service_config_call_data]() { service_config_call_data->Commit(); }, /*is_transparent_retry=*/false); committed_call_->StartTransportStreamOpBatch(batch); return; @@ -2298,8 +2267,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( OrphanablePtr RetryFilter::CallData::CreateLoadBalancedCall( - ConfigSelector::CallDispatchController* call_dispatch_controller, - bool is_transparent_retry) { + absl::AnyInvocable on_commit, bool is_transparent_retry) { grpc_call_element_args args = {owning_call_, nullptr, call_context_, path_, /*start_time=*/0, deadline_, arena_, call_combiner_}; @@ -2308,7 +2276,7 @@ RetryFilter::CallData::CreateLoadBalancedCall( // This callback holds a ref to the CallStackDestructionBarrier // object until the LB call is destroyed. call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this), - call_dispatch_controller, is_transparent_retry); + std::move(on_commit), is_transparent_retry); } void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) { @@ -2548,17 +2516,17 @@ void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) { gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this); } if (call_attempt != nullptr) { - // If the call attempt's LB call has been committed, inform the call - // dispatch controller that the call has been committed. + // If the call attempt's LB call has been committed, invoke the + // call's on_commit callback. // Note: If call_attempt is null, this is happening before the first // retry attempt is started, in which case we'll just pass the real - // call dispatch controller down into the LB call, and it won't be - // our problem anymore. + // on_commit callback down into the LB call, and it won't be our + // problem anymore. if (call_attempt->lb_call_committed()) { auto* service_config_call_data = static_cast( call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); - service_config_call_data->call_dispatch_controller()->Commit(); + service_config_call_data->Commit(); } // Free cached send ops. call_attempt->FreeCachedSendOpDataAfterCommit();