split out lb destination

pull/36732/head
Craig Tiller 11 months ago
parent a5cbcd8c7e
commit bca9cad9f7
  1. 24
      BUILD
  2. 1000
      src/core/client_channel/client_channel.cc
  3. 105
      src/core/client_channel/client_channel.h
  4. 96
      src/core/client_channel/lb_call_tracing_filter.cc
  5. 79
      src/core/client_channel/lb_call_tracing_filter.h
  6. 333
      src/core/client_channel/load_balanced_call_destination.cc
  7. 49
      src/core/client_channel/load_balanced_call_destination.h
  8. 16
      src/core/lib/channel/call_tracer.h
  9. 13
      src/core/lib/channel/context.h
  10. 7
      src/core/lib/promise/context.h

24
BUILD

@ -3646,6 +3646,7 @@ grpc_cc_library(
"//src/core:client_channel/client_channel_plugin.cc",
"//src/core:client_channel/dynamic_filters.cc",
"//src/core:client_channel/global_subchannel_pool.cc",
"//src/core:client_channel/load_balanced_call_destination.cc",
"//src/core:client_channel/local_subchannel_pool.cc",
"//src/core:client_channel/retry_filter.cc",
"//src/core:client_channel/retry_filter_legacy_call_data.cc",
@ -3658,6 +3659,7 @@ grpc_cc_library(
"//src/core:client_channel/client_channel_filter.h",
"//src/core:client_channel/dynamic_filters.h",
"//src/core:client_channel/global_subchannel_pool.h",
"//src/core:client_channel/load_balanced_call_destination.h",
"//src/core:client_channel/local_subchannel_pool.h",
"//src/core:client_channel/retry_filter.h",
"//src/core:client_channel/retry_filter_legacy_call_data.h",
@ -3678,14 +3680,10 @@ grpc_cc_library(
"absl/strings:cord",
"absl/types:optional",
"absl/types:variant",
"@com_google_protobuf//upb:base",
"@com_google_protobuf//upb:mem",
"@com_google_protobuf//upb:message",
],
language = "c++",
visibility = ["@grpc:client_channel"],
deps = [
"api_trace",
"backoff",
"call_combiner",
"call_tracer",
@ -3693,7 +3691,6 @@ grpc_cc_library(
"channel_arg_names",
"channelz",
"config",
"config_vars",
"debug_location",
"endpoint_addresses",
"exec_ctx",
@ -3704,26 +3701,21 @@ grpc_cc_library(
"grpc_security_base",
"grpc_service_config_impl",
"grpc_trace",
"http_connect_handshaker",
"iomgr",
"iomgr_timer",
"lb_child_policy_handler",
"legacy_context",
"orphanable",
"parse_address",
"promise",
"protobuf_duration_upb",
"ref_counted_ptr",
"sockaddr_utils",
"stats",
"uri_parser",
"work_serializer",
"xds_orca_service_upb",
"xds_orca_upb",
"//src/core:activity",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:backend_metric_parser",
"//src/core:call_destination",
"//src/core:call_filters",
"//src/core:call_spine",
"//src/core:cancel_callback",
@ -3739,26 +3731,19 @@ grpc_cc_library(
"//src/core:connectivity_state",
"//src/core:construct_destruct",
"//src/core:context",
"//src/core:delegating_helper",
"//src/core:dual_ref_counted",
"//src/core:env",
"//src/core:error",
"//src/core:error_utils",
"//src/core:exec_ctx_wakeup_scheduler",
"//src/core:experiments",
"//src/core:gpr_atm",
"//src/core:gpr_manual_constructor",
"//src/core:grpc_backend_metric_data",
"//src/core:grpc_channel_idle_filter",
"//src/core:grpc_message_size_filter",
"//src/core:grpc_service_config",
"//src/core:idle_filter_state",
"//src/core:init_internally",
"//src/core:iomgr_fwd",
"//src/core:json",
"//src/core:json_args",
"//src/core:json_channel_args",
"//src/core:json_object_loader",
"//src/core:latch",
"//src/core:lb_policy",
"//src/core:lb_policy_registry",
@ -3772,7 +3757,6 @@ grpc_cc_library(
"//src/core:pipe",
"//src/core:poll",
"//src/core:pollset_set",
"//src/core:proxy_mapper",
"//src/core:proxy_mapper_registry",
"//src/core:ref_counted",
"//src/core:resolved_address",
@ -3780,7 +3764,6 @@ grpc_cc_library(
"//src/core:retry_service_config",
"//src/core:retry_throttle",
"//src/core:seq",
"//src/core:service_config_parser",
"//src/core:single_set_ptr",
"//src/core:sleep",
"//src/core:slice",
@ -3796,7 +3779,6 @@ grpc_cc_library(
"//src/core:try_seq",
"//src/core:unique_type_name",
"//src/core:useful",
"//src/core:validation_errors",
],
)

File diff suppressed because it is too large Load Diff

@ -22,6 +22,7 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "subchannel.h"
#include "src/core/client_channel/client_channel_factory.h"
#include "src/core/client_channel/config_selector.h"
@ -29,18 +30,101 @@
#include "src/core/ext/filters/channel_idle/idle_filter_state.h"
#include "src/core/lib/gprpp/single_set_ptr.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/observable.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/call_filters.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/resolver/resolver.h"
#include "src/core/service_config/service_config.h"
#include "src/core/lib/promise/observable.h"
namespace grpc_core {
class ClientChannel : public Channel {
public:
// This class is a wrapper for Subchannel that hides details of the
// channel's implementation (such as the connected subchannel) from the
// LB policy API.
//
// Note that no synchronization is needed here, because even if the
// underlying subchannel is shared between channels, this wrapper will only
// be used within one channel, so it will always be synchronized by the
// control plane work_serializer.
class SubchannelWrapper : public SubchannelInterface {
public:
SubchannelWrapper(RefCountedPtr<ClientChannel> client_channel,
RefCountedPtr<Subchannel> subchannel);
~SubchannelWrapper() override;
void Orphaned() override;
void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
void CancelConnectivityStateWatch(
ConnectivityStateWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
return subchannel_->connected_subchannel();
}
void RequestConnection() override { subchannel_->RequestConnection(); }
void ResetBackoff() override { subchannel_->ResetBackoff(); }
void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
void CancelDataWatcher(DataWatcherInterface* watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
void ThrottleKeepaliveTime(int new_keepalive_time);
private:
class WatcherWrapper;
// A heterogenous lookup comparator for data watchers that allows
// unique_ptr keys to be looked up as raw pointers.
struct DataWatcherLessThan {
using is_transparent = void;
bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
const std::unique_ptr<DataWatcherInterface>& p2) const {
return p1 < p2;
}
bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
const DataWatcherInterface* p2) const {
return p1.get() < p2;
}
bool operator()(const DataWatcherInterface* p1,
const std::unique_ptr<DataWatcherInterface>& p2) const {
return p1 < p2.get();
}
};
RefCountedPtr<ClientChannel> client_channel_;
RefCountedPtr<Subchannel> subchannel_;
// Maps from the address of the watcher passed to us by the LB policy
// to the address of the WrapperWatcher that we passed to the underlying
// subchannel. This is needed so that when the LB policy calls
// CancelConnectivityStateWatch() with its watcher, we know the
// corresponding WrapperWatcher to cancel on the underlying subchannel.
std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
ABSL_GUARDED_BY(*client_channel_->work_serializer_);
std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
data_watchers_ ABSL_GUARDED_BY(*client_channel_->work_serializer_);
};
using PickerObservable =
Observable<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>;
class CallDestinationFactory {
public:
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return "grpc.internal.client_channel_call_destination"; }
virtual RefCountedPtr<UnstartedCallDestination> CreateCallDestination(
PickerObservable) = 0;
};
static absl::StatusOr<OrphanablePtr<Channel>> Create(
std::string target, ChannelArgs channel_args,
grpc_channel_stack_type channel_stack_type);
@ -49,7 +133,8 @@ class ClientChannel : public Channel {
ClientChannel(std::string target_uri, ChannelArgs args,
std::string uri_to_resolve,
RefCountedPtr<ServiceConfig> default_service_config,
ClientChannelFactory* client_channel_factory);
ClientChannelFactory* client_channel_factory,
CallDestinationFactory* call_destination_factory);
~ClientChannel() override;
@ -106,8 +191,6 @@ class ClientChannel : public Channel {
private:
class ResolverResultHandler;
class ClientChannelControlHelper;
class SubchannelWrapper;
class LoadBalancedCallDestination;
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void DestroyResolverAndLbPolicyLocked()
@ -156,17 +239,6 @@ class ClientChannel : public Channel {
ConfigSelector& config_selector,
ClientMetadata& client_initial_metadata) const;
// Does an LB pick for a call. Returns one of the following things:
// - Continue{}, meaning to queue the pick
// - a non-OK status, meaning to fail the call
// - a connected subchannel, meaning that the pick is complete
// When the pick is complete, pushes client_initial_metadata onto
// call_initiator. Also adds the subchannel call tracker (if any) to
// context.
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> PickSubchannel(
LoadBalancingPolicy::SubchannelPicker& picker,
UnstartedCallHandler& unstarted_handler);
const ChannelArgs channel_args_;
const std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine_;
@ -197,7 +269,8 @@ class ClientChannel : public Channel {
//
// Fields related to LB picks.
//
Observable<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> picker_;
PickerObservable picker_;
const RefCountedPtr<UnstartedCallDestination> call_destination_;
//
// Fields used in the control plane. Guarded by work_serializer.

@ -0,0 +1,96 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/client_channel/lb_call_tracing_filter.h"
#include "lb_call_tracing_filter.h"
namespace grpc_core {
const NoInterceptor LbCallTracingFilter::Call::OnClientToServerMessage;
const NoInterceptor LbCallTracingFilter::Call::OnServerToClientMessage;
void LbCallTracingFilter::Call::OnClientInitialMetadata(
ClientMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordSendInitialMetadata(&metadata);
}
void LbCallTracingFilter::Call::OnServerInitialMetadata(
ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordReceivedInitialMetadata(&metadata);
// Save peer string for later use.
Slice* peer_string = metadata.get_pointer(PeerString());
if (peer_string != nullptr) peer_string_ = peer_string->Ref();
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
void LbCallTracingFilter::Call::OnClientToServerHalfClose() {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
// TODO(roth): Change CallTracer API to not pass metadata
// batch to this method, since the batch is always empty.
grpc_metadata_batch metadata;
tracer->RecordSendTrailingMetadata(&metadata);
}
void LbCallTracingFilter::Call::OnServerTrailingMetadata(
ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
auto* call_tracker =
GetContext<LoadBalancingPolicy::SubchannelCallTrackerInterface*>();
absl::Status status;
if (tracer != nullptr ||
(call_tracker != nullptr && *call_tracker != nullptr)) {
grpc_status_code code =
metadata.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
if (code != GRPC_STATUS_OK) {
absl::string_view message;
if (const auto* grpc_message =
metadata.get_pointer(GrpcMessageMetadata())) {
message = grpc_message->as_string_view();
}
status = absl::Status(static_cast<absl::StatusCode>(code), message);
}
}
if (tracer != nullptr) {
tracer->RecordReceivedTrailingMetadata(
status, &metadata,
&GetContext<CallContext>()->call_stats()->transport_stream_stats);
}
if (call_tracker != nullptr && *call_tracker != nullptr) {
LbMetadata lb_metadata(&metadata);
BackendMetricAccessor backend_metric_accessor(&metadata);
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string_.as_string_view(), status, &lb_metadata,
&backend_metric_accessor};
(*call_tracker)->Finish(args);
delete *call_tracker;
}
}
void LbCallTracingFilter::Call::OnFinalize(const grpc_call_final_info*) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
tracer->RecordEnd(latency);
}
} // namespace grpc_core

@ -0,0 +1,79 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LB_CALL_TRACING_FILTER_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_LB_CALL_TRACING_FILTER_H
namespace grpc_core {
// A filter to handle updating with the call tracer and LB subchannel
// call tracker inside the LB call.
// FIXME: register only when call v3 experiment is enabled
class LbCallTracingFilter {
public:
static absl::StatusOr<LbCallTracingFilter> Create(const ChannelArgs&,
ChannelFilter::Args) {
return LbCallTracingFilter();
}
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& metadata);
void OnServerInitialMetadata(ServerMetadata& metadata);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
void OnClientToServerHalfClose();
void OnServerTrailingMetadata(ServerMetadata& metadata);
void OnFinalize(const grpc_call_final_info*);
private:
// Interface for accessing backend metric data in the LB call tracker.
class BackendMetricAccessor
: public LoadBalancingPolicy::BackendMetricAccessor {
public:
explicit BackendMetricAccessor(
grpc_metadata_batch* server_trailing_metadata)
: server_trailing_metadata_(server_trailing_metadata) {}
~BackendMetricAccessor() override;
const BackendMetricData* GetBackendMetricData() override;
private:
class BackendMetricAllocator : public BackendMetricAllocatorInterface {
public:
BackendMetricData* AllocateBackendMetricData() override {
return GetContext<Arena>()->New<BackendMetricData>();
}
char* AllocateString(size_t size) override {
return static_cast<char*>(GetContext<Arena>()->Alloc(size));
}
};
grpc_metadata_batch* server_trailing_metadata_;
const BackendMetricData* backend_metric_data_ = nullptr;
};
// FIXME: this isn't the right place to measure this from -- should be
// doing it from before we do the LB pick
gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
Slice peer_string_;
};
};
} // namespace grpc_core
#endif

@ -0,0 +1,333 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/client_channel/load_balanced_call_destination.h"
#include "src/core/client_channel/client_channel_internal.h"
#include "src/core/client_channel/subchannel.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/status_util.h"
namespace grpc_core {
// Defined in legacy client channel filter.
// TODO(roth): Move these here when we remove the legacy filter.
extern TraceFlag grpc_client_channel_trace;
extern TraceFlag grpc_client_channel_call_trace;
extern TraceFlag grpc_client_channel_lb_call_trace;
namespace {
class LbMetadata : public LoadBalancingPolicy::MetadataInterface {
public:
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {}
void Add(absl::string_view key, absl::string_view value) override {
if (batch_ == nullptr) return;
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
batch_->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
return;
}
batch_->Append(key, Slice::FromStaticString(value),
[key](absl::string_view error, const Slice& value) {
gpr_log(GPR_ERROR, "%s",
absl::StrCat(error, " key:", key,
" value:", value.as_string_view())
.c_str());
});
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
private:
class Encoder {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
grpc_metadata_batch* batch_;
};
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
auto* call_tracer = GetContext<ClientCallTracer>();
if (call_tracer == nullptr) return;
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
SetContext<CallTracerInterface>(tracer);
}
class LbCallState : public ClientChannelLbCallState {
public:
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); }
// Internal API to allow first-party LB policies to access per-call
// attributes set by the ConfigSelector.
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
UniqueTypeName type) const override {
auto* service_config_call_data = GetContext<ServiceConfigCallData>();
return service_config_call_data->GetCallAttribute(type);
}
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientCallTracer::CallAttemptTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER].value);
}
};
// TODO(roth): Remove this in favor of the gprpp Match() function once
// we can do that without breaking lock annotations.
template <typename T>
T HandlePickResult(
LoadBalancingPolicy::PickResult* result,
std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
auto* complete_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
if (complete_pick != nullptr) {
return complete_func(complete_pick);
}
auto* queue_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
if (queue_pick != nullptr) {
return queue_func(queue_pick);
}
auto* fail_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
if (fail_pick != nullptr) {
return fail_func(fail_pick);
}
auto* drop_pick =
absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
GPR_ASSERT(drop_pick != nullptr);
return drop_func(drop_pick);
}
// Does an LB pick for a call. Returns one of the following things:
// - Continue{}, meaning to queue the pick
// - a non-OK status, meaning to fail the call
// - a connected subchannel, meaning that the pick is complete
// When the pick is complete, pushes client_initial_metadata onto
// call_initiator. Also adds the subchannel call tracker (if any) to
// context.
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> PickSubchannel(
LoadBalancingPolicy::SubchannelPicker& picker,
UnstartedCallHandler& unstarted_handler) {
// Perform LB pick.
auto& client_initial_metadata =
unstarted_handler.UnprocessedClientInitialMetadata();
LoadBalancingPolicy::PickArgs pick_args;
Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
pick_args.path = path->as_string_view();
LbCallState lb_call_state;
pick_args.call_state = &lb_call_state;
LbMetadata initial_metadata(&client_initial_metadata);
pick_args.initial_metadata = &initial_metadata;
auto result = picker.Pick(pick_args);
// Handle result.
return HandlePickResult<
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>>>(
&result,
// CompletePick
[&](LoadBalancingPolicy::PickResult::Complete* complete_pick)
-> LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"client_channel: %sLB pick succeeded: subchannel=%p",
GetContext<Activity>()->DebugTag().c_str(),
complete_pick->subchannel.get());
}
GPR_ASSERT(complete_pick->subchannel != nullptr);
// Grab a ref to the connected subchannel while we're still
// holding the data plane mutex.
auto* subchannel = DownCast<ClientChannel::SubchannelWrapper*>(
complete_pick->subchannel.get());
auto connected_subchannel = subchannel->connected_subchannel();
// If the subchannel has no connected subchannel (e.g., if the
// subchannel has moved out of state READY but the LB policy hasn't
// yet seen that change and given us a new picker), then just
// queue the pick. We'll try again as soon as we get a new picker.
if (connected_subchannel == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"client_channel: %ssubchannel returned by LB picker "
"has no connected subchannel; queueing pick",
GetContext<Activity>()->DebugTag().c_str());
}
return Continue{};
}
// If the LB policy returned a call tracker, inform it that the
// call is starting and add it to context, so that we can notify
// it when the call finishes.
if (complete_pick->subchannel_call_tracker != nullptr) {
complete_pick->subchannel_call_tracker->Start();
SetContext(complete_pick->subchannel_call_tracker.release());
}
// Return the connected subchannel.
return connected_subchannel;
},
// QueuePick
[&](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "client_channel: %sLB pick queued",
GetContext<Activity>()->DebugTag().c_str());
}
return Continue{};
},
// FailPick
[&](LoadBalancingPolicy::PickResult::Fail* fail_pick)
-> LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "client_channel: %sLB pick failed: %s",
GetContext<Activity>()->DebugTag().c_str(),
fail_pick->status.ToString().c_str());
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if (!unstarted_handler.UnprocessedClientInitialMetadata()
.GetOrCreatePointer(WaitForReady())
->value) {
return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status),
"LB pick");
}
// If wait_for_ready is true, then queue to retry when we get a new
// picker.
return Continue{};
},
// DropPick
[&](LoadBalancingPolicy::PickResult::Drop* drop_pick)
-> LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "client_channel: %sLB pick dropped: %s",
GetContext<Activity>()->DebugTag().c_str(),
drop_pick->status.ToString().c_str());
}
return grpc_error_set_int(MaybeRewriteIllegalStatusCode(
std::move(drop_pick->status), "LB drop"),
StatusIntProperty::kLbPolicyDrop, 1);
});
}
} // namespace
void LoadBalancedCallDestination::StartCall(
UnstartedCallHandler unstarted_handler) {
// If there is a call tracer, create a call attempt tracer.
bool* is_transparent_retry_metadata =
unstarted_handler.UnprocessedClientInitialMetadata().get_pointer(
IsTransparentRetry());
bool is_transparent_retry = is_transparent_retry_metadata != nullptr
? *is_transparent_retry_metadata
: false;
MaybeCreateCallAttemptTracer(is_transparent_retry);
// Spawn a promise to do the LB pick.
// This will eventually start the call.
unstarted_handler.SpawnGuarded("lb_pick", [was_queued = true,
unstarted_handler =
std::move(unstarted_handler),
picker = picker_]() mutable {
return Map(
// Wait for the LB picker.
Loop([last_picker =
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(),
unstarted_handler, &was_queued, picker]() mutable {
return Map(picker.Next(last_picker),
[unstarted_handler, &last_picker, &was_queued](
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
picker) mutable {
last_picker = std::move(picker);
// Returns 3 possible things:
// - Continue to queue the pick
// - non-OK status to fail the pick
// - a connected subchannel to complete the pick
auto result =
PickSubchannel(*last_picker, unstarted_handler);
if (absl::holds_alternative<Continue>(result)) {
was_queued = true;
}
return result;
});
}),
// Create call stack on the connected subchannel.
[unstarted_handler = std::move(unstarted_handler),
&was_queued](absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>
connected_subchannel) {
if (!connected_subchannel.ok()) {
return connected_subchannel.status();
}
// LB pick is done, so indicate that we've committed.
auto* on_commit = GetContext<LbOnCommit>();
if (on_commit != nullptr && *on_commit != nullptr) {
(*on_commit)();
}
// If it was queued, add a trace annotation.
if (was_queued) {
auto* tracer =
MaybeGetContext<ClientCallTracer::CallAttemptTracer>();
if (tracer != nullptr) {
tracer->RecordAnnotation("Delayed LB pick complete.");
}
}
// Delegate to connected subchannel.
// FIXME: need to insert LbCallTracingFilter at the top of the
// stack
(*connected_subchannel)->StartCall(std::move(unstarted_handler));
return absl::OkStatus();
});
});
}
} // namespace grpc_core

