From 9954dbacfc90766427c101e85c01f7f26b9d9022 Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Wed, 12 Jun 2024 10:50:01 -0700 Subject: [PATCH 1/5] [infra] Remove CODEOWNERS for rollouts.yaml (#36878) There is already an exception for `experiments.yaml`, but any change to that file still adds @veblush and @gnossen to every PR. This is correcting an oversight. See https://github.com/grpc/grpc/pull/36876 --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7998f90b0a0..ee847011d75 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,5 +1,6 @@ /bazel/** @veblush @gnossen /bazel/experiments.yaml +/bazel/rollouts.yaml /cmake/** @veblush @apolcyn /src/core/client_channel/** @markdroth /src/core/ext/transport/chttp2/transport/** @ctiller From fdac9ebfdfcb75de30138a021e1d7a974697d908 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 12 Jun 2024 12:40:11 -0700 Subject: [PATCH 2/5] [thready_tsan] Grab bag of improvements (#36886) - ensure ordering of `OnAccept` and `Shutdown` callbacks from thready event engine (previously these could be reordered and this caused spurious failures) - enable thready_tsan for one C++ e2e test - don't filter thready_tsan for local builds (only CI) Closes #36886 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36886 from ctiller:erm e3b88e7d86aff7707819ddd010850123e730a91a PiperOrigin-RevId: 642702724 --- .../thready_event_engine.cc | 38 ++++++++++++++++--- test/cpp/end2end/BUILD | 1 + tools/bazel.rc | 1 - 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc b/src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc index f448108a390..3be7a0e0b32 100644 --- a/src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc +++ b/src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -22,6 +22,7 @@ #include #include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { @@ -39,20 +40,45 @@ ThreadyEventEngine::CreateListener( absl::AnyInvocable on_shutdown, const EndpointConfig& config, std::unique_ptr memory_allocator_factory) { + struct AcceptState { + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + int pending_accepts_ ABSL_GUARDED_BY(mu_) = 0; + }; + auto accept_state = std::make_shared(); return impl_->CreateListener( - [this, on_accept = std::make_shared( - std::move(on_accept))](std::unique_ptr endpoint, - MemoryAllocator memory_allocator) { + [this, accept_state, + on_accept = std::make_shared( + std::move(on_accept))](std::unique_ptr endpoint, + MemoryAllocator memory_allocator) { + { + grpc_core::MutexLock lock(&accept_state->mu_); + ++accept_state->pending_accepts_; + } Asynchronously( - [on_accept, endpoint = std::move(endpoint), + [on_accept, accept_state, endpoint = std::move(endpoint), memory_allocator = std::move(memory_allocator)]() mutable { (*on_accept)(std::move(endpoint), std::move(memory_allocator)); + { + grpc_core::MutexLock lock(&accept_state->mu_); + --accept_state->pending_accepts_; + if (accept_state->pending_accepts_ == 0) { + accept_state->cv_.Signal(); + } + } }); }, - [this, + [this, accept_state, on_shutdown = std::move(on_shutdown)](absl::Status status) mutable { - Asynchronously([on_shutdown = std::move(on_shutdown), + Asynchronously([accept_state, on_shutdown = std::move(on_shutdown), status = std::move(status)]() mutable { + while (true) { + grpc_core::MutexLock lock(&accept_state->mu_); + if (accept_state->pending_accepts_ == 0) { + break; + } + accept_state->cv_.Wait(&accept_state->mu_); + } on_shutdown(std::move(status)); }); }, diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 6ada7da27c1..4a424354c39 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -112,6 +112,7 @@ grpc_cc_test( tags = [ "cpp_end2end_test", "no_test_ios", + "thready_tsan", ], deps = [ "//:gpr", diff --git a/tools/bazel.rc b/tools/bazel.rc index 36ec153f52a..3d98df37012 100644 --- a/tools/bazel.rc +++ b/tools/bazel.rc @@ -133,7 +133,6 @@ build:thready_tsan --copt=-DGPR_NO_DIRECT_SYSCALLS build:thready_tsan --copt=-DGRPC_TSAN build:thready_tsan --copt=-DGRPC_MAXIMIZE_THREADYNESS build:thready_tsan --linkopt=-fsanitize=thread -build:thready_tsan --test_tag_filters=thready_tsan build:thready_tsan --action_env=TSAN_OPTIONS=suppressions=test/core/test_util/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1 build:tsan_macos --strip=never From 8564f72e8e0334c25c480e0aec1df75bdc1fce14 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 12 Jun 2024 13:18:03 -0700 Subject: [PATCH 3/5] [omg] Avoid MSVC miscompile (#36893) Seems MSVC has trouble compiling this statically initialized lambda in a template; crash goes away when I split it out to be a named function instead. Closes #36893 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36893 from ctiller:edward 4bf9b88797abb214f83996c9fb405cbabbfa7e96 PiperOrigin-RevId: 642714465 --- src/core/lib/resource_quota/arena.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/core/lib/resource_quota/arena.h b/src/core/lib/resource_quota/arena.h index 916dbec4548..5d282e57a4d 100644 --- a/src/core/lib/resource_quota/arena.h +++ b/src/core/lib/resource_quota/arena.h @@ -94,8 +94,13 @@ class ArenaContextTraits : public BaseArenaContextTraits { }; template -const uint16_t ArenaContextTraits::id_ = BaseArenaContextTraits::MakeId( - [](void* ptr) { ArenaContextType::Destroy(static_cast(ptr)); }); +void DestroyArenaContext(void* p) { + ArenaContextType::Destroy(static_cast(p)); +} + +template +const uint16_t ArenaContextTraits::id_ = + BaseArenaContextTraits::MakeId(DestroyArenaContext); template struct IfArray { @@ -283,6 +288,7 @@ class Arena final : public RefCounted::Destroy(static_cast(slot)); } slot = context; + DCHECK_EQ(GetContext(), context); } private: From f7ce3ee9d54e30cf8ce944b8b3c3f945eb856f64 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 12 Jun 2024 15:19:26 -0700 Subject: [PATCH 4/5] [call v3] add dynamic filter support to client channel (#36877) Also made some minor improvements to the `ConfigSelector` API. Closes #36877 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36877 from markdroth:client_channel_v3_dynamic_filters 6a539fe3200dd81b8ca6fd1aa3955ecc8f75666a PiperOrigin-RevId: 642755276 --- CMakeLists.txt | 2 - Makefile | 1 - Package.swift | 1 - build_autogenerated.yaml | 2 - config.m4 | 1 - config.w32 | 1 - gRPC-Core.podspec | 1 - grpc.gemspec | 1 - package.xml | 1 - src/core/BUILD | 6 +- src/core/client_channel/client_channel.cc | 7 +- src/core/client_channel/config_selector.cc | 60 ----------- src/core/client_channel/config_selector.h | 31 +++--- src/core/resolver/xds/xds_resolver.cc | 40 +++++-- src/core/xds/grpc/xds_http_fault_filter.cc | 4 + src/core/xds/grpc/xds_http_fault_filter.h | 1 + src/core/xds/grpc/xds_http_filters.h | 4 + src/core/xds/grpc/xds_http_rbac_filter.cc | 4 + src/core/xds/grpc/xds_http_rbac_filter.h | 1 + .../grpc/xds_http_stateful_session_filter.cc | 5 + .../grpc/xds_http_stateful_session_filter.h | 1 + src/python/grpcio/grpc_core_dependencies.py | 1 - .../client_channel/client_channel_test.cc | 102 +++++++++++++++++- test/cpp/end2end/client_lb_end2end_test.cc | 5 +- tools/doxygen/Doxyfile.c++.internal | 1 - tools/doxygen/Doxyfile.core.internal | 1 - 26 files changed, 180 insertions(+), 105 deletions(-) delete mode 100644 src/core/client_channel/config_selector.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index c379a81ef28..00d7dec71a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1853,7 +1853,6 @@ add_library(grpc src/core/client_channel/client_channel_filter.cc src/core/client_channel/client_channel_plugin.cc src/core/client_channel/client_channel_service_config.cc - src/core/client_channel/config_selector.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc src/core/client_channel/load_balanced_call_destination.cc @@ -2946,7 +2945,6 @@ add_library(grpc_unsecure src/core/client_channel/client_channel_filter.cc src/core/client_channel/client_channel_plugin.cc src/core/client_channel/client_channel_service_config.cc - src/core/client_channel/config_selector.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc src/core/client_channel/load_balanced_call_destination.cc diff --git a/Makefile b/Makefile index 1d39b0f27e9..aa1295bfa64 100644 --- a/Makefile +++ b/Makefile @@ -675,7 +675,6 @@ LIBGRPC_SRC = \ src/core/client_channel/client_channel_filter.cc \ src/core/client_channel/client_channel_plugin.cc \ src/core/client_channel/client_channel_service_config.cc \ - src/core/client_channel/config_selector.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/load_balanced_call_destination.cc \ diff --git a/Package.swift b/Package.swift index 706e261a442..903fceaa748 100644 --- a/Package.swift +++ b/Package.swift @@ -136,7 +136,6 @@ let package = Package( "src/core/client_channel/client_channel_plugin.cc", "src/core/client_channel/client_channel_service_config.cc", "src/core/client_channel/client_channel_service_config.h", - "src/core/client_channel/config_selector.cc", "src/core/client_channel/config_selector.h", "src/core/client_channel/connector.h", "src/core/client_channel/dynamic_filters.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index fa157a9fbae..dad0979993b 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1258,7 +1258,6 @@ libs: - src/core/client_channel/client_channel_filter.cc - src/core/client_channel/client_channel_plugin.cc - src/core/client_channel/client_channel_service_config.cc - - src/core/client_channel/config_selector.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc - src/core/client_channel/load_balanced_call_destination.cc @@ -2714,7 +2713,6 @@ libs: - src/core/client_channel/client_channel_filter.cc - src/core/client_channel/client_channel_plugin.cc - src/core/client_channel/client_channel_service_config.cc - - src/core/client_channel/config_selector.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc - src/core/client_channel/load_balanced_call_destination.cc diff --git a/config.m4 b/config.m4 index 7a6ce84d3d7..6b3d155e3fa 100644 --- a/config.m4 +++ b/config.m4 @@ -50,7 +50,6 @@ if test "$PHP_GRPC" != "no"; then src/core/client_channel/client_channel_filter.cc \ src/core/client_channel/client_channel_plugin.cc \ src/core/client_channel/client_channel_service_config.cc \ - src/core/client_channel/config_selector.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/load_balanced_call_destination.cc \ diff --git a/config.w32 b/config.w32 index 657bdf0b37f..030bffccc0b 100644 --- a/config.w32 +++ b/config.w32 @@ -15,7 +15,6 @@ if (PHP_GRPC != "no") { "src\\core\\client_channel\\client_channel_filter.cc " + "src\\core\\client_channel\\client_channel_plugin.cc " + "src\\core\\client_channel\\client_channel_service_config.cc " + - "src\\core\\client_channel\\config_selector.cc " + "src\\core\\client_channel\\dynamic_filters.cc " + "src\\core\\client_channel\\global_subchannel_pool.cc " + "src\\core\\client_channel\\load_balanced_call_destination.cc " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1116516edbd..f23ac164ff6 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -255,7 +255,6 @@ Pod::Spec.new do |s| 'src/core/client_channel/client_channel_plugin.cc', 'src/core/client_channel/client_channel_service_config.cc', 'src/core/client_channel/client_channel_service_config.h', - 'src/core/client_channel/config_selector.cc', 'src/core/client_channel/config_selector.h', 'src/core/client_channel/connector.h', 'src/core/client_channel/dynamic_filters.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 91416b2ac1f..92494c37b7d 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -142,7 +142,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/client_channel/client_channel_plugin.cc ) s.files += %w( src/core/client_channel/client_channel_service_config.cc ) s.files += %w( src/core/client_channel/client_channel_service_config.h ) - s.files += %w( src/core/client_channel/config_selector.cc ) s.files += %w( src/core/client_channel/config_selector.h ) s.files += %w( src/core/client_channel/connector.h ) s.files += %w( src/core/client_channel/dynamic_filters.cc ) diff --git a/package.xml b/package.xml index 523bc3aa67b..3d88d93e590 100644 --- a/package.xml +++ b/package.xml @@ -124,7 +124,6 @@ - diff --git a/src/core/BUILD b/src/core/BUILD index 76d3eb99400..f6b6b13ea10 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3295,9 +3295,6 @@ grpc_cc_library( grpc_cc_library( name = "config_selector", - srcs = [ - "client_channel/config_selector.cc", - ], hdrs = [ "client_channel/config_selector.h", ], @@ -3313,9 +3310,11 @@ grpc_cc_library( "channel_fwd", "client_channel_internal_header", "grpc_service_config", + "interception_chain", "metadata_batch", "ref_counted", "slice", + "unique_type_name", "useful", "//:gpr_public_hdrs", "//:grpc_public_hdrs", @@ -5215,6 +5214,7 @@ grpc_cc_library( "grpc_tls_credentials", "grpc_transport_chttp2_client_connector", "init_internally", + "interception_chain", "iomgr_fwd", "json", "json_args", diff --git a/src/core/client_channel/client_channel.cc b/src/core/client_channel/client_channel.cc index 004f87bbba0..87a66c75d91 100644 --- a/src/core/client_channel/client_channel.cc +++ b/src/core/client_channel/client_channel.cc @@ -1195,14 +1195,17 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() { } CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder( GRPC_CLIENT_CHANNEL, builder); - // TODO(roth): add filters returned by config selector - // Create call destination. + // Add filters returned by the config selector (e.g., xDS HTTP filters). + config_selector->AddFilters(builder); + // TODO(roth, ctiller): When we implement the retry interceptor, that + // needs to be added *after* the filters added by the config selector. const bool enable_retries = !channel_args_.WantMinimalStack() && channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true); if (enable_retries) { Crash("call v3 stack does not yet support retries"); } + // Create call destination. auto top_of_stack_call_destination = builder.Build(call_destination_); // Send result to data plane. if (!top_of_stack_call_destination.ok()) { diff --git a/src/core/client_channel/config_selector.cc b/src/core/client_channel/config_selector.cc deleted file mode 100644 index 9264e4c1618..00000000000 --- a/src/core/client_channel/config_selector.cc +++ /dev/null @@ -1,60 +0,0 @@ -// -// Copyright 2020 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include - -#include "src/core/client_channel/config_selector.h" - -#include "src/core/lib/channel/channel_args.h" -#include "src/core/util/useful.h" - -namespace grpc_core { - -namespace { - -void* ConfigSelectorArgCopy(void* p) { - ConfigSelector* config_selector = static_cast(p); - config_selector->Ref().release(); - return p; -} - -void ConfigSelectorArgDestroy(void* p) { - ConfigSelector* config_selector = static_cast(p); - config_selector->Unref(); -} - -int ConfigSelectorArgCmp(void* p, void* q) { return QsortCompare(p, q); } - -const grpc_arg_pointer_vtable kChannelArgVtable = { - ConfigSelectorArgCopy, ConfigSelectorArgDestroy, ConfigSelectorArgCmp}; - -} // namespace - -grpc_arg ConfigSelector::MakeChannelArg() const { - return grpc_channel_arg_pointer_create( - const_cast(GRPC_ARG_CONFIG_SELECTOR), - const_cast(this), &kChannelArgVtable); -} - -RefCountedPtr ConfigSelector::GetFromChannelArgs( - const grpc_channel_args& args) { - ConfigSelector* config_selector = - grpc_channel_args_find_pointer(&args, - GRPC_ARG_CONFIG_SELECTOR); - return config_selector != nullptr ? config_selector->Ref() : nullptr; -} - -} // namespace grpc_core diff --git a/src/core/client_channel/config_selector.h b/src/core/client_channel/config_selector.h index 0a0a6d54c80..d049ae95836 100644 --- a/src/core/client_channel/config_selector.h +++ b/src/core/client_channel/config_selector.h @@ -35,8 +35,10 @@ #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice.h" +#include "src/core/lib/transport/interception_chain.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/service_config/service_config.h" #include "src/core/util/useful.h" @@ -50,34 +52,32 @@ namespace grpc_core { // MethodConfig and provide input to LB policies on a per-call basis. class ConfigSelector : public RefCounted { public: - struct GetCallConfigArgs { - grpc_metadata_batch* initial_metadata; - Arena* arena; - ClientChannelServiceConfigCallData* service_config_call_data; - }; - ~ConfigSelector() override = default; - virtual const char* name() const = 0; + virtual UniqueTypeName name() const = 0; static bool Equals(const ConfigSelector* cs1, const ConfigSelector* cs2) { if (cs1 == nullptr) return cs2 == nullptr; if (cs2 == nullptr) return false; - if (strcmp(cs1->name(), cs2->name()) != 0) return false; + if (cs1->name() != cs2->name()) return false; return cs1->Equals(cs2); } // The channel will call this when the resolver returns a new ConfigSelector // to determine what set of dynamic filters will be configured. + virtual void AddFilters(InterceptionChainBuilder& /*builder*/) {} + // TODO(roth): Remove this once the legacy filter stack goes away. virtual std::vector GetFilters() { return {}; } - // Returns the call config to use for the call, or a status to fail - // the call with. + // Gets the configuration for the call and stores it in service config + // call data. + struct GetCallConfigArgs { + grpc_metadata_batch* initial_metadata; + Arena* arena; + ClientChannelServiceConfigCallData* service_config_call_data; + }; virtual absl::Status GetCallConfig(GetCallConfigArgs args) = 0; - grpc_arg MakeChannelArg() const; - static RefCountedPtr GetFromChannelArgs( - const grpc_channel_args& args); static absl::string_view ChannelArgName() { return GRPC_ARG_CONFIG_SELECTOR; } static int ChannelArgsCompare(const ConfigSelector* a, const ConfigSelector* b) { @@ -101,7 +101,10 @@ class DefaultConfigSelector final : public ConfigSelector { DCHECK(service_config_ != nullptr); } - const char* name() const override { return "default"; } + UniqueTypeName name() const override { + static UniqueTypeName::Factory kFactory("default"); + return kFactory.Create(); + } absl::Status GetCallConfig(GetCallConfigArgs args) override { Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata()); diff --git a/src/core/resolver/xds/xds_resolver.cc b/src/core/resolver/xds/xds_resolver.cc index 5f582bc63f8..286f4e0dedd 100644 --- a/src/core/resolver/xds/xds_resolver.cc +++ b/src/core/resolver/xds/xds_resolver.cc @@ -270,7 +270,10 @@ class XdsResolver final : public Resolver { RefCountedPtr route_config_data); ~XdsConfigSelector() override; - const char* name() const override { return "XdsConfigSelector"; } + UniqueTypeName name() const override { + static UniqueTypeName::Factory kFactory("XdsConfigSelector"); + return kFactory.Create(); + } bool Equals(const ConfigSelector* other) const override { const auto* other_xds = static_cast(other); @@ -281,14 +284,14 @@ class XdsResolver final : public Resolver { absl::Status GetCallConfig(GetCallConfigArgs args) override; - std::vector GetFilters() override { - return filters_; - } + void AddFilters(InterceptionChainBuilder& builder) override; + + std::vector GetFilters() override; private: RefCountedPtr resolver_; RefCountedPtr route_config_data_; - std::vector filters_; + std::vector filters_; }; class XdsRouteStateAttributeImpl final : public XdsRouteStateAttribute { @@ -641,12 +644,9 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector( http_filter_registry.GetFilterForType( http_filter.config.config_proto_type_name); CHECK_NE(filter_impl, nullptr); - // Add C-core filter to list. - if (filter_impl->channel_filter() != nullptr) { - filters_.push_back(filter_impl->channel_filter()); - } + // Add filter to list. + filters_.push_back(filter_impl); } - filters_.push_back(&ClusterSelectionFilter::kFilter); } XdsResolver::XdsConfigSelector::~XdsConfigSelector() { @@ -799,6 +799,26 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( return absl::OkStatus(); } +void XdsResolver::XdsConfigSelector::AddFilters( + InterceptionChainBuilder& builder) { + for (const XdsHttpFilterImpl* filter : filters_) { + filter->AddFilter(builder); + } + builder.Add(); +} + +std::vector +XdsResolver::XdsConfigSelector::GetFilters() { + std::vector filters; + for (const XdsHttpFilterImpl* filter : filters_) { + if (filter->channel_filter() != nullptr) { + filters.push_back(filter->channel_filter()); + } + } + filters.push_back(&ClusterSelectionFilter::kFilter); + return filters; +} + // // XdsResolver::XdsRouteStateAttributeImpl // diff --git a/src/core/xds/grpc/xds_http_fault_filter.cc b/src/core/xds/grpc/xds_http_fault_filter.cc index 48156bd8cc3..0c65e13deb2 100644 --- a/src/core/xds/grpc/xds_http_fault_filter.cc +++ b/src/core/xds/grpc/xds_http_fault_filter.cc @@ -214,6 +214,10 @@ XdsHttpFaultFilter::GenerateFilterConfigOverride( return GenerateFilterConfig(context, std::move(extension), errors); } +void XdsHttpFaultFilter::AddFilter(InterceptionChainBuilder& builder) const { + builder.Add(); +} + const grpc_channel_filter* XdsHttpFaultFilter::channel_filter() const { return &FaultInjectionFilter::kFilter; } diff --git a/src/core/xds/grpc/xds_http_fault_filter.h b/src/core/xds/grpc/xds_http_fault_filter.h index cd6dfd78691..ff2394c7a28 100644 --- a/src/core/xds/grpc/xds_http_fault_filter.h +++ b/src/core/xds/grpc/xds_http_fault_filter.h @@ -44,6 +44,7 @@ class XdsHttpFaultFilter final : public XdsHttpFilterImpl { absl::optional GenerateFilterConfigOverride( const XdsResourceType::DecodeContext& context, XdsExtension extension, ValidationErrors* errors) const override; + void AddFilter(InterceptionChainBuilder& builder) const override; const grpc_channel_filter* channel_filter() const override; ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override; absl::StatusOr GenerateServiceConfig( diff --git a/src/core/xds/grpc/xds_http_filters.h b/src/core/xds/grpc/xds_http_filters.h index b61690fd74d..6e5cda52b57 100644 --- a/src/core/xds/grpc/xds_http_filters.h +++ b/src/core/xds/grpc/xds_http_filters.h @@ -35,6 +35,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_fwd.h" #include "src/core/lib/gprpp/validation_errors.h" +#include "src/core/lib/transport/interception_chain.h" #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" #include "src/core/xds/grpc/xds_common_types.h" @@ -96,6 +97,8 @@ class XdsHttpFilterImpl { ValidationErrors* errors) const = 0; // C-core channel filter implementation. + virtual void AddFilter(InterceptionChainBuilder& builder) const = 0; + // TODO(roth): Remove this once the legacy filter stack goes away. virtual const grpc_channel_filter* channel_filter() const = 0; // Modifies channel args that may affect service config parsing (not @@ -135,6 +138,7 @@ class XdsHttpRouterFilter final : public XdsHttpFilterImpl { absl::optional GenerateFilterConfigOverride( const XdsResourceType::DecodeContext& context, XdsExtension extension, ValidationErrors* errors) const override; + void AddFilter(InterceptionChainBuilder& /*builder*/) const override {} const grpc_channel_filter* channel_filter() const override { return nullptr; } absl::StatusOr GenerateServiceConfig( const FilterConfig& /*hcm_filter_config*/, diff --git a/src/core/xds/grpc/xds_http_rbac_filter.cc b/src/core/xds/grpc/xds_http_rbac_filter.cc index ee81008bd90..6ba09042c88 100644 --- a/src/core/xds/grpc/xds_http_rbac_filter.cc +++ b/src/core/xds/grpc/xds_http_rbac_filter.cc @@ -564,6 +564,10 @@ XdsHttpRbacFilter::GenerateFilterConfigOverride( return FilterConfig{OverrideConfigProtoName(), std::move(rbac_json)}; } +void XdsHttpRbacFilter::AddFilter(InterceptionChainBuilder& builder) const { + builder.Add(); +} + const grpc_channel_filter* XdsHttpRbacFilter::channel_filter() const { return &RbacFilter::kFilterVtable; } diff --git a/src/core/xds/grpc/xds_http_rbac_filter.h b/src/core/xds/grpc/xds_http_rbac_filter.h index f95fc5d8693..ad357a978e2 100644 --- a/src/core/xds/grpc/xds_http_rbac_filter.h +++ b/src/core/xds/grpc/xds_http_rbac_filter.h @@ -44,6 +44,7 @@ class XdsHttpRbacFilter final : public XdsHttpFilterImpl { absl::optional GenerateFilterConfigOverride( const XdsResourceType::DecodeContext& context, XdsExtension extension, ValidationErrors* errors) const override; + void AddFilter(InterceptionChainBuilder& builder) const override; const grpc_channel_filter* channel_filter() const override; ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override; absl::StatusOr GenerateServiceConfig( diff --git a/src/core/xds/grpc/xds_http_stateful_session_filter.cc b/src/core/xds/grpc/xds_http_stateful_session_filter.cc index 698e1776361..7b0b05219cd 100644 --- a/src/core/xds/grpc/xds_http_stateful_session_filter.cc +++ b/src/core/xds/grpc/xds_http_stateful_session_filter.cc @@ -194,6 +194,11 @@ XdsHttpStatefulSessionFilter::GenerateFilterConfigOverride( Json::FromObject(std::move(config))}; } +void XdsHttpStatefulSessionFilter::AddFilter( + InterceptionChainBuilder& builder) const { + builder.Add(); +} + const grpc_channel_filter* XdsHttpStatefulSessionFilter::channel_filter() const { return &StatefulSessionFilter::kFilter; diff --git a/src/core/xds/grpc/xds_http_stateful_session_filter.h b/src/core/xds/grpc/xds_http_stateful_session_filter.h index b734b0fb135..4df7a04a5d8 100644 --- a/src/core/xds/grpc/xds_http_stateful_session_filter.h +++ b/src/core/xds/grpc/xds_http_stateful_session_filter.h @@ -44,6 +44,7 @@ class XdsHttpStatefulSessionFilter final : public XdsHttpFilterImpl { absl::optional GenerateFilterConfigOverride( const XdsResourceType::DecodeContext& context, XdsExtension extension, ValidationErrors* errors) const override; + void AddFilter(InterceptionChainBuilder& builder) const override; const grpc_channel_filter* channel_filter() const override; ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override; absl::StatusOr GenerateServiceConfig( diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index ae570fb582a..8b24084ed2b 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -24,7 +24,6 @@ CORE_SOURCE_FILES = [ 'src/core/client_channel/client_channel_filter.cc', 'src/core/client_channel/client_channel_plugin.cc', 'src/core/client_channel/client_channel_service_config.cc', - 'src/core/client_channel/config_selector.cc', 'src/core/client_channel/dynamic_filters.cc', 'src/core/client_channel/global_subchannel_pool.cc', 'src/core/client_channel/load_balanced_call_destination.cc', diff --git a/test/core/client_channel/client_channel_test.cc b/test/core/client_channel/client_channel_test.cc index 1aa8c644816..a280eae8a71 100644 --- a/test/core/client_channel/client_channel_test.cc +++ b/test/core/client_channel/client_channel_test.cc @@ -23,7 +23,9 @@ #include #include "src/core/lib/address_utils/parse_address.h" +#include "src/core/lib/channel/promise_based_filter.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/service_config/service_config_impl.h" #include "test/core/call/yodel/yodel_test.h" namespace grpc_core { @@ -77,11 +79,21 @@ class ClientChannelTest : public YodelTest { } Resolver::Result MakeSuccessfulResolutionResult( - absl::string_view endpoint_address) { + absl::string_view endpoint_address, + absl::StatusOr> service_config = nullptr, + RefCountedPtr config_selector = nullptr) { Resolver::Result result; grpc_resolved_address address; CHECK(grpc_parse_uri(URI::Parse(endpoint_address).value(), &address)); result.addresses = EndpointAddressesList({EndpointAddresses{address, {}}}); + result.service_config = std::move(service_config); + if (config_selector != nullptr) { + CHECK(result.service_config.ok()) + << "channel does not use ConfigSelector without service config"; + CHECK(*result.service_config != nullptr) + << "channel does not use ConfigSelector without service config"; + result.args = ChannelArgs().SetObject(std::move(config_selector)); + } return result; } @@ -268,6 +280,94 @@ CLIENT_CHANNEL_TEST(StartCall) { WaitForAllPendingWork(); } +// A filter that adds metadata foo=bar. +class TestFilter : public ImplementChannelFilter { + public: + class Call { + public: + void OnClientInitialMetadata(ClientMetadata& md) { + md.Append("foo", Slice::FromStaticString("bar"), + [](absl::string_view error, const Slice&) { + FAIL() << "error encoding metadata: " << error; + }); + } + + static const NoInterceptor OnClientToServerMessage; + static const NoInterceptor OnClientToServerHalfClose; + static const NoInterceptor OnServerInitialMetadata; + static const NoInterceptor OnServerToClientMessage; + static const NoInterceptor OnServerTrailingMetadata; + static const NoInterceptor OnFinalize; + }; + + static absl::StatusOr> Create( + const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/) { + return std::make_unique(); + } +}; + +const NoInterceptor TestFilter::Call::OnClientToServerMessage; +const NoInterceptor TestFilter::Call::OnClientToServerHalfClose; +const NoInterceptor TestFilter::Call::OnServerInitialMetadata; +const NoInterceptor TestFilter::Call::OnServerToClientMessage; +const NoInterceptor TestFilter::Call::OnServerTrailingMetadata; +const NoInterceptor TestFilter::Call::OnFinalize; + +// A config selector that adds TestFilter as a dynamic filter. +class TestConfigSelector : public ConfigSelector { + public: + UniqueTypeName name() const override { + static UniqueTypeName::Factory kFactory("test"); + return kFactory.Create(); + } + + void AddFilters(InterceptionChainBuilder& builder) override { + builder.Add(); + } + + absl::Status GetCallConfig(GetCallConfigArgs /*args*/) override { + return absl::OkStatus(); + } + + // Any instance of this class will behave the same, so all comparisons + // are true. + bool Equals(const ConfigSelector* /*other*/) const override { return true; } +}; + +CLIENT_CHANNEL_TEST(ConfigSelectorWithDynamicFilters) { + auto& channel = InitChannel(ChannelArgs()); + auto call = MakeCallPair(MakeClientInitialMetadata(), channel.event_engine(), + channel.call_arena_allocator()->MakeArena()); + channel.StartCall(std::move(call.handler)); + auto service_config = ServiceConfigImpl::Create(ChannelArgs(), "{}"); + ASSERT_TRUE(service_config.ok()); + QueueNameResolutionResult(MakeSuccessfulResolutionResult( + "ipv4:127.0.0.1:1234", std::move(service_config), + MakeRefCounted())); + auto call_handler = TickUntilCallStarted(); + SpawnTestSeq( + call_handler, "check_initial_metadata", + [call_handler]() mutable { + return call_handler.PullClientInitialMetadata(); + }, + [](ValueOrFailure md) { + EXPECT_TRUE(md.ok()); + if (md.ok()) { + std::string buffer; + auto value = (*md)->GetStringValue("foo", &buffer); + EXPECT_TRUE(value.has_value()); + if (value.has_value()) EXPECT_EQ(*value, "bar"); + } + return Empty{}; + }); + SpawnTestSeq(call.initiator, "cancel", + [call_initiator = call.initiator]() mutable { + call_initiator.Cancel(); + return Empty{}; + }); + WaitForAllPendingWork(); +} + // TODO(ctiller, roth): MANY more test cases // - Resolver returns an error for the initial result, then returns a valid // result. diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index cc0a6bbccfa..ea6156b545d 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -2947,7 +2947,10 @@ TEST_F(ControlPlaneStatusRewritingTest, RewritesFromConfigSelector) { public: explicit FailConfigSelector(absl::Status status) : status_(std::move(status)) {} - const char* name() const override { return "FailConfigSelector"; } + grpc_core::UniqueTypeName name() const override { + static grpc_core::UniqueTypeName::Factory kFactory("FailConfigSelector"); + return kFactory.Create(); + } bool Equals(const ConfigSelector* other) const override { return status_ == static_cast(other)->status_; } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 49d88672e31..c3797ac0f86 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1104,7 +1104,6 @@ src/core/client_channel/client_channel_internal.h \ src/core/client_channel/client_channel_plugin.cc \ src/core/client_channel/client_channel_service_config.cc \ src/core/client_channel/client_channel_service_config.h \ -src/core/client_channel/config_selector.cc \ src/core/client_channel/config_selector.h \ src/core/client_channel/connector.h \ src/core/client_channel/dynamic_filters.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 3cb1e1de44c..64d33400ab2 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -907,7 +907,6 @@ src/core/client_channel/client_channel_internal.h \ src/core/client_channel/client_channel_plugin.cc \ src/core/client_channel/client_channel_service_config.cc \ src/core/client_channel/client_channel_service_config.h \ -src/core/client_channel/config_selector.cc \ src/core/client_channel/config_selector.h \ src/core/client_channel/connector.h \ src/core/client_channel/dynamic_filters.cc \ From 568fbfff8cc0c637b2dfdf5211d5731dca63689c Mon Sep 17 00:00:00 2001 From: Yousuk Seung Date: Wed, 12 Jun 2024 16:19:03 -0700 Subject: [PATCH 5/5] [generic API] separate callback from cq in generic stub/service (#36447) See https://github.com/grpc/proposal/pull/426 Closes #36447 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36447 from yousukseung:generic-stub-service-refactor 1cc0cbdc4b74450f631115fefc254976383eb816 PiperOrigin-RevId: 642774012 --- BUILD | 40 ++++++ CMakeLists.txt | 6 + build_autogenerated.yaml | 6 + gRPC-C++.podspec | 3 + .../grpcpp/generic/async_generic_service.h | 54 +------- .../grpcpp/generic/callback_generic_service.h | 84 ++++++++++++ include/grpcpp/generic/generic_stub.h | 83 ++---------- .../grpcpp/generic/generic_stub_callback.h | 44 ++++++ include/grpcpp/impl/generic_stub_internal.h | 125 ++++++++++++++++++ tools/doxygen/Doxyfile.c++ | 3 + tools/doxygen/Doxyfile.c++.internal | 3 + 11 files changed, 326 insertions(+), 125 deletions(-) create mode 100644 include/grpcpp/generic/callback_generic_service.h create mode 100644 include/grpcpp/generic/generic_stub_callback.h create mode 100644 include/grpcpp/impl/generic_stub_internal.h diff --git a/BUILD b/BUILD index ec8bed94f53..7892cd46496 100644 --- a/BUILD +++ b/BUILD @@ -415,7 +415,9 @@ GRPCXX_PUBLIC_HDRS = [ "include/grpcpp/create_channel_posix.h", "include/grpcpp/ext/health_check_service_server_builder_option.h", "include/grpcpp/generic/async_generic_service.h", + "include/grpcpp/generic/callback_generic_service.h", "include/grpcpp/generic/generic_stub.h", + "include/grpcpp/generic/generic_stub_callback.h", "include/grpcpp/grpcpp.h", "include/grpcpp/health_check_service_interface.h", "include/grpcpp/impl/call_hook.h", @@ -1260,6 +1262,7 @@ grpc_cc_library( visibility = ["@grpc:public"], deps = [ "channel_arg_names", + "generic_stub_internal", "gpr", "grpc++_base_unsecure", "grpc++_codegen_proto", @@ -2470,6 +2473,7 @@ grpc_cc_library( "channel_stack_builder", "config", "exec_ctx", + "generic_stub_internal", "gpr", "grpc", "grpc++_codegen_proto", @@ -2558,6 +2562,7 @@ grpc_cc_library( "channel_stack_builder", "config", "exec_ctx", + "generic_stub_internal", "gpr", "grpc_base", "grpc_core_credentials_header", @@ -2955,6 +2960,41 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "generic_stub_internal", + hdrs = [ + "include/grpcpp/impl/generic_stub_internal.h", + ], + language = "c++", + deps = [ + "grpc++_public_hdrs", + ], +) + +grpc_cc_library( + name = "generic_stub_callback", + hdrs = [ + "include/grpcpp/generic/generic_stub_callback.h", + ], + language = "c++", + visibility = ["@grpc:public"], + deps = [ + "generic_stub_internal", + ], +) + +grpc_cc_library( + name = "callback_generic_service", + hdrs = [ + "include/grpcpp/generic/callback_generic_service.h", + ], + language = "c++", + visibility = ["@grpc:public"], + deps = [ + "grpc++_public_hdrs", + ], +) + grpc_cc_library( name = "work_serializer", srcs = [ diff --git a/CMakeLists.txt b/CMakeLists.txt index 00d7dec71a4..4708ab40ded 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4253,7 +4253,9 @@ foreach(_hdr include/grpcpp/ext/health_check_service_server_builder_option.h include/grpcpp/ext/server_metric_recorder.h include/grpcpp/generic/async_generic_service.h + include/grpcpp/generic/callback_generic_service.h include/grpcpp/generic/generic_stub.h + include/grpcpp/generic/generic_stub_callback.h include/grpcpp/grpcpp.h include/grpcpp/health_check_service_interface.h include/grpcpp/impl/call.h @@ -4314,6 +4316,7 @@ foreach(_hdr include/grpcpp/impl/completion_queue_tag.h include/grpcpp/impl/create_auth_context.h include/grpcpp/impl/delegating_channel.h + include/grpcpp/impl/generic_stub_internal.h include/grpcpp/impl/grpc_library.h include/grpcpp/impl/intercepted_channel.h include/grpcpp/impl/interceptor_common.h @@ -4996,7 +4999,9 @@ foreach(_hdr include/grpcpp/ext/health_check_service_server_builder_option.h include/grpcpp/ext/server_metric_recorder.h include/grpcpp/generic/async_generic_service.h + include/grpcpp/generic/callback_generic_service.h include/grpcpp/generic/generic_stub.h + include/grpcpp/generic/generic_stub_callback.h include/grpcpp/grpcpp.h include/grpcpp/health_check_service_interface.h include/grpcpp/impl/call.h @@ -5057,6 +5062,7 @@ foreach(_hdr include/grpcpp/impl/completion_queue_tag.h include/grpcpp/impl/create_auth_context.h include/grpcpp/impl/delegating_channel.h + include/grpcpp/impl/generic_stub_internal.h include/grpcpp/impl/grpc_library.h include/grpcpp/impl/intercepted_channel.h include/grpcpp/impl/interceptor_common.h diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index dad0979993b..8b8cad64122 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3713,7 +3713,9 @@ libs: - include/grpcpp/ext/health_check_service_server_builder_option.h - include/grpcpp/ext/server_metric_recorder.h - include/grpcpp/generic/async_generic_service.h + - include/grpcpp/generic/callback_generic_service.h - include/grpcpp/generic/generic_stub.h + - include/grpcpp/generic/generic_stub_callback.h - include/grpcpp/grpcpp.h - include/grpcpp/health_check_service_interface.h - include/grpcpp/impl/call.h @@ -3774,6 +3776,7 @@ libs: - include/grpcpp/impl/completion_queue_tag.h - include/grpcpp/impl/create_auth_context.h - include/grpcpp/impl/delegating_channel.h + - include/grpcpp/impl/generic_stub_internal.h - include/grpcpp/impl/grpc_library.h - include/grpcpp/impl/intercepted_channel.h - include/grpcpp/impl/interceptor_common.h @@ -4143,7 +4146,9 @@ libs: - include/grpcpp/ext/health_check_service_server_builder_option.h - include/grpcpp/ext/server_metric_recorder.h - include/grpcpp/generic/async_generic_service.h + - include/grpcpp/generic/callback_generic_service.h - include/grpcpp/generic/generic_stub.h + - include/grpcpp/generic/generic_stub_callback.h - include/grpcpp/grpcpp.h - include/grpcpp/health_check_service_interface.h - include/grpcpp/impl/call.h @@ -4204,6 +4209,7 @@ libs: - include/grpcpp/impl/completion_queue_tag.h - include/grpcpp/impl/create_auth_context.h - include/grpcpp/impl/delegating_channel.h + - include/grpcpp/impl/generic_stub_internal.h - include/grpcpp/impl/grpc_library.h - include/grpcpp/impl/intercepted_channel.h - include/grpcpp/impl/interceptor_common.h diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index dcbb22b52a1..842fef4262a 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -103,7 +103,9 @@ Pod::Spec.new do |s| 'include/grpcpp/ext/health_check_service_server_builder_option.h', 'include/grpcpp/ext/server_metric_recorder.h', 'include/grpcpp/generic/async_generic_service.h', + 'include/grpcpp/generic/callback_generic_service.h', 'include/grpcpp/generic/generic_stub.h', + 'include/grpcpp/generic/generic_stub_callback.h', 'include/grpcpp/grpcpp.h', 'include/grpcpp/health_check_service_interface.h', 'include/grpcpp/impl/call.h', @@ -160,6 +162,7 @@ Pod::Spec.new do |s| 'include/grpcpp/impl/completion_queue_tag.h', 'include/grpcpp/impl/create_auth_context.h', 'include/grpcpp/impl/delegating_channel.h', + 'include/grpcpp/impl/generic_stub_internal.h', 'include/grpcpp/impl/grpc_library.h', 'include/grpcpp/impl/intercepted_channel.h', 'include/grpcpp/impl/interceptor_common.h', diff --git a/include/grpcpp/generic/async_generic_service.h b/include/grpcpp/generic/async_generic_service.h index 77525d13b2a..68013caf223 100644 --- a/include/grpcpp/generic/async_generic_service.h +++ b/include/grpcpp/generic/async_generic_service.h @@ -20,10 +20,9 @@ #define GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H #include -#include +#include #include #include -#include struct grpc_server; @@ -78,57 +77,6 @@ class AsyncGenericService final { grpc::Server* server_; }; -/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs -/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with -/// ByteBuffer arguments. -using ServerGenericBidiReactor = ServerBidiReactor; - -class GenericCallbackServerContext final : public grpc::CallbackServerContext { - public: - const std::string& method() const { return method_; } - const std::string& host() const { return host_; } - - private: - friend class grpc::Server; - - std::string method_; - std::string host_; -}; - -/// \a CallbackGenericService is the base class for generic services implemented -/// using the callback API and registered through the ServerBuilder using -/// RegisterCallbackGenericService. -class CallbackGenericService { - public: - CallbackGenericService() {} - virtual ~CallbackGenericService() {} - - /// The "method handler" for the generic API. This function should be - /// overridden to provide a ServerGenericBidiReactor that implements the - /// application-level interface for this RPC. Unimplemented by default. - virtual ServerGenericBidiReactor* CreateReactor( - GenericCallbackServerContext* /*ctx*/) { - class Reactor : public ServerGenericBidiReactor { - public: - Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } - void OnDone() override { delete this; } - }; - return new Reactor; - } - - private: - friend class grpc::Server; - - internal::CallbackBidiHandler* Handler() { - return new internal::CallbackBidiHandler( - [this](grpc::CallbackServerContext* ctx) { - return CreateReactor(static_cast(ctx)); - }); - } - - grpc::Server* server_{nullptr}; -}; - } // namespace grpc #endif // GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H diff --git a/include/grpcpp/generic/callback_generic_service.h b/include/grpcpp/generic/callback_generic_service.h new file mode 100644 index 00000000000..db080a79e7d --- /dev/null +++ b/include/grpcpp/generic/callback_generic_service.h @@ -0,0 +1,84 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H +#define GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H + +#include +#include +#include +#include + +struct grpc_server; + +namespace grpc { + +/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs +/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with +/// ByteBuffer arguments. +using ServerGenericBidiReactor = ServerBidiReactor; + +class GenericCallbackServerContext final : public grpc::CallbackServerContext { + public: + const std::string& method() const { return method_; } + const std::string& host() const { return host_; } + + private: + friend class grpc::Server; + + std::string method_; + std::string host_; +}; + +/// \a CallbackGenericService is the base class for generic services implemented +/// using the callback API and registered through the ServerBuilder using +/// RegisterCallbackGenericService. +class CallbackGenericService { + public: + CallbackGenericService() {} + virtual ~CallbackGenericService() {} + + /// The "method handler" for the generic API. This function should be + /// overridden to provide a ServerGenericBidiReactor that implements the + /// application-level interface for this RPC. Unimplemented by default. + virtual ServerGenericBidiReactor* CreateReactor( + GenericCallbackServerContext* /*ctx*/) { + class Reactor : public ServerGenericBidiReactor { + public: + Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); } + void OnDone() override { delete this; } + }; + return new Reactor; + } + + private: + friend class grpc::Server; + + internal::CallbackBidiHandler* Handler() { + return new internal::CallbackBidiHandler( + [this](grpc::CallbackServerContext* ctx) { + return CreateReactor(static_cast(ctx)); + }); + } + + grpc::Server* server_{nullptr}; +}; + +} // namespace grpc + +#endif // GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H diff --git a/include/grpcpp/generic/generic_stub.h b/include/grpcpp/generic/generic_stub.h index a44232bc519..bc06c602d76 100644 --- a/include/grpcpp/generic/generic_stub.h +++ b/include/grpcpp/generic/generic_stub.h @@ -19,15 +19,12 @@ #ifndef GRPCPP_GENERIC_GENERIC_STUB_H #define GRPCPP_GENERIC_GENERIC_STUB_H -#include - #include +#include #include #include #include #include -#include -#include #include namespace grpc { @@ -42,10 +39,12 @@ typedef ClientAsyncResponseReader GenericClientAsyncResponseReader; /// by name. In practice, the Request and Response types should be basic /// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf). template -class TemplatedGenericStub final { +class TemplatedGenericStub final + : public internal::TemplatedGenericStubCallbackInternal { public: - explicit TemplatedGenericStub(std::shared_ptr channel) - : channel_(channel) {} + using internal::TemplatedGenericStubCallbackInternal< + RequestType, ResponseType>::TemplatedGenericStubCallbackInternal; /// Setup a call to a named method \a method using \a context, but don't /// start it. Let it be started explicitly with StartCall and a tag. @@ -74,6 +73,9 @@ class TemplatedGenericStub final { context, request)); } + using internal::TemplatedGenericStubCallbackInternal< + RequestType, ResponseType>::PrepareUnaryCall; + /// DEPRECATED for multi-threaded use /// Begin a call to a named method \a method using \a context. /// A tag \a tag will be delivered to \a cq when the call has been started @@ -87,72 +89,9 @@ class TemplatedGenericStub final { true, tag); } - /// Setup and start a unary call to a named method \a method using - /// \a context and specifying the \a request and \a response buffers. - void UnaryCall(ClientContext* context, const std::string& method, - StubOptions options, const RequestType* request, - ResponseType* response, - std::function on_completion) { - UnaryCallInternal(context, method, options, request, response, - std::move(on_completion)); - } - - /// Setup a unary call to a named method \a method using - /// \a context and specifying the \a request and \a response buffers. - /// Like any other reactor-based RPC, it will not be activated until - /// StartCall is invoked on its reactor. - void PrepareUnaryCall(ClientContext* context, const std::string& method, - StubOptions options, const RequestType* request, - ResponseType* response, ClientUnaryReactor* reactor) { - PrepareUnaryCallInternal(context, method, options, request, response, - reactor); - } - - /// Setup a call to a named method \a method using \a context and tied to - /// \a reactor . Like any other bidi streaming RPC, it will not be activated - /// until StartCall is invoked on its reactor. - void PrepareBidiStreamingCall( - ClientContext* context, const std::string& method, StubOptions options, - ClientBidiReactor* reactor) { - PrepareBidiStreamingCallInternal(context, method, options, reactor); - } - private: - std::shared_ptr channel_; - - void UnaryCallInternal(ClientContext* context, const std::string& method, - StubOptions options, const RequestType* request, - ResponseType* response, - std::function on_completion) { - internal::CallbackUnaryCall( - channel_.get(), - grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(), - grpc::internal::RpcMethod::NORMAL_RPC), - context, request, response, std::move(on_completion)); - } - - void PrepareUnaryCallInternal(ClientContext* context, - const std::string& method, StubOptions options, - const RequestType* request, - ResponseType* response, - ClientUnaryReactor* reactor) { - internal::ClientCallbackUnaryFactory::Create( - channel_.get(), - grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(), - grpc::internal::RpcMethod::NORMAL_RPC), - context, request, response, reactor); - } - - void PrepareBidiStreamingCallInternal( - ClientContext* context, const std::string& method, StubOptions options, - ClientBidiReactor* reactor) { - internal::ClientCallbackReaderWriterFactory:: - Create(channel_.get(), - grpc::internal::RpcMethod( - method.c_str(), options.suffix_for_stats(), - grpc::internal::RpcMethod::BIDI_STREAMING), - context, reactor); - } + using internal::TemplatedGenericStubCallbackInternal::channel_; std::unique_ptr> CallInternal(grpc::ChannelInterface* channel, ClientContext* context, diff --git a/include/grpcpp/generic/generic_stub_callback.h b/include/grpcpp/generic/generic_stub_callback.h new file mode 100644 index 00000000000..c039e2a4433 --- /dev/null +++ b/include/grpcpp/generic/generic_stub_callback.h @@ -0,0 +1,44 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H +#define GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H + +#include +#include + +namespace grpc { + +/// Generic stubs provide a type-unaware interface to call gRPC methods +/// by name. In practice, the Request and Response types should be basic +/// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf). +template +class TemplatedGenericStubCallback final + : public internal::TemplatedGenericStubCallbackInternal { + public: + using internal::TemplatedGenericStubCallbackInternal< + RequestType, ResponseType>::TemplatedGenericStubCallbackInternal; +}; + +typedef TemplatedGenericStubCallback + GenericStubCallback; + +} // namespace grpc + +#endif // GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H diff --git a/include/grpcpp/impl/generic_stub_internal.h b/include/grpcpp/impl/generic_stub_internal.h new file mode 100644 index 00000000000..ba8c929d8fe --- /dev/null +++ b/include/grpcpp/impl/generic_stub_internal.h @@ -0,0 +1,125 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H +#define GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H + +#include + +#include +#include +#include +#include +#include +#include + +namespace grpc { + +template +class TemplatedGenericStub; +template +class TemplatedGenericStubCallback; + +namespace internal { + +/// Generic stubs provide a type-unaware interface to call gRPC methods +/// by name. In practice, the Request and Response types should be basic +/// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf). +template +class TemplatedGenericStubCallbackInternal { + public: + explicit TemplatedGenericStubCallbackInternal( + std::shared_ptr channel) + : channel_(channel) {} + + /// Setup and start a unary call to a named method \a method using + /// \a context and specifying the \a request and \a response buffers. + void UnaryCall(ClientContext* context, const std::string& method, + StubOptions options, const RequestType* request, + ResponseType* response, + std::function on_completion) { + UnaryCallInternal(context, method, options, request, response, + std::move(on_completion)); + } + + /// Setup a unary call to a named method \a method using + /// \a context and specifying the \a request and \a response buffers. + /// Like any other reactor-based RPC, it will not be activated until + /// StartCall is invoked on its reactor. + void PrepareUnaryCall(ClientContext* context, const std::string& method, + StubOptions options, const RequestType* request, + ResponseType* response, ClientUnaryReactor* reactor) { + PrepareUnaryCallInternal(context, method, options, request, response, + reactor); + } + + /// Setup a call to a named method \a method using \a context and tied to + /// \a reactor . Like any other bidi streaming RPC, it will not be activated + /// until StartCall is invoked on its reactor. + void PrepareBidiStreamingCall( + ClientContext* context, const std::string& method, StubOptions options, + ClientBidiReactor* reactor) { + PrepareBidiStreamingCallInternal(context, method, options, reactor); + } + + private: + template + friend class grpc::TemplatedGenericStub; + template + friend class grpc::TemplatedGenericStubCallback; + std::shared_ptr channel_; + + void UnaryCallInternal(ClientContext* context, const std::string& method, + StubOptions options, const RequestType* request, + ResponseType* response, + std::function on_completion) { + internal::CallbackUnaryCall( + channel_.get(), + grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(), + grpc::internal::RpcMethod::NORMAL_RPC), + context, request, response, std::move(on_completion)); + } + + void PrepareUnaryCallInternal(ClientContext* context, + const std::string& method, StubOptions options, + const RequestType* request, + ResponseType* response, + ClientUnaryReactor* reactor) { + internal::ClientCallbackUnaryFactory::Create( + channel_.get(), + grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(), + grpc::internal::RpcMethod::NORMAL_RPC), + context, request, response, reactor); + } + + void PrepareBidiStreamingCallInternal( + ClientContext* context, const std::string& method, StubOptions options, + ClientBidiReactor* reactor) { + internal::ClientCallbackReaderWriterFactory:: + Create(channel_.get(), + grpc::internal::RpcMethod( + method.c_str(), options.suffix_for_stats(), + grpc::internal::RpcMethod::BIDI_STREAMING), + context, reactor); + } +}; + +} // namespace internal +} // namespace grpc + +#endif // GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 01b0f97652a..edf845fb46a 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -965,7 +965,9 @@ include/grpcpp/ext/call_metric_recorder.h \ include/grpcpp/ext/health_check_service_server_builder_option.h \ include/grpcpp/ext/server_metric_recorder.h \ include/grpcpp/generic/async_generic_service.h \ +include/grpcpp/generic/callback_generic_service.h \ include/grpcpp/generic/generic_stub.h \ +include/grpcpp/generic/generic_stub_callback.h \ include/grpcpp/grpcpp.h \ include/grpcpp/health_check_service_interface.h \ include/grpcpp/impl/call.h \ @@ -1026,6 +1028,7 @@ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/completion_queue_tag.h \ include/grpcpp/impl/create_auth_context.h \ include/grpcpp/impl/delegating_channel.h \ +include/grpcpp/impl/generic_stub_internal.h \ include/grpcpp/impl/grpc_library.h \ include/grpcpp/impl/intercepted_channel.h \ include/grpcpp/impl/interceptor_common.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index c3797ac0f86..3bceffda967 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -965,7 +965,9 @@ include/grpcpp/ext/call_metric_recorder.h \ include/grpcpp/ext/health_check_service_server_builder_option.h \ include/grpcpp/ext/server_metric_recorder.h \ include/grpcpp/generic/async_generic_service.h \ +include/grpcpp/generic/callback_generic_service.h \ include/grpcpp/generic/generic_stub.h \ +include/grpcpp/generic/generic_stub_callback.h \ include/grpcpp/grpcpp.h \ include/grpcpp/health_check_service_interface.h \ include/grpcpp/impl/call.h \ @@ -1026,6 +1028,7 @@ include/grpcpp/impl/codegen/time.h \ include/grpcpp/impl/completion_queue_tag.h \ include/grpcpp/impl/create_auth_context.h \ include/grpcpp/impl/delegating_channel.h \ +include/grpcpp/impl/generic_stub_internal.h \ include/grpcpp/impl/grpc_library.h \ include/grpcpp/impl/intercepted_channel.h \ include/grpcpp/impl/interceptor_common.h \