implement new approach with UnstartedCallHandler

pull/35791/head
Mark D. Roth 1 year ago
parent c83f1bb586
commit 214126ae16
  1. 1
      BUILD
  2. 1
      Package.swift
  3. 2
      build_autogenerated.yaml
  4. 2
      gRPC-C++.podspec
  5. 2
      gRPC-Core.podspec
  6. 1
      grpc.gemspec
  7. 1
      package.xml
  8. 13
      src/core/BUILD
  9. 772
      src/core/client_channel/client_channel.cc
  10. 12
      src/core/client_channel/client_channel.h
  11. 206
      src/core/client_channel/subchannel.cc
  12. 35
      src/core/client_channel/subchannel.h
  13. 5
      src/core/lib/surface/channel.h
  14. 13
      src/core/lib/surface/legacy_channel.cc
  15. 35
      src/core/lib/transport/call_destination.h
  16. 20
      src/core/lib/transport/call_spine.cc
  17. 109
      src/core/lib/transport/call_spine.h
  18. 21
      src/core/lib/transport/metadata_batch.h
  19. 1
      src/core/lib/transport/transport.h
  20. 1
      tools/doxygen/Doxyfile.c++.internal
  21. 1
      tools/doxygen/Doxyfile.core.internal

@ -3649,7 +3649,6 @@ grpc_cc_library(
"//src/core:arena_promise",
"//src/core:backend_metric_parser",
"//src/core:call_filters",
"//src/core:call_destination",
"//src/core:call_spine",
"//src/core:cancel_callback",
"//src/core:channel_args",

1
Package.swift generated

@ -1805,7 +1805,6 @@ let package = Package(
"src/core/lib/transport/batch_builder.h",
"src/core/lib/transport/bdp_estimator.cc",
"src/core/lib/transport/bdp_estimator.h",
"src/core/lib/transport/call_destination.h",
"src/core/lib/transport/call_filters.cc",
"src/core/lib/transport/call_filters.h",
"src/core/lib/transport/call_final_info.cc",

@ -1133,7 +1133,6 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h
@ -2599,7 +2598,6 @@ libs:
- src/core/lib/surface/wait_for_cq_end_op.h
- src/core/lib/transport/batch_builder.h
- src/core/lib/transport/bdp_estimator.h
- src/core/lib/transport/call_destination.h
- src/core/lib/transport/call_filters.h
- src/core/lib/transport/call_final_info.h
- src/core/lib/transport/call_spine.h

2
gRPC-C++.podspec generated

@ -1238,7 +1238,6 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',
@ -2500,7 +2499,6 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',

2
gRPC-Core.podspec generated

@ -1914,7 +1914,6 @@ Pod::Spec.new do |s|
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.cc',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.cc',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.cc',
@ -3279,7 +3278,6 @@ Pod::Spec.new do |s|
'src/core/lib/surface/wait_for_cq_end_op.h',
'src/core/lib/transport/batch_builder.h',
'src/core/lib/transport/bdp_estimator.h',
'src/core/lib/transport/call_destination.h',
'src/core/lib/transport/call_filters.h',
'src/core/lib/transport/call_final_info.h',
'src/core/lib/transport/call_spine.h',

1
grpc.gemspec generated

@ -1807,7 +1807,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/transport/batch_builder.h )
s.files += %w( src/core/lib/transport/bdp_estimator.cc )
s.files += %w( src/core/lib/transport/bdp_estimator.h )
s.files += %w( src/core/lib/transport/call_destination.h )
s.files += %w( src/core/lib/transport/call_filters.cc )
s.files += %w( src/core/lib/transport/call_filters.h )
s.files += %w( src/core/lib/transport/call_final_info.cc )

1
package.xml generated

@ -1789,7 +1789,6 @@
<file baseinstalldir="/" name="src/core/lib/transport/batch_builder.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/bdp_estimator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_destination.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_filters.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/transport/call_final_info.cc" role="src" />

@ -7214,18 +7214,6 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "call_destination",
hdrs = [
"lib/transport/call_destination.h",
],
deps = [
"call_spine",
"dual_ref_counted",
"//:gpr_platform",
],
)
grpc_cc_library(
name = "parsed_metadata",
srcs = [
@ -7292,6 +7280,7 @@ grpc_cc_library(
"1999",
"call_filters",
"call_final_info",
"dual_ref_counted",
"for_each",
"if",
"latch",

@ -88,6 +88,7 @@
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/security/credentials/credentials.h"
@ -527,58 +528,378 @@ class ClientChannel::ClientChannelControlHelper
};
//
// NoRetryCallDestination
// ClientChannel::LoadBalancedCallDestination
//
namespace {
ClientChannelServiceConfigCallData* GetServiceConfigCallDataFromContext() {
class LbMetadata : public LoadBalancingPolicy::MetadataInterface {
public:
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {}
void Add(absl::string_view key, absl::string_view value) override {
if (batch_ == nullptr) return;
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
batch_->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
return;
}
batch_->Append(key, Slice::FromStaticString(value),
[key](absl::string_view error, const Slice& value) {
gpr_log(GPR_ERROR, "%s",
absl::StrCat(error, " key:", key,
" value:", value.as_string_view())
.c_str());
});
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
private:
class Encoder {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
grpc_metadata_batch* batch_;
};
ClientCallTracer* GetCallTracerFromContext() {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientChannelServiceConfigCallData*>(
legacy_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
return static_cast<ClientCallTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
}
// A call destination that does not support retries.
class NoRetryCallDestination : public CallDestination {
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
auto* call_tracer = GetCallTracerFromContext();
if (call_tracer == nullptr) return;
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
legacy_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
}
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracerFromContext() {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientCallTracer::CallAttemptTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER].value);
}
// Context type for subchannel call tracker.
template <>
struct ContextType<LoadBalancingPolicy::SubchannelCallTracker*> {};
// A filter to handle updating with the call tracer and LB subchannel
// call tracker inside the LB call.
// FIXME: move this to its own file, register only when call v3
// experiment is enabled
class LbCallTracingFilter : public ImplementChannelFilter<LbCallTracingFilter> {
public:
static absl::StatusOr<LbCallTracingFilter> Create(const ChannelArgs&,
ChannelFilter::Args) {
return LbCallTracingFilter();
}
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordSendInitialMetadata(metadata.get());
}
void OnServerInitialMetadata(ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordReceivedInitialMetadata(metadata.get());
// Save peer string for later use.
Slice* peer_string = metadata->get_pointer(PeerString());
if (peer_string != nullptr) peer_string_ = peer_string->Ref();
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
// FIXME(ctiller): Add this hook to the L1 filter API
void OnClientToServerMessagesClosed() {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
// TODO(roth): Change CallTracer API to not pass metadata
// batch to this method, since the batch is always empty.
grpc_metadata_batch metadata(GetContext<Arena>());
tracer->RecordSendTrailingMetadata(&metadata);
}
void OnServerTrailingMetadata(ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
auto* call_tracker =
GetContext<LoadBalancingPolicy::SubchannelCallTracker*>();
absl::Status status;
if (tracer != nullptr || call_tracker != nullptr) {
grpc_status_code code = metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN);
if (code != GRPC_STATUS_OK) {
absl::string_view message;
if (const auto* grpc_message =
metadata->get_pointer(GrpcMessageMetadata())) {
message = grpc_message->as_string_view();
}
status =
absl::Status(static_cast<absl::StatusCode>(code), message);
}
}
if (tracer != nullptr) {
tracer->RecordReceivedTrailingMetadata(
status, metadata.get(),
&GetContext<CallContext>()->call_stats()->transport_stream_stats,
peer_string_.as_string_view());
}
if (call_tracker != nullptr) {
LbMetadata metadata(metadata.get());
BackendMetricAccessor backend_metric_accessor(metadata.get());
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string_.as_string_view(), status, metadata.get(),
&backend_metric_accessor};
call_tracker->Finish(args);
delete call_tracker;
}
}
void OnFinalize(const grpc_call_final_info*) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
tracer->RecordEnd(latency);
}
private:
// Interface for accessing backend metric data in the LB call tracker.
class BackendMetricAccessor
: public LoadBalancingPolicy::BackendMetricAccessor {
public:
explicit BackendMetricAccessor(
grpc_metadata_batch* server_trailing_metadata)
: server_trailing_metadata_(server_trailing_metadata) {}
~BackendMetricAccessor() override {
if (backend_metric_data_ != nullptr) {
backend_metric_data_->BackendMetricData::~BackendMetricData();
}
}
const BackendMetricData* GetBackendMetricData() override {
if (backend_metric_data_ == nullptr) {
if (const auto* md = recv_trailing_metadata_->get_pointer(
EndpointLoadMetricsBinMetadata())) {
BackendMetricAllocator allocator;
backend_metric_data_ =
ParseBackendMetricData(md->as_string_view(), &allocator);
}
}
return backend_metric_data_;
}
private:
class BackendMetricAllocator : public BackendMetricAllocatorInterface {
public:
BackendMetricData* AllocateBackendMetricData() override {
return GetContext<Arena>()->New<BackendMetricData>();
}
char* AllocateString(size_t size) override {
return static_cast<char*>(GetContext<Arena>()->Alloc(size));
}
};
grpc_metadata_batch* send_trailing_metadata_;
const BackendMetricData* backend_metric_data_ = nullptr;
};
Slice peer_string_;
};
};
} // namespace
class ClientChannel::LoadBalancedCallDestination : public CallDestination {
public:
explicit NoRetryCallDestination(
explicit LoadBalancedCallDestination(
RefCountedPtr<ClientChannel> client_channel)
: client_channel_(std::move(client_channel)) {}
void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
void Orphan() override {}
void StartCall(UnstartedCallHandler unstarted_handler) override {
// If there is a call tracer, create a call attempt tracer.
bool* is_transparent_retry_metadata =
unstarted_call_handler.UnprocessedClientInitialMetadata->get_pointer(
IsTransparentRetry());
bool is_transparent_retry =
is_transparent_retry_metadata != nullptr
? *is_transparent_retry_metadata
: false;
MaybeCreateCallAttemptTracer(is_transparent_retry);
// Spawn a promise to do the LB pick.
// This will eventually start the call.
unstarted_handler.SpawnGuarded(
"lb_pick",
[client_channel = client_channel_, was_queued = true,
unstarted_handler = std::move(unstarted_handler)]() mutable {
return Map(
// Wait for the LB picker.
Loop([last_picker =
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(),
client_channel = std::move(client_channel),
unstarted_handler, &was_queued]() mutable {
return Map(
client_channel->picker_.Next(last_picker),
[client_channel, unstarted_handler, &last_picker,
&was_queued](
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
picker) mutable {
last_picker = std::move(picker);
// Returns 3 possible things:
// - Continue to queue the pick
// - non-OK status to fail the pick
// - a connected subchannel to complete the pick
auto result = client_channel->PickSubchannel(
*last_picker, unstarted_handler);
if (result == Continue{}) was_queued = true;
return result;
});
}),
// Create call stack on the connected subchannel.
[unstarted_handler = std::move(unstarted_handler),
&was_queued](
absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>
connected_subchannel) {
if (!connected_subchannel.ok()) {
return connected_subchannel.status();
}
// LB pick is done, so indicate that we've committed.
absl::AnyInvocable<void()>* on_commit =
metadata->get_pointer(LoadBalancingOnCommit());
if (on_commit != nullptr && *on_commit != nullptr) {
(*on_commit)();
}
// If it was queued, add a trace annotation.
if (was_queued) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer != nullptr) {
tracer->RecordAnnotation("Delayed LB pick complete.");
}
}
// Delegate to connected subchannel.
// FIXME: need to insert LbCallTracingFilter at the top of the stack
connected_subchannel->StartCall(std::move(unstarted_handler));
return absl::OkStatus();
});
});
}
private:
RefCountedPtr<ClientChannel> client_channel_;
};
//
// NoRetryCallDestination
//
namespace {
// A call destination that does not support retries.
// To be used as an L2 filter.
class NoRetryCallDestination : public DelegatingCallDestination {
public:
NoRetryCallDestination(RefCountedPtr<CallDestination> next,
RefCountedPtr<CallFilters::Stack> filter_stack)
: DelegatingCallDestination(std::move(next)),
filter_stack_(std::move(filter_stack)) {}
void Orphan() override {}
void StartCall(UnstartedCallHandler unstarted_handler) override {
// Start the parent call. We take ownership of the handler.
CallHandler handler = unstarted_handler.StartCall(filter_stack_);
// Start a promise to drain the client initial metadata from the
// parent call, create a new child call, and forward between them.
handler.SpawnGuarded(
"drain_send_initial_metadata",
[client_channel = client_channel_,
call_handler = std::move(call_handler)]() mutable {
// Wait to get client initial metadata from the call handler.
[self = RefAsSubclass<NoRetryCallDestination>(), handler]() mutable {
return Map(
call_handler.PullClientInitialMetadata(),
[client_channel = std::move(client_channel), call_handler](
handler.PullClientInitialMetadata(),
[self = std::move(self), handler](
ValueOrFailure<ClientMetadataHandle> client_initial_metadata)
mutable {
if (!client_initial_metadata.ok()) return;
// Create the LoadBalancedCall.
CallInitiator call_initiator =
client_channel->CreateLoadBalancedCall(
std::move(*client_initial_metadata),
/*on_commit=*/[]() {
auto* service_config_call_data =
GetServiceConfigCallDataFromContext();
service_config_call_data->Commit();
},
/*is_transparent_retry=*/false);
// Propagate operations from the parent call's handler to
// the LoadBalancedCall's initiator.
ForwardCall(std::move(call_handler),
std::move(call_initiator));
mutable -> StatusFlag {
if (!client_initial_metadata.ok()) return Failure{};
// Create an arena for the child call.
const size_t initial_size =
self->call_size_estimator_.CallSizeEstimate();
// FIXME: do we want to do this for LB calls, or do we want a separate stat for this?
//global_stats().IncrementCallInitialSize(initial_size);
Arena* arena =
Arena::Create(initial_size, &self->call_allocator_);
// Create an initiator/unstarted-handler pair using the arena.
// FIXME: pass in a callback that the CallSpine will use to destroy the arena:
// [](Arena* arena) {
// call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
// arena->Destroy();
// }
auto child_call = MakeCall(std::move(*client_initial_metadata),
GetContext<EventEngine>(), arena);
// Pass the child call's unstarted handler to the next
// destination.
wrapped_destination()->StartCall(
std::move(child_call.unstarted_handler));
// Forward everything from the parent call to the child call.
ForwardCall(std::move(handler), std::move(call.initiator));
return Success{};
});
});
}
void Orphan() override { delete this; }
private:
RefCountedPtr<ClientChannel> client_channel_;
RefCountedPtr<CallFilters::Stack> filter_stack_;
CallSizeEstimator call_size_estimator_;
MemoryAllocator call_allocator_;
};
} // namespace
@ -668,15 +989,6 @@ ClientChannel::ClientChannel(
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "client_channel=%p: creating client_channel", this);
}
// Create call destination.
const bool enable_retries =
!channel_args_.WantMinimalStack() &&
channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
if (enable_retries) {
Crash("call v3 stack does not yet support retries");
} else {
call_destination_ = MakeOrphanable<NoRetryCallDestination>();
}
// Set initial keepalive time.
auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
if (keepalive_arg.has_value()) {
@ -876,6 +1188,7 @@ grpc_call* ClientChannel::CreateCall(
bool registered_method) {
// FIXME: code to convert from C-core batch API to v3 call, then invoke
// CreateCall(client_initial_metadata, arena)
// FIXME: make sure call holds a ref to ClientChannel for its entire lifetime
}
CallInitiator ClientChannel::CreateCall(
@ -884,17 +1197,16 @@ CallInitiator ClientChannel::CreateCall(
if (idle_timeout_ != Duration::Zero()) idle_state_.IncreaseCallCount();
// Exit IDLE if needed.
CheckConnectivityState(/*try_to_connect=*/true);
// Create an initiator/handler pair.
auto call = MakeCall(GetContext<EventEngine>(), arena);
// Create an initiator/unstarted-handler pair.
auto call = MakeCall(std::move(client_initial_metadata),
GetContext<EventEngine>(), arena);
// Spawn a promise to wait for the resolver result.
// This will eventually start using the handler, which will allow the
// initiator to make progress.
// This will eventually start the call.
call.initiator.SpawnGuarded(
"wait-for-name-resolution",
[self = RefAsSubclass<ClientChannel>(),
client_initial_metadata = std::move(client_initial_metadata),
initiator = call.initiator,
handler = std::move(call.handler), was_queued = false]() mutable {
unstarted_handler = std::move(call.unstarted_handler),
was_queued = false]() mutable {
const bool wait_for_ready =
client_initial_metadata->GetOrCreatePointer(WaitForReady())
->value;
@ -917,43 +1229,27 @@ CallInitiator ClientChannel::CreateCall(
return got_result;
}),
// Handle resolver result.
[self, &was_queued, &initiator, &handler,
client_initial_metadata =
std::move(client_initial_metadata)](
[self, &was_queued, &unstarted_handler](
absl::StatusOr<ResolverDataForCalls> resolver_data)
mutable {
if (!resolver_data.ok()) return resolver_data.status();
// Apply service config to call.
absl::Status status = self->ApplyServiceConfigToCall(
*resolver_data->config_selector,
client_initial_metadata);
unstarted_handler.UnprocessedClientInitialMetadata());
if (!status.ok()) return status;
// If the call was queued, add trace annotation.
if (was_queued) {
auto* legacy_context =
GetContext<grpc_call_context_element>();
auto* call_tracer =
static_cast<CallTracerAnnotationInterface*>(
legacy_context[
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
.value);
auto* call_tracer = GetCallTracerFromContext();
if (call_tracer != nullptr) {
call_tracer->RecordAnnotation(
"Delayed name resolution complete.");
}
}
// Now inject initial metadata into the call.
initiator.SpawnGuarded(
"send_initial_metadata",
[initiator, client_initial_metadata =
std::move(client_initial_metadata)]() mutable {
return initiator.PushClientInitialMetadata(
std::move(client_initial_metadata));
});
// Finish constructing the call with the right filter
// stack and destination.
handler.SetStack(std::move(resolver_data->filter_stack));
self->call_destination_->StartCall(std::move(handler));
// Start the call on the destination provided by the
// resolver.
resolver_data->call_destination->StartCall(
std::move(unstarted_handler));
return absl::OkStatus();
});
});
@ -961,301 +1257,6 @@ CallInitiator ClientChannel::CreateCall(
return call.initiator;
}
namespace {
class LbMetadata : public LoadBalancingPolicy::MetadataInterface {
public:
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {}
void Add(absl::string_view key, absl::string_view value) override {
if (batch_ == nullptr) return;
// Gross, egregious hack to support legacy grpclb behavior.
// TODO(ctiller): Use a promise context for this once that plumbing is done.
if (key == GrpcLbClientStatsMetadata::key()) {
batch_->Set(
GrpcLbClientStatsMetadata(),
const_cast<GrpcLbClientStats*>(
reinterpret_cast<const GrpcLbClientStats*>(value.data())));
return;
}
batch_->Append(key, Slice::FromStaticString(value),
[key](absl::string_view error, const Slice& value) {
gpr_log(GPR_ERROR, "%s",
absl::StrCat(error, " key:", key,
" value:", value.as_string_view())
.c_str());
});
}
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
override {
if (batch_ == nullptr) return {};
Encoder encoder;
batch_->Encode(&encoder);
return encoder.Take();
}
absl::optional<absl::string_view> Lookup(absl::string_view key,
std::string* buffer) const override {
if (batch_ == nullptr) return absl::nullopt;
return batch_->GetStringValue(key, buffer);
}
private:
class Encoder {
public:
void Encode(const Slice& key, const Slice& value) {
out_.emplace_back(std::string(key.as_string_view()),
std::string(value.as_string_view()));
}
template <class Which>
void Encode(Which, const typename Which::ValueType& value) {
auto value_slice = Which::Encode(value);
out_.emplace_back(std::string(Which::key()),
std::string(value_slice.as_string_view()));
}
void Encode(GrpcTimeoutMetadata,
const typename GrpcTimeoutMetadata::ValueType&) {}
void Encode(HttpPathMetadata, const Slice&) {}
void Encode(HttpMethodMetadata,
const typename HttpMethodMetadata::ValueType&) {}
std::vector<std::pair<std::string, std::string>> Take() {
return std::move(out_);
}
private:
std::vector<std::pair<std::string, std::string>> out_;
};
grpc_metadata_batch* batch_;
};
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) {
auto* legacy_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<ClientCallTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
if (call_tracer == nullptr) return;
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
legacy_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
}
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracerFromContext() {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientCallTracer::CallAttemptTracer*>(
legacy_context[GRPC_CONTEXT_CALL_TRACER].value);
}
// Context type for subchannel call tracker.
template <>
struct ContextType<LoadBalancingPolicy::SubchannelCallTracker*> {};
// A filter to handle updating with the call tracer and LB subchannel
// call tracker inside the LB call.
// FIXME: move this to its own file, register only when call v3
// experiment is enabled
class LbCallTracingFilter : public ImplementChannelFilter<LbCallTracingFilter> {
public:
static absl::StatusOr<LbCallTracingFilter> Create(const ChannelArgs&,
ChannelFilter::Args) {
return LbCallTracingFilter();
}
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordSendInitialMetadata(metadata.get());
}
void OnServerInitialMetadata(ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
tracer->RecordReceivedInitialMetadata(metadata.get());
// Save peer string for later use.
Slice* peer_string = metadata->get_pointer(PeerString());
if (peer_string != nullptr) peer_string_ = peer_string->Ref();
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
// FIXME(ctiller): Add this hook to the L1 filter API
void OnClientToServerMessagesClosed() {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
// TODO(roth): Change CallTracer API to not pass metadata
// batch to this method, since the batch is always empty.
grpc_metadata_batch metadata(GetContext<Arena>());
tracer->RecordSendTrailingMetadata(&metadata);
}
void OnServerTrailingMetadata(ServerMetadata& metadata) {
auto* tracer = GetCallAttemptTracerFromContext();
auto* call_tracker =
GetContext<LoadBalancingPolicy::SubchannelCallTracker*>();
absl::Status status;
if (tracer != nullptr || call_tracker != nullptr) {
grpc_status_code code = metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN);
if (code != GRPC_STATUS_OK) {
absl::string_view message;
if (const auto* grpc_message =
metadata->get_pointer(GrpcMessageMetadata())) {
message = grpc_message->as_string_view();
}
status =
absl::Status(static_cast<absl::StatusCode>(code), message);
}
}
if (tracer != nullptr) {
tracer->RecordReceivedTrailingMetadata(
status, metadata.get(),
&GetContext<CallContext>()->call_stats()->transport_stream_stats,
peer_string_.as_string_view());
}
if (call_tracker != nullptr) {
LbMetadata metadata(metadata.get());
BackendMetricAccessor backend_metric_accessor(metadata.get());
LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
peer_string_.as_string_view(), status, metadata.get(),
&backend_metric_accessor};
call_tracker->Finish(args);
delete call_tracker;
}
}
void OnFinalize(const grpc_call_final_info*) {
auto* tracer = GetCallAttemptTracerFromContext();
if (tracer == nullptr) return;
gpr_timespec latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
tracer->RecordEnd(latency);
}
private:
// Interface for accessing backend metric data in the LB call tracker.
class BackendMetricAccessor
: public LoadBalancingPolicy::BackendMetricAccessor {
public:
explicit BackendMetricAccessor(
grpc_metadata_batch* server_trailing_metadata)
: server_trailing_metadata_(server_trailing_metadata) {}
~BackendMetricAccessor() override {
if (backend_metric_data_ != nullptr) {
backend_metric_data_->BackendMetricData::~BackendMetricData();
}
}
const BackendMetricData* GetBackendMetricData() override {
if (backend_metric_data_ == nullptr) {
if (const auto* md = recv_trailing_metadata_->get_pointer(
EndpointLoadMetricsBinMetadata())) {
BackendMetricAllocator allocator;
backend_metric_data_ =
ParseBackendMetricData(md->as_string_view(), &allocator);
}
}
return backend_metric_data_;
}
private:
class BackendMetricAllocator : public BackendMetricAllocatorInterface {
public:
BackendMetricData* AllocateBackendMetricData() override {
return GetContext<Arena>()->New<BackendMetricData>();
}
char* AllocateString(size_t size) override {
return static_cast<char*>(GetContext<Arena>()->Alloc(size));
}
};
grpc_metadata_batch* send_trailing_metadata_;
const BackendMetricData* backend_metric_data_ = nullptr;
};
Slice peer_string_;
};
};
} // namespace
CallInitiator ClientChannel::CreateLoadBalancedCall(
ClientMetadataHandle client_initial_metadata,
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
// If there is a call tracer, create a call attempt tracer.
MaybeCreateCallAttemptTracer(is_transparent_retry);
// Create an arena.
const size_t initial_size = lb_call_size_estimator_.CallSizeEstimate();
// FIXME: do we want to do this for LB calls, or do we want a separate stat for this?
//global_stats().IncrementCallInitialSize(initial_size);
Arena* arena = Arena::Create(initial_size, &lb_call_allocator_);
// Create an initiator/handler pair using the arena.
// FIXME: pass in a callback that the CallSpine will use to destroy the arena:
// [](Arena* arena) {
// lb_call_size_estimator_.UpdateCallSizeEstimate(arena->TotalUsedBytes());
// arena->Destroy();
// }
auto call = MakeCall(GetContext<EventEngine>(), arena);
// Spawn a promise to do the LB pick.
// This will eventually start using the handler, which will allow the
// initiator to make progress.
call.initiator.SpawnGuarded(
"lb_pick",
[self = RefAsSubclass<ClientChannel>(),
client_initial_metadata = std::move(client_initial_metadata),
initiator = call.initiator, handler = std::move(call.handler),
on_commit = std::move(on_commit), was_queued = true]() mutable {
return Map(
// Wait for the LB picker.
Loop([last_picker =
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(),
client_initial_metadata =
std::move(client_initial_metadata),
initiator, &was_queued]() mutable {
return Map(
picker_.Next(last_picker),
[&last_picker, &client_initial_metadata, &initiator,
&was_queued](
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
picker) mutable {
last_picker = std::move(picker);
// Returns 3 possible things:
// - Continue to queue the pick
// - non-OK status to fail the pick
// - a connected subchannel to complete the pick
auto result = PickSubchannel(
*last_picker, client_initial_metadata, initiator);
if (result == Continue{}) was_queued = true;
return result;
});
}),
// Create call stack on the connected subchannel.
[handler = std::move(handler), on_commit = std::move(on_commit),
&was_queued](
RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
// LB pick is done, so indicate that we've committed.
on_commit();
// If it was queued, add a trace annotation.
auto* tracer = GetCallAttemptTracerFromContext();
if (was_queued && tracer != nullptr) {
tracer->RecordAnnotation("Delayed LB pick complete.");
}
// Build call stack.
// FIXME: need to insert LbCallTracingFilter at the top of the stack
handler.SetStack(connected_subchannel->GetStack());
connected_subchannel->StartCall(std::move(handler));
});
});
// Return the initiator.
return call.initiator;
}
void ClientChannel::CreateResolverLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
gpr_log(GPR_INFO, "client_channel=%p: starting name resolution for %s",
@ -1645,10 +1646,23 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
GPR_ASSERT(dynamic_filters != nullptr);
#endif
auto filter_stack = builder.Build();
// Create call destination.
std::unique_ptr<CallDestination> call_destination;
const bool enable_retries =
!channel_args_.WantMinimalStack() &&
channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
if (enable_retries) {
Crash("call v3 stack does not yet support retries");
} else {
call_destination = MakeRefCounted<NoRetryCallDestination>(
MakeRefCounted<LoadBalancedCallDestination>(
RefAsSubclass<ClientChannel>()),
std::move(filter_stack));
}
// Send result to data plane.
resolver_data_for_calls_.Set(
ResolverDataForCalls{std::move(config_selector),
std::move(filter_stack)});
std::move(call_destination)});
}
void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
@ -1763,6 +1777,12 @@ absl::Status ClientChannel::ApplyServiceConfigToCall(
namespace {
ClientChannelServiceConfigCallData* GetServiceConfigCallDataFromContext() {
auto* legacy_context = GetContext<grpc_call_context_element>();
return static_cast<ClientChannelServiceConfigCallData*>(
legacy_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
}
class LbCallState : public ClientChannelLbCallState {
public:
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); }
@ -1786,9 +1806,10 @@ class LbCallState : public ClientChannelLbCallState {
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>>
ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker,
ClientMetadataHandle& client_initial_metadata,
CallInitiator& call_initiator) {
UnstartedCallHandler& unstarted_handler) {
// Perform LB pick.
auto& client_initial_metadata =
unstarted_handler.UnprocessedClientInitialMetadata();
LoadBalancingPolicy::PickArgs pick_args;
Slice* path = client_initial_metadata->get_pointer(HttpPathMetadata());
GPR_ASSERT(path != nullptr);
@ -1833,18 +1854,9 @@ ClientChannel::PickSubchannel(LoadBalancingPolicy::SubchannelPicker& picker,
// it when the call finishes.
if (complete_pick->subchannel_call_tracker != nullptr) {
complete_pick->subchannel_call_tracker->Start();
call_initiator.SetContext(
unstarted_call_handler.SetContext(
complete_pick->subchannel_call_tracker.release());
}
// Now that we're done with client initial metadata, push it
// into the call initiator.
call_initiator.SpawnGuarded(
"send_initial_metadata",
[call_initiator, client_initial_metadata =
std::move(client_initial_metadata)]() mutable {
return call_initiator.PushClientInitialMetadata(
std::move(client_initial_metadata));
});
// Return the connected subchannel.
return connected_subchannel;
},

@ -96,11 +96,6 @@ class ClientChannel : public Channel {
void Ping(grpc_completion_queue* cq, void* tag) override;
// Creates a load balanced call on the channel.
CallInitiator CreateLoadBalancedCall(
ClientMetadataHandle client_initial_metadata,
absl::AnyInvocable<void()> on_commit, bool is_transparent_retry);
// Flag that this object gets stored in channel args as a raw pointer.
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() {
@ -111,6 +106,7 @@ class ClientChannel : public Channel {
class ResolverResultHandler;
class ClientChannelControlHelper;
class SubchannelWrapper;
class LoadBalancedCallDestination;
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
void DestroyResolverAndLbPolicyLocked()
@ -168,8 +164,7 @@ class ClientChannel : public Channel {
// context.
LoopCtl<absl::StatusOr<RefCountedPtr<ConnectedSubchannel>>> PickSubchannel(
LoadBalancingPolicy::SubchannelPicker& picker,
ClientMetadataHandle& client_initial_metadata,
CallInitiator& call_initiator);
UnstartedCallHandler& unstarted_handler);
//
// Fields set at construction and never modified.
@ -181,7 +176,6 @@ class ClientChannel : public Channel {
ClientChannelFactory* client_channel_factory_;
std::string default_authority_;
channelz::ChannelNode* channelz_node_;
OrphanablePtr<CallDestination> call_destination_;
//
// State for LB calls.
@ -201,7 +195,7 @@ class ClientChannel : public Channel {
//
struct ResolverDataForCalls {
RefCountedPtr<ConfigSelector> config_selector;
RefCountedPtr<CallFilters::Stack> filter_stack;
RefCountedPtr<CallDestination> call_destination;
};
Observable<absl::StatusOr<ResolverDataForCalls>> resolver_data_for_calls_;

@ -97,75 +97,148 @@ DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
//
ConnectedSubchannel::ConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: RefCounted<ConnectedSubchannel>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
? "ConnectedSubchannel"
: nullptr),
channel_stack_(std::move(channel_stack)),
args_(args),
channelz_subchannel_(std::move(channelz_subchannel)) {}
ConnectedSubchannel::~ConnectedSubchannel() {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}
void ConnectedSubchannel::StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher);
op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
op->bind_pollset_set = interested_parties;
grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
channel_stack_->call_stack_size;
}
ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise(
CallArgs call_args) {
// If not using channelz, we just need to call the channel stack.
if (channelz_subchannel() == nullptr) {
return channel_stack_->MakeClientCallPromise(std::move(call_args));
}
// Otherwise, we need to wrap the channel stack promise with code that
// handles the channelz updates.
return OnCancel(
Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
[self = Ref()](ServerMetadataHandle metadata) {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
channelz_subchannel->RecordCallFailed();
} else {
channelz_subchannel->RecordCallSucceeded();
}
return metadata;
}),
[self = Ref()]() {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
channelz_subchannel->RecordCallFailed();
});
}
//
// LegacyConnectedSubchannel
//
class LegacyConnectedSubchannel : public ConnectedSubchannel {
public:
LegacyConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
channel_stack_(std::move(channel_stack)) {}
~LegacyConnectedSubchannel() override {
channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
}
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->start_connectivity_watch = std::move(watcher);
op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
op->bind_pollset_set = interested_parties;
grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) override {
Crash("call v3 ping method called in legacy impl");
}
void StartCall(UnstartedCallHandler) override {
Crash("call v3 StartCall() method called in legacy impl");
}
grpc_channel_stack* channel_stack() const override {
return channel_stack_.get();
}
size_t GetInitialCallSizeEstimate() const override {
return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
channel_stack_->call_stack_size;
}
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) override {
// If not using channelz, we just need to call the channel stack.
if (channelz_subchannel() == nullptr) {
return channel_stack_->MakeClientCallPromise(std::move(call_args));
}
// Otherwise, we need to wrap the channel stack promise with code that
// handles the channelz updates.
return OnCancel(
Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
[self = Ref()](ServerMetadataHandle metadata) {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
if (metadata->get(GrpcStatusMetadata())
.value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
channelz_subchannel->RecordCallFailed();
} else {
channelz_subchannel->RecordCallSucceeded();
}
return metadata;
}),
[self = Ref()]() {
channelz::SubchannelNode* channelz_subchannel =
self->channelz_subchannel();
GPR_ASSERT(channelz_subchannel != nullptr);
channelz_subchannel->RecordCallFailed();
});
}
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem;
op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(channel_stack_, 0);
elem->filter->start_transport_op(elem, op);
}
private:
RefCountedPtr<grpc_channel_stack> channel_stack_;
};
//
// NewConnectedSubchannel
//
class NewConnectedSubchannel : public ConnectedSubchannel {
public:
NewConnectedSubchannel(
RefCountedPtr<CallFilters::Stack> filter_stack,
OrphanablePtr<Transport> transport, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
: ConnectedSubchannel(args, std::move(channelz_subchannel)),
filter_stack_(std::move(filter_stack)),
transport_(std::move(transport)) {}
void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
// FIXME: add new transport API for this in v3 stack
}
void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) override {
// FIXME: add new transport API for this in v3 stack
}
void StartCall(UnstartedCallHandler unstarted_handler) override {
auto handler = unstarted_handler.StartCall(filter_stack_);
transport_->StartCall(std::move(handler));
}
grpc_channel_stack* channel_stack() const override { return nullptr; }
size_t GetInitialCallSizeEstimate() const override { return 0; }
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) override {
Crash("legacy MakeCallPromise() method called in call v3 impl");
}
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
Crash("legacy ping method called in call v3 impl");
}
private:
RefCountedPtr<CallFilters::Stack> filter_stack_;
OrphanablePtr<Transport> transport_;
};
//
// SubchannelCall
@ -789,11 +862,18 @@ bool Subchannel::PublishTransportLocked() {
key_.ToString().c_str(), stack.status().ToString().c_str());
return false;
}
connected_subchannel_ = MakeRefCounted<ConnectedSubchannel>(
connected_subchannel_ = MakeRefCounted<LegacyConnectedSubchannel>(
std::move(stack), args_, channelz_node_);
} else {
// Call v3 stack.
// FIXME
CallFilters::StackBuilder builder;
// FIXME: add filters registered for CLIENT_SUBCHANNEL
auto filter_stack = builder.Build();
connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
std::move(filter_stack),
OrphanablePtr<Transport>(
std::exchange(connecting_result_.transport, nullptr)),
args_, channelz_node_);
}
connecting_result_.Reset();
// Publish.

