client channel: refactor call objects in preparation for promise conversion (#32223)

This is a prerequisite for converting the client_channel filter to
promises. This refactors two objects:
- `ClientChannel::CallData`, which is primarily responsible for applying
the service config to the call
- `ClientChannel::LoadBalancedCall`, which is responsible for doing the
LB pick for the call attempt

Each of those classes has been split into two pieces:
- a base class with the functionality to be shared between the legacy
filter stack implementation and the new promise-based implementation
- a subclass providing the legacy filter stack implementation

A subsequent PR will add another subclass that provides the
promise-based implementation.
pull/32417/head^2
Mark D. Roth 2 years ago committed by GitHub
parent d98edb20ab
commit 3013c7e9b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1401
      src/core/ext/filters/client_channel/client_channel.cc
  3. 155
      src/core/ext/filters/client_channel/client_channel.h
  4. 7
      src/core/ext/filters/client_channel/config_selector.h
  5. 6
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  6. 11
      src/core/ext/filters/client_channel/retry_filter.cc

@ -2785,6 +2785,7 @@ grpc_cc_library(
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_set", "absl/container:flat_hash_set",
"absl/container:inlined_vector", "absl/container:inlined_vector",
"absl/status", "absl/status",

File diff suppressed because it is too large Load Diff

@ -69,7 +69,6 @@
#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_parser.h" #include "src/core/lib/service_config/service_config_parser.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
@ -110,6 +109,7 @@ class ClientChannel {
static const grpc_channel_filter kFilterVtable; static const grpc_channel_filter kFilterVtable;
class LoadBalancedCall; class LoadBalancedCall;
class FilterBasedLoadBalancedCall;
// Flag that this object gets stored in channel args as a raw pointer. // Flag that this object gets stored in channel args as a raw pointer.
struct RawPointerChannelArgTag {}; struct RawPointerChannelArgTag {};
@ -163,7 +163,7 @@ class ClientChannel {
void RemoveConnectivityWatcher( void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher); AsyncConnectivityStateWatcherInterface* watcher);
OrphanablePtr<LoadBalancedCall> CreateLoadBalancedCall( OrphanablePtr<FilterBasedLoadBalancedCall> CreateLoadBalancedCall(
const grpc_call_element_args& args, grpc_polling_entity* pollent, const grpc_call_element_args& args, grpc_polling_entity* pollent,
grpc_closure* on_call_destruction_complete, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller,
@ -177,6 +177,7 @@ class ClientChannel {
private: private:
class CallData; class CallData;
class FilterBasedCallData;
class ResolverResultHandler; class ResolverResultHandler;
class SubchannelWrapper; class SubchannelWrapper;
class ClientChannelControlHelper; class ClientChannelControlHelper;
@ -297,7 +298,7 @@ class ClientChannel {
// //
mutable Mutex resolution_mu_; mutable Mutex resolution_mu_;
// List of calls queued waiting for resolver result. // List of calls queued waiting for resolver result.
absl::flat_hash_set<grpc_call_element*> resolver_queued_calls_ absl::flat_hash_set<CallData*> resolver_queued_calls_
ABSL_GUARDED_BY(resolution_mu_); ABSL_GUARDED_BY(resolution_mu_);
// Data from service config. // Data from service config.
absl::Status resolver_transient_failure_error_ absl::Status resolver_transient_failure_error_
@ -371,11 +372,12 @@ class ClientChannel {
class ClientChannel::LoadBalancedCall class ClientChannel::LoadBalancedCall
: public InternallyRefCounted<LoadBalancedCall, kUnrefCallDtor> { : public InternallyRefCounted<LoadBalancedCall, kUnrefCallDtor> {
public: public:
// TODO(roth): Make this private.
class LbCallState : public LbCallStateInternal { class LbCallState : public LbCallStateInternal {
public: public:
explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {} explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); } void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
// Internal API to allow first-party LB policies to access per-call // Internal API to allow first-party LB policies to access per-call
// attributes set by the ConfigSelector. // attributes set by the ConfigSelector.
@ -385,37 +387,132 @@ class ClientChannel::LoadBalancedCall
LoadBalancedCall* lb_call_; LoadBalancedCall* lb_call_;
}; };
LoadBalancedCall(
ClientChannel* chand, grpc_call_context_element* call_context,
ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry);
~LoadBalancedCall() override;
void Orphan() override;
// Called by channel when removing a call from the list of queued calls.
void RemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
// Called by the channel for each queued call when a new picker
// becomes available.
virtual void RetryPickLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) = 0;
protected:
ClientChannel* chand() const { return chand_; }
ConfigSelector::CallDispatchController* call_dispatch_controller() const {
return call_dispatch_controller_;
}
CallTracer::CallAttemptTracer* call_attempt_tracer() const {
return call_attempt_tracer_;
}
gpr_cycle_counter lb_call_start_time() const { return lb_call_start_time_; }
ConnectedSubchannel* connected_subchannel() const {
return connected_subchannel_.get();
}
LoadBalancingPolicy::SubchannelCallTrackerInterface*
lb_subchannel_call_tracker() const {
return lb_subchannel_call_tracker_.get();
}
// Attempts an LB pick. The following outcomes are possible:
// - No pick result is available yet. The call will be queued and
// nullopt will be returned. The channel will later call
// RetryPickLocked() when a new picker is available and the pick
// should be retried.
// - The pick failed. If the call is not wait_for_ready, a non-OK
// status will be returned. (If the call *is* wait_for_ready,
// it will be queued instead.)
// - The pick completed successfully. A connected subchannel is
// stored and an OK status will be returned.
absl::optional<absl::Status> PickSubchannel(bool was_queued);
void RecordCallCompletion(absl::Status status,
grpc_metadata_batch* recv_trailing_metadata,
grpc_transport_stream_stats* transport_stream_stats,
absl::string_view peer_address);
private:
class Metadata;
class BackendMetricAccessor;
virtual Arena* arena() const = 0;
virtual grpc_call_context_element* call_context() const = 0;
virtual grpc_polling_entity* pollent() const = 0;
virtual grpc_metadata_batch* send_initial_metadata() const = 0;
// Helper function for performing an LB pick with a specified picker.
// Returns true if the pick is complete.
bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present.
void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
// Called when adding the call to the LB queue.
virtual void OnAddToQueueLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_) {}
ClientChannel* chand_;
ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const BackendMetricData* backend_metric_data_ = nullptr;
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
lb_subchannel_call_tracker_;
};
class ClientChannel::FilterBasedLoadBalancedCall
: public ClientChannel::LoadBalancedCall {
public:
// If on_call_destruction_complete is non-null, then it will be // If on_call_destruction_complete is non-null, then it will be
// invoked once the LoadBalancedCall is completely destroyed. // invoked once the LoadBalancedCall is completely destroyed.
// If it is null, then the caller is responsible for checking whether // If it is null, then the caller is responsible for checking whether
// the LB call has a subchannel call and ensuring that the // the LB call has a subchannel call and ensuring that the
// on_call_destruction_complete closure passed down from the surface // on_call_destruction_complete closure passed down from the surface
// is not invoked until after the subchannel call stack is destroyed. // is not invoked until after the subchannel call stack is destroyed.
LoadBalancedCall( FilterBasedLoadBalancedCall(
ClientChannel* chand, const grpc_call_element_args& args, ClientChannel* chand, const grpc_call_element_args& args,
grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry); bool is_transparent_retry);
~LoadBalancedCall() override; ~FilterBasedLoadBalancedCall() override;
void Orphan() override; void Orphan() override;
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
void PickSubchannel(bool was_queued);
// Called by channel when removing a call from the list of queued calls.
void RemoveCallFromLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
RefCountedPtr<SubchannelCall> subchannel_call() const { RefCountedPtr<SubchannelCall> subchannel_call() const {
return subchannel_call_; return subchannel_call_;
} }
private: private:
class LbQueuedCallCanceller; class LbQueuedCallCanceller;
class Metadata;
class BackendMetricAccessor; // Work-around for Windows compilers that don't allow nested classes
// to access protected members of the enclosing class's parent class.
using LoadBalancedCall::call_dispatch_controller;
using LoadBalancedCall::chand;
Arena* arena() const override { return arena_; }
grpc_call_context_element* call_context() const override {
return call_context_;
}
grpc_polling_entity* pollent() const override { return pollent_; }
grpc_metadata_batch* send_initial_metadata() const override {
return pending_batches_[0]
->payload->send_initial_metadata.send_initial_metadata;
}
// Returns the index into pending_batches_ to be used for batch. // Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
@ -451,36 +548,28 @@ class ClientChannel::LoadBalancedCall
static void RecvMessageReady(void* arg, grpc_error_handle error); static void RecvMessageReady(void* arg, grpc_error_handle error);
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
void RecordCallCompletion(absl::Status status); // Called to perform a pick, both when the call is initially started
// and when it is queued and the channel gets a new picker.
void TryPick(bool was_queued);
void CreateSubchannelCall(); void OnAddToQueueLocked() override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
// Helper function for performing an LB pick with a specified picker. void RetryPickLocked() override
// Returns true if the pick is complete.
bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
grpc_error_handle* error);
// Adds the call to the channel's list of queued picks if not already present.
void AddCallToLbQueuedCallsLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::lb_mu_);
ClientChannel* chand_; void CreateSubchannelCall();
// TODO(roth): Instead of duplicating these fields in every filter // TODO(roth): Instead of duplicating these fields in every filter
// that uses any one of them, we should store them in the call // that uses any one of them, we should store them in the call
// context. This will save per-call memory overhead. // context. This will save per-call memory overhead.
Slice path_; // Request path.
Timestamp deadline_; Timestamp deadline_;
Arena* arena_; Arena* arena_;
grpc_call_context_element* call_context_;
grpc_call_stack* owning_call_; grpc_call_stack* owning_call_;
CallCombiner* call_combiner_; CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_polling_entity* pollent_; grpc_polling_entity* pollent_;
grpc_closure* on_call_destruction_complete_; grpc_closure* on_call_destruction_complete_;
ConfigSelector::CallDispatchController* call_dispatch_controller_;
CallTracer::CallAttemptTracer* call_attempt_tracer_;
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
// Set when we get a cancel_stream op. // Set when we get a cancel_stream op.
grpc_error_handle cancel_error_; grpc_error_handle cancel_error_;
@ -492,11 +581,6 @@ class ClientChannel::LoadBalancedCall
LbQueuedCallCanceller* lb_call_canceller_ LbQueuedCallCanceller* lb_call_canceller_
ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr; ABSL_GUARDED_BY(&ClientChannel::lb_mu_) = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
const BackendMetricData* backend_metric_data_ = nullptr;
std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
lb_subchannel_call_tracker_;
RefCountedPtr<SubchannelCall> subchannel_call_; RefCountedPtr<SubchannelCall> subchannel_call_;
// For intercepting send_initial_metadata on_complete. // For intercepting send_initial_metadata on_complete.
@ -531,6 +615,7 @@ class ClientChannel::LoadBalancedCall
// A sub-class of ServiceConfigCallData used to access the // A sub-class of ServiceConfigCallData used to access the
// CallDispatchController. Allocated on the arena, stored in the call // CallDispatchController. Allocated on the arena, stored in the call
// context, and destroyed when the call is destroyed. // context, and destroyed when the call is destroyed.
// TODO(roth): Combine this with lb_call_state_internal.h.
class ClientChannelServiceConfigCallData : public ServiceConfigCallData { class ClientChannelServiceConfigCallData : public ServiceConfigCallData {
public: public:
ClientChannelServiceConfigCallData( ClientChannelServiceConfigCallData(

@ -28,7 +28,6 @@
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/channel/channel_fwd.h"
@ -39,6 +38,7 @@
#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_call_data.h" #include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_parser.h" #include "src/core/lib/service_config/service_config_parser.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
// Channel arg key for ConfigSelector. // Channel arg key for ConfigSelector.
@ -65,7 +65,6 @@ class ConfigSelector : public RefCounted<ConfigSelector> {
}; };
struct GetCallConfigArgs { struct GetCallConfigArgs {
grpc_slice* path;
grpc_metadata_batch* initial_metadata; grpc_metadata_batch* initial_metadata;
Arena* arena; Arena* arena;
}; };
@ -132,8 +131,10 @@ class DefaultConfigSelector : public ConfigSelector {
absl::StatusOr<CallConfig> GetCallConfig(GetCallConfigArgs args) override { absl::StatusOr<CallConfig> GetCallConfig(GetCallConfigArgs args) override {
CallConfig call_config; CallConfig call_config;
Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
call_config.method_configs = call_config.method_configs =
service_config_->GetMethodParsedConfigVector(*args.path); service_config_->GetMethodParsedConfigVector(path->c_slice());
call_config.service_config = service_config_; call_config.service_config = service_config_;
return call_config; return call_config;
} }

@ -47,6 +47,7 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/slice/slice.h"
#define XXH_INLINE_ALL #define XXH_INLINE_ALL
#include "xxhash.h" #include "xxhash.h"
@ -84,7 +85,6 @@
#include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/service_config/service_config.h" #include "src/core/lib/service_config/service_config.h"
#include "src/core/lib/service_config/service_config_impl.h" #include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/uri/uri_parser.h" #include "src/core/lib/uri/uri_parser.h"
@ -658,8 +658,10 @@ absl::optional<uint64_t> HeaderHashHelper(
absl::StatusOr<ConfigSelector::CallConfig> absl::StatusOr<ConfigSelector::CallConfig>
XdsResolver::XdsConfigSelector::GetCallConfig(GetCallConfigArgs args) { XdsResolver::XdsConfigSelector::GetCallConfig(GetCallConfigArgs args) {
Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
auto route_index = XdsRouting::GetRouteForRequest( auto route_index = XdsRouting::GetRouteForRequest(
RouteListIterator(&route_table_), StringViewFromSlice(*args.path), RouteListIterator(&route_table_), path->as_string_view(),
args.initial_metadata); args.initial_metadata);
if (!route_index.has_value()) { if (!route_index.has_value()) {
return absl::UnavailableError( return absl::UnavailableError(

@ -448,7 +448,7 @@ class RetryFilter::CallData {
CallData* calld_; CallData* calld_;
AttemptDispatchController attempt_dispatch_controller_; AttemptDispatchController attempt_dispatch_controller_;
OrphanablePtr<ClientChannel::LoadBalancedCall> lb_call_; OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
bool lb_call_committed_ = false; bool lb_call_committed_ = false;
grpc_timer per_attempt_recv_timer_; grpc_timer per_attempt_recv_timer_;
@ -555,7 +555,8 @@ class RetryFilter::CallData {
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures); void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error); static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall( OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry); bool is_transparent_retry);
@ -586,7 +587,7 @@ class RetryFilter::CallData {
// LB call used when we've committed to a call attempt and the retry // LB call used when we've committed to a call attempt and the retry
// state for that attempt is no longer needed. This provides a fast // state for that attempt is no longer needed. This provides a fast
// path for long-running streaming calls that minimizes overhead. // path for long-running streaming calls that minimizes overhead.
OrphanablePtr<ClientChannel::LoadBalancedCall> committed_call_; OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> committed_call_;
// When are are not yet fully committed to a particular call (i.e., // When are are not yet fully committed to a particular call (i.e.,
// either we might still retry or we have committed to the call but // either we might still retry or we have committed to the call but
@ -881,7 +882,7 @@ namespace {
void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) { void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
grpc_transport_stream_op_batch* batch = grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg); static_cast<grpc_transport_stream_op_batch*>(arg);
auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>( auto* lb_call = static_cast<ClientChannel::FilterBasedLoadBalancedCall*>(
batch->handler_private.extra_arg); batch->handler_private.extra_arg);
// Note: This will release the call combiner. // Note: This will release the call combiner.
lb_call->StartTransportStreamOpBatch(batch); lb_call->StartTransportStreamOpBatch(batch);
@ -2301,7 +2302,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch(
call_attempt_->StartRetriableBatches(); call_attempt_->StartRetriableBatches();
} }
OrphanablePtr<ClientChannel::LoadBalancedCall> OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
RetryFilter::CallData::CreateLoadBalancedCall( RetryFilter::CallData::CreateLoadBalancedCall(
ConfigSelector::CallDispatchController* call_dispatch_controller, ConfigSelector::CallDispatchController* call_dispatch_controller,
bool is_transparent_retry) { bool is_transparent_retry) {

Loading…
Cancel
Save