@ -0,0 +1,49 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H
#define GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H
#include "absl/functional/any_invocable.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/transport/call_destination.h"
#include "src/core/load_balancing/lb_policy.h"
#include "src/core/client_channel/client_channel.h"
namespace grpc_core {
// Context type for LB on_commit callback.
// TODO(ctiller): make this a struct, so we don't accidentally alias context
// types
using LbOnCommit = absl::AnyInvocable<void()>;
template <>
struct ContextType<LbOnCommit> {};
class LoadBalancedCallDestination final : public UnstartedCallDestination {
public:
explicit LoadBalancedCallDestination(ClientChannel::PickerObservable picker)
: picker_(std::move(picker)) {}
void Orphaned() override {}
void StartCall(UnstartedCallHandler unstarted_handler) override;
private:
ClientChannel::PickerObservable picker_;
};
} // namespace grpc_core
#endif

@ -34,6 +34,7 @@
#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/call_final_info.h"
@ -221,6 +222,21 @@ void AddClientCallTracerToContext(grpc_call_context_element* call_context,
void AddServerCallTracerToContext(grpc_call_context_element* call_context,
ServerCallTracer* tracer);
template <>
struct ContextSubclass<ClientCallTracer::CallAttemptTracer> {
using Base = CallTracerInterface;
};
template <>
struct ContextSubclass<ServerCallTracer> {
using Base = CallTracerInterface;
};
template <>
struct ContextSubclass<ClientCallTracer> {
using Base = CallTracerAnnotationInterface;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_CHANNEL_CALL_TRACER_H

@ -76,6 +76,8 @@ struct grpc_call_context_element {
namespace grpc_core {
class Call;
class CallTracerAnnotationInterface;
class CallTracerInterface;
class ServiceConfigCallData;
// Bind the legacy context array into the new style structure
// TODO(ctiller): remove as we migrate these contexts to the new system.
@ -99,6 +101,17 @@ struct OldStyleContext<CallTracerAnnotationInterface> {
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE;
};
template <>
struct OldStyleContext<CallTracerInterface> {
static constexpr grpc_context_index kIndex = GRPC_CONTEXT_CALL_TRACER;
};
template <>
struct OldStyleContext<ServiceConfigCallData> {
static constexpr grpc_context_index kIndex =
GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA;
};
template <typename T>
class Context<T, absl::void_t<decltype(OldStyleContext<T>::kIndex)>> {
public:

@ -119,6 +119,13 @@ T* GetContext() {
return p;
}
// Retrieve the current value of a context, or return nullptr if the value is
// unset.
template <typename T>
T* MaybeGetContext() {
return promise_detail::Context<T>::get();
}
template <typename T>
void SetContext(T* p) {
promise_detail::Context<T>::set(p);

Loading…
Cancel
Save