|
|
|
@ -107,6 +107,8 @@ |
|
|
|
|
#include "src/core/lib/transport/error_utils.h" |
|
|
|
|
#include "src/core/lib/transport/metadata_batch.h" |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/channel/promise_based_filter.h" |
|
|
|
|
#include "src/core/lib/promise/context.h" |
|
|
|
|
#include "src/core/lib/transport/call_spine.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
@ -1513,18 +1515,6 @@ CallInitiator ClientChannel::CreateCall( |
|
|
|
|
CheckConnectivityState(/*try_to_connect=*/true); |
|
|
|
|
// Create an initiator/handler pair.
|
|
|
|
|
auto call = MakeCall(GetContext<EventEngine>(), arena); |
|
|
|
|
// FIXME: handle adding annotation if we were queued waiting for the
|
|
|
|
|
// resolver result
|
|
|
|
|
#if 0 |
|
|
|
|
// If the call was queued, add trace annotation.
|
|
|
|
|
if (was_queued) { |
|
|
|
|
auto* call_tracer = static_cast<CallTracerAnnotationInterface*>( |
|
|
|
|
call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); |
|
|
|
|
if (call_tracer != nullptr) { |
|
|
|
|
call_tracer->RecordAnnotation("Delayed name resolution complete."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
// Spawn a promise to wait for the resolver result.
|
|
|
|
|
// This will eventually start using the handler, which will allow the
|
|
|
|
|
// initiator to make progress.
|
|
|
|
@ -1533,24 +1523,29 @@ CallInitiator ClientChannel::CreateCall( |
|
|
|
|
[self = RefAsSubclass<ClientChannel>(), |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata), |
|
|
|
|
initiator = call.initiator, |
|
|
|
|
handler = std::move(call.handler)]() mutable { |
|
|
|
|
handler = std::move(call.handler), was_queued = false]() mutable { |
|
|
|
|
const bool wait_for_ready = |
|
|
|
|
client_initial_metadata->GetOrCreatePointer(WaitForReady())->value; |
|
|
|
|
return Map( |
|
|
|
|
// Wait for the resolver result.
|
|
|
|
|
resolver_data_for_calls_.NextWhen( |
|
|
|
|
[wait_for_ready]( |
|
|
|
|
[wait_for_ready, &was_queued]( |
|
|
|
|
const absl::StatusOr<ResolverDataForCalls> result) { |
|
|
|
|
bool got_result = false; |
|
|
|
|
// If the resolver reports an error but the call is
|
|
|
|
|
// wait_for_ready, keep waiting for the next result
|
|
|
|
|
// instead of failing the call.
|
|
|
|
|
if (!result.ok()) return !wait_for_ready; |
|
|
|
|
// Not an error. Make sure we actually have a result.
|
|
|
|
|
return *result != nullptr; |
|
|
|
|
if (!result.ok()) { |
|
|
|
|
got_result = !wait_for_ready; |
|
|
|
|
} else { |
|
|
|
|
// Not an error. Make sure we actually have a result.
|
|
|
|
|
got_result = *result != nullptr; |
|
|
|
|
} |
|
|
|
|
if (!got_result) was_queued = true; |
|
|
|
|
return got_result; |
|
|
|
|
}), |
|
|
|
|
// Handle resolver result.
|
|
|
|
|
[self, initiator = std::move(initiator), |
|
|
|
|
handler = std::move(handler), |
|
|
|
|
[self, &was_queued, &initiator, &handler, |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata)]( |
|
|
|
|
ResolverDataForCalls resolver_data) mutable { |
|
|
|
|
// Apply service config to call.
|
|
|
|
@ -1558,9 +1553,21 @@ CallInitiator ClientChannel::CreateCall( |
|
|
|
|
self.get(), *resolver_data.config_selector, |
|
|
|
|
client_initial_metadata); |
|
|
|
|
if (!status.ok()) return status; |
|
|
|
|
// If the call was queued, add trace annotation.
|
|
|
|
|
if (was_queued) { |
|
|
|
|
auto* legacy_context = |
|
|
|
|
GetContext<grpc_call_context_element>(); |
|
|
|
|
auto* call_tracer = |
|
|
|
|
static_cast<CallTracerAnnotationInterface*>( |
|
|
|
|
legacy_context[ |
|
|
|
|
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE] |
|
|
|
|
.value); |
|
|
|
|
if (call_tracer != nullptr) { |
|
|
|
|
call_tracer->RecordAnnotation( |
|
|
|
|
"Delayed name resolution complete."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Now inject initial metadata into the call.
|
|
|
|
|
// FIXME: how do I chain this such that it doesn't need another call to
|
|
|
|
|
// SpawnGuarded()?
|
|
|
|
|
initiator.SpawnGuarded( |
|
|
|
|
"send_initial_metadata", |
|
|
|
|
[initiator, client_initial_metadata = |
|
|
|
@ -1579,114 +1586,9 @@ CallInitiator ClientChannel::CreateCall( |
|
|
|
|
return call.initiator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
CallInitiator ClientChannel::CreateLoadBalancedCall( |
|
|
|
|
ClientMetadataHandle client_initial_metadata, |
|
|
|
|
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) { |
|
|
|
|
// If there is a call tracer, record events.
|
|
|
|
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
|
|
|
|
auto* call_attempt_tracer = static_cast<ClientCallTracer::CallAttemptTracer*>( |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
|
|
|
if (call_attempt_tracer != nullptr) { |
|
|
|
|
call_attempt_tracer->RecordSendInitialMetadata( |
|
|
|
|
client_initial_metadata.get()); |
|
|
|
|
// FIXME: call call_attempt_tracer->RecordSendTrailingMetadata() when we
|
|
|
|
|
// send the half close (when we're done sending messages?)
|
|
|
|
|
} |
|
|
|
|
// FIXME: extract peer name from server initial metadata
|
|
|
|
|
// Create an arena.
|
|
|
|
|
const size_t initial_size = lb_call_size_estimator_.CallSizeEstimate(); |
|
|
|
|
// FIXME: do we want to do this for LB calls, or do we want a separate stat for this?
|
|
|
|
|
//global_stats().IncrementCallInitialSize(initial_size);
|
|
|
|
|
Arena* arena = Arena::Create(initial_size, &lb_call_allocator_); |
|
|
|
|
// Create an initiator/handler pair using the arena.
|
|
|
|
|
// FIXME: pass in a callback that the CallSpine will use to destroy the arena:
|
|
|
|
|
// [](Arena* arena) {
|
|
|
|
|
// lb_call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
|
|
|
|
|
// arena->Destroy();
|
|
|
|
|
// }
|
|
|
|
|
auto call = MakeCall(GetContext<EventEngine>(), arena); |
|
|
|
|
// FIXME: handle adding annotation if we were queued waiting for the
|
|
|
|
|
// LB pick
|
|
|
|
|
#if 0 |
|
|
|
|
// If it was queued, add a trace annotation.
|
|
|
|
|
if (was_queued && call_attempt_tracer() != nullptr) { |
|
|
|
|
call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete."); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
// FIXME: invoke the on_commit callback when LB pick is complete
|
|
|
|
|
// Spawn a promise to do the LB pick.
|
|
|
|
|
// This will eventually start using the handler, which will allow the
|
|
|
|
|
// initiator to make progress.
|
|
|
|
|
call.initiator.SpawnGuarded( |
|
|
|
|
"lb_pick", |
|
|
|
|
[self = RefAsSubclass<ClientChannel>(), |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata), |
|
|
|
|
initiator = call.initiator, |
|
|
|
|
handler = std::move(call.handler)]() mutable { |
|
|
|
|
return Map( |
|
|
|
|
// Wait for the LB picker.
|
|
|
|
|
Loop([last_picker = |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(), |
|
|
|
|
client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata), |
|
|
|
|
initiator, handler]() mutable { |
|
|
|
|
return Map( |
|
|
|
|
picker_.Next(last_picker), |
|
|
|
|
[&last_picker, &client_initial_metadata, &initiator]( |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
|
|
|
|
picker) mutable { |
|
|
|
|
last_picker = std::move(picker); |
|
|
|
|
// Returns 3 possible things:
|
|
|
|
|
// - Continue to queue the pick
|
|
|
|
|
// - non-OK status to fail the pick
|
|
|
|
|
// - a connected subchannel to complete the pick
|
|
|
|
|
return PickSubchannel( |
|
|
|
|
*last_picker, client_initial_metadata, initiator); |
|
|
|
|
}); |
|
|
|
|
}), |
|
|
|
|
// Create call stack on the connected subchannel.
|
|
|
|
|
[handler = std::move(handler)]( |
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { |
|
|
|
|
handler.SetStack(connected_subchannel->GetStack()); |
|
|
|
|
connected_subchannel->call_destination_->StartCall( |
|
|
|
|
std::move(handler)); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
// Return the initiator.
|
|
|
|
|
return call.initiator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ClientChannel::LbCallState
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
class ClientChannel::LbCallState : public ClientChannelLbCallState { |
|
|
|
|
public: |
|
|
|
|
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); } |
|
|
|
|
|
|
|
|
|
// Internal API to allow first-party LB policies to access per-call
|
|
|
|
|
// attributes set by the ConfigSelector.
|
|
|
|
|
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( |
|
|
|
|
UniqueTypeName type) const override { |
|
|
|
|
auto* service_config_call_data = GetServiceConfigCallDataFromContext(); |
|
|
|
|
return service_config_call_data->GetCallAttribute(type); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override { |
|
|
|
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
|
|
|
|
return static_cast<ClientCallAttemptTracer*>( |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ClientChannel::LbMetadata
|
|
|
|
|
//
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
class ClientChannel::LbMetadata |
|
|
|
|
: public LoadBalancingPolicy::MetadataInterface { |
|
|
|
|
class LbMetadata : public LoadBalancingPolicy::MetadataInterface { |
|
|
|
|
public: |
|
|
|
|
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} |
|
|
|
|
|
|
|
|
@ -1756,115 +1658,248 @@ class ClientChannel::LbMetadata |
|
|
|
|
grpc_metadata_batch* batch_; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ClientChannel::LoadBalancedCall::BackendMetricAccessor
|
|
|
|
|
//
|
|
|
|
|
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) { |
|
|
|
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
|
|
|
|
auto* call_tracer = static_cast<ClientCallTracer*>( |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); |
|
|
|
|
if (call_tracer == nullptr) return; |
|
|
|
|
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value = tracer; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// FIXME: move this to a filter
|
|
|
|
|
class ClientChannel::LoadBalancedCall::BackendMetricAccessor |
|
|
|
|
: public LoadBalancingPolicy::BackendMetricAccessor { |
|
|
|
|
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracerFromContext() { |
|
|
|
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
|
|
|
|
return static_cast<ClientCallTracer::CallAttemptTracer*>( |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Context type for subchannel call tracker.
|
|
|
|
|
template <> |
|
|
|
|
struct ContextType<LoadBalancingPolicy::SubchannelCallTracker*> {}; |
|
|
|
|
|
|
|
|
|
// A filter to handle updating with the call tracer and LB subchannel
|
|
|
|
|
// call tracker inside the LB call.
|
|
|
|
|
class LbCallTracingFilter : public ImplementChannelFilter<LbCallTracingFilter> { |
|
|
|
|
public: |
|
|
|
|
BackendMetricAccessor(LoadBalancedCall* lb_call, |
|
|
|
|
grpc_metadata_batch* recv_trailing_metadata) |
|
|
|
|
: lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {} |
|
|
|
|
|
|
|
|
|
const BackendMetricData* GetBackendMetricData() override { |
|
|
|
|
if (lb_call_->backend_metric_data_ == nullptr && |
|
|
|
|
recv_trailing_metadata_ != nullptr) { |
|
|
|
|
if (const auto* md = recv_trailing_metadata_->get_pointer( |
|
|
|
|
EndpointLoadMetricsBinMetadata())) { |
|
|
|
|
BackendMetricAllocator allocator(lb_call_->arena()); |
|
|
|
|
lb_call_->backend_metric_data_ = |
|
|
|
|
ParseBackendMetricData(md->as_string_view(), &allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return lb_call_->backend_metric_data_; |
|
|
|
|
static absl::StatusOr<LbCallTracingFilter> Create(const ChannelArgs&, |
|
|
|
|
ChannelFilter::Args) { |
|
|
|
|
return LbCallTracingFilter(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class BackendMetricAllocator : public BackendMetricAllocatorInterface { |
|
|
|
|
class Call { |
|
|
|
|
public: |
|
|
|
|
BackendMetricData* AllocateBackendMetricData() override { |
|
|
|
|
return GetContext<Arena>()->New<BackendMetricData>(); |
|
|
|
|
void OnClientInitialMetadata(ClientMetadata& metadata) { |
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
if (tracer == nullptr) return; |
|
|
|
|
tracer->RecordSendInitialMetadata(metadata.get()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
char* AllocateString(size_t size) override { |
|
|
|
|
return static_cast<char*>(GetContext<Arena>()->Alloc(size)); |
|
|
|
|
void OnServerInitialMetadata(ServerMetadata& metadata) { |
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
if (tracer == nullptr) return; |
|
|
|
|
tracer->RecordReceivedInitialMetadata(metadata.get()); |
|
|
|
|
// Save peer string for later use.
|
|
|
|
|
Slice* peer_string = metadata->get_pointer(PeerString()); |
|
|
|
|
if (peer_string != nullptr) peer_string_ = peer_string->Ref(); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
LoadBalancedCall* lb_call_; |
|
|
|
|
grpc_metadata_batch* recv_trailing_metadata_; |
|
|
|
|
}; |
|
|
|
|
static const NoInterceptor OnClientToServerMessage; |
|
|
|
|
static const NoInterceptor OnServerToClientMessage; |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ClientChannel::LoadBalancedCall
|
|
|
|
|
//
|
|
|
|
|
void OnClientToServerMessagesClosed() { |
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
if (tracer == nullptr) return; |
|
|
|
|
// TODO(roth): Change CallTracer API to not pass metadata
|
|
|
|
|
// batch to this method, since the batch is always empty.
|
|
|
|
|
grpc_metadata_batch metadata(GetContext<Arena>()); |
|
|
|
|
tracer->RecordSendTrailingMetadata(&metadata); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
void OnServerTrailingMetadata(ServerMetadata& metadata) { |
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
auto* call_tracker = |
|
|
|
|
GetContext<LoadBalancingPolicy::SubchannelCallTracker*>(); |
|
|
|
|
absl::Status status; |
|
|
|
|
if (tracer != nullptr || call_tracker != nullptr) { |
|
|
|
|
grpc_status_code code = metadata->get(GrpcStatusMetadata()) |
|
|
|
|
.value_or(GRPC_STATUS_UNKNOWN); |
|
|
|
|
if (code != GRPC_STATUS_OK) { |
|
|
|
|
absl::string_view message; |
|
|
|
|
if (const auto* grpc_message = |
|
|
|
|
metadata->get_pointer(GrpcMessageMetadata())) { |
|
|
|
|
message = grpc_message->as_string_view(); |
|
|
|
|
} |
|
|
|
|
status = |
|
|
|
|
absl::Status(static_cast<absl::StatusCode>(code), message); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (tracer != nullptr) { |
|
|
|
|
tracer->RecordReceivedTrailingMetadata( |
|
|
|
|
status, metadata.get(), |
|
|
|
|
&GetContext<CallContext>()->call_stats()->transport_stream_stats, |
|
|
|
|
peer_string_.as_string_view()); |
|
|
|
|
} |
|
|
|
|
if (call_tracker != nullptr) { |
|
|
|
|
LbMetadata metadata(metadata.get()); |
|
|
|
|
BackendMetricAccessor backend_metric_accessor(metadata.get()); |
|
|
|
|
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = { |
|
|
|
|
peer_string_.as_string_view(), status, metadata.get(), |
|
|
|
|
&backend_metric_accessor}; |
|
|
|
|
call_tracker->Finish(args); |
|
|
|
|
delete call_tracker; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void CreateCallAttemptTracer(grpc_call_context_element* context, |
|
|
|
|
bool is_transparent_retry) { |
|
|
|
|
auto* call_tracer = static_cast<ClientCallTracer*>( |
|
|
|
|
context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value); |
|
|
|
|
if (call_tracer == nullptr) return; |
|
|
|
|
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); |
|
|
|
|
context[GRPC_CONTEXT_CALL_TRACER].value = tracer; |
|
|
|
|
} |
|
|
|
|
void OnFinalize(const grpc_call_final_info*) { |
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
if (tracer == nullptr) return; |
|
|
|
|
gpr_timespec latency = |
|
|
|
|
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_); |
|
|
|
|
tracer->RecordEnd(latency); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
// Interface for accessing backend metric data in the LB call tracker.
|
|
|
|
|
class BackendMetricAccessor |
|
|
|
|
: public LoadBalancingPolicy::BackendMetricAccessor { |
|
|
|
|
public: |
|
|
|
|
explicit BackendMetricAccessor( |
|
|
|
|
grpc_metadata_batch* server_trailing_metadata) |
|
|
|
|
: server_trailing_metadata_(server_trailing_metadata) {} |
|
|
|
|
|
|
|
|
|
~BackendMetricAccessor() override { |
|
|
|
|
if (backend_metric_data_ != nullptr) { |
|
|
|
|
backend_metric_data_->BackendMetricData::~BackendMetricData(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const BackendMetricData* GetBackendMetricData() override { |
|
|
|
|
if (backend_metric_data_ == nullptr) { |
|
|
|
|
if (const auto* md = recv_trailing_metadata_->get_pointer( |
|
|
|
|
EndpointLoadMetricsBinMetadata())) { |
|
|
|
|
BackendMetricAllocator allocator; |
|
|
|
|
backend_metric_data_ = |
|
|
|
|
ParseBackendMetricData(md->as_string_view(), &allocator); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return backend_metric_data_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
class BackendMetricAllocator : public BackendMetricAllocatorInterface { |
|
|
|
|
public: |
|
|
|
|
BackendMetricData* AllocateBackendMetricData() override { |
|
|
|
|
return GetContext<Arena>()->New<BackendMetricData>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
char* AllocateString(size_t size) override { |
|
|
|
|
return static_cast<char*>(GetContext<Arena>()->Alloc(size)); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
grpc_metadata_batch* send_trailing_metadata_; |
|
|
|
|
const BackendMetricData* backend_metric_data_ = nullptr; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
Slice peer_string_; |
|
|
|
|
}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
ClientChannel::LoadBalancedCall::LoadBalancedCall( |
|
|
|
|
ClientChannel* client_channel, grpc_call_context_element* call_context, |
|
|
|
|
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) |
|
|
|
|
: InternallyRefCounted( |
|
|
|
|
GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) |
|
|
|
|
? "LoadBalancedCall" |
|
|
|
|
: nullptr), |
|
|
|
|
client_channel_(client_channel), |
|
|
|
|
on_commit_(std::move(on_commit)), |
|
|
|
|
call_context_(call_context) { |
|
|
|
|
CreateCallAttemptTracer(call_context, is_transparent_retry); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, "client_channel=%p lb_call=%p: created", client_channel_, this); |
|
|
|
|
} |
|
|
|
|
CallInitiator ClientChannel::CreateLoadBalancedCall( |
|
|
|
|
ClientMetadataHandle client_initial_metadata, |
|
|
|
|
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) { |
|
|
|
|
// If there is a call tracer, create a call attempt tracer.
|
|
|
|
|
MaybeCreateCallAttemptTracer(is_transparent_retry); |
|
|
|
|
// Create an arena.
|
|
|
|
|
const size_t initial_size = lb_call_size_estimator_.CallSizeEstimate(); |
|
|
|
|
// FIXME: do we want to do this for LB calls, or do we want a separate stat for this?
|
|
|
|
|
//global_stats().IncrementCallInitialSize(initial_size);
|
|
|
|
|
Arena* arena = Arena::Create(initial_size, &lb_call_allocator_); |
|
|
|
|
// Create an initiator/handler pair using the arena.
|
|
|
|
|
// FIXME: pass in a callback that the CallSpine will use to destroy the arena:
|
|
|
|
|
// [](Arena* arena) {
|
|
|
|
|
// lb_call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
|
|
|
|
|
// arena->Destroy();
|
|
|
|
|
// }
|
|
|
|
|
auto call = MakeCall(GetContext<EventEngine>(), arena); |
|
|
|
|
// Spawn a promise to do the LB pick.
|
|
|
|
|
// This will eventually start using the handler, which will allow the
|
|
|
|
|
// initiator to make progress.
|
|
|
|
|
call.initiator.SpawnGuarded( |
|
|
|
|
"lb_pick", |
|
|
|
|
[self = RefAsSubclass<ClientChannel>(), |
|
|
|
|
client_initial_metadata = std::move(client_initial_metadata), |
|
|
|
|
initiator = call.initiator, handler = std::move(call.handler), |
|
|
|
|
on_commit = std::move(on_commit), was_queued = true]() mutable { |
|
|
|
|
return Map( |
|
|
|
|
// Wait for the LB picker.
|
|
|
|
|
Loop([last_picker = |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(), |
|
|
|
|
client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata), |
|
|
|
|
initiator, &was_queued]() mutable { |
|
|
|
|
return Map( |
|
|
|
|
picker_.Next(last_picker), |
|
|
|
|
[&last_picker, &client_initial_metadata, &initiator, |
|
|
|
|
&was_queued]( |
|
|
|
|
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
|
|
|
|
picker) mutable { |
|
|
|
|
last_picker = std::move(picker); |
|
|
|
|
// Returns 3 possible things:
|
|
|
|
|
// - Continue to queue the pick
|
|
|
|
|
// - non-OK status to fail the pick
|
|
|
|
|
// - a connected subchannel to complete the pick
|
|
|
|
|
auto result = PickSubchannel( |
|
|
|
|
*last_picker, client_initial_metadata, initiator); |
|
|
|
|
if (result == Continue{}) was_queued = true; |
|
|
|
|
return result; |
|
|
|
|
}); |
|
|
|
|
}), |
|
|
|
|
// Create call stack on the connected subchannel.
|
|
|
|
|
[handler = std::move(handler), on_commit = std::move(on_commit), |
|
|
|
|
&was_queued]( |
|
|
|
|
RefCountedPtr<ConnectedSubchannel> connected_subchannel) { |
|
|
|
|
// LB pick is done, so indicate that we've committed.
|
|
|
|
|
on_commit(); |
|
|
|
|
// If it was queued, add a trace annotation.
|
|
|
|
|
auto* tracer = GetCallAttemptTracerFromContext(); |
|
|
|
|
if (was_queued && tracer != nullptr) { |
|
|
|
|
tracer->RecordAnnotation("Delayed LB pick complete."); |
|
|
|
|
} |
|
|
|
|
// Build call stack.
|
|
|
|
|
// FIXME: need to insert LbCallTracingFilter at the top of the stack
|
|
|
|
|
handler.SetStack(connected_subchannel->GetStack()); |
|
|
|
|
connected_subchannel->StartCall(std::move(handler)); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
// Return the initiator.
|
|
|
|
|
return call.initiator; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ClientChannel::LoadBalancedCall::~LoadBalancedCall() { |
|
|
|
|
if (backend_metric_data_ != nullptr) { |
|
|
|
|
backend_metric_data_->BackendMetricData::~BackendMetricData(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
class LbCallState : public ClientChannelLbCallState { |
|
|
|
|
public: |
|
|
|
|
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); } |
|
|
|
|
|
|
|
|
|
void ClientChannel::LoadBalancedCall::RecordCallCompletion( |
|
|
|
|
absl::Status status, grpc_metadata_batch* recv_trailing_metadata, |
|
|
|
|
grpc_transport_stream_stats* transport_stream_stats, |
|
|
|
|
absl::string_view peer_address) { |
|
|
|
|
// If we have a tracer, notify it.
|
|
|
|
|
if (call_attempt_tracer() != nullptr) { |
|
|
|
|
call_attempt_tracer()->RecordReceivedTrailingMetadata( |
|
|
|
|
status, recv_trailing_metadata, transport_stream_stats); |
|
|
|
|
} |
|
|
|
|
// If the LB policy requested a callback for trailing metadata, invoke
|
|
|
|
|
// the callback.
|
|
|
|
|
if (lb_subchannel_call_tracker_ != nullptr) { |
|
|
|
|
Metadata trailing_metadata(recv_trailing_metadata); |
|
|
|
|
BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata); |
|
|
|
|
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = { |
|
|
|
|
peer_address, status, &trailing_metadata, &backend_metric_accessor}; |
|
|
|
|
lb_subchannel_call_tracker_->Finish(args); |
|
|
|
|
lb_subchannel_call_tracker_.reset(); |
|
|
|
|
// Internal API to allow first-party LB policies to access per-call
|
|
|
|
|
// attributes set by the ConfigSelector.
|
|
|
|
|
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( |
|
|
|
|
UniqueTypeName type) const override { |
|
|
|
|
auto* service_config_call_data = GetServiceConfigCallDataFromContext(); |
|
|
|
|
return service_config_call_data->GetCallAttribute(type); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ClientChannel::LoadBalancedCall::RecordLatency() { |
|
|
|
|
// Compute latency and report it to the tracer.
|
|
|
|
|
if (call_attempt_tracer() != nullptr) { |
|
|
|
|
gpr_timespec latency = |
|
|
|
|
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_); |
|
|
|
|
call_attempt_tracer()->RecordEnd(latency); |
|
|
|
|
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override { |
|
|
|
|
auto* legacy_context = GetContext<grpc_call_context_element>(); |
|
|
|
|
return static_cast<ClientCallAttemptTracer*>( |
|
|
|
|
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> |
|
|
|
|
ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, |
|
|
|
@ -1915,7 +1950,7 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, |
|
|
|
|
if (complete_pick->subchannel_call_tracker != nullptr) { |
|
|
|
|
complete_pick->subchannel_call_tracker->Start(); |
|
|
|
|
call_initiator.SetContext( |
|
|
|
|
std::move(complete_pick->subchannel_call_tracker)); |
|
|
|
|
complete_pick->subchannel_call_tracker.release()); |
|
|
|
|
} |
|
|
|
|
// Now that we're done with client initial metadata, push it
|
|
|
|
|
// into the call initiator.
|
|
|
|
@ -1968,123 +2003,4 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// ClientChannel::PromiseBasedLoadBalancedCall
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// FIXME: remove
|
|
|
|
|
ArenaPromise<ServerMetadataHandle> |
|
|
|
|
ClientChannel::PromiseBasedLoadBalancedCall::MakeCallPromise( |
|
|
|
|
CallArgs call_args, OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call) { |
|
|
|
|
pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value(); |
|
|
|
|
// Record ops in tracer.
|
|
|
|
|
if (call_attempt_tracer() != nullptr) { |
|
|
|
|
call_attempt_tracer()->RecordSendInitialMetadata( |
|
|
|
|
call_args.client_initial_metadata.get()); |
|
|
|
|
// TODO(ctiller): Find a way to do this without registering a no-op mapper.
|
|
|
|
|
call_args.client_to_server_messages->InterceptAndMapWithHalfClose( |
|
|
|
|
[](MessageHandle message) { return message; }, // No-op.
|
|
|
|
|
[this]() { |
|
|
|
|
// TODO(roth): Change CallTracer API to not pass metadata
|
|
|
|
|
// batch to this method, since the batch is always empty.
|
|
|
|
|
grpc_metadata_batch metadata(GetContext<Arena>()); |
|
|
|
|
call_attempt_tracer()->RecordSendTrailingMetadata(&metadata); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
// Extract peer name from server initial metadata.
|
|
|
|
|
call_args.server_initial_metadata->InterceptAndMap( |
|
|
|
|
[self = lb_call->RefAsSubclass<PromiseBasedLoadBalancedCall>()]( |
|
|
|
|
ServerMetadataHandle metadata) { |
|
|
|
|
if (self->call_attempt_tracer() != nullptr) { |
|
|
|
|
self->call_attempt_tracer()->RecordReceivedInitialMetadata( |
|
|
|
|
metadata.get()); |
|
|
|
|
} |
|
|
|
|
Slice* peer_string = metadata->get_pointer(PeerString()); |
|
|
|
|
if (peer_string != nullptr) self->peer_string_ = peer_string->Ref(); |
|
|
|
|
return metadata; |
|
|
|
|
}); |
|
|
|
|
client_initial_metadata_ = std::move(call_args.client_initial_metadata); |
|
|
|
|
return OnCancel( |
|
|
|
|
Map(TrySeq( |
|
|
|
|
// LB pick.
|
|
|
|
|
[this]() -> Poll<absl::Status> { |
|
|
|
|
auto result = PickSubchannel(was_queued_); |
|
|
|
|
if (GRPC_TRACE_FLAG_ENABLED( |
|
|
|
|
grpc_client_channel_lb_call_trace)) { |
|
|
|
|
gpr_log(GPR_INFO, |
|
|
|
|
"client_channel=%p lb_call=%p: %sPickSubchannel() returns %s", |
|
|
|
|
client_channel(), this, |
|
|
|
|
GetContext<Activity>()->DebugTag().c_str(), |
|
|
|
|
result.has_value() ? result->ToString().c_str() |
|
|
|
|
: "Pending"); |
|
|
|
|
} |
|
|
|
|
if (result == absl::nullopt) return Pending{}; |
|
|
|
|
return std::move(*result); |
|
|
|
|
}, |
|
|
|
|
[this, call_args = std::move(call_args)]() mutable |
|
|
|
|
-> ArenaPromise<ServerMetadataHandle> { |
|
|
|
|
call_args.client_initial_metadata = |
|
|
|
|
std::move(client_initial_metadata_); |
|
|
|
|
return connected_subchannel()->MakeCallPromise( |
|
|
|
|
std::move(call_args)); |
|
|
|
|
}), |
|
|
|
|
// Record call completion.
|
|
|
|
|
[this](ServerMetadataHandle metadata) { |
|
|
|
|
if (call_attempt_tracer() != nullptr || |
|
|
|
|
lb_subchannel_call_tracker() != nullptr) { |
|
|
|
|
absl::Status status; |
|
|
|
|
grpc_status_code code = metadata->get(GrpcStatusMetadata()) |
|
|
|
|
.value_or(GRPC_STATUS_UNKNOWN); |
|
|
|
|
if (code != GRPC_STATUS_OK) { |
|
|
|
|
absl::string_view message; |
|
|
|
|
if (const auto* grpc_message = |
|
|
|
|
metadata->get_pointer(GrpcMessageMetadata())) { |
|
|
|
|
message = grpc_message->as_string_view(); |
|
|
|
|
} |
|
|
|
|
status = |
|
|
|
|
absl::Status(static_cast<absl::StatusCode>(code), message); |
|
|
|
|
} |
|
|
|
|
RecordCallCompletion(status, metadata.get(), |
|
|
|
|
&GetContext<CallContext>() |
|
|
|
|
->call_stats() |
|
|
|
|
->transport_stream_stats, |
|
|
|
|
peer_string_.as_string_view()); |
|
|
|
|
} |
|
|
|
|
RecordLatency(); |
|
|
|
|
return metadata; |
|
|
|
|
}), |
|
|
|
|
[lb_call = std::move(lb_call)]() { |
|
|
|
|
// If the waker is pending, then we need to remove ourself from
|
|
|
|
|
// the list of queued LB calls.
|
|
|
|
|
if (!lb_call->waker_.is_unwakeable()) { |
|
|
|
|
MutexLock lock(&lb_call->client_channel()->lb_mu_); |
|
|
|
|
lb_call->Commit(); |
|
|
|
|
// Remove pick from list of queued picks.
|
|
|
|
|
lb_call->RemoveCallFromLbQueuedCallsLocked(); |
|
|
|
|
// Remove from queued picks list.
|
|
|
|
|
lb_call->client_channel()->lb_queued_calls_.erase(lb_call.get()); |
|
|
|
|
} |
|
|
|
|
// TODO(ctiller): We don't have access to the call's actual status
|
|
|
|
|
// here, so we just assume CANCELLED. We could change this to use
|
|
|
|
|
// CallFinalization instead of OnCancel() so that we can get the
|
|
|
|
|
// actual status. But we should also have access to the trailing
|
|
|
|
|
// metadata, which we don't have in either case. Ultimately, we
|
|
|
|
|
// need a better story for code that needs to run at the end of a
|
|
|
|
|
// call in both cancellation and non-cancellation cases that needs
|
|
|
|
|
// access to server trailing metadata and the call's real status.
|
|
|
|
|
if (lb_call->call_attempt_tracer() != nullptr) { |
|
|
|
|
lb_call->call_attempt_tracer()->RecordCancel( |
|
|
|
|
absl::CancelledError("call cancelled")); |
|
|
|
|
} |
|
|
|
|
if (lb_call->call_attempt_tracer() != nullptr || |
|
|
|
|
lb_call->lb_subchannel_call_tracker() != nullptr) { |
|
|
|
|
// If we were cancelled without recording call completion, then
|
|
|
|
|
// record call completion here, as best we can. We assume status
|
|
|
|
|
// CANCELLED in this case.
|
|
|
|
|
lb_call->RecordCallCompletion(absl::CancelledError("call cancelled"), |
|
|
|
|
nullptr, nullptr, ""); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace grpc_core
|
|
|
|
|