Merge branch 'master' into gpr_assert_removal_in_php

pull/36817/head
tanvi-jagtap 6 months ago
commit 2222f95413
  1. 1
      .github/CODEOWNERS
  2. 40
      BUILD
  3. 8
      CMakeLists.txt
  4. 1
      Makefile
  5. 1
      Package.swift
  6. 8
      build_autogenerated.yaml
  7. 1
      config.m4
  8. 1
      config.w32
  9. 3
      gRPC-C++.podspec
  10. 1
      gRPC-Core.podspec
  11. 1
      grpc.gemspec
  12. 54
      include/grpcpp/generic/async_generic_service.h
  13. 84
      include/grpcpp/generic/callback_generic_service.h
  14. 83
      include/grpcpp/generic/generic_stub.h
  15. 44
      include/grpcpp/generic/generic_stub_callback.h
  16. 125
      include/grpcpp/impl/generic_stub_internal.h
  17. 1
      package.xml
  18. 6
      src/core/BUILD
  19. 7
      src/core/client_channel/client_channel.cc
  20. 60
      src/core/client_channel/config_selector.cc
  21. 31
      src/core/client_channel/config_selector.h
  22. 38
      src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
  23. 10
      src/core/lib/resource_quota/arena.h
  24. 40
      src/core/resolver/xds/xds_resolver.cc
  25. 4
      src/core/xds/grpc/xds_http_fault_filter.cc
  26. 1
      src/core/xds/grpc/xds_http_fault_filter.h
  27. 4
      src/core/xds/grpc/xds_http_filters.h
  28. 4
      src/core/xds/grpc/xds_http_rbac_filter.cc
  29. 1
      src/core/xds/grpc/xds_http_rbac_filter.h
  30. 5
      src/core/xds/grpc/xds_http_stateful_session_filter.cc
  31. 1
      src/core/xds/grpc/xds_http_stateful_session_filter.h
  32. 1
      src/python/grpcio/grpc_core_dependencies.py
  33. 102
      test/core/client_channel/client_channel_test.cc
  34. 1
      test/cpp/end2end/BUILD
  35. 5
      test/cpp/end2end/client_lb_end2end_test.cc
  36. 1
      tools/bazel.rc
  37. 3
      tools/doxygen/Doxyfile.c++
  38. 4
      tools/doxygen/Doxyfile.c++.internal
  39. 1
      tools/doxygen/Doxyfile.core.internal