@ -68,17 +68,6 @@ class SubchannelCall;
class ConnectedSubchannel : public CallDestination {
public:
// Ctor for legacy stack.
ConnectedSubchannel(
RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
// Ctor for call v3 stack.
ConnectedSubchannel(
Transport* transport, const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
~ConnectedSubchannel() override;
void Orphan() override {}
const ChannelArgs& args() const { return args_; }
@ -86,22 +75,26 @@ class ConnectedSubchannel : public CallDestination {
return channelz_subchannel_.get();
}
void StartWatch(grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
virtual void StartWatch(
grpc_pollset_set* interested_parties,
OrphanablePtr<ConnectivityStateWatcherInterface> watcher) = 0;
// Methods for v3 stack.
RefCountedPtr<CallFilters::Stack> GetStack();
void StartCall(CallHandler call_handler) override;
void Ping(absl::AnyInvocable<void(absl::Status)> on_ack);
virtual void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) = 0;
// Methods for legacy stack.
grpc_channel_stack* channel_stack() const { return channel_stack_.get(); }
size_t GetInitialCallSizeEstimate() const;
ArenaPromise<ServerMetadataHandle> MakeCallPromise(CallArgs call_args);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
virtual grpc_channel_stack* channel_stack() const = 0;
virtual size_t GetInitialCallSizeEstimate() const = 0;
virtual ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) = 0;
virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
protected:
ConnectedSubchannel(
const ChannelArgs& args,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
private:
RefCountedPtr<grpc_channel_stack> channel_stack_;
ChannelArgs args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.

@ -69,9 +69,6 @@ class Channel : public RefCounted<Channel>,
virtual void Orphan() = 0;
Arena* CreateArena();
void DestroyArena(Arena* arena);
virtual bool IsLame() const = 0;
// TODO(roth): This should return a C++ type.
@ -126,6 +123,8 @@ class Channel : public RefCounted<Channel>,
virtual void Ping(grpc_completion_queue* cq, void* tag) = 0;
// TODO(roth): Remove these methods when LegacyChannel goes away.
Arena* CreateArena();
void DestroyArena(Arena* arena);
virtual grpc_channel_stack* channel_stack() const { return nullptr; }
virtual bool is_client() const { return true; }
virtual bool is_promising() const { return true; }

@ -99,19 +99,6 @@ absl::StatusOr<OrphanablePtr<Channel>> LegacyChannel::Create(
builder.IsPromising(), std::move(target), args, std::move(*r));
}
namespace {
class NotReallyACallFactory final : public CallFactory {
public:
using CallFactory::CallFactory;
void Orphan() override { delete this; }
CallInitiator CreateCall(ClientMetadataHandle, Arena*) override {
Crash("NotReallyACallFactory::CreateCall should never be called");
}
};
} // namespace
LegacyChannel::LegacyChannel(bool is_client, bool is_promising,
std::string target,
const ChannelArgs& channel_args,

@ -1,35 +0,0 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H
#define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/transport/call_spine.h"
namespace grpc_core {
// CallDestination is responsible for the processing of a CallHandler.
// It might be a transport, the server API, or a subchannel on the client (for
// instance).
class CallDestination : public DualRefCounted<CallDestination> {
public:
virtual void StartCall(CallHandler call_handler) = 0;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_DESTINATION_H

@ -89,10 +89,28 @@ void ForwardCall(CallHandler call_handler, CallInitiator call_initiator) {
});
}
CallHandler UnstartedCallHandler::StartCall(
RefCountedPtr<CallFilters::Stack> stack,
RefCountedPtr<CallDestination> call_destination) {
call_initiator->SpawnGuarded(
"send_initial_metadata",
[client_initial_metadata = std::move(client_initial_metadata_),
spine = spine_]() mutable {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine->party());
return Map(spine_->client_initial_metadata().sender.Push(
std::move(client_initial_metadata)),
[](bool ok) { return StatusFlag(ok); });
});
// FIXME: attach stack and destination to CallHandler
return CallHandler(std::move(spine_));
}
CallInitiatorAndHandler MakeCall(
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena) {
auto spine = CallSpine::Create(event_engine, arena);
return {CallInitiator(spine), CallHandler(spine)};
return {CallInitiator(spine),
UnstartedCallHandler(std::move(client_initial_metadata), spine)};
}
} // namespace grpc_core

@ -19,6 +19,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
@ -228,6 +229,10 @@ class CallInitiator {
explicit CallInitiator(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
// FIXME: should this method go away, since we now want this to be
// handled in UnstartedCallHandler instead?
// Note: this would require changing ServerTransport::AcceptCall() to
// act like a CallDestination
auto PushClientInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().sender.Push(std::move(md)),
@ -323,10 +328,6 @@ class CallHandler {
explicit CallHandler(RefCountedPtr<CallSpineInterface> spine)
: spine_(std::move(spine)) {}
void SetStack(RefCountedPtr<CallFilters::Stack> stack) {
// FIXME: Implement.
}
auto PullClientInitialMetadata() {
GPR_DEBUG_ASSERT(GetContext<Activity>() == &spine_->party());
return Map(spine_->client_initial_metadata().receiver.Next(),
@ -407,12 +408,110 @@ class CallHandler {
RefCountedPtr<CallSpineInterface> spine_;
};
class UnstartedCallHandler;
// CallDestination is responsible for starting an UnstartedCallHandler
// and then processing operations on the resulting CallHandler.
//
// Examples of CallDestinations include:
// - a client transport
// - the server API
// - a load-balanced call in the client channel
// - a hijacking filter (see DelegatingCallDestination below)
//
// FIXME: do we want this to be ref-counted? that might not be
// desirable for the hijacking filter case, where we want the filter stack
// to own the filter rather than having every call take its own ref to every
// hijacking filter.
class CallDestination : public DualRefCounted<CallDestination> {
public:
virtual void StartCall(UnstartedCallHandler unstarted_call_handler) = 0;
};
// A delegating CallDestination for use as a hijacking filter.
// Implementations may look at the unprocessed initial metadata
// and decide to do one of two things:
//
// 1. It can be a no-op. In this case, it will simply pass the
// unstarted_call_handler to the wrapped CallDestination.
//
// 2. It can hijack the call by doing the following:
// - Start unstarted_call_handler and take ownership of the
// resulting handler.
// - Create a new CallInitiator/UnstartedCallHandler pair, and pass
// that new UnstartedCallHandler down to the wrapped CallDestination.
// - The implementation is then responsible for forwarding between
// the started handler and the new initiator. Note that in
// simple cases, this can be done via ForwardCall().
class DelegatingCallDestination : public CallDestination {
protected:
explicit DelegatingCallDestination(
RefCountedPtr<CallDestination> wrapped_destination)
: wrapped_destination_(std::move(wrapped_destination)) {}
CallDestination* wrapped_destination() const {
return wrapped_destination_.get();
}
private:
RefCountedPtr<CallDestination> wrapped_destination_;
};
class UnstartedCallHandler {
public:
UnstartedCallHandler(RefCountedPtr<CallSpineInterface> spine,
ClientMetadataHandle client_initial_metadata)
: spine_(std::move(spine)),
client_initial_metadata_(std::move(client_initial_metadata)) {}
// Returns the client initial metadata, which has not yet been
// processed by the stack that will ultimately be used for this call.
ClientMetadataHandle& UnprocessedClientInitialMetadata() {
return client_initial_metadata_;
}
// Starts the call using the specified stack.
// This must be called only once, and the UnstartedCallHandler object
// may not be used after this is called.
CallHandler StartCall(RefCountedPtr<CallFilters::Stack> stack);
template <typename ContextType>
void SetContext(ContextType context) {
// FIXME: implement
}
template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}
template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
}
template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}
template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
return spine_->party().SpawnWaitable(name, std::move(promise_factory));
}
private:
RefCountedPtr<CallSpineInterface> spine_;
ClientMetadataHandle client_initial_metadata_;
};
struct CallInitiatorAndHandler {
CallInitiator initiator;
CallHandler handler;
UnstartedCallHandler unstarted_handler;
};
CallInitiatorAndHandler MakeCall(
ClientMetadataHandle client_initial_metadata,
grpc_event_engine::experimental::EventEngine* event_engine, Arena* arena);
template <typename CallHalf>

@ -29,6 +29,7 @@
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/function_ref.h"
#include "absl/meta/type_traits.h"
#include "absl/strings/numbers.h"
@ -503,6 +504,25 @@ struct WaitForReady {
static std::string DisplayValue(ValueType x);
};
// Annotation added by retry code to indicate a transparent retry.
struct IsTransparentRetry {
static absl::string_view DebugKey() { return "IsTransparentRetry"; }
static constexpr bool kRepeatable = false;
using ValueType = bool;
static std::string DisplayValue(ValueType x) { return x ? "true" : "false"; }
};
// Annotation added by retry or no-retry filters to pass the on-commit
// callback to the load balanced call.
struct LoadBalancingOnCommit {
static absl::string_view DebugKey() { return "LoadBalancingOnCommit"; }
static constexpr bool kRepeatable = false;
using ValueType = absl::AnyInvocable<void()>;
static std::string DisplayValue(const ValueType& x) {
return x == nullptr ? "unset" : "set";
}
};
// Annotation added by a transport to note that server trailing metadata
// is a Trailers-Only response.
struct GrpcTrailersOnly {
@ -1511,6 +1531,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
grpc_core::GrpcCallWasCancelled, grpc_core::WaitForReady,
grpc_core::IsTransparentRetry, grpc_core::LoadBalancingOnCommit,
grpc_core::GrpcTrailersOnly, grpc_core::GrpcTarPit,
grpc_core::GrpcRegisteredMethod GRPC_CUSTOM_CLIENT_METADATA
GRPC_CUSTOM_SERVER_METADATA>;

@ -540,6 +540,7 @@ class FilterStackTransport {
~FilterStackTransport() = default;
};
// FIXME: should this just be an alias for CallDestination?
class ClientTransport {
public:
virtual void StartCall(CallHandler call_handler) = 0;

@ -2806,7 +2806,6 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_destination.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \

@ -2583,7 +2583,6 @@ src/core/lib/transport/batch_builder.cc \
src/core/lib/transport/batch_builder.h \
src/core/lib/transport/bdp_estimator.cc \
src/core/lib/transport/bdp_estimator.h \
src/core/lib/transport/call_destination.h \
src/core/lib/transport/call_filters.cc \
src/core/lib/transport/call_filters.h \
src/core/lib/transport/call_final_info.cc \

Loading…
Cancel
Save