[call-v3] New client channel implementation (#36723)
Closes #36723
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36723 from ctiller:transport-refs-7 e019b8f5ea
PiperOrigin-RevId: 638424601
pull/36759/head
parent
ecee02a226
commit
34871fafa3
73 changed files with 6570 additions and 397 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,245 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2015 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H |
||||||
|
#define GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/status/statusor.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
|
||||||
|
#include "src/core/client_channel/client_channel_factory.h" |
||||||
|
#include "src/core/client_channel/config_selector.h" |
||||||
|
#include "src/core/client_channel/subchannel.h" |
||||||
|
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
||||||
|
#include "src/core/lib/gprpp/single_set_ptr.h" |
||||||
|
#include "src/core/lib/promise/observable.h" |
||||||
|
#include "src/core/lib/surface/channel.h" |
||||||
|
#include "src/core/lib/transport/metadata.h" |
||||||
|
#include "src/core/load_balancing/lb_policy.h" |
||||||
|
#include "src/core/resolver/resolver.h" |
||||||
|
#include "src/core/service_config/service_config.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
class ClientChannel : public Channel { |
||||||
|
public: |
||||||
|
using PickerObservable = |
||||||
|
Observable<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>; |
||||||
|
|
||||||
|
class CallDestinationFactory { |
||||||
|
public: |
||||||
|
struct RawPointerChannelArgTag {}; |
||||||
|
|
||||||
|
static absl::string_view ChannelArgName() { |
||||||
|
return "grpc.internal.client_channel_call_destination"; |
||||||
|
} |
||||||
|
|
||||||
|
virtual RefCountedPtr<UnstartedCallDestination> CreateCallDestination( |
||||||
|
PickerObservable) = 0; |
||||||
|
|
||||||
|
protected: |
||||||
|
~CallDestinationFactory() = default; |
||||||
|
}; |
||||||
|
|
||||||
|
static absl::StatusOr<OrphanablePtr<Channel>> Create( |
||||||
|
std::string target, ChannelArgs channel_args); |
||||||
|
|
||||||
|
// Do not instantiate directly -- use Create() instead.
|
||||||
|
ClientChannel(std::string target_uri, ChannelArgs args, |
||||||
|
std::string uri_to_resolve, |
||||||
|
RefCountedPtr<ServiceConfig> default_service_config, |
||||||
|
ClientChannelFactory* client_channel_factory, |
||||||
|
CallDestinationFactory* call_destination_factory); |
||||||
|
|
||||||
|
~ClientChannel() override; |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask, |
||||||
|
grpc_completion_queue* cq, |
||||||
|
grpc_pollset_set* /*pollset_set_alternative*/, |
||||||
|
Slice path, absl::optional<Slice> authority, |
||||||
|
Timestamp deadline, bool registered_method) override; |
||||||
|
|
||||||
|
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata); |
||||||
|
|
||||||
|
grpc_event_engine::experimental::EventEngine* event_engine() const override { |
||||||
|
return event_engine_.get(); |
||||||
|
} |
||||||
|
|
||||||
|
// TODO(ctiller): lame channels
|
||||||
|
bool IsLame() const override { return false; } |
||||||
|
|
||||||
|
bool SupportsConnectivityWatcher() const override { return true; } |
||||||
|
|
||||||
|
// Returns the current connectivity state. If try_to_connect is true,
|
||||||
|
// triggers a connection attempt if not already connected.
|
||||||
|
grpc_connectivity_state CheckConnectivityState(bool try_to_connect) override; |
||||||
|
|
||||||
|
void WatchConnectivityState(grpc_connectivity_state last_observed_state, |
||||||
|
Timestamp deadline, grpc_completion_queue* cq, |
||||||
|
void* tag) override; |
||||||
|
|
||||||
|
// Starts and stops a connectivity watch. The watcher will be initially
|
||||||
|
// notified as soon as the state changes from initial_state and then on
|
||||||
|
// every subsequent state change until either the watch is stopped or
|
||||||
|
// it is notified that the state has changed to SHUTDOWN.
|
||||||
|
//
|
||||||
|
// This is intended to be used when starting watches from code inside of
|
||||||
|
// C-core (e.g., for a nested control plane channel for things like xds).
|
||||||
|
void AddConnectivityWatcher( |
||||||
|
grpc_connectivity_state initial_state, |
||||||
|
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) override; |
||||||
|
void RemoveConnectivityWatcher( |
||||||
|
AsyncConnectivityStateWatcherInterface* watcher) override; |
||||||
|
|
||||||
|
void GetInfo(const grpc_channel_info* channel_info) override; |
||||||
|
|
||||||
|
void ResetConnectionBackoff() override; |
||||||
|
|
||||||
|
void Ping(grpc_completion_queue* cq, void* tag) override; |
||||||
|
|
||||||
|
// Flag that this object gets stored in channel args as a raw pointer.
|
||||||
|
struct RawPointerChannelArgTag {}; |
||||||
|
static absl::string_view ChannelArgName() { |
||||||
|
return "grpc.internal.client_channel"; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class ClientChannelControlHelper; |
||||||
|
class ResolverResultHandler; |
||||||
|
class SubchannelWrapper; |
||||||
|
|
||||||
|
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
void DestroyResolverAndLbPolicyLocked() |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void OnResolverResultChangedLocked(Resolver::Result result) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
void OnResolverErrorLocked(absl::Status status) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
absl::Status CreateOrUpdateLbPolicyLocked( |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, |
||||||
|
const absl::optional<std::string>& health_check_service_name, |
||||||
|
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( |
||||||
|
const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void UpdateServiceConfigInControlPlaneLocked( |
||||||
|
RefCountedPtr<ServiceConfig> service_config, |
||||||
|
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void UpdateServiceConfigInDataPlaneLocked() |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void UpdateStateLocked(grpc_connectivity_state state, |
||||||
|
const absl::Status& status, const char* reason) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void UpdateStateAndPickerLocked( |
||||||
|
grpc_connectivity_state state, const absl::Status& status, |
||||||
|
const char* reason, |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) |
||||||
|
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||||
|
|
||||||
|
void StartIdleTimer(); |
||||||
|
|
||||||
|
// Applies service config settings from config_selector to the call.
|
||||||
|
// May modify call context and client_initial_metadata.
|
||||||
|
absl::Status ApplyServiceConfigToCall( |
||||||
|
ConfigSelector& config_selector, |
||||||
|
ClientMetadata& client_initial_metadata) const; |
||||||
|
|
||||||
|
const ChannelArgs channel_args_; |
||||||
|
const std::shared_ptr<grpc_event_engine::experimental::EventEngine> |
||||||
|
event_engine_; |
||||||
|
const std::string uri_to_resolve_; |
||||||
|
const size_t service_config_parser_index_; |
||||||
|
const RefCountedPtr<ServiceConfig> default_service_config_; |
||||||
|
ClientChannelFactory* const client_channel_factory_; |
||||||
|
const std::string default_authority_; |
||||||
|
channelz::ChannelNode* const channelz_node_; |
||||||
|
// TODO(ctiller): unify with Channel
|
||||||
|
const RefCountedPtr<CallArenaAllocator> call_arena_allocator_; |
||||||
|
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_; |
||||||
|
|
||||||
|
//
|
||||||
|
// Idleness state.
|
||||||
|
//
|
||||||
|
const Duration idle_timeout_; |
||||||
|
IdleFilterState idle_state_{false}; |
||||||
|
SingleSetPtr<Activity, typename ActivityPtr::deleter_type> idle_activity_; |
||||||
|
|
||||||
|
//
|
||||||
|
// Fields related to name resolution.
|
||||||
|
//
|
||||||
|
struct ResolverDataForCalls { |
||||||
|
RefCountedPtr<ConfigSelector> config_selector; |
||||||
|
RefCountedPtr<UnstartedCallDestination> call_destination; |
||||||
|
}; |
||||||
|
Observable<absl::StatusOr<ResolverDataForCalls>> resolver_data_for_calls_; |
||||||
|
|
||||||
|
//
|
||||||
|
// Fields related to LB picks.
|
||||||
|
//
|
||||||
|
PickerObservable picker_; |
||||||
|
const RefCountedPtr<UnstartedCallDestination> call_destination_; |
||||||
|
|
||||||
|
//
|
||||||
|
// Fields used in the control plane. Guarded by work_serializer.
|
||||||
|
//
|
||||||
|
std::shared_ptr<WorkSerializer> work_serializer_; |
||||||
|
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
bool previous_resolution_contained_addresses_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_) = false; |
||||||
|
RefCountedPtr<ServiceConfig> saved_service_config_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
RefCountedPtr<ConfigSelector> saved_config_selector_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> lb_policy_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
||||||
|
std::map<Subchannel*, int> subchannel_refcount_map_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
// The set of SubchannelWrappers that currently exist.
|
||||||
|
// No need to hold a ref, since the set is updated in the control-plane
|
||||||
|
// work_serializer when the SubchannelWrappers are created and destroyed.
|
||||||
|
absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_ |
||||||
|
ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1; |
||||||
|
absl::Status disconnect_error_ ABSL_GUARDED_BY(*work_serializer_); |
||||||
|
|
||||||
|
//
|
||||||
|
// Fields accessed via GetChannelInfo().
|
||||||
|
//
|
||||||
|
Mutex info_mu_; |
||||||
|
std::string info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); |
||||||
|
std::string info_service_config_json_ ABSL_GUARDED_BY(info_mu_); |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H
|
@ -0,0 +1,335 @@ |
|||||||
|
// Copyright 2024 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "src/core/client_channel/load_balanced_call_destination.h" |
||||||
|
|
||||||
|
#include "src/core/client_channel/client_channel.h" |
||||||
|
#include "src/core/client_channel/client_channel_internal.h" |
||||||
|
#include "src/core/client_channel/subchannel.h" |
||||||
|
#include "src/core/lib/channel/status_util.h" |
||||||
|
#include "src/core/lib/promise/loop.h" |
||||||
|
#include "src/core/telemetry/call_tracer.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// Defined in legacy client channel filter.
|
||||||
|
// TODO(roth): Move these here when we remove the legacy filter.
|
||||||
|
extern TraceFlag grpc_client_channel_trace; |
||||||
|
extern TraceFlag grpc_client_channel_call_trace; |
||||||
|
extern TraceFlag grpc_client_channel_lb_call_trace; |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
class LbMetadata : public LoadBalancingPolicy::MetadataInterface { |
||||||
|
public: |
||||||
|
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} |
||||||
|
|
||||||
|
void Add(absl::string_view key, absl::string_view value) override { |
||||||
|
if (batch_ == nullptr) return; |
||||||
|
// Gross, egregious hack to support legacy grpclb behavior.
|
||||||
|
// TODO(ctiller): Use a promise context for this once that plumbing is done.
|
||||||
|
if (key == GrpcLbClientStatsMetadata::key()) { |
||||||
|
batch_->Set( |
||||||
|
GrpcLbClientStatsMetadata(), |
||||||
|
const_cast<GrpcLbClientStats*>( |
||||||
|
reinterpret_cast<const GrpcLbClientStats*>(value.data()))); |
||||||
|
return; |
||||||
|
} |
||||||
|
batch_->Append(key, Slice::FromStaticString(value), |
||||||
|
[key](absl::string_view error, const Slice& value) { |
||||||
|
gpr_log(GPR_ERROR, "%s", |
||||||
|
absl::StrCat(error, " key:", key, |
||||||
|
" value:", value.as_string_view()) |
||||||
|
.c_str()); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector() |
||||||
|
override { |
||||||
|
if (batch_ == nullptr) return {}; |
||||||
|
Encoder encoder; |
||||||
|
batch_->Encode(&encoder); |
||||||
|
return encoder.Take(); |
||||||
|
} |
||||||
|
|
||||||
|
absl::optional<absl::string_view> Lookup(absl::string_view key, |
||||||
|
std::string* buffer) const override { |
||||||
|
if (batch_ == nullptr) return absl::nullopt; |
||||||
|
return batch_->GetStringValue(key, buffer); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class Encoder { |
||||||
|
public: |
||||||
|
void Encode(const Slice& key, const Slice& value) { |
||||||
|
out_.emplace_back(std::string(key.as_string_view()), |
||||||
|
std::string(value.as_string_view())); |
||||||
|
} |
||||||
|
|
||||||
|
template <class Which> |
||||||
|
void Encode(Which, const typename Which::ValueType& value) { |
||||||
|
auto value_slice = Which::Encode(value); |
||||||
|
out_.emplace_back(std::string(Which::key()), |
||||||
|
std::string(value_slice.as_string_view())); |
||||||
|
} |
||||||
|
|
||||||
|
void Encode(GrpcTimeoutMetadata, |
||||||
|
const typename GrpcTimeoutMetadata::ValueType&) {} |
||||||
|
void Encode(HttpPathMetadata, const Slice&) {} |
||||||
|
void Encode(HttpMethodMetadata, |
||||||
|
const typename HttpMethodMetadata::ValueType&) {} |
||||||
|
|
||||||
|
std::vector<std::pair<std::string, std::string>> Take() { |
||||||
|
return std::move(out_); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
std::vector<std::pair<std::string, std::string>> out_; |
||||||
|
}; |
||||||
|
|
||||||
|
grpc_metadata_batch* batch_; |
||||||
|
}; |
||||||
|
|
||||||
|
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) { |
||||||
|
auto* call_tracer = MaybeGetContext<ClientCallTracer>(); |
||||||
|
if (call_tracer == nullptr) return; |
||||||
|
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); |
||||||
|
SetContext<CallTracerInterface>(tracer); |
||||||
|
} |
||||||
|
|
||||||
|
class LbCallState : public ClientChannelLbCallState { |
||||||
|
public: |
||||||
|
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); } |
||||||
|
|
||||||
|
// Internal API to allow first-party LB policies to access per-call
|
||||||
|
// attributes set by the ConfigSelector.
|
||||||
|
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( |
||||||
|
UniqueTypeName type) const override { |
||||||
|
auto* service_config_call_data = GetContext<ServiceConfigCallData>(); |
||||||
|
return service_config_call_data->GetCallAttribute(type); |
||||||
|
} |
||||||
|
|
||||||
|
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override { |
||||||
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
||||||
|
return static_cast<ClientCallTracer::CallAttemptTracer*>( |
||||||
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
// TODO(roth): Remove this in favor of the gprpp Match() function once
|
||||||
|
// we can do that without breaking lock annotations.
|
||||||
|
template <typename T> |
||||||
|
T HandlePickResult( |
||||||
|
LoadBalancingPolicy::PickResult* result, |
||||||
|
std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func, |
||||||
|
std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func, |
||||||
|
std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func, |
||||||
|
std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) { |
||||||
|
auto* complete_pick = |
||||||
|
absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result); |
||||||
|
if (complete_pick != nullptr) { |
||||||
|
return complete_func(complete_pick); |
||||||
|
} |
||||||
|
auto* queue_pick = |
||||||
|
absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result); |
||||||
|
if (queue_pick != nullptr) { |
||||||
|
return queue_func(queue_pick); |
||||||
|
} |
||||||
|
auto* fail_pick = |
||||||
|
absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result); |
||||||
|
if (fail_pick != nullptr) { |
||||||
|
return fail_func(fail_pick); |
||||||
|
} |
||||||
|
auto* drop_pick = |
||||||
|
absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result); |
||||||
|
CHECK(drop_pick != nullptr); |
||||||
|
return drop_func(drop_pick); |
||||||
|
} |
||||||
|
|
||||||
|
// Does an LB pick for a call. Returns one of the following things:
|
||||||
|
// - Continue{}, meaning to queue the pick
|
||||||
|
// - a non-OK status, meaning to fail the call
|
||||||
|
// - a call destination, meaning that the pick is complete
|
||||||
|
// When the pick is complete, pushes client_initial_metadata onto
|
||||||
|
// call_initiator. Also adds the subchannel call tracker (if any) to
|
||||||
|
// context.
|
||||||
|
LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> PickSubchannel( |
||||||
|
LoadBalancingPolicy::SubchannelPicker& picker, |
||||||
|
UnstartedCallHandler& unstarted_handler) { |
||||||
|
// Perform LB pick.
|
||||||
|
auto& client_initial_metadata = |
||||||
|
unstarted_handler.UnprocessedClientInitialMetadata(); |
||||||
|
LoadBalancingPolicy::PickArgs pick_args; |
||||||
|
Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata()); |
||||||
|
CHECK(path != nullptr); |
||||||
|
pick_args.path = path->as_string_view(); |
||||||
|
LbCallState lb_call_state; |
||||||
|
pick_args.call_state = &lb_call_state; |
||||||
|
LbMetadata initial_metadata(&client_initial_metadata); |
||||||
|
pick_args.initial_metadata = &initial_metadata; |
||||||
|
auto result = picker.Pick(pick_args); |
||||||
|
// Handle result.
|
||||||
|
return HandlePickResult< |
||||||
|
LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>>>( |
||||||
|
&result, |
||||||
|
// CompletePick
|
||||||
|
[&](LoadBalancingPolicy::PickResult::Complete* complete_pick) |
||||||
|
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"client_channel: %sLB pick succeeded: subchannel=%p", |
||||||
|
GetContext<Activity>()->DebugTag().c_str(), |
||||||
|
complete_pick->subchannel.get()); |
||||||
|
} |
||||||
|
CHECK(complete_pick->subchannel != nullptr); |
||||||
|
// Grab a ref to the call destination while we're still
|
||||||
|
// holding the data plane mutex.
|
||||||
|
auto call_destination = |
||||||
|
DownCast<SubchannelInterfaceWithCallDestination*>( |
||||||
|
complete_pick->subchannel.get()) |
||||||
|
->call_destination(); |
||||||
|
// If the subchannel has no call destination (e.g., if the
|
||||||
|
// subchannel has moved out of state READY but the LB policy hasn't
|
||||||
|
// yet seen that change and given us a new picker), then just
|
||||||
|
// queue the pick. We'll try again as soon as we get a new picker.
|
||||||
|
if (call_destination == nullptr) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"client_channel: %ssubchannel returned by LB picker " |
||||||
|
"has no connected subchannel; queueing pick", |
||||||
|
GetContext<Activity>()->DebugTag().c_str()); |
||||||
|
} |
||||||
|
return Continue{}; |
||||||
|
} |
||||||
|
// If the LB policy returned a call tracker, inform it that the
|
||||||
|
// call is starting and add it to context, so that we can notify
|
||||||
|
// it when the call finishes.
|
||||||
|
if (complete_pick->subchannel_call_tracker != nullptr) { |
||||||
|
complete_pick->subchannel_call_tracker->Start(); |
||||||
|
SetContext(complete_pick->subchannel_call_tracker.release()); |
||||||
|
} |
||||||
|
// Return the connected subchannel.
|
||||||
|
return call_destination; |
||||||
|
}, |
||||||
|
// QueuePick
|
||||||
|
[&](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||||
|
gpr_log(GPR_INFO, "client_channel: %sLB pick queued", |
||||||
|
GetContext<Activity>()->DebugTag().c_str()); |
||||||
|
} |
||||||
|
return Continue{}; |
||||||
|
}, |
||||||
|
// FailPick
|
||||||
|
[&](LoadBalancingPolicy::PickResult::Fail* fail_pick) |
||||||
|
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||||
|
gpr_log(GPR_INFO, "client_channel: %sLB pick failed: %s", |
||||||
|
GetContext<Activity>()->DebugTag().c_str(), |
||||||
|
fail_pick->status.ToString().c_str()); |
||||||
|
} |
||||||
|
// If wait_for_ready is false, then the error indicates the RPC
|
||||||
|
// attempt's final status.
|
||||||
|
if (!unstarted_handler.UnprocessedClientInitialMetadata() |
||||||
|
.GetOrCreatePointer(WaitForReady()) |
||||||
|
->value) { |
||||||
|
return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status), |
||||||
|
"LB pick"); |
||||||
|
} |
||||||
|
// If wait_for_ready is true, then queue to retry when we get a new
|
||||||
|
// picker.
|
||||||
|
return Continue{}; |
||||||
|
}, |
||||||
|
// DropPick
|
||||||
|
[&](LoadBalancingPolicy::PickResult::Drop* drop_pick) |
||||||
|
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||||
|
gpr_log(GPR_INFO, "client_channel: %sLB pick dropped: %s", |
||||||
|
GetContext<Activity>()->DebugTag().c_str(), |
||||||
|
drop_pick->status.ToString().c_str()); |
||||||
|
} |
||||||
|
return grpc_error_set_int(MaybeRewriteIllegalStatusCode( |
||||||
|
std::move(drop_pick->status), "LB drop"), |
||||||
|
StatusIntProperty::kLbPolicyDrop, 1); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
void LoadBalancedCallDestination::StartCall( |
||||||
|
UnstartedCallHandler unstarted_handler) { |
||||||
|
// If there is a call tracer, create a call attempt tracer.
|
||||||
|
bool* is_transparent_retry_metadata = |
||||||
|
unstarted_handler.UnprocessedClientInitialMetadata().get_pointer( |
||||||
|
IsTransparentRetry()); |
||||||
|
bool is_transparent_retry = is_transparent_retry_metadata != nullptr |
||||||
|
? *is_transparent_retry_metadata |
||||||
|
: false; |
||||||
|
MaybeCreateCallAttemptTracer(is_transparent_retry); |
||||||
|
// Spawn a promise to do the LB pick.
|
||||||
|
// This will eventually start the call.
|
||||||
|
unstarted_handler.SpawnGuardedUntilCallCompletes( |
||||||
|
"lb_pick", [unstarted_handler, picker = picker_]() mutable { |
||||||
|
return Map( |
||||||
|
// Wait for the LB picker.
|
||||||
|
CheckDelayed(Loop( |
||||||
|
[last_picker = |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(), |
||||||
|
unstarted_handler, picker]() mutable { |
||||||
|
return Map( |
||||||
|
picker.Next(last_picker), |
||||||
|
[unstarted_handler, &last_picker]( |
||||||
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
||||||
|
picker) mutable { |
||||||
|
last_picker = std::move(picker); |
||||||
|
// Returns 3 possible things:
|
||||||
|
// - Continue to queue the pick
|
||||||
|
// - non-OK status to fail the pick
|
||||||
|
// - a connected subchannel to complete the pick
|
||||||
|
return PickSubchannel(*last_picker, unstarted_handler); |
||||||
|
}); |
||||||
|
})), |
||||||
|
// Create call stack on the connected subchannel.
|
||||||
|
[unstarted_handler]( |
||||||
|
std::tuple< |
||||||
|
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>, |
||||||
|
bool> |
||||||
|
pick_result) { |
||||||
|
auto& call_destination = std::get<0>(pick_result); |
||||||
|
const bool was_queued = std::get<1>(pick_result); |
||||||
|
if (!call_destination.ok()) { |
||||||
|
return call_destination.status(); |
||||||
|
} |
||||||
|
// LB pick is done, so indicate that we've committed.
|
||||||
|
auto* on_commit = MaybeGetContext<LbOnCommit>(); |
||||||
|
if (on_commit != nullptr && *on_commit != nullptr) { |
||||||
|
(*on_commit)(); |
||||||
|
} |
||||||
|
// If it was queued, add a trace annotation.
|
||||||
|
if (was_queued) { |
||||||
|
auto* tracer = |
||||||
|
MaybeGetContext<ClientCallTracer::CallAttemptTracer>(); |
||||||
|
if (tracer != nullptr) { |
||||||
|
tracer->RecordAnnotation("Delayed LB pick complete."); |
||||||
|
} |
||||||
|
} |
||||||
|
// Delegate to connected subchannel.
|
||||||
|
// TODO(ctiller): need to insert LbCallTracingFilter at the top of
|
||||||
|
// the stack
|
||||||
|
(*call_destination)->StartCall(unstarted_handler); |
||||||
|
return absl::OkStatus(); |
||||||
|
}); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,49 @@ |
|||||||
|
// Copyright 2024 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H |
||||||
|
#define GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H |
||||||
|
|
||||||
|
#include "absl/functional/any_invocable.h" |
||||||
|
|
||||||
|
#include "src/core/client_channel/client_channel.h" |
||||||
|
#include "src/core/lib/promise/context.h" |
||||||
|
#include "src/core/lib/transport/call_destination.h" |
||||||
|
#include "src/core/load_balancing/lb_policy.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// Context type for LB on_commit callback.
|
||||||
|
// TODO(ctiller): make this a struct, so we don't accidentally alias context
|
||||||
|
// types
|
||||||
|
using LbOnCommit = absl::AnyInvocable<void()>; |
||||||
|
template <> |
||||||
|
struct ContextType<LbOnCommit> {}; |
||||||
|
|
||||||
|
class LoadBalancedCallDestination final : public UnstartedCallDestination { |
||||||
|
public: |
||||||
|
explicit LoadBalancedCallDestination(ClientChannel::PickerObservable picker) |
||||||
|
: picker_(std::move(picker)) {} |
||||||
|
|
||||||
|
void Orphaned() override {} |
||||||
|
|
||||||
|
void StartCall(UnstartedCallHandler unstarted_handler) override; |
||||||
|
|
||||||
|
private: |
||||||
|
ClientChannel::PickerObservable picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H
|
@ -0,0 +1,178 @@ |
|||||||
|
// Copyright 2024 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include <memory> |
||||||
|
|
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
#include "gtest/gtest.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/client_channel/client_channel.h" |
||||||
|
#include "src/core/client_channel/local_subchannel_pool.h" |
||||||
|
#include "src/core/lib/address_utils/parse_address.h" |
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
#include "test/core/call/yodel/yodel_test.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
using EventEngine = grpc_event_engine::experimental::EventEngine; |
||||||
|
|
||||||
|
namespace { |
||||||
|
const absl::string_view kTestPath = "/test_method"; |
||||||
|
const absl::string_view kTestAddress = "ipv4:127.0.0.1:1234"; |
||||||
|
const absl::string_view kDefaultAuthority = "test-authority"; |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
class ConnectedSubchannelTest : public YodelTest { |
||||||
|
protected: |
||||||
|
using YodelTest::YodelTest; |
||||||
|
|
||||||
|
RefCountedPtr<ConnectedSubchannel> InitChannel(const ChannelArgs& args) { |
||||||
|
grpc_resolved_address addr; |
||||||
|
CHECK(grpc_parse_uri(URI::Parse(kTestAddress).value(), &addr)); |
||||||
|
auto subchannel = Subchannel::Create(MakeOrphanable<TestConnector>(this), |
||||||
|
addr, CompleteArgs(args)); |
||||||
|
{ |
||||||
|
ExecCtx exec_ctx; |
||||||
|
subchannel->RequestConnection(); |
||||||
|
} |
||||||
|
return TickUntil<RefCountedPtr<ConnectedSubchannel>>( |
||||||
|
[subchannel]() -> Poll<RefCountedPtr<ConnectedSubchannel>> { |
||||||
|
auto connected_subchannel = subchannel->connected_subchannel(); |
||||||
|
if (connected_subchannel != nullptr) return connected_subchannel; |
||||||
|
return Pending(); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
ClientMetadataHandle MakeClientInitialMetadata() { |
||||||
|
auto client_initial_metadata = Arena::MakePooled<ClientMetadata>(); |
||||||
|
client_initial_metadata->Set(HttpPathMetadata(), |
||||||
|
Slice::FromCopiedString(kTestPath)); |
||||||
|
return client_initial_metadata; |
||||||
|
} |
||||||
|
|
||||||
|
CallInitiatorAndHandler MakeCall( |
||||||
|
ClientMetadataHandle client_initial_metadata) { |
||||||
|
return MakeCallPair( |
||||||
|
std::move(client_initial_metadata), event_engine().get(), |
||||||
|
call_arena_allocator_->MakeArena(), call_arena_allocator_, nullptr); |
||||||
|
} |
||||||
|
|
||||||
|
CallHandler TickUntilCallStarted() { |
||||||
|
return TickUntil<CallHandler>([this]() -> Poll<CallHandler> { |
||||||
|
auto handler = PopHandler(); |
||||||
|
if (handler.has_value()) return std::move(*handler); |
||||||
|
return Pending(); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class TestTransport final : public ClientTransport { |
||||||
|
public: |
||||||
|
explicit TestTransport(ConnectedSubchannelTest* test) : test_(test) {} |
||||||
|
|
||||||
|
void Orphan() override { |
||||||
|
state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN, absl::OkStatus(), |
||||||
|
"transport-orphaned"); |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
FilterStackTransport* filter_stack_transport() override { return nullptr; } |
||||||
|
ClientTransport* client_transport() override { return this; } |
||||||
|
ServerTransport* server_transport() override { return nullptr; } |
||||||
|
absl::string_view GetTransportName() const override { return "test"; } |
||||||
|
void SetPollset(grpc_stream*, grpc_pollset*) override {} |
||||||
|
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} |
||||||
|
void PerformOp(grpc_transport_op* op) override { |
||||||
|
LOG(INFO) << "PerformOp: " << grpc_transport_op_string(op); |
||||||
|
if (op->start_connectivity_watch != nullptr) { |
||||||
|
state_tracker_.AddWatcher(op->start_connectivity_watch_state, |
||||||
|
std::move(op->start_connectivity_watch)); |
||||||
|
} |
||||||
|
ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus()); |
||||||
|
} |
||||||
|
|
||||||
|
void StartCall(CallHandler call_handler) override { |
||||||
|
test_->handlers_.push(std::move(call_handler)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
ConnectedSubchannelTest* const test_; |
||||||
|
ConnectivityStateTracker state_tracker_{"test-transport"}; |
||||||
|
}; |
||||||
|
|
||||||
|
class TestConnector final : public SubchannelConnector { |
||||||
|
public: |
||||||
|
explicit TestConnector(ConnectedSubchannelTest* test) : test_(test) {} |
||||||
|
|
||||||
|
void Connect(const Args& args, Result* result, |
||||||
|
grpc_closure* notify) override { |
||||||
|
result->channel_args = args.channel_args; |
||||||
|
result->transport = MakeOrphanable<TestTransport>(test_).release(); |
||||||
|
ExecCtx::Run(DEBUG_LOCATION, notify, absl::OkStatus()); |
||||||
|
} |
||||||
|
|
||||||
|
void Shutdown(grpc_error_handle) override {} |
||||||
|
|
||||||
|
private: |
||||||
|
ConnectedSubchannelTest* const test_; |
||||||
|
}; |
||||||
|
|
||||||
|
ChannelArgs CompleteArgs(const ChannelArgs& args) { |
||||||
|
return args.SetObject(ResourceQuota::Default()) |
||||||
|
.SetObject(std::static_pointer_cast<EventEngine>(event_engine())) |
||||||
|
.SetObject(MakeRefCounted<LocalSubchannelPool>()) |
||||||
|
.Set(GRPC_ARG_DEFAULT_AUTHORITY, kDefaultAuthority); |
||||||
|
} |
||||||
|
|
||||||
|
void InitCoreConfiguration() override {} |
||||||
|
|
||||||
|
void Shutdown() override {} |
||||||
|
|
||||||
|
absl::optional<CallHandler> PopHandler() { |
||||||
|
if (handlers_.empty()) return absl::nullopt; |
||||||
|
auto handler = std::move(handlers_.front()); |
||||||
|
handlers_.pop(); |
||||||
|
return handler; |
||||||
|
} |
||||||
|
|
||||||
|
std::queue<CallHandler> handlers_; |
||||||
|
RefCountedPtr<CallArenaAllocator> call_arena_allocator_ = |
||||||
|
MakeRefCounted<CallArenaAllocator>( |
||||||
|
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||||
|
"test"), |
||||||
|
1024); |
||||||
|
}; |
||||||
|
|
||||||
|
#define CONNECTED_SUBCHANNEL_CHANNEL_TEST(name) \ |
||||||
|
YODEL_TEST(ConnectedSubchannelTest, name) |
||||||
|
|
||||||
|
CONNECTED_SUBCHANNEL_CHANNEL_TEST(NoOp) { InitChannel(ChannelArgs()); } |
||||||
|
|
||||||
|
CONNECTED_SUBCHANNEL_CHANNEL_TEST(StartCall) { |
||||||
|
auto channel = InitChannel(ChannelArgs()); |
||||||
|
auto call = MakeCall(MakeClientInitialMetadata()); |
||||||
|
SpawnTestSeq( |
||||||
|
call.handler, "start-call", [channel, handler = call.handler]() mutable { |
||||||
|
channel->unstarted_call_destination()->StartCall(std::move(handler)); |
||||||
|
return Empty{}; |
||||||
|
}); |
||||||
|
auto handler = TickUntilCallStarted(); |
||||||
|
WaitForAllPendingWork(); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1 @@ |
|||||||
|
|
@ -0,0 +1,202 @@ |
|||||||
|
// Copyright 2024 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#include "src/core/client_channel/load_balanced_call_destination.h" |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include <memory> |
||||||
|
#include <queue> |
||||||
|
|
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
#include "gmock/gmock.h" |
||||||
|
#include "gtest/gtest.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "test/core/call/yodel/yodel_test.h" |
||||||
|
|
||||||
|
using testing::StrictMock; |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
using EventEngine = grpc_event_engine::experimental::EventEngine; |
||||||
|
|
||||||
|
namespace { |
||||||
|
const absl::string_view kTestPath = "/test_method"; |
||||||
|
} // namespace
|
||||||
|
|
||||||
|
class LoadBalancedCallDestinationTest : public YodelTest { |
||||||
|
protected: |
||||||
|
using YodelTest::YodelTest; |
||||||
|
|
||||||
|
ClientMetadataHandle MakeClientInitialMetadata() { |
||||||
|
auto client_initial_metadata = Arena::MakePooled<ClientMetadata>(); |
||||||
|
client_initial_metadata->Set(HttpPathMetadata(), |
||||||
|
Slice::FromCopiedString(kTestPath)); |
||||||
|
return client_initial_metadata; |
||||||
|
} |
||||||
|
|
||||||
|
CallInitiatorAndHandler MakeCall( |
||||||
|
ClientMetadataHandle client_initial_metadata) { |
||||||
|
return MakeCallPair( |
||||||
|
std::move(client_initial_metadata), event_engine().get(), |
||||||
|
call_arena_allocator_->MakeArena(), call_arena_allocator_, nullptr); |
||||||
|
} |
||||||
|
|
||||||
|
CallHandler TickUntilCallStarted() { |
||||||
|
auto poll = [this]() -> Poll<CallHandler> { |
||||||
|
auto handler = call_destination_->PopHandler(); |
||||||
|
if (handler.has_value()) return std::move(*handler); |
||||||
|
return Pending(); |
||||||
|
}; |
||||||
|
return TickUntil(absl::FunctionRef<Poll<CallHandler>()>(poll)); |
||||||
|
} |
||||||
|
|
||||||
|
LoadBalancedCallDestination& destination_under_test() { |
||||||
|
return *destination_under_test_; |
||||||
|
} |
||||||
|
|
||||||
|
ClientChannel::PickerObservable& picker() { return picker_; } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> subchannel() { return subchannel_; } |
||||||
|
|
||||||
|
private: |
||||||
|
class TestCallDestination final : public UnstartedCallDestination { |
||||||
|
public: |
||||||
|
void StartCall(UnstartedCallHandler unstarted_call_handler) override { |
||||||
|
handlers_.push( |
||||||
|
unstarted_call_handler.V2HackToStartCallWithoutACallFilterStack()); |
||||||
|
} |
||||||
|
|
||||||
|
absl::optional<CallHandler> PopHandler() { |
||||||
|
if (handlers_.empty()) return absl::nullopt; |
||||||
|
auto handler = std::move(handlers_.front()); |
||||||
|
handlers_.pop(); |
||||||
|
return handler; |
||||||
|
} |
||||||
|
|
||||||
|
void Orphaned() override {} |
||||||
|
|
||||||
|
private: |
||||||
|
std::queue<CallHandler> handlers_; |
||||||
|
}; |
||||||
|
|
||||||
|
class TestSubchannel : public SubchannelInterfaceWithCallDestination { |
||||||
|
public: |
||||||
|
explicit TestSubchannel( |
||||||
|
RefCountedPtr<UnstartedCallDestination> call_destination) |
||||||
|
: call_destination_(std::move(call_destination)) {} |
||||||
|
|
||||||
|
void WatchConnectivityState( |
||||||
|
std::unique_ptr<ConnectivityStateWatcherInterface>) override { |
||||||
|
Crash("not implemented"); |
||||||
|
} |
||||||
|
void CancelConnectivityStateWatch( |
||||||
|
ConnectivityStateWatcherInterface*) override { |
||||||
|
Crash("not implemented"); |
||||||
|
} |
||||||
|
void RequestConnection() override { Crash("not implemented"); } |
||||||
|
void ResetBackoff() override { Crash("not implemented"); } |
||||||
|
void AddDataWatcher(std::unique_ptr<DataWatcherInterface>) override { |
||||||
|
Crash("not implemented"); |
||||||
|
} |
||||||
|
void CancelDataWatcher(DataWatcherInterface*) override { |
||||||
|
Crash("not implemented"); |
||||||
|
} |
||||||
|
RefCountedPtr<UnstartedCallDestination> call_destination() override { |
||||||
|
return call_destination_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
const RefCountedPtr<UnstartedCallDestination> call_destination_; |
||||||
|
}; |
||||||
|
|
||||||
|
void InitCoreConfiguration() override {} |
||||||
|
|
||||||
|
void Shutdown() override { |
||||||
|
channel_.reset(); |
||||||
|
picker_ = ClientChannel::PickerObservable(nullptr); |
||||||
|
call_destination_.reset(); |
||||||
|
destination_under_test_.reset(); |
||||||
|
call_arena_allocator_.reset(); |
||||||
|
subchannel_.reset(); |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<ClientChannel> channel_; |
||||||
|
ClientChannel::PickerObservable picker_{nullptr}; |
||||||
|
RefCountedPtr<TestCallDestination> call_destination_ = |
||||||
|
MakeRefCounted<TestCallDestination>(); |
||||||
|
RefCountedPtr<LoadBalancedCallDestination> destination_under_test_ = |
||||||
|
MakeRefCounted<LoadBalancedCallDestination>(picker_); |
||||||
|
RefCountedPtr<CallArenaAllocator> call_arena_allocator_ = |
||||||
|
MakeRefCounted<CallArenaAllocator>( |
||||||
|
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||||
|
"test"), |
||||||
|
1024); |
||||||
|
RefCountedPtr<TestSubchannel> subchannel_ = |
||||||
|
MakeRefCounted<TestSubchannel>(call_destination_); |
||||||
|
}; |
||||||
|
|
||||||
|
#define LOAD_BALANCED_CALL_DESTINATION_TEST(name) \ |
||||||
|
YODEL_TEST(LoadBalancedCallDestinationTest, name) |
||||||
|
|
||||||
|
class MockPicker : public LoadBalancingPolicy::SubchannelPicker { |
||||||
|
public: |
||||||
|
MOCK_METHOD(LoadBalancingPolicy::PickResult, Pick, |
||||||
|
(LoadBalancingPolicy::PickArgs)); |
||||||
|
}; |
||||||
|
|
||||||
|
LOAD_BALANCED_CALL_DESTINATION_TEST(NoOp) {} |
||||||
|
|
||||||
|
LOAD_BALANCED_CALL_DESTINATION_TEST(CreateCall) { |
||||||
|
auto call = MakeCall(MakeClientInitialMetadata()); |
||||||
|
SpawnTestSeq( |
||||||
|
call.initiator, "initiator", |
||||||
|
[this, handler = std::move(call.handler)]() { |
||||||
|
destination_under_test().StartCall(handler); |
||||||
|
return Empty{}; |
||||||
|
}, |
||||||
|
[call_initiator = call.initiator]() mutable { |
||||||
|
call_initiator.Cancel(); |
||||||
|
return Empty{}; |
||||||
|
}); |
||||||
|
WaitForAllPendingWork(); |
||||||
|
} |
||||||
|
|
||||||
|
LOAD_BALANCED_CALL_DESTINATION_TEST(StartCall) { |
||||||
|
auto call = MakeCall(MakeClientInitialMetadata()); |
||||||
|
SpawnTestSeq(call.initiator, "initiator", |
||||||
|
[this, handler = std::move(call.handler)]() { |
||||||
|
destination_under_test().StartCall(handler); |
||||||
|
return Empty{}; |
||||||
|
}); |
||||||
|
auto mock_picker = MakeRefCounted<StrictMock<MockPicker>>(); |
||||||
|
EXPECT_CALL(*mock_picker, Pick) |
||||||
|
.WillOnce([this](LoadBalancingPolicy::PickArgs) { |
||||||
|
return LoadBalancingPolicy::PickResult::Complete{subchannel()}; |
||||||
|
}); |
||||||
|
picker().Set(mock_picker); |
||||||
|
auto handler = TickUntilCallStarted(); |
||||||
|
SpawnTestSeq(call.initiator, "cancel", |
||||||
|
[call_initiator = call.initiator]() mutable { |
||||||
|
call_initiator.Cancel(); |
||||||
|
return Empty{}; |
||||||
|
}); |
||||||
|
WaitForAllPendingWork(); |
||||||
|
} |
||||||
|
|
||||||
|
// TODO(roth, ctiller): more tests
|
||||||
|
// - tests for the picker returning queue, fail, and drop results.
|
||||||
|
|
||||||
|
} // namespace grpc_core
|
@ -0,0 +1,90 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2022 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <memory> |
||||||
|
|
||||||
|
#include "absl/types/optional.h" |
||||||
|
#include "gtest/gtest.h" |
||||||
|
|
||||||
|
#include <grpc/impl/channel_arg_names.h> |
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/client_channel/subchannel.h" |
||||||
|
#include "src/core/client_channel/subchannel_pool_interface.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/resolver/endpoint_addresses.h" |
||||||
|
#include "test/core/test_util/test_config.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
namespace testing { |
||||||
|
namespace { |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, UsesChannelDefaultAuthorityByDefault) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs( |
||||||
|
ChannelArgs(), ChannelArgs(), nullptr, "foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "foo.example.com"); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, DefaultAuthorityFromChannelArgs) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs( |
||||||
|
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), |
||||||
|
ChannelArgs(), nullptr, "foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com"); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, DefaultAuthorityFromResolver) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs( |
||||||
|
ChannelArgs(), |
||||||
|
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), nullptr, |
||||||
|
"foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com"); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, |
||||||
|
DefaultAuthorityFromChannelArgsOverridesValueFromResolver) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs( |
||||||
|
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "bar.example.com"), |
||||||
|
ChannelArgs().Set(GRPC_ARG_DEFAULT_AUTHORITY, "baz.example.com"), nullptr, |
||||||
|
"foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_DEFAULT_AUTHORITY), "bar.example.com"); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, ArgsFromChannelTrumpPerAddressArgs) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs(ChannelArgs().Set("foo", 1), |
||||||
|
ChannelArgs().Set("foo", 2), |
||||||
|
nullptr, "foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetInt("foo"), 1); |
||||||
|
} |
||||||
|
|
||||||
|
TEST(MakeSubchannelArgs, StripsOutNoSubchannelArgs) { |
||||||
|
ChannelArgs args = Subchannel::MakeSubchannelArgs( |
||||||
|
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo", 1), |
||||||
|
ChannelArgs().Set(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar", 1), nullptr, |
||||||
|
"foo.example.com"); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_NO_SUBCHANNEL_PREFIX "foo"), absl::nullopt); |
||||||
|
EXPECT_EQ(args.GetString(GRPC_ARG_NO_SUBCHANNEL_PREFIX "bar"), absl::nullopt); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace testing
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
grpc::testing::TestEnvironment env(&argc, argv); |
||||||
|
auto result = RUN_ALL_TESTS(); |
||||||
|
return result; |
||||||
|
} |
Loading…
Reference in new issue