@ -1,5 +1,6 @@
/bazel/** @veblush @gnossen
/bazel/experiments.yaml
/bazel/rollouts.yaml
/cmake/** @veblush @apolcyn
/src/core/client_channel/** @markdroth
/src/core/ext/transport/chttp2/transport/** @ctiller

40
BUILD

@ -415,7 +415,9 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/create_channel_posix.h",
"include/grpcpp/ext/health_check_service_server_builder_option.h",
"include/grpcpp/generic/async_generic_service.h",
"include/grpcpp/generic/callback_generic_service.h",
"include/grpcpp/generic/generic_stub.h",
"include/grpcpp/generic/generic_stub_callback.h",
"include/grpcpp/grpcpp.h",
"include/grpcpp/health_check_service_interface.h",
"include/grpcpp/impl/call_hook.h",
@ -1260,6 +1262,7 @@ grpc_cc_library(
visibility = ["@grpc:public"],
deps = [
"channel_arg_names",
"generic_stub_internal",
"gpr",
"grpc++_base_unsecure",
"grpc++_codegen_proto",
@ -2470,6 +2473,7 @@ grpc_cc_library(
"channel_stack_builder",
"config",
"exec_ctx",
"generic_stub_internal",
"gpr",
"grpc",
"grpc++_codegen_proto",
@ -2558,6 +2562,7 @@ grpc_cc_library(
"channel_stack_builder",
"config",
"exec_ctx",
"generic_stub_internal",
"gpr",
"grpc_base",
"grpc_core_credentials_header",
@ -2955,6 +2960,41 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "generic_stub_internal",
hdrs = [
"include/grpcpp/impl/generic_stub_internal.h",
],
language = "c++",
deps = [
"grpc++_public_hdrs",
],
)
grpc_cc_library(
name = "generic_stub_callback",
hdrs = [
"include/grpcpp/generic/generic_stub_callback.h",
],
language = "c++",
visibility = ["@grpc:public"],
deps = [
"generic_stub_internal",
],
)
grpc_cc_library(
name = "callback_generic_service",
hdrs = [
"include/grpcpp/generic/callback_generic_service.h",
],
language = "c++",
visibility = ["@grpc:public"],
deps = [
"grpc++_public_hdrs",
],
)
grpc_cc_library(
name = "work_serializer",
srcs = [

8
CMakeLists.txt generated

@ -1853,7 +1853,6 @@ add_library(grpc
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
src/core/client_channel/config_selector.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@ -2946,7 +2945,6 @@ add_library(grpc_unsecure
src/core/client_channel/client_channel_filter.cc
src/core/client_channel/client_channel_plugin.cc
src/core/client_channel/client_channel_service_config.cc
src/core/client_channel/config_selector.cc
src/core/client_channel/dynamic_filters.cc
src/core/client_channel/global_subchannel_pool.cc
src/core/client_channel/load_balanced_call_destination.cc
@ -4255,7 +4253,9 @@ foreach(_hdr
include/grpcpp/ext/health_check_service_server_builder_option.h
include/grpcpp/ext/server_metric_recorder.h
include/grpcpp/generic/async_generic_service.h
include/grpcpp/generic/callback_generic_service.h
include/grpcpp/generic/generic_stub.h
include/grpcpp/generic/generic_stub_callback.h
include/grpcpp/grpcpp.h
include/grpcpp/health_check_service_interface.h
include/grpcpp/impl/call.h
@ -4316,6 +4316,7 @@ foreach(_hdr
include/grpcpp/impl/completion_queue_tag.h
include/grpcpp/impl/create_auth_context.h
include/grpcpp/impl/delegating_channel.h
include/grpcpp/impl/generic_stub_internal.h
include/grpcpp/impl/grpc_library.h
include/grpcpp/impl/intercepted_channel.h
include/grpcpp/impl/interceptor_common.h
@ -4998,7 +4999,9 @@ foreach(_hdr
include/grpcpp/ext/health_check_service_server_builder_option.h
include/grpcpp/ext/server_metric_recorder.h
include/grpcpp/generic/async_generic_service.h
include/grpcpp/generic/callback_generic_service.h
include/grpcpp/generic/generic_stub.h
include/grpcpp/generic/generic_stub_callback.h
include/grpcpp/grpcpp.h
include/grpcpp/health_check_service_interface.h
include/grpcpp/impl/call.h
@ -5059,6 +5062,7 @@ foreach(_hdr
include/grpcpp/impl/completion_queue_tag.h
include/grpcpp/impl/create_auth_context.h
include/grpcpp/impl/delegating_channel.h
include/grpcpp/impl/generic_stub_internal.h
include/grpcpp/impl/grpc_library.h
include/grpcpp/impl/intercepted_channel.h
include/grpcpp/impl/interceptor_common.h

1
Makefile generated

@ -675,7 +675,6 @@ LIBGRPC_SRC = \
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/config_selector.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \

1
Package.swift generated

@ -136,7 +136,6 @@ let package = Package(
"src/core/client_channel/client_channel_plugin.cc",
"src/core/client_channel/client_channel_service_config.cc",
"src/core/client_channel/client_channel_service_config.h",
"src/core/client_channel/config_selector.cc",
"src/core/client_channel/config_selector.h",
"src/core/client_channel/connector.h",
"src/core/client_channel/dynamic_filters.cc",

@ -1258,7 +1258,6 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
- src/core/client_channel/config_selector.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@ -2714,7 +2713,6 @@ libs:
- src/core/client_channel/client_channel_filter.cc
- src/core/client_channel/client_channel_plugin.cc
- src/core/client_channel/client_channel_service_config.cc
- src/core/client_channel/config_selector.cc
- src/core/client_channel/dynamic_filters.cc
- src/core/client_channel/global_subchannel_pool.cc
- src/core/client_channel/load_balanced_call_destination.cc
@ -3715,7 +3713,9 @@ libs:
- include/grpcpp/ext/health_check_service_server_builder_option.h
- include/grpcpp/ext/server_metric_recorder.h
- include/grpcpp/generic/async_generic_service.h
- include/grpcpp/generic/callback_generic_service.h
- include/grpcpp/generic/generic_stub.h
- include/grpcpp/generic/generic_stub_callback.h
- include/grpcpp/grpcpp.h
- include/grpcpp/health_check_service_interface.h
- include/grpcpp/impl/call.h
@ -3776,6 +3776,7 @@ libs:
- include/grpcpp/impl/completion_queue_tag.h
- include/grpcpp/impl/create_auth_context.h
- include/grpcpp/impl/delegating_channel.h
- include/grpcpp/impl/generic_stub_internal.h
- include/grpcpp/impl/grpc_library.h
- include/grpcpp/impl/intercepted_channel.h
- include/grpcpp/impl/interceptor_common.h
@ -4145,7 +4146,9 @@ libs:
- include/grpcpp/ext/health_check_service_server_builder_option.h
- include/grpcpp/ext/server_metric_recorder.h
- include/grpcpp/generic/async_generic_service.h
- include/grpcpp/generic/callback_generic_service.h
- include/grpcpp/generic/generic_stub.h
- include/grpcpp/generic/generic_stub_callback.h
- include/grpcpp/grpcpp.h
- include/grpcpp/health_check_service_interface.h
- include/grpcpp/impl/call.h
@ -4206,6 +4209,7 @@ libs:
- include/grpcpp/impl/completion_queue_tag.h
- include/grpcpp/impl/create_auth_context.h
- include/grpcpp/impl/delegating_channel.h
- include/grpcpp/impl/generic_stub_internal.h
- include/grpcpp/impl/grpc_library.h
- include/grpcpp/impl/intercepted_channel.h
- include/grpcpp/impl/interceptor_common.h

1
config.m4 generated

@ -50,7 +50,6 @@ if test "$PHP_GRPC" != "no"; then
src/core/client_channel/client_channel_filter.cc \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/config_selector.cc \
src/core/client_channel/dynamic_filters.cc \
src/core/client_channel/global_subchannel_pool.cc \
src/core/client_channel/load_balanced_call_destination.cc \

1
config.w32 generated

@ -15,7 +15,6 @@ if (PHP_GRPC != "no") {
"src\\core\\client_channel\\client_channel_filter.cc " +
"src\\core\\client_channel\\client_channel_plugin.cc " +
"src\\core\\client_channel\\client_channel_service_config.cc " +
"src\\core\\client_channel\\config_selector.cc " +
"src\\core\\client_channel\\dynamic_filters.cc " +
"src\\core\\client_channel\\global_subchannel_pool.cc " +
"src\\core\\client_channel\\load_balanced_call_destination.cc " +

3
gRPC-C++.podspec generated

@ -103,7 +103,9 @@ Pod::Spec.new do |s|
'include/grpcpp/ext/health_check_service_server_builder_option.h',
'include/grpcpp/ext/server_metric_recorder.h',
'include/grpcpp/generic/async_generic_service.h',
'include/grpcpp/generic/callback_generic_service.h',
'include/grpcpp/generic/generic_stub.h',
'include/grpcpp/generic/generic_stub_callback.h',
'include/grpcpp/grpcpp.h',
'include/grpcpp/health_check_service_interface.h',
'include/grpcpp/impl/call.h',
@ -160,6 +162,7 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/completion_queue_tag.h',
'include/grpcpp/impl/create_auth_context.h',
'include/grpcpp/impl/delegating_channel.h',
'include/grpcpp/impl/generic_stub_internal.h',
'include/grpcpp/impl/grpc_library.h',
'include/grpcpp/impl/intercepted_channel.h',
'include/grpcpp/impl/interceptor_common.h',

1
gRPC-Core.podspec generated

@ -255,7 +255,6 @@ Pod::Spec.new do |s|
'src/core/client_channel/client_channel_plugin.cc',
'src/core/client_channel/client_channel_service_config.cc',
'src/core/client_channel/client_channel_service_config.h',
'src/core/client_channel/config_selector.cc',
'src/core/client_channel/config_selector.h',
'src/core/client_channel/connector.h',
'src/core/client_channel/dynamic_filters.cc',

1
grpc.gemspec generated

@ -142,7 +142,6 @@ Gem::Specification.new do |s|
s.files += %w( src/core/client_channel/client_channel_plugin.cc )
s.files += %w( src/core/client_channel/client_channel_service_config.cc )
s.files += %w( src/core/client_channel/client_channel_service_config.h )
s.files += %w( src/core/client_channel/config_selector.cc )
s.files += %w( src/core/client_channel/config_selector.h )
s.files += %w( src/core/client_channel/connector.h )
s.files += %w( src/core/client_channel/dynamic_filters.cc )

@ -20,10 +20,9 @@
#define GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H
#include <grpc/support/port_platform.h>
#include <grpcpp/impl/server_callback_handlers.h>
#include <grpcpp/generic/callback_generic_service.h>
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/server_callback.h>
struct grpc_server;
@ -78,57 +77,6 @@ class AsyncGenericService final {
grpc::Server* server_;
};
/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs
/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with
/// ByteBuffer arguments.
using ServerGenericBidiReactor = ServerBidiReactor<ByteBuffer, ByteBuffer>;
class GenericCallbackServerContext final : public grpc::CallbackServerContext {
public:
const std::string& method() const { return method_; }
const std::string& host() const { return host_; }
private:
friend class grpc::Server;
std::string method_;
std::string host_;
};
/// \a CallbackGenericService is the base class for generic services implemented
/// using the callback API and registered through the ServerBuilder using
/// RegisterCallbackGenericService.
class CallbackGenericService {
public:
CallbackGenericService() {}
virtual ~CallbackGenericService() {}
/// The "method handler" for the generic API. This function should be
/// overridden to provide a ServerGenericBidiReactor that implements the
/// application-level interface for this RPC. Unimplemented by default.
virtual ServerGenericBidiReactor* CreateReactor(
GenericCallbackServerContext* /*ctx*/) {
class Reactor : public ServerGenericBidiReactor {
public:
Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); }
void OnDone() override { delete this; }
};
return new Reactor;
}
private:
friend class grpc::Server;
internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>* Handler() {
return new internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>(
[this](grpc::CallbackServerContext* ctx) {
return CreateReactor(static_cast<GenericCallbackServerContext*>(ctx));
});
}
grpc::Server* server_{nullptr};
};
} // namespace grpc
#endif // GRPCPP_GENERIC_ASYNC_GENERIC_SERVICE_H

@ -0,0 +1,84 @@
//
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H
#define GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H
#include <grpc/support/port_platform.h>
#include <grpcpp/impl/server_callback_handlers.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/server_callback.h>
struct grpc_server;
namespace grpc {
/// \a ServerGenericBidiReactor is the reactor class for bidi streaming RPCs
/// invoked on a CallbackGenericService. It is just a ServerBidi reactor with
/// ByteBuffer arguments.
using ServerGenericBidiReactor = ServerBidiReactor<ByteBuffer, ByteBuffer>;
class GenericCallbackServerContext final : public grpc::CallbackServerContext {
public:
const std::string& method() const { return method_; }
const std::string& host() const { return host_; }
private:
friend class grpc::Server;
std::string method_;
std::string host_;
};
/// \a CallbackGenericService is the base class for generic services implemented
/// using the callback API and registered through the ServerBuilder using
/// RegisterCallbackGenericService.
class CallbackGenericService {
public:
CallbackGenericService() {}
virtual ~CallbackGenericService() {}
/// The "method handler" for the generic API. This function should be
/// overridden to provide a ServerGenericBidiReactor that implements the
/// application-level interface for this RPC. Unimplemented by default.
virtual ServerGenericBidiReactor* CreateReactor(
GenericCallbackServerContext* /*ctx*/) {
class Reactor : public ServerGenericBidiReactor {
public:
Reactor() { this->Finish(Status(StatusCode::UNIMPLEMENTED, "")); }
void OnDone() override { delete this; }
};
return new Reactor;
}
private:
friend class grpc::Server;
internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>* Handler() {
return new internal::CallbackBidiHandler<ByteBuffer, ByteBuffer>(
[this](grpc::CallbackServerContext* ctx) {
return CreateReactor(static_cast<GenericCallbackServerContext*>(ctx));
});
}
grpc::Server* server_{nullptr};
};
} // namespace grpc
#endif // GRPCPP_GENERIC_CALLBACK_GENERIC_SERVICE_H

@ -19,15 +19,12 @@
#ifndef GRPCPP_GENERIC_GENERIC_STUB_H
#define GRPCPP_GENERIC_GENERIC_STUB_H
#include <functional>
#include <grpcpp/client_context.h>
#include <grpcpp/impl/generic_stub_internal.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/client_callback.h>
#include <grpcpp/support/status.h>
#include <grpcpp/support/stub_options.h>
namespace grpc {
@ -42,10 +39,12 @@ typedef ClientAsyncResponseReader<ByteBuffer> GenericClientAsyncResponseReader;
/// by name. In practice, the Request and Response types should be basic
/// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf).
template <class RequestType, class ResponseType>
class TemplatedGenericStub final {
class TemplatedGenericStub final
: public internal::TemplatedGenericStubCallbackInternal<RequestType,
ResponseType> {
public:
explicit TemplatedGenericStub(std::shared_ptr<grpc::ChannelInterface> channel)
: channel_(channel) {}
using internal::TemplatedGenericStubCallbackInternal<
RequestType, ResponseType>::TemplatedGenericStubCallbackInternal;
/// Setup a call to a named method \a method using \a context, but don't
/// start it. Let it be started explicitly with StartCall and a tag.
@ -74,6 +73,9 @@ class TemplatedGenericStub final {
context, request));
}
using internal::TemplatedGenericStubCallbackInternal<
RequestType, ResponseType>::PrepareUnaryCall;
/// DEPRECATED for multi-threaded use
/// Begin a call to a named method \a method using \a context.
/// A tag \a tag will be delivered to \a cq when the call has been started
@ -87,72 +89,9 @@ class TemplatedGenericStub final {
true, tag);
}
/// Setup and start a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
void UnaryCall(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
UnaryCallInternal(context, method, options, request, response,
std::move(on_completion));
}
/// Setup a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
/// Like any other reactor-based RPC, it will not be activated until
/// StartCall is invoked on its reactor.
void PrepareUnaryCall(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response, ClientUnaryReactor* reactor) {
PrepareUnaryCallInternal(context, method, options, request, response,
reactor);
}
/// Setup a call to a named method \a method using \a context and tied to
/// \a reactor . Like any other bidi streaming RPC, it will not be activated
/// until StartCall is invoked on its reactor.
void PrepareBidiStreamingCall(
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
PrepareBidiStreamingCallInternal(context, method, options, reactor);
}
private:
std::shared_ptr<grpc::ChannelInterface> channel_;
void UnaryCallInternal(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
internal::CallbackUnaryCall(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, std::move(on_completion));
}
void PrepareUnaryCallInternal(ClientContext* context,
const std::string& method, StubOptions options,
const RequestType* request,
ResponseType* response,
ClientUnaryReactor* reactor) {
internal::ClientCallbackUnaryFactory::Create<RequestType, ResponseType>(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, reactor);
}
void PrepareBidiStreamingCallInternal(
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
internal::ClientCallbackReaderWriterFactory<RequestType, ResponseType>::
Create(channel_.get(),
grpc::internal::RpcMethod(
method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::BIDI_STREAMING),
context, reactor);
}
using internal::TemplatedGenericStubCallbackInternal<RequestType,
ResponseType>::channel_;
std::unique_ptr<ClientAsyncReaderWriter<RequestType, ResponseType>>
CallInternal(grpc::ChannelInterface* channel, ClientContext* context,

@ -0,0 +1,44 @@
//
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H
#define GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H
#include <grpcpp/impl/generic_stub_internal.h>
#include <grpcpp/support/byte_buffer.h>
namespace grpc {
/// Generic stubs provide a type-unaware interface to call gRPC methods
/// by name. In practice, the Request and Response types should be basic
/// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf).
template <class RequestType, class ResponseType>
class TemplatedGenericStubCallback final
: public internal::TemplatedGenericStubCallbackInternal<RequestType,
ResponseType> {
public:
using internal::TemplatedGenericStubCallbackInternal<
RequestType, ResponseType>::TemplatedGenericStubCallbackInternal;
};
typedef TemplatedGenericStubCallback<grpc::ByteBuffer, grpc::ByteBuffer>
GenericStubCallback;
} // namespace grpc
#endif // GRPCPP_GENERIC_GENERIC_STUB_CALLBACK_H

@ -0,0 +1,125 @@
//
//
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H
#define GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H
#include <functional>
#include <grpcpp/client_context.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/client_callback.h>
#include <grpcpp/support/status.h>
#include <grpcpp/support/stub_options.h>
namespace grpc {
template <class RequestType, class ResponseType>
class TemplatedGenericStub;
template <class RequestType, class ResponseType>
class TemplatedGenericStubCallback;
namespace internal {
/// Generic stubs provide a type-unaware interface to call gRPC methods
/// by name. In practice, the Request and Response types should be basic
/// types like grpc::ByteBuffer or proto::MessageLite (the base protobuf).
template <class RequestType, class ResponseType>
class TemplatedGenericStubCallbackInternal {
public:
explicit TemplatedGenericStubCallbackInternal(
std::shared_ptr<grpc::ChannelInterface> channel)
: channel_(channel) {}
/// Setup and start a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
void UnaryCall(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
UnaryCallInternal(context, method, options, request, response,
std::move(on_completion));
}
/// Setup a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
/// Like any other reactor-based RPC, it will not be activated until
/// StartCall is invoked on its reactor.
void PrepareUnaryCall(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response, ClientUnaryReactor* reactor) {
PrepareUnaryCallInternal(context, method, options, request, response,
reactor);
}
/// Setup a call to a named method \a method using \a context and tied to
/// \a reactor . Like any other bidi streaming RPC, it will not be activated
/// until StartCall is invoked on its reactor.
void PrepareBidiStreamingCall(
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
PrepareBidiStreamingCallInternal(context, method, options, reactor);
}
private:
template <class Req, class Resp>
friend class grpc::TemplatedGenericStub;
template <class Req, class Resp>
friend class grpc::TemplatedGenericStubCallback;
std::shared_ptr<grpc::ChannelInterface> channel_;
void UnaryCallInternal(ClientContext* context, const std::string& method,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
internal::CallbackUnaryCall(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, std::move(on_completion));
}
void PrepareUnaryCallInternal(ClientContext* context,
const std::string& method, StubOptions options,
const RequestType* request,
ResponseType* response,
ClientUnaryReactor* reactor) {
internal::ClientCallbackUnaryFactory::Create<RequestType, ResponseType>(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, reactor);
}
void PrepareBidiStreamingCallInternal(
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
internal::ClientCallbackReaderWriterFactory<RequestType, ResponseType>::
Create(channel_.get(),
grpc::internal::RpcMethod(
method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::BIDI_STREAMING),
context, reactor);
}
};
} // namespace internal
} // namespace grpc
#endif // GRPCPP_IMPL_GENERIC_STUB_INTERNAL_H

1
package.xml generated

@ -124,7 +124,6 @@
<file baseinstalldir="/" name="src/core/client_channel/client_channel_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel_service_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/client_channel_service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/config_selector.cc" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/config_selector.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/connector.h" role="src" />
<file baseinstalldir="/" name="src/core/client_channel/dynamic_filters.cc" role="src" />

@ -3295,9 +3295,6 @@ grpc_cc_library(
grpc_cc_library(
name = "config_selector",
srcs = [
"client_channel/config_selector.cc",
],
hdrs = [
"client_channel/config_selector.h",
],
@ -3313,9 +3310,11 @@ grpc_cc_library(
"channel_fwd",
"client_channel_internal_header",
"grpc_service_config",
"interception_chain",
"metadata_batch",
"ref_counted",
"slice",
"unique_type_name",
"useful",
"//:gpr_public_hdrs",
"//:grpc_public_hdrs",
@ -5215,6 +5214,7 @@ grpc_cc_library(
"grpc_tls_credentials",
"grpc_transport_chttp2_client_connector",
"init_internally",
"interception_chain",
"iomgr_fwd",
"json",
"json_args",

@ -1195,14 +1195,17 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
}
CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
GRPC_CLIENT_CHANNEL, builder);
// TODO(roth): add filters returned by config selector
// Create call destination.
// Add filters returned by the config selector (e.g., xDS HTTP filters).
config_selector->AddFilters(builder);
// TODO(roth, ctiller): When we implement the retry interceptor, that
// needs to be added *after* the filters added by the config selector.
const bool enable_retries =
!channel_args_.WantMinimalStack() &&
channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
if (enable_retries) {
Crash("call v3 stack does not yet support retries");
}
// Create call destination.
auto top_of_stack_call_destination = builder.Build(call_destination_);
// Send result to data plane.
if (!top_of_stack_call_destination.ok()) {

@ -1,60 +0,0 @@
//
// Copyright 2020 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/support/port_platform.h>
#include "src/core/client_channel/config_selector.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/util/useful.h"
namespace grpc_core {
namespace {
void* ConfigSelectorArgCopy(void* p) {
ConfigSelector* config_selector = static_cast<ConfigSelector*>(p);
config_selector->Ref().release();
return p;
}
void ConfigSelectorArgDestroy(void* p) {
ConfigSelector* config_selector = static_cast<ConfigSelector*>(p);
config_selector->Unref();
}
int ConfigSelectorArgCmp(void* p, void* q) { return QsortCompare(p, q); }
const grpc_arg_pointer_vtable kChannelArgVtable = {
ConfigSelectorArgCopy, ConfigSelectorArgDestroy, ConfigSelectorArgCmp};
} // namespace
grpc_arg ConfigSelector::MakeChannelArg() const {
return grpc_channel_arg_pointer_create(
const_cast<char*>(GRPC_ARG_CONFIG_SELECTOR),
const_cast<ConfigSelector*>(this), &kChannelArgVtable);
}
RefCountedPtr<ConfigSelector> ConfigSelector::GetFromChannelArgs(
const grpc_channel_args& args) {
ConfigSelector* config_selector =
grpc_channel_args_find_pointer<ConfigSelector>(&args,
GRPC_ARG_CONFIG_SELECTOR);
return config_selector != nullptr ? config_selector->Ref() : nullptr;
}
} // namespace grpc_core

@ -35,8 +35,10 @@
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/interception_chain.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/service_config/service_config.h"
#include "src/core/util/useful.h"
@ -50,34 +52,32 @@ namespace grpc_core {
// MethodConfig and provide input to LB policies on a per-call basis.
class ConfigSelector : public RefCounted<ConfigSelector> {
public:
struct GetCallConfigArgs {
grpc_metadata_batch* initial_metadata;
Arena* arena;
ClientChannelServiceConfigCallData* service_config_call_data;
};
~ConfigSelector() override = default;
virtual const char* name() const = 0;
virtual UniqueTypeName name() const = 0;
static bool Equals(const ConfigSelector* cs1, const ConfigSelector* cs2) {
if (cs1 == nullptr) return cs2 == nullptr;
if (cs2 == nullptr) return false;
if (strcmp(cs1->name(), cs2->name()) != 0) return false;
if (cs1->name() != cs2->name()) return false;
return cs1->Equals(cs2);
}
// The channel will call this when the resolver returns a new ConfigSelector
// to determine what set of dynamic filters will be configured.
virtual void AddFilters(InterceptionChainBuilder& /*builder*/) {}
// TODO(roth): Remove this once the legacy filter stack goes away.
virtual std::vector<const grpc_channel_filter*> GetFilters() { return {}; }
// Returns the call config to use for the call, or a status to fail
// the call with.
// Gets the configuration for the call and stores it in service config
// call data.
struct GetCallConfigArgs {
grpc_metadata_batch* initial_metadata;
Arena* arena;
ClientChannelServiceConfigCallData* service_config_call_data;
};
virtual absl::Status GetCallConfig(GetCallConfigArgs args) = 0;
grpc_arg MakeChannelArg() const;
static RefCountedPtr<ConfigSelector> GetFromChannelArgs(
const grpc_channel_args& args);
static absl::string_view ChannelArgName() { return GRPC_ARG_CONFIG_SELECTOR; }
static int ChannelArgsCompare(const ConfigSelector* a,
const ConfigSelector* b) {
@ -101,7 +101,10 @@ class DefaultConfigSelector final : public ConfigSelector {
DCHECK(service_config_ != nullptr);
}
const char* name() const override { return "default"; }
UniqueTypeName name() const override {
static UniqueTypeName::Factory kFactory("default");
return kFactory.Create();
}
absl::Status GetCallConfig(GetCallConfigArgs args) override {
Slice* path = args.initial_metadata->get_pointer(HttpPathMetadata());

@ -22,6 +22,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
@ -39,20 +40,45 @@ ThreadyEventEngine::CreateListener(
absl::AnyInvocable<void(absl::Status)> on_shutdown,
const EndpointConfig& config,
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
struct AcceptState {
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
int pending_accepts_ ABSL_GUARDED_BY(mu_) = 0;
};
auto accept_state = std::make_shared<AcceptState>();
return impl_->CreateListener(
[this, on_accept = std::make_shared<Listener::AcceptCallback>(
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint,
MemoryAllocator memory_allocator) {
[this, accept_state,
on_accept = std::make_shared<Listener::AcceptCallback>(
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint,
MemoryAllocator memory_allocator) {
{
grpc_core::MutexLock lock(&accept_state->mu_);
++accept_state->pending_accepts_;
}
Asynchronously(
[on_accept, endpoint = std::move(endpoint),
[on_accept, accept_state, endpoint = std::move(endpoint),
memory_allocator = std::move(memory_allocator)]() mutable {
(*on_accept)(std::move(endpoint), std::move(memory_allocator));
{
grpc_core::MutexLock lock(&accept_state->mu_);
--accept_state->pending_accepts_;
if (accept_state->pending_accepts_ == 0) {
accept_state->cv_.Signal();
}
}
});
},
[this,
[this, accept_state,
on_shutdown = std::move(on_shutdown)](absl::Status status) mutable {
Asynchronously([on_shutdown = std::move(on_shutdown),
Asynchronously([accept_state, on_shutdown = std::move(on_shutdown),
status = std::move(status)]() mutable {
while (true) {
grpc_core::MutexLock lock(&accept_state->mu_);
if (accept_state->pending_accepts_ == 0) {
break;
}
accept_state->cv_.Wait(&accept_state->mu_);
}
on_shutdown(std::move(status));
});
},

@ -94,8 +94,13 @@ class ArenaContextTraits : public BaseArenaContextTraits {
};
template <typename T>
const uint16_t ArenaContextTraits<T>::id_ = BaseArenaContextTraits::MakeId(
[](void* ptr) { ArenaContextType<T>::Destroy(static_cast<T*>(ptr)); });
void DestroyArenaContext(void* p) {
ArenaContextType<T>::Destroy(static_cast<T*>(p));
}
template <typename T>
const uint16_t ArenaContextTraits<T>::id_ =
BaseArenaContextTraits::MakeId(DestroyArenaContext<T>);
template <typename T, typename A, typename B>
struct IfArray {
@ -283,6 +288,7 @@ class Arena final : public RefCounted<Arena, NonPolymorphicRefCount,
ArenaContextType<T>::Destroy(static_cast<T*>(slot));
}
slot = context;
DCHECK_EQ(GetContext<T>(), context);
}
private:

@ -270,7 +270,10 @@ class XdsResolver final : public Resolver {
RefCountedPtr<RouteConfigData> route_config_data);
~XdsConfigSelector() override;
const char* name() const override { return "XdsConfigSelector"; }
UniqueTypeName name() const override {
static UniqueTypeName::Factory kFactory("XdsConfigSelector");
return kFactory.Create();
}
bool Equals(const ConfigSelector* other) const override {
const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
@ -281,14 +284,14 @@ class XdsResolver final : public Resolver {
absl::Status GetCallConfig(GetCallConfigArgs args) override;
std::vector<const grpc_channel_filter*> GetFilters() override {
return filters_;
}
void AddFilters(InterceptionChainBuilder& builder) override;
std::vector<const grpc_channel_filter*> GetFilters() override;
private:
RefCountedPtr<XdsResolver> resolver_;
RefCountedPtr<RouteConfigData> route_config_data_;
std::vector<const grpc_channel_filter*> filters_;
std::vector<const XdsHttpFilterImpl*> filters_;
};
class XdsRouteStateAttributeImpl final : public XdsRouteStateAttribute {
@ -641,12 +644,9 @@ XdsResolver::XdsConfigSelector::XdsConfigSelector(
http_filter_registry.GetFilterForType(
http_filter.config.config_proto_type_name);
CHECK_NE(filter_impl, nullptr);
// Add C-core filter to list.
if (filter_impl->channel_filter() != nullptr) {
filters_.push_back(filter_impl->channel_filter());
}
// Add filter to list.
filters_.push_back(filter_impl);
}
filters_.push_back(&ClusterSelectionFilter::kFilter);
}
XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
@ -799,6 +799,26 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
return absl::OkStatus();
}
void XdsResolver::XdsConfigSelector::AddFilters(
InterceptionChainBuilder& builder) {
for (const XdsHttpFilterImpl* filter : filters_) {
filter->AddFilter(builder);
}
builder.Add<ClusterSelectionFilter>();
}
std::vector<const grpc_channel_filter*>
XdsResolver::XdsConfigSelector::GetFilters() {
std::vector<const grpc_channel_filter*> filters;
for (const XdsHttpFilterImpl* filter : filters_) {
if (filter->channel_filter() != nullptr) {
filters.push_back(filter->channel_filter());
}
}
filters.push_back(&ClusterSelectionFilter::kFilter);
return filters;
}
//
// XdsResolver::XdsRouteStateAttributeImpl
//

@ -214,6 +214,10 @@ XdsHttpFaultFilter::GenerateFilterConfigOverride(
return GenerateFilterConfig(context, std::move(extension), errors);
}
void XdsHttpFaultFilter::AddFilter(InterceptionChainBuilder& builder) const {
builder.Add<FaultInjectionFilter>();
}
const grpc_channel_filter* XdsHttpFaultFilter::channel_filter() const {
return &FaultInjectionFilter::kFilter;
}

@ -44,6 +44,7 @@ class XdsHttpFaultFilter final : public XdsHttpFilterImpl {
absl::optional<FilterConfig> GenerateFilterConfigOverride(
const XdsResourceType::DecodeContext& context, XdsExtension extension,
ValidationErrors* errors) const override;
void AddFilter(InterceptionChainBuilder& builder) const override;
const grpc_channel_filter* channel_filter() const override;
ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override;
absl::StatusOr<ServiceConfigJsonEntry> GenerateServiceConfig(

@ -35,6 +35,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/transport/interception_chain.h"
#include "src/core/util/json/json.h"
#include "src/core/util/json/json_writer.h"
#include "src/core/xds/grpc/xds_common_types.h"
@ -96,6 +97,8 @@ class XdsHttpFilterImpl {
ValidationErrors* errors) const = 0;
// C-core channel filter implementation.
virtual void AddFilter(InterceptionChainBuilder& builder) const = 0;
// TODO(roth): Remove this once the legacy filter stack goes away.
virtual const grpc_channel_filter* channel_filter() const = 0;
// Modifies channel args that may affect service config parsing (not
@ -135,6 +138,7 @@ class XdsHttpRouterFilter final : public XdsHttpFilterImpl {
absl::optional<FilterConfig> GenerateFilterConfigOverride(
const XdsResourceType::DecodeContext& context, XdsExtension extension,
ValidationErrors* errors) const override;
void AddFilter(InterceptionChainBuilder& /*builder*/) const override {}
const grpc_channel_filter* channel_filter() const override { return nullptr; }
absl::StatusOr<ServiceConfigJsonEntry> GenerateServiceConfig(
const FilterConfig& /*hcm_filter_config*/,

@ -564,6 +564,10 @@ XdsHttpRbacFilter::GenerateFilterConfigOverride(
return FilterConfig{OverrideConfigProtoName(), std::move(rbac_json)};
}
void XdsHttpRbacFilter::AddFilter(InterceptionChainBuilder& builder) const {
builder.Add<RbacFilter>();
}
const grpc_channel_filter* XdsHttpRbacFilter::channel_filter() const {
return &RbacFilter::kFilterVtable;
}

@ -44,6 +44,7 @@ class XdsHttpRbacFilter final : public XdsHttpFilterImpl {
absl::optional<FilterConfig> GenerateFilterConfigOverride(
const XdsResourceType::DecodeContext& context, XdsExtension extension,
ValidationErrors* errors) const override;
void AddFilter(InterceptionChainBuilder& builder) const override;
const grpc_channel_filter* channel_filter() const override;
ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override;
absl::StatusOr<ServiceConfigJsonEntry> GenerateServiceConfig(

@ -194,6 +194,11 @@ XdsHttpStatefulSessionFilter::GenerateFilterConfigOverride(
Json::FromObject(std::move(config))};
}
void XdsHttpStatefulSessionFilter::AddFilter(
InterceptionChainBuilder& builder) const {
builder.Add<StatefulSessionFilter>();
}
const grpc_channel_filter* XdsHttpStatefulSessionFilter::channel_filter()
const {
return &StatefulSessionFilter::kFilter;

@ -44,6 +44,7 @@ class XdsHttpStatefulSessionFilter final : public XdsHttpFilterImpl {
absl::optional<FilterConfig> GenerateFilterConfigOverride(
const XdsResourceType::DecodeContext& context, XdsExtension extension,
ValidationErrors* errors) const override;
void AddFilter(InterceptionChainBuilder& builder) const override;
const grpc_channel_filter* channel_filter() const override;
ChannelArgs ModifyChannelArgs(const ChannelArgs& args) const override;
absl::StatusOr<ServiceConfigJsonEntry> GenerateServiceConfig(

@ -24,7 +24,6 @@ CORE_SOURCE_FILES = [
'src/core/client_channel/client_channel_filter.cc',
'src/core/client_channel/client_channel_plugin.cc',
'src/core/client_channel/client_channel_service_config.cc',
'src/core/client_channel/config_selector.cc',
'src/core/client_channel/dynamic_filters.cc',
'src/core/client_channel/global_subchannel_pool.cc',
'src/core/client_channel/load_balanced_call_destination.cc',

@ -23,7 +23,9 @@
#include <grpc/grpc.h>
#include "src/core/lib/address_utils/parse_address.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/service_config/service_config_impl.h"
#include "test/core/call/yodel/yodel_test.h"
namespace grpc_core {
@ -77,11 +79,21 @@ class ClientChannelTest : public YodelTest {
}
Resolver::Result MakeSuccessfulResolutionResult(
absl::string_view endpoint_address) {
absl::string_view endpoint_address,
absl::StatusOr<RefCountedPtr<ServiceConfig>> service_config = nullptr,
RefCountedPtr<ConfigSelector> config_selector = nullptr) {
Resolver::Result result;
grpc_resolved_address address;
CHECK(grpc_parse_uri(URI::Parse(endpoint_address).value(), &address));
result.addresses = EndpointAddressesList({EndpointAddresses{address, {}}});
result.service_config = std::move(service_config);
if (config_selector != nullptr) {
CHECK(result.service_config.ok())
<< "channel does not use ConfigSelector without service config";
CHECK(*result.service_config != nullptr)
<< "channel does not use ConfigSelector without service config";
result.args = ChannelArgs().SetObject(std::move(config_selector));
}
return result;
}
@ -268,6 +280,94 @@ CLIENT_CHANNEL_TEST(StartCall) {
WaitForAllPendingWork();
}
// A filter that adds metadata foo=bar.
class TestFilter : public ImplementChannelFilter<TestFilter> {
public:
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& md) {
md.Append("foo", Slice::FromStaticString("bar"),
[](absl::string_view error, const Slice&) {
FAIL() << "error encoding metadata: " << error;
});
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnClientToServerHalfClose;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
};
static absl::StatusOr<std::unique_ptr<TestFilter>> Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/) {
return std::make_unique<TestFilter>();
}
};
const NoInterceptor TestFilter::Call::OnClientToServerMessage;
const NoInterceptor TestFilter::Call::OnClientToServerHalfClose;
const NoInterceptor TestFilter::Call::OnServerInitialMetadata;
const NoInterceptor TestFilter::Call::OnServerToClientMessage;
const NoInterceptor TestFilter::Call::OnServerTrailingMetadata;
const NoInterceptor TestFilter::Call::OnFinalize;
// A config selector that adds TestFilter as a dynamic filter.
class TestConfigSelector : public ConfigSelector {
public:
UniqueTypeName name() const override {
static UniqueTypeName::Factory kFactory("test");
return kFactory.Create();
}
void AddFilters(InterceptionChainBuilder& builder) override {
builder.Add<TestFilter>();
}
absl::Status GetCallConfig(GetCallConfigArgs /*args*/) override {
return absl::OkStatus();
}
// Any instance of this class will behave the same, so all comparisons
// are true.
bool Equals(const ConfigSelector* /*other*/) const override { return true; }
};
CLIENT_CHANNEL_TEST(ConfigSelectorWithDynamicFilters) {
auto& channel = InitChannel(ChannelArgs());
auto call = MakeCallPair(MakeClientInitialMetadata(), channel.event_engine(),
channel.call_arena_allocator()->MakeArena());
channel.StartCall(std::move(call.handler));
auto service_config = ServiceConfigImpl::Create(ChannelArgs(), "{}");
ASSERT_TRUE(service_config.ok());
QueueNameResolutionResult(MakeSuccessfulResolutionResult(
"ipv4:127.0.0.1:1234", std::move(service_config),
MakeRefCounted<TestConfigSelector>()));
auto call_handler = TickUntilCallStarted();
SpawnTestSeq(
call_handler, "check_initial_metadata",
[call_handler]() mutable {
return call_handler.PullClientInitialMetadata();
},
[](ValueOrFailure<ClientMetadataHandle> md) {
EXPECT_TRUE(md.ok());
if (md.ok()) {
std::string buffer;
auto value = (*md)->GetStringValue("foo", &buffer);
EXPECT_TRUE(value.has_value());
if (value.has_value()) EXPECT_EQ(*value, "bar");
}
return Empty{};
});
SpawnTestSeq(call.initiator, "cancel",
[call_initiator = call.initiator]() mutable {
call_initiator.Cancel();
return Empty{};
});
WaitForAllPendingWork();
}
// TODO(ctiller, roth): MANY more test cases
// - Resolver returns an error for the initial result, then returns a valid
// result.

@ -112,6 +112,7 @@ grpc_cc_test(
tags = [
"cpp_end2end_test",
"no_test_ios",
"thready_tsan",
],
deps = [
"//:gpr",

@ -2947,7 +2947,10 @@ TEST_F(ControlPlaneStatusRewritingTest, RewritesFromConfigSelector) {
public:
explicit FailConfigSelector(absl::Status status)
: status_(std::move(status)) {}
const char* name() const override { return "FailConfigSelector"; }
grpc_core::UniqueTypeName name() const override {
static grpc_core::UniqueTypeName::Factory kFactory("FailConfigSelector");
return kFactory.Create();
}
bool Equals(const ConfigSelector* other) const override {
return status_ == static_cast<const FailConfigSelector*>(other)->status_;
}

@ -133,7 +133,6 @@ build:thready_tsan --copt=-DGPR_NO_DIRECT_SYSCALLS
build:thready_tsan --copt=-DGRPC_TSAN
build:thready_tsan --copt=-DGRPC_MAXIMIZE_THREADYNESS
build:thready_tsan --linkopt=-fsanitize=thread
build:thready_tsan --test_tag_filters=thready_tsan
build:thready_tsan --action_env=TSAN_OPTIONS=suppressions=test/core/test_util/tsan_suppressions.txt:halt_on_error=1:second_deadlock_stack=1
build:tsan_macos --strip=never

@ -965,7 +965,9 @@ include/grpcpp/ext/call_metric_recorder.h \
include/grpcpp/ext/health_check_service_server_builder_option.h \
include/grpcpp/ext/server_metric_recorder.h \
include/grpcpp/generic/async_generic_service.h \
include/grpcpp/generic/callback_generic_service.h \
include/grpcpp/generic/generic_stub.h \
include/grpcpp/generic/generic_stub_callback.h \
include/grpcpp/grpcpp.h \
include/grpcpp/health_check_service_interface.h \
include/grpcpp/impl/call.h \
@ -1026,6 +1028,7 @@ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/completion_queue_tag.h \
include/grpcpp/impl/create_auth_context.h \
include/grpcpp/impl/delegating_channel.h \
include/grpcpp/impl/generic_stub_internal.h \
include/grpcpp/impl/grpc_library.h \
include/grpcpp/impl/intercepted_channel.h \
include/grpcpp/impl/interceptor_common.h \

@ -965,7 +965,9 @@ include/grpcpp/ext/call_metric_recorder.h \
include/grpcpp/ext/health_check_service_server_builder_option.h \
include/grpcpp/ext/server_metric_recorder.h \
include/grpcpp/generic/async_generic_service.h \
include/grpcpp/generic/callback_generic_service.h \
include/grpcpp/generic/generic_stub.h \
include/grpcpp/generic/generic_stub_callback.h \
include/grpcpp/grpcpp.h \
include/grpcpp/health_check_service_interface.h \
include/grpcpp/impl/call.h \
@ -1026,6 +1028,7 @@ include/grpcpp/impl/codegen/time.h \
include/grpcpp/impl/completion_queue_tag.h \
include/grpcpp/impl/create_auth_context.h \
include/grpcpp/impl/delegating_channel.h \
include/grpcpp/impl/generic_stub_internal.h \
include/grpcpp/impl/grpc_library.h \
include/grpcpp/impl/intercepted_channel.h \
include/grpcpp/impl/interceptor_common.h \
@ -1104,7 +1107,6 @@ src/core/client_channel/client_channel_internal.h \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/client_channel_service_config.h \
src/core/client_channel/config_selector.cc \
src/core/client_channel/config_selector.h \
src/core/client_channel/connector.h \
src/core/client_channel/dynamic_filters.cc \

@ -907,7 +907,6 @@ src/core/client_channel/client_channel_internal.h \
src/core/client_channel/client_channel_plugin.cc \
src/core/client_channel/client_channel_service_config.cc \
src/core/client_channel/client_channel_service_config.h \
src/core/client_channel/config_selector.cc \
src/core/client_channel/config_selector.h \
src/core/client_channel/connector.h \
src/core/client_channel/dynamic_filters.cc \

Loading…
Cancel
Save