Automated rollback of commit f45ac2c9e4.

PiperOrigin-RevId: 630539205
pull/36522/head
AJ Heller 7 months ago committed by Copybara-Service
parent 120b0d9ef1
commit 82e5116fb0
  1. 7
      BUILD
  2. 6
      CMakeLists.txt
  3. 1
      Makefile
  4. 1
      Package.swift
  5. 7
      build_autogenerated.yaml
  6. 1
      gRPC-C++.podspec
  7. 1
      gRPC-Core.podspec
  8. 1
      grpc.gemspec
  9. 1
      include/grpc/module.modulemap
  10. 62
      include/grpc/passive_listener.h
  11. 27
      include/grpcpp/passive_listener.h
  12. 1
      include/grpcpp/security/server_credentials.h
  13. 28
      include/grpcpp/server_builder.h
  14. 1
      package.xml
  15. 2
      src/core/BUILD
  16. 7
      src/core/ext/transport/binder/server/binder_server.cc
  17. 7
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.cc
  18. 4
      src/core/ext/transport/chaotic_good/server/chaotic_good_server.h
  19. 287
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  20. 33
      src/core/ext/transport/chttp2/server/chttp2_server.h
  21. 7
      src/core/lib/event_engine/extensions/supports_fd.h
  22. 33
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  23. 2
      src/core/lib/event_engine/posix_engine/posix_engine.h
  24. 14
      src/core/server/server.h
  25. 60
      src/cpp/server/server_builder.cc
  26. 65
      test/core/event_engine/event_engine_test_utils.h
  27. 1
      test/cpp/server/BUILD
  28. 55
      test/cpp/server/server_builder_test.cc
  29. 4
      tools/distrib/check_namespace_qualification.py
  30. 9
      tools/distrib/check_redundant_namespace_qualifiers.py
  31. 2
      tools/doxygen/Doxyfile.c++
  32. 2
      tools/doxygen/Doxyfile.c++.internal
  33. 1
      tools/doxygen/Doxyfile.core
  34. 1
      tools/doxygen/Doxyfile.core.internal

@ -296,7 +296,6 @@ GRPC_PUBLIC_HDRS = [
"include/grpc/grpc_posix.h", "include/grpc/grpc_posix.h",
"include/grpc/grpc_security.h", "include/grpc/grpc_security.h",
"include/grpc/grpc_security_constants.h", "include/grpc/grpc_security_constants.h",
"include/grpc/passive_listener.h",
"include/grpc/slice.h", "include/grpc/slice.h",
"include/grpc/slice_buffer.h", "include/grpc/slice_buffer.h",
"include/grpc/status.h", "include/grpc/status.h",
@ -458,7 +457,6 @@ GRPCXX_PUBLIC_HDRS = [
"include/grpcpp/impl/service_type.h", "include/grpcpp/impl/service_type.h",
"include/grpcpp/impl/status.h", "include/grpcpp/impl/status.h",
"include/grpcpp/impl/sync.h", "include/grpcpp/impl/sync.h",
"include/grpcpp/passive_listener.h",
"include/grpcpp/resource_quota.h", "include/grpcpp/resource_quota.h",
"include/grpcpp/security/audit_logging.h", "include/grpcpp/security/audit_logging.h",
"include/grpcpp/security/tls_crl_provider.h", "include/grpcpp/security/tls_crl_provider.h",
@ -884,7 +882,7 @@ grpc_cc_library(
grpc_cc_library( grpc_cc_library(
name = "grpc_public_hdrs", name = "grpc_public_hdrs",
hdrs = GRPC_PUBLIC_HDRS + GRPC_PUBLIC_EVENT_ENGINE_HDRS, hdrs = GRPC_PUBLIC_HDRS,
external_deps = [ external_deps = [
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",
@ -2508,7 +2506,6 @@ grpc_cc_library(
"//src/core:grpc_backend_metric_provider", "//src/core:grpc_backend_metric_provider",
"//src/core:grpc_crl_provider", "//src/core:grpc_crl_provider",
"//src/core:grpc_service_config", "//src/core:grpc_service_config",
"//src/core:grpc_transport_chttp2_server",
"//src/core:grpc_transport_inproc", "//src/core:grpc_transport_inproc",
"//src/core:json", "//src/core:json",
"//src/core:json_reader", "//src/core:json_reader",
@ -2567,7 +2564,6 @@ grpc_cc_library(
"grpc_security_base", "grpc_security_base",
"grpc_service_config_impl", "grpc_service_config_impl",
"grpc_trace", "grpc_trace",
"grpc_transport_chttp2",
"grpc_unsecure", "grpc_unsecure",
"grpcpp_backend_metric_recorder", "grpcpp_backend_metric_recorder",
"grpcpp_call_metric_recorder", "grpcpp_call_metric_recorder",
@ -2589,7 +2585,6 @@ grpc_cc_library(
"//src/core:grpc_backend_metric_provider", "//src/core:grpc_backend_metric_provider",
"//src/core:grpc_insecure_credentials", "//src/core:grpc_insecure_credentials",
"//src/core:grpc_service_config", "//src/core:grpc_service_config",
"//src/core:grpc_transport_chttp2_server",
"//src/core:grpc_transport_inproc", "//src/core:grpc_transport_inproc",
"//src/core:ref_counted", "//src/core:ref_counted",
"//src/core:resource_quota", "//src/core:resource_quota",

6
CMakeLists.txt generated

@ -2746,7 +2746,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h include/grpc/impl/slice_type.h
include/grpc/load_reporting.h include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h include/grpc/slice.h
include/grpc/slice_buffer.h include/grpc/slice_buffer.h
include/grpc/status.h include/grpc/status.h
@ -3447,7 +3446,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h include/grpc/impl/slice_type.h
include/grpc/load_reporting.h include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h include/grpc/slice.h
include/grpc/slice_buffer.h include/grpc/slice_buffer.h
include/grpc/status.h include/grpc/status.h
@ -4316,7 +4314,6 @@ foreach(_hdr
include/grpcpp/impl/service_type.h include/grpcpp/impl/service_type.h
include/grpcpp/impl/status.h include/grpcpp/impl/status.h
include/grpcpp/impl/sync.h include/grpcpp/impl/sync.h
include/grpcpp/passive_listener.h
include/grpcpp/resource_quota.h include/grpcpp/resource_quota.h
include/grpcpp/security/audit_logging.h include/grpcpp/security/audit_logging.h
include/grpcpp/security/auth_context.h include/grpcpp/security/auth_context.h
@ -5057,7 +5054,6 @@ foreach(_hdr
include/grpcpp/impl/service_type.h include/grpcpp/impl/service_type.h
include/grpcpp/impl/status.h include/grpcpp/impl/status.h
include/grpcpp/impl/sync.h include/grpcpp/impl/sync.h
include/grpcpp/passive_listener.h
include/grpcpp/resource_quota.h include/grpcpp/resource_quota.h
include/grpcpp/security/audit_logging.h include/grpcpp/security/audit_logging.h
include/grpcpp/security/auth_context.h include/grpcpp/security/auth_context.h
@ -5508,7 +5504,6 @@ foreach(_hdr
include/grpc/impl/propagation_bits.h include/grpc/impl/propagation_bits.h
include/grpc/impl/slice_type.h include/grpc/impl/slice_type.h
include/grpc/load_reporting.h include/grpc/load_reporting.h
include/grpc/passive_listener.h
include/grpc/slice.h include/grpc/slice.h
include/grpc/slice_buffer.h include/grpc/slice_buffer.h
include/grpc/status.h include/grpc/status.h
@ -26837,7 +26832,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/xds/v3/orca_load_report.grpc.pb.h
test/core/event_engine/event_engine_test_utils.cc
test/core/test_util/cmdline.cc test/core/test_util/cmdline.cc
test/core/test_util/fuzzer_util.cc test/core/test_util/fuzzer_util.cc
test/core/test_util/grpc_profiler.cc test/core/test_util/grpc_profiler.cc

1
Makefile generated

@ -1775,7 +1775,6 @@ PUBLIC_HEADERS_C += \
include/grpc/impl/propagation_bits.h \ include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \ include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \ include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \ include/grpc/slice.h \
include/grpc/slice_buffer.h \ include/grpc/slice_buffer.h \
include/grpc/status.h \ include/grpc/status.h \

1
Package.swift generated

@ -93,7 +93,6 @@ let package = Package(
"include/grpc/impl/propagation_bits.h", "include/grpc/impl/propagation_bits.h",
"include/grpc/impl/slice_type.h", "include/grpc/impl/slice_type.h",
"include/grpc/load_reporting.h", "include/grpc/load_reporting.h",
"include/grpc/passive_listener.h",
"include/grpc/slice.h", "include/grpc/slice.h",
"include/grpc/slice_buffer.h", "include/grpc/slice_buffer.h",
"include/grpc/status.h", "include/grpc/status.h",

@ -198,7 +198,6 @@ libs:
- include/grpc/impl/propagation_bits.h - include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h - include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h - include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h - include/grpc/slice.h
- include/grpc/slice_buffer.h - include/grpc/slice_buffer.h
- include/grpc/status.h - include/grpc/status.h
@ -2183,7 +2182,6 @@ libs:
- include/grpc/impl/propagation_bits.h - include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h - include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h - include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h - include/grpc/slice.h
- include/grpc/slice_buffer.h - include/grpc/slice_buffer.h
- include/grpc/status.h - include/grpc/status.h
@ -3788,7 +3786,6 @@ libs:
- include/grpcpp/impl/service_type.h - include/grpcpp/impl/service_type.h
- include/grpcpp/impl/status.h - include/grpcpp/impl/status.h
- include/grpcpp/impl/sync.h - include/grpcpp/impl/sync.h
- include/grpcpp/passive_listener.h
- include/grpcpp/resource_quota.h - include/grpcpp/resource_quota.h
- include/grpcpp/security/audit_logging.h - include/grpcpp/security/audit_logging.h
- include/grpcpp/security/auth_context.h - include/grpcpp/security/auth_context.h
@ -4216,7 +4213,6 @@ libs:
- include/grpcpp/impl/service_type.h - include/grpcpp/impl/service_type.h
- include/grpcpp/impl/status.h - include/grpcpp/impl/status.h
- include/grpcpp/impl/sync.h - include/grpcpp/impl/sync.h
- include/grpcpp/passive_listener.h
- include/grpcpp/resource_quota.h - include/grpcpp/resource_quota.h
- include/grpcpp/security/audit_logging.h - include/grpcpp/security/audit_logging.h
- include/grpcpp/security/auth_context.h - include/grpcpp/security/auth_context.h
@ -4365,7 +4361,6 @@ libs:
- include/grpc/impl/propagation_bits.h - include/grpc/impl/propagation_bits.h
- include/grpc/impl/slice_type.h - include/grpc/impl/slice_type.h
- include/grpc/load_reporting.h - include/grpc/load_reporting.h
- include/grpc/passive_listener.h
- include/grpc/slice.h - include/grpc/slice.h
- include/grpc/slice_buffer.h - include/grpc/slice_buffer.h
- include/grpc/status.h - include/grpc/status.h
@ -17787,7 +17782,6 @@ targets:
build: test build: test
language: c++ language: c++
headers: headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/test_util/cmdline.h - test/core/test_util/cmdline.h
- test/core/test_util/evaluate_args_test_util.h - test/core/test_util/evaluate_args_test_util.h
- test/core/test_util/fuzzer_util.h - test/core/test_util/fuzzer_util.h
@ -17803,7 +17797,6 @@ targets:
- src/proto/grpc/testing/echo_messages.proto - src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto - src/proto/grpc/testing/simple_messages.proto
- src/proto/grpc/testing/xds/v3/orca_load_report.proto - src/proto/grpc/testing/xds/v3/orca_load_report.proto
- test/core/event_engine/event_engine_test_utils.cc
- test/core/test_util/cmdline.cc - test/core/test_util/cmdline.cc
- test/core/test_util/fuzzer_util.cc - test/core/test_util/fuzzer_util.cc
- test/core/test_util/grpc_profiler.cc - test/core/test_util/grpc_profiler.cc

1
gRPC-C++.podspec generated

@ -176,7 +176,6 @@ Pod::Spec.new do |s|
'include/grpcpp/impl/service_type.h', 'include/grpcpp/impl/service_type.h',
'include/grpcpp/impl/status.h', 'include/grpcpp/impl/status.h',
'include/grpcpp/impl/sync.h', 'include/grpcpp/impl/sync.h',
'include/grpcpp/passive_listener.h',
'include/grpcpp/resource_quota.h', 'include/grpcpp/resource_quota.h',
'include/grpcpp/security/audit_logging.h', 'include/grpcpp/security/audit_logging.h',
'include/grpcpp/security/auth_context.h', 'include/grpcpp/security/auth_context.h',

1
gRPC-Core.podspec generated

@ -168,7 +168,6 @@ Pod::Spec.new do |s|
'include/grpc/impl/propagation_bits.h', 'include/grpc/impl/propagation_bits.h',
'include/grpc/impl/slice_type.h', 'include/grpc/impl/slice_type.h',
'include/grpc/load_reporting.h', 'include/grpc/load_reporting.h',
'include/grpc/passive_listener.h',
'include/grpc/slice.h', 'include/grpc/slice.h',
'include/grpc/slice_buffer.h', 'include/grpc/slice_buffer.h',
'include/grpc/status.h', 'include/grpc/status.h',

1
grpc.gemspec generated

@ -99,7 +99,6 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/impl/propagation_bits.h ) s.files += %w( include/grpc/impl/propagation_bits.h )
s.files += %w( include/grpc/impl/slice_type.h ) s.files += %w( include/grpc/impl/slice_type.h )
s.files += %w( include/grpc/load_reporting.h ) s.files += %w( include/grpc/load_reporting.h )
s.files += %w( include/grpc/passive_listener.h )
s.files += %w( include/grpc/slice.h ) s.files += %w( include/grpc/slice.h )
s.files += %w( include/grpc/slice_buffer.h ) s.files += %w( include/grpc/slice_buffer.h )
s.files += %w( include/grpc/status.h ) s.files += %w( include/grpc/status.h )

@ -38,7 +38,6 @@ header "byte_buffer.h"
header "impl/propagation_bits.h" header "impl/propagation_bits.h"
header "impl/slice_type.h" header "impl/slice_type.h"
header "load_reporting.h" header "load_reporting.h"
header "passive_listener.h"
header "slice.h" header "slice.h"
header "slice_buffer.h" header "slice_buffer.h"
header "status.h" header "status.h"

@ -1,62 +0,0 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PASSIVE_LISTENER_H
#define GRPC_PASSIVE_LISTENER_H
#include <memory>
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
// #include <grpc/support/port_platform.h>
namespace grpc_core {
class Server;
namespace experimental {
class PassiveListenerImpl;
/// -- EXPERIMENTAL API --
/// Interface for used for Server Endpoint injection.
class PassiveListener {
public:
virtual ~PassiveListener() = default;
/// -- EXPERIMENTAL API --
///
/// Takes an Endpoint for an established connection, and treats it as if the
/// connection had been accepted by the server.
///
/// The server must be started before endpoints can be accepted.
virtual absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) = 0;
/// -- EXPERIMENTAL API --
///
/// Takes a connected file descriptor, and treats it as if the server had
/// accepted the connection itself.
///
/// Returns a failure status if the server's active EventEngine does not
/// support Endpoint creation from fds.
virtual absl::Status AcceptConnectedFd(int fd) = 0;
};
} // namespace experimental
} // namespace grpc_core
absl::Status grpc_server_add_passive_listener(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
#endif /* GRPC_PASSIVE_LISTENER_H */

@ -1,27 +0,0 @@
// Copyright 2024 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPCPP_PASSIVE_LISTENER_H
#define GRPCPP_PASSIVE_LISTENER_H
#include <grpc/passive_listener.h>
namespace grpc {
namespace experimental {
using grpc_core::experimental::PassiveListener;
} // namespace experimental
} // namespace grpc
#endif // GRPCPP_PASSIVE_LISTENER_H

@ -84,7 +84,6 @@ class ServerCredentials : private grpc::internal::GrpcLibrary {
// Needed for access to AddPortToServer. // Needed for access to AddPortToServer.
friend class Server; friend class Server;
// Needed for access to c_creds_. // Needed for access to c_creds_.
friend class ServerBuilder;
friend std::shared_ptr<ServerCredentials> grpc::XdsServerCredentials( friend std::shared_ptr<ServerCredentials> grpc::XdsServerCredentials(
const std::shared_ptr<ServerCredentials>& fallback_credentials); const std::shared_ptr<ServerCredentials>& fallback_credentials);

@ -25,17 +25,13 @@
#include <vector> #include <vector>
#include <grpc/compression.h> #include <grpc/compression.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/passive_listener.h>
#include <grpc/support/cpu.h> #include <grpc/support/cpu.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/workaround_list.h> #include <grpc/support/workaround_list.h>
#include <grpcpp/impl/channel_argument_option.h> #include <grpcpp/impl/channel_argument_option.h>
#include <grpcpp/impl/server_builder_option.h> #include <grpcpp/impl/server_builder_option.h>
#include <grpcpp/impl/server_builder_plugin.h> #include <grpcpp/impl/server_builder_plugin.h>
#include <grpcpp/passive_listener.h>
#include <grpcpp/security/authorization_policy_provider.h> #include <grpcpp/security/authorization_policy_provider.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/support/config.h> #include <grpcpp/support/config.h>
#include <grpcpp/support/server_interceptor.h> #include <grpcpp/support/server_interceptor.h>
@ -295,18 +291,6 @@ class ServerBuilder {
void EnableCallMetricRecording( void EnableCallMetricRecording(
experimental::ServerMetricRecorder* server_metric_recorder = nullptr); experimental::ServerMetricRecorder* server_metric_recorder = nullptr);
// Creates a passive listener for Server Endpoint injection.
///
/// \a PasiveListener lets applications provide pre-established connections
/// to gRPC Servers. The server will behave as if it accepted the connection
/// itself on its own listening addresses.
///
/// This can be called multiple times to create passive listeners with
/// different server credentials.
ServerBuilder& AddPassiveListener(
std::shared_ptr<grpc::ServerCredentials> creds,
std::unique_ptr<grpc::experimental::PassiveListener>& passive_listener);
private: private:
ServerBuilder* builder_; ServerBuilder* builder_;
}; };
@ -380,17 +364,6 @@ class ServerBuilder {
private: private:
friend class grpc::testing::ServerBuilderPluginTest; friend class grpc::testing::ServerBuilderPluginTest;
struct UnstartedPassiveListener {
std::weak_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener;
std::shared_ptr<grpc::ServerCredentials> credentials;
UnstartedPassiveListener(
std::weak_ptr<grpc_core::experimental::PassiveListenerImpl> listener,
std::shared_ptr<grpc::ServerCredentials> creds)
: passive_listener(std::move(listener)),
credentials(std::move(creds)) {}
};
struct SyncServerSettings { struct SyncServerSettings {
SyncServerSettings() SyncServerSettings()
: num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
@ -415,7 +388,6 @@ class ServerBuilder {
std::vector<std::unique_ptr<grpc::ServerBuilderOption>> options_; std::vector<std::unique_ptr<grpc::ServerBuilderOption>> options_;
std::vector<std::unique_ptr<NamedService>> services_; std::vector<std::unique_ptr<NamedService>> services_;
std::vector<Port> ports_; std::vector<Port> ports_;
std::vector<UnstartedPassiveListener> unstarted_passive_listeners_;
SyncServerSettings sync_server_settings_; SyncServerSettings sync_server_settings_;

1
package.xml generated

@ -81,7 +81,6 @@
<file baseinstalldir="/" name="include/grpc/impl/propagation_bits.h" role="src" /> <file baseinstalldir="/" name="include/grpc/impl/propagation_bits.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/slice_type.h" role="src" /> <file baseinstalldir="/" name="include/grpc/impl/slice_type.h" role="src" />
<file baseinstalldir="/" name="include/grpc/load_reporting.h" role="src" /> <file baseinstalldir="/" name="include/grpc/load_reporting.h" role="src" />
<file baseinstalldir="/" name="include/grpc/passive_listener.h" role="src" />
<file baseinstalldir="/" name="include/grpc/slice.h" role="src" /> <file baseinstalldir="/" name="include/grpc/slice.h" role="src" />
<file baseinstalldir="/" name="include/grpc/slice_buffer.h" role="src" /> <file baseinstalldir="/" name="include/grpc/slice_buffer.h" role="src" />
<file baseinstalldir="/" name="include/grpc/status.h" role="src" /> <file baseinstalldir="/" name="include/grpc/status.h" role="src" />

@ -6868,8 +6868,6 @@ grpc_cc_library(
"connection_quota", "connection_quota",
"error", "error",
"error_utils", "error_utils",
"event_engine_extensions",
"event_engine_query_extensions",
"grpc_insecure_credentials", "grpc_insecure_credentials",
"handshaker_registry", "handshaker_registry",
"iomgr_fwd", "iomgr_fwd",

@ -159,7 +159,7 @@ class BinderServerListener : public Server::ListenerInterface {
on_destroy_done_ = on_destroy_done; on_destroy_done_ = on_destroy_done;
} }
void Orphan() override { Unref(); } void Orphan() override { delete this; }
~BinderServerListener() override { ~BinderServerListener() override {
ExecCtx::Get()->Flush(); ExecCtx::Get()->Flush();
@ -239,8 +239,9 @@ bool AddBinderPort(const std::string& addr, grpc_server* server,
} }
std::string conn_id = addr.substr(kBinderUriScheme.size()); std::string conn_id = addr.substr(kBinderUriScheme.size());
Server* core_server = Server::FromC(server); Server* core_server = Server::FromC(server);
core_server->AddListener(MakeOrphanable<BinderServerListener>( core_server->AddListener(
core_server, conn_id, std::move(factory), security_policy)); OrphanablePtr<Server::ListenerInterface>(new BinderServerListener(
core_server, conn_id, std::move(factory), security_policy)));
return true; return true;
} }

@ -103,8 +103,8 @@ absl::StatusOr<int> ChaoticGoodServerListener::Bind(
str.ok() ? str->c_str() : str.status().ToString().c_str()); str.ok() ? str->c_str() : str.status().ToString().c_str());
} }
EventEngine::Listener::AcceptCallback accept_cb = EventEngine::Listener::AcceptCallback accept_cb =
[self = RefAsSubclass<ChaoticGoodServerListener>()]( [self = Ref()](std::unique_ptr<EventEngine::Endpoint> ep,
std::unique_ptr<EventEngine::Endpoint> ep, MemoryAllocator) { MemoryAllocator) {
ExecCtx exec_ctx; ExecCtx exec_ctx;
MutexLock lock(&self->mu_); MutexLock lock(&self->mu_);
if (self->shutdown_) return; if (self->shutdown_) return;
@ -149,8 +149,7 @@ absl::Status ChaoticGoodServerListener::StartListening() {
ChaoticGoodServerListener::ActiveConnection::ActiveConnection( ChaoticGoodServerListener::ActiveConnection::ActiveConnection(
RefCountedPtr<ChaoticGoodServerListener> listener, RefCountedPtr<ChaoticGoodServerListener> listener,
std::unique_ptr<EventEngine::Endpoint> endpoint) std::unique_ptr<EventEngine::Endpoint> endpoint)
: memory_allocator_(listener->memory_allocator_), : memory_allocator_(listener->memory_allocator_), listener_(listener) {
listener_(std::move(listener)) {
handshaking_state_ = MakeRefCounted<HandshakingState>(Ref()); handshaking_state_ = MakeRefCounted<HandshakingState>(Ref());
handshaking_state_->Start(std::move(endpoint)); handshaking_state_->Start(std::move(endpoint));
} }

@ -49,7 +49,9 @@
namespace grpc_core { namespace grpc_core {
namespace chaotic_good { namespace chaotic_good {
class ChaoticGoodServerListener final : public Server::ListenerInterface { class ChaoticGoodServerListener final
: public Server::ListenerInterface,
public RefCounted<ChaoticGoodServerListener> {
public: public:
static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() { static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() {
return [bitgen = absl::BitGen()]() mutable { return [bitgen = absl::BitGen()]() mutable {

@ -41,7 +41,6 @@
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/grpc_posix.h> #include <grpc/grpc_posix.h>
#include <grpc/impl/channel_arg_names.h> #include <grpc/impl/channel_arg_names.h>
#include <grpc/passive_listener.h>
#include <grpc/slice_buffer.h> #include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
@ -58,8 +57,6 @@
#include "src/core/lib/config/core_configuration.h" #include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
#include "src/core/lib/event_engine/extensions/supports_fd.h"
#include "src/core/lib/event_engine/query_extensions.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/ref_counted_ptr.h"
@ -69,7 +66,6 @@
#include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h"
#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/resolve_address.h"
@ -96,11 +92,9 @@
#endif // GPR_SUPPORT_CHANNELS_FROM_FD #endif // GPR_SUPPORT_CHANNELS_FROM_FD
namespace grpc_core { namespace grpc_core {
namespace {
using grpc_event_engine::experimental::ChannelArgsEndpointConfig; using ::grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::EventEngineSupportsFdExtension;
using grpc_event_engine::experimental::QueryExtension;
const char kUnixUriPrefix[] = "unix:"; const char kUnixUriPrefix[] = "unix:";
const char kUnixAbstractUriPrefix[] = "unix-abstract:"; const char kUnixAbstractUriPrefix[] = "unix-abstract:";
@ -117,23 +111,14 @@ class Chttp2ServerListener : public Server::ListenerInterface {
Server* server, const char* name, const ChannelArgs& args, Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier); Chttp2ServerArgsModifier args_modifier);
static Chttp2ServerListener* CreateForPassiveListener(
Server* server, const ChannelArgs& args,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener);
// Do not instantiate directly. Use one of the factory methods above. // Do not instantiate directly. Use one of the factory methods above.
Chttp2ServerListener(Server* server, const ChannelArgs& args, Chttp2ServerListener(Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, Chttp2ServerArgsModifier args_modifier);
grpc_server_config_fetcher* config_fetcher,
std::shared_ptr<experimental::PassiveListenerImpl>
passive_listener = nullptr);
~Chttp2ServerListener() override; ~Chttp2ServerListener() override;
void Start(Server* server, void Start(Server* server,
const std::vector<grpc_pollset*>* pollsets) override; const std::vector<grpc_pollset*>* pollsets) override;
void AcceptConnectedEndpoint(std::unique_ptr<EventEngine::Endpoint> endpoint);
channelz::ListenSocketNode* channelz_listen_socket_node() const override { channelz::ListenSocketNode* channelz_listen_socket_node() const override {
return channelz_listen_socket_.get(); return channelz_listen_socket_.get();
} }
@ -143,8 +128,6 @@ class Chttp2ServerListener : public Server::ListenerInterface {
void Orphan() override; void Orphan() override;
private: private:
friend class experimental::PassiveListenerImpl;
class ConfigFetcherWatcher class ConfigFetcherWatcher
: public grpc_server_config_fetcher::WatcherInterface { : public grpc_server_config_fetcher::WatcherInterface {
public: public:
@ -251,8 +234,34 @@ class Chttp2ServerListener : public Server::ListenerInterface {
static void DestroyListener(Server* /*server*/, void* arg, static void DestroyListener(Server* /*server*/, void* arg,
grpc_closure* destroy_done); grpc_closure* destroy_done);
Server* const server_ = nullptr; // The interface required by RefCountedPtr<> has been manually implemented
grpc_tcp_server* tcp_server_ = nullptr; // here to take a ref on tcp_server_ instead. Note that, the handshaker
// needs tcp_server_ to exist for the lifetime of the handshake since it's
// needed by acceptor. Sharing refs between the listener and tcp_server_ is
// just an optimization to avoid taking additional refs on the listener,
// since TcpServerShutdownComplete already holds a ref to the listener.
void IncrementRefCount() { grpc_tcp_server_ref(tcp_server_); }
void IncrementRefCount(const DebugLocation& /* location */,
const char* /* reason */) {
IncrementRefCount();
}
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref() {
IncrementRefCount();
return RefCountedPtr<Chttp2ServerListener>(this);
}
GRPC_MUST_USE_RESULT RefCountedPtr<Chttp2ServerListener> Ref(
const DebugLocation& /* location */, const char* /* reason */) {
return Ref();
}
void Unref() { grpc_tcp_server_unref(tcp_server_); }
void Unref(const DebugLocation& /* location */, const char* /* reason */) {
Unref();
}
Server* const server_;
grpc_tcp_server* tcp_server_;
grpc_resolved_address resolved_address_; grpc_resolved_address resolved_address_;
Chttp2ServerArgsModifier const args_modifier_; Chttp2ServerArgsModifier const args_modifier_;
ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
@ -275,10 +284,6 @@ class Chttp2ServerListener : public Server::ListenerInterface {
RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_; RefCountedPtr<channelz::ListenSocketNode> channelz_listen_socket_;
MemoryQuotaRefPtr memory_quota_; MemoryQuotaRefPtr memory_quota_;
ConnectionQuotaRefPtr connection_quota_; ConnectionQuotaRefPtr connection_quota_;
grpc_server_config_fetcher* config_fetcher_ = nullptr;
// TODO(yashykt): consider using absl::variant<> to minimize memory usage for
// disjoint cases where different fields are used.
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener_;
}; };
// //
@ -375,17 +380,13 @@ Chttp2ServerListener::ActiveConnection::HandshakingState::HandshakingState(
handshake_mgr_(MakeRefCounted<HandshakeManager>()), handshake_mgr_(MakeRefCounted<HandshakeManager>()),
deadline_(GetConnectionDeadline(args)), deadline_(GetConnectionDeadline(args)),
interested_parties_(grpc_pollset_set_create()) { interested_parties_(grpc_pollset_set_create()) {
if (accepting_pollset != nullptr) { grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
grpc_pollset_set_add_pollset(interested_parties_, accepting_pollset_);
}
CoreConfiguration::Get().handshaker_registry().AddHandshakers( CoreConfiguration::Get().handshaker_registry().AddHandshakers(
HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get()); HANDSHAKER_SERVER, args, interested_parties_, handshake_mgr_.get());
} }
Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() { Chttp2ServerListener::ActiveConnection::HandshakingState::~HandshakingState() {
if (accepting_pollset_ != nullptr) { grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
grpc_pollset_set_del_pollset(interested_parties_, accepting_pollset_);
}
grpc_pollset_set_destroy(interested_parties_); grpc_pollset_set_destroy(interested_parties_);
gpr_free(acceptor_); gpr_free(acceptor_);
} }
@ -707,82 +708,83 @@ void Chttp2ServerListener::ActiveConnection::OnDrainGraceTimeExpiry() {
grpc_error_handle Chttp2ServerListener::Create( grpc_error_handle Chttp2ServerListener::Create(
Server* server, grpc_resolved_address* addr, const ChannelArgs& args, Server* server, grpc_resolved_address* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, int* port_num) { Chttp2ServerArgsModifier args_modifier, int* port_num) {
// Create Chttp2ServerListener. Chttp2ServerListener* listener = nullptr;
OrphanablePtr<Chttp2ServerListener> listener = // The bulk of this method is inside of a lambda to make cleanup
MakeOrphanable<Chttp2ServerListener>(server, args, args_modifier, // easier without using goto.
server->config_fetcher()); grpc_error_handle error = [&]() {
// The tcp_server will be unreffed when the listener is orphaned, which could grpc_error_handle error;
// be at the end of this function if the listener was not added to the // Create Chttp2ServerListener.
// server's set of listeners. listener = new Chttp2ServerListener(server, args, args_modifier);
grpc_error_handle error = grpc_tcp_server_create( error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), &listener->tcp_server_shutdown_complete_,
OnAccept, listener.get(), &listener->tcp_server_); grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
if (!error.ok()) return error; OnAccept, listener, &listener->tcp_server_);
if (listener->config_fetcher_ != nullptr) {
listener->resolved_address_ = *addr;
// TODO(yashykt): Consider binding so as to be able to return the port
// number.
} else {
error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
if (!error.ok()) return error; if (!error.ok()) return error;
} if (server->config_fetcher() != nullptr) {
// Create channelz node. listener->resolved_address_ = *addr;
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ) // TODO(yashykt): Consider binding so as to be able to return the port
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) { // number.
auto string_address = grpc_sockaddr_to_uri(addr); } else {
if (!string_address.ok()) { error = grpc_tcp_server_add_port(listener->tcp_server_, addr, port_num);
return GRPC_ERROR_CREATE(string_address.status().ToString()); if (!error.ok()) return error;
}
// Create channelz node.
if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
.value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
auto string_address = grpc_sockaddr_to_uri(addr);
if (!string_address.ok()) {
return GRPC_ERROR_CREATE(string_address.status().ToString());
}
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
*string_address,
absl::StrCat("chttp2 listener ", *string_address));
}
// Register with the server only upon success
server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return absl::OkStatus();
}();
if (!error.ok()) {
if (listener != nullptr) {
if (listener->tcp_server_ != nullptr) {
// listener is deleted when tcp_server_ is shutdown.
grpc_tcp_server_unref(listener->tcp_server_);
} else {
delete listener;
}
} }
listener->channelz_listen_socket_ =
MakeRefCounted<channelz::ListenSocketNode>(
*string_address, absl::StrCat("chttp2 listener ", *string_address));
} }
// Register with the server only upon success return error;
server->AddListener(std::move(listener));
return absl::OkStatus();
} }
grpc_error_handle Chttp2ServerListener::CreateWithAcceptor( grpc_error_handle Chttp2ServerListener::CreateWithAcceptor(
Server* server, const char* name, const ChannelArgs& args, Server* server, const char* name, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier) { Chttp2ServerArgsModifier args_modifier) {
auto listener = MakeOrphanable<Chttp2ServerListener>( Chttp2ServerListener* listener =
server, args, args_modifier, server->config_fetcher()); new Chttp2ServerListener(server, args, args_modifier);
grpc_error_handle error = grpc_tcp_server_create( grpc_error_handle error = grpc_tcp_server_create(
&listener->tcp_server_shutdown_complete_, ChannelArgsEndpointConfig(args), &listener->tcp_server_shutdown_complete_,
OnAccept, listener.get(), &listener->tcp_server_); grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
if (!error.ok()) return error; OnAccept, listener, &listener->tcp_server_);
if (!error.ok()) {
delete listener;
return error;
}
// TODO(yangg) channelz // TODO(yangg) channelz
TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name); TcpServerFdHandler** arg_val = args.GetPointer<TcpServerFdHandler*>(name);
*arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_); *arg_val = grpc_tcp_server_create_fd_handler(listener->tcp_server_);
server->AddListener(std::move(listener)); server->AddListener(OrphanablePtr<Server::ListenerInterface>(listener));
return absl::OkStatus(); return absl::OkStatus();
} }
Chttp2ServerListener* Chttp2ServerListener::CreateForPassiveListener(
Server* server, const ChannelArgs& args,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener) {
// TODO(hork): figure out how to handle channelz in this case
auto listener = MakeOrphanable<Chttp2ServerListener>(
server, args, /*args_modifier=*/
[](const ChannelArgs& args, grpc_error_handle*) { return args; }, nullptr,
std::move(passive_listener));
auto listener_ptr = listener.get();
server->AddListener(std::move(listener));
return listener_ptr;
}
Chttp2ServerListener::Chttp2ServerListener( Chttp2ServerListener::Chttp2ServerListener(
Server* server, const ChannelArgs& args, Server* server, const ChannelArgs& args,
Chttp2ServerArgsModifier args_modifier, Chttp2ServerArgsModifier args_modifier)
grpc_server_config_fetcher* config_fetcher,
std::shared_ptr<experimental::PassiveListenerImpl> passive_listener)
: server_(server), : server_(server),
args_modifier_(args_modifier), args_modifier_(args_modifier),
args_(args), args_(args),
memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()), memory_quota_(args.GetObject<ResourceQuota>()->memory_quota()),
connection_quota_(MakeRefCounted<ConnectionQuota>()), connection_quota_(MakeRefCounted<ConnectionQuota>()) {
config_fetcher_(config_fetcher),
passive_listener_(std::move(passive_listener)) {
auto max_allowed_incoming_connections = auto max_allowed_incoming_connections =
args.GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS); args.GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS);
if (max_allowed_incoming_connections.has_value()) { if (max_allowed_incoming_connections.has_value()) {
@ -797,9 +799,6 @@ Chttp2ServerListener::~Chttp2ServerListener() {
// Flush queued work before destroying handshaker factory, since that // Flush queued work before destroying handshaker factory, since that
// may do a synchronous unref. // may do a synchronous unref.
ExecCtx::Get()->Flush(); ExecCtx::Get()->Flush();
if (passive_listener_ != nullptr) {
passive_listener_->ListenerDestroyed();
}
if (on_destroy_done_ != nullptr) { if (on_destroy_done_ != nullptr) {
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus()); ExecCtx::Run(DEBUG_LOCATION, on_destroy_done_, absl::OkStatus());
ExecCtx::Get()->Flush(); ExecCtx::Get()->Flush();
@ -809,11 +808,10 @@ Chttp2ServerListener::~Chttp2ServerListener() {
// Server callback: start listening on our ports // Server callback: start listening on our ports
void Chttp2ServerListener::Start( void Chttp2ServerListener::Start(
Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) { Server* /*server*/, const std::vector<grpc_pollset*>* /* pollsets */) {
if (config_fetcher_ != nullptr) { if (server_->config_fetcher() != nullptr) {
auto watcher = std::make_unique<ConfigFetcherWatcher>( auto watcher = std::make_unique<ConfigFetcherWatcher>(Ref());
RefAsSubclass<Chttp2ServerListener>());
config_fetcher_watcher_ = watcher.get(); config_fetcher_watcher_ = watcher.get();
config_fetcher_->StartWatch( server_->config_fetcher()->StartWatch(
grpc_sockaddr_to_string(&resolved_address_, false).value(), grpc_sockaddr_to_string(&resolved_address_, false).value(),
std::move(watcher)); std::move(watcher));
} else { } else {
@ -827,9 +825,7 @@ void Chttp2ServerListener::Start(
} }
void Chttp2ServerListener::StartListening() { void Chttp2ServerListener::StartListening() {
if (tcp_server_ != nullptr) { grpc_tcp_server_start(tcp_server_, &server_->pollsets());
grpc_tcp_server_start(tcp_server_, &server_->pollsets());
}
} }
void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) { void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
@ -837,12 +833,6 @@ void Chttp2ServerListener::SetOnDestroyDone(grpc_closure* on_destroy_done) {
on_destroy_done_ = on_destroy_done; on_destroy_done_ = on_destroy_done;
} }
void Chttp2ServerListener::AcceptConnectedEndpoint(
std::unique_ptr<EventEngine::Endpoint> endpoint) {
OnAccept(this, grpc_event_engine_endpoint_create(std::move(endpoint)),
/*accepting_pollset=*/nullptr, /*acceptor=*/nullptr);
}
void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp, void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
grpc_pollset* accepting_pollset, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) { grpc_tcp_server_acceptor* acceptor) {
@ -867,7 +857,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
endpoint_cleanup(error); endpoint_cleanup(error);
return; return;
} }
if (self->config_fetcher_ != nullptr) { if (self->server_->config_fetcher() != nullptr) {
if (connection_manager == nullptr) { if (connection_manager == nullptr) {
grpc_error_handle error = GRPC_ERROR_CREATE( grpc_error_handle error = GRPC_ERROR_CREATE(
"No ConnectionManager configured. Closing connection."); "No ConnectionManager configured. Closing connection.");
@ -908,7 +898,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
// heap-use-after-free issues where `Ref()` is invoked when the ref of // heap-use-after-free issues where `Ref()` is invoked when the ref of
// tcp_server_ has already reached 0. (Ref() implementation of // tcp_server_ has already reached 0. (Ref() implementation of
// Chttp2ServerListener is grpc_tcp_server_ref().) // Chttp2ServerListener is grpc_tcp_server_ref().)
listener_ref = self->RefAsSubclass<Chttp2ServerListener>(); listener_ref = self->Ref();
self->connections_.emplace(connection.get(), std::move(connection)); self->connections_.emplace(connection.get(), std::move(connection));
} }
} }
@ -923,7 +913,7 @@ void Chttp2ServerListener::TcpServerShutdownComplete(
void* arg, grpc_error_handle /*error*/) { void* arg, grpc_error_handle /*error*/) {
Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg); Chttp2ServerListener* self = static_cast<Chttp2ServerListener*>(arg);
self->channelz_listen_socket_.reset(); self->channelz_listen_socket_.reset();
self->Unref(); delete self;
} }
// Server callback: destroy the tcp listener (so we don't generate further // Server callback: destroy the tcp listener (so we don't generate further
@ -932,8 +922,7 @@ void Chttp2ServerListener::Orphan() {
// Cancel the watch before shutting down so as to avoid holding a ref to the // Cancel the watch before shutting down so as to avoid holding a ref to the
// listener in the watcher. // listener in the watcher.
if (config_fetcher_watcher_ != nullptr) { if (config_fetcher_watcher_ != nullptr) {
GPR_ASSERT(config_fetcher_ != nullptr); server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
config_fetcher_->CancelWatch(config_fetcher_watcher_);
} }
std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections; std::map<ActiveConnection*, OrphanablePtr<ActiveConnection>> connections;
grpc_tcp_server* tcp_server; grpc_tcp_server* tcp_server;
@ -951,14 +940,12 @@ void Chttp2ServerListener::Orphan() {
} }
tcp_server = tcp_server_; tcp_server = tcp_server_;
} }
if (tcp_server != nullptr) { grpc_tcp_server_shutdown_listeners(tcp_server);
grpc_tcp_server_shutdown_listeners(tcp_server); grpc_tcp_server_unref(tcp_server);
grpc_tcp_server_unref(tcp_server);
} else {
Unref();
}
} }
} // namespace
// //
// Chttp2ServerAddPort() // Chttp2ServerAddPort()
// //
@ -1059,50 +1046,6 @@ ChannelArgs ModifyArgsForConnection(const ChannelArgs& args,
} }
} // namespace } // namespace
namespace experimental {
absl::Status PassiveListenerImpl::AcceptConnectedEndpoint(
std::unique_ptr<EventEngine::Endpoint> endpoint) {
GPR_ASSERT(server_ != nullptr);
RefCountedPtr<Chttp2ServerListener> listener;
{
MutexLock lock(&mu_);
if (listener_ != nullptr) {
listener =
listener_->RefIfNonZero().TakeAsSubclass<Chttp2ServerListener>();
}
}
if (listener == nullptr) {
return absl::UnavailableError("passive listener already shut down");
}
ExecCtx exec_ctx;
listener->AcceptConnectedEndpoint(std::move(endpoint));
return absl::OkStatus();
}
absl::Status PassiveListenerImpl::AcceptConnectedFd(int fd) {
GPR_ASSERT(server_ != nullptr);
ExecCtx exec_ctx;
auto& args = server_->channel_args();
auto* supports_fd = QueryExtension<EventEngineSupportsFdExtension>(
/*engine=*/args.GetObjectRef<EventEngine>().get());
if (supports_fd == nullptr) {
return absl::UnimplementedError(
"The server's EventEngine does not support adding endpoints from "
"connected file descriptors.");
}
auto endpoint =
supports_fd->CreateEndpointFromFd(fd, ChannelArgsEndpointConfig(args));
return AcceptConnectedEndpoint(std::move(endpoint));
}
void PassiveListenerImpl::ListenerDestroyed() {
MutexLock lock(&mu_);
listener_ = nullptr;
}
} // namespace experimental
} // namespace grpc_core } // namespace grpc_core
int grpc_server_add_http2_port(grpc_server* server, const char* addr, int grpc_server_add_http2_port(grpc_server* server, const char* addr,
@ -1200,31 +1143,3 @@ void grpc_server_add_channel_from_fd(grpc_server* /* server */, int /* fd */,
} }
#endif // GPR_SUPPORT_CHANNELS_FROM_FD #endif // GPR_SUPPORT_CHANNELS_FROM_FD
absl::Status grpc_server_add_passive_listener(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener) {
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_add_passive_listener(server=%p, credentials=%p)",
2, (server, credentials));
// Create security context.
if (credentials == nullptr) {
return absl::UnavailableError(
"No credentials specified for passive listener");
}
auto sc = credentials->create_security_connector(grpc_core::ChannelArgs());
if (sc == nullptr) {
return absl::UnavailableError(
absl::StrCat("Unable to create secure server with credentials of type ",
credentials->type().name()));
}
auto args = server->channel_args()
.SetObject(credentials->Ref())
.SetObject(std::move(sc));
passive_listener->listener_ =
grpc_core::Chttp2ServerListener::CreateForPassiveListener(
server, args, passive_listener);
passive_listener->server_ = server->Ref();
return absl::OkStatus();
}

@ -21,7 +21,6 @@
#include <functional> #include <functional>
#include <grpc/passive_listener.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -43,38 +42,6 @@ grpc_error_handle Chttp2ServerAddPort(
Server* server, const char* addr, const ChannelArgs& args, Server* server, const char* addr, const ChannelArgs& args,
Chttp2ServerArgsModifier connection_args_modifier, int* port_num); Chttp2ServerArgsModifier connection_args_modifier, int* port_num);
class Chttp2ServerListener;
namespace experimental {
// An implementation of the public C++ passive listener interface.
// The server builder holds a weak_ptr to one of these objects, and the
// application owns the instance.
class PassiveListenerImpl final : public PassiveListener {
public:
absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) override ABSL_LOCKS_EXCLUDED(mu_);
absl::Status AcceptConnectedFd(GRPC_UNUSED int fd) override
ABSL_LOCKS_EXCLUDED(mu_);
void ListenerDestroyed() ABSL_LOCKS_EXCLUDED(mu_);
private:
// note: the grpc_core::Server redundant namespace qualification is
// required for older gcc versions.
friend absl::Status(::grpc_server_add_passive_listener)(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
Mutex mu_;
// Data members will be populated when initialized.
RefCountedPtr<Server> server_;
Chttp2ServerListener* listener_;
};
} // namespace experimental
} // namespace grpc_core } // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H

@ -112,13 +112,6 @@ class EventEngineSupportsFdExtension {
int fd, const EndpointConfig& config, int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) = 0; MemoryAllocator memory_allocator) = 0;
/// Creates an EventEngine::Endpoint from an fd which is already assumed to be
/// connected to a remote peer. See \a CreatePosixEndpointFromFd for details.
/// This has the same behavior, but the \a memory_allocator is taken from the
/// EndpointConfig's resource quota.
virtual std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) = 0;
/// Called when the posix listener has accepted a new client connection. /// Called when the posix listener has accepted a new client connection.
/// \a listener_fd - The listening socket fd that accepted the new client /// \a listener_fd - The listening socket fd that accepted the new client
/// connection. /// connection.

@ -25,6 +25,7 @@
#include "absl/cleanup/cleanup.h" #include "absl/cleanup/cleanup.h"
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/log/check.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/strings/match.h" #include "absl/strings/match.h"
#include "absl/strings/str_cat.h" #include "absl/strings/str_cat.h"
@ -142,7 +143,7 @@ void AsyncConnect::OnWritable(absl::Status status)
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> ep; absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> ep;
mu_.Lock(); mu_.Lock();
GPR_ASSERT(fd_ != nullptr); CHECK_NE(fd_, nullptr);
fd = std::exchange(fd_, nullptr); fd = std::exchange(fd_, nullptr);
bool connect_cancelled = connect_cancelled_; bool connect_cancelled = connect_cancelled_;
if (fd->IsHandleShutdown() && status.ok()) { if (fd->IsHandleShutdown() && status.ok()) {
@ -334,7 +335,7 @@ PosixEnginePollerManager::PosixEnginePollerManager(
poller_state_(PollerState::kExternal), poller_state_(PollerState::kExternal),
executor_(nullptr), executor_(nullptr),
trigger_shutdown_called_(false) { trigger_shutdown_called_(false) {
GPR_DEBUG_ASSERT(poller_ != nullptr); DCHECK_NE(poller_, nullptr);
} }
void PosixEnginePollerManager::Run( void PosixEnginePollerManager::Run(
@ -351,7 +352,7 @@ void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
} }
void PosixEnginePollerManager::TriggerShutdown() { void PosixEnginePollerManager::TriggerShutdown() {
GPR_DEBUG_ASSERT(trigger_shutdown_called_ == false); DCHECK(trigger_shutdown_called_ == false);
trigger_shutdown_called_ = true; trigger_shutdown_called_ = true;
// If the poller is external, dont try to shut it down. Otherwise // If the poller is external, dont try to shut it down. Otherwise
// set poller state to PollerState::kShuttingDown. // set poller state to PollerState::kShuttingDown.
@ -467,7 +468,7 @@ PosixEventEngine::~PosixEventEngine() {
this, HandleToString(handle).c_str()); this, HandleToString(handle).c_str());
} }
} }
GPR_ASSERT(GPR_LIKELY(known_handles_.empty())); CHECK(GPR_LIKELY(known_handles_.empty()));
} }
timer_manager_->Shutdown(); timer_manager_->Shutdown();
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
@ -592,7 +593,7 @@ bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
auto it = shard->pending_connections.find(connection_handle); auto it = shard->pending_connections.find(connection_handle);
if (it != shard->pending_connections.end()) { if (it != shard->pending_connections.end()) {
ac = it->second; ac = it->second;
GPR_ASSERT(ac != nullptr); CHECK_NE(ac, nullptr);
// Trying to acquire ac->mu here would could cause a deadlock because // Trying to acquire ac->mu here would could cause a deadlock because
// the OnWritable method tries to acquire the two mutexes used // the OnWritable method tries to acquire the two mutexes used
// here in the reverse order. But we dont need to acquire ac->mu before // here in the reverse order. But we dont need to acquire ac->mu before
@ -639,7 +640,7 @@ EventEngine::ConnectionHandle PosixEventEngine::Connect(
const EndpointConfig& args, MemoryAllocator memory_allocator, const EndpointConfig& args, MemoryAllocator memory_allocator,
Duration timeout) { Duration timeout) {
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
GPR_ASSERT(poller_manager_ != nullptr); CHECK_NE(poller_manager_, nullptr);
PosixTcpOptions options = TcpOptionsFromEndpointConfig(args); PosixTcpOptions options = TcpOptionsFromEndpointConfig(args);
absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult> socket = absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult> socket =
PosixSocketWrapper::CreateAndPrepareTcpClientSocket(options, addr); PosixSocketWrapper::CreateAndPrepareTcpClientSocket(options, addr);
@ -661,9 +662,9 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd,
const EndpointConfig& config, const EndpointConfig& config,
MemoryAllocator memory_allocator) { MemoryAllocator memory_allocator) {
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
GPR_ASSERT(fd > 0); DCHECK_GT(fd, 0);
PosixEventPoller* poller = poller_manager_->Poller(); PosixEventPoller* poller = poller_manager_->Poller();
GPR_DEBUG_ASSERT(poller != nullptr); DCHECK_NE(poller, nullptr);
EventHandle* handle = EventHandle* handle =
poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors()); poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors());
return CreatePosixEndpoint(handle, nullptr, shared_from_this(), return CreatePosixEndpoint(handle, nullptr, shared_from_this(),
@ -676,22 +677,6 @@ PosixEventEngine::CreatePosixEndpointFromFd(int fd,
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
} }
std::unique_ptr<EventEngine::Endpoint> PosixEventEngine::CreateEndpointFromFd(
int fd, const EndpointConfig& config) {
auto options = TcpOptionsFromEndpointConfig(config);
MemoryAllocator allocator;
if (options.memory_allocator_factory != nullptr) {
return CreatePosixEndpointFromFd(
fd, config,
options.memory_allocator_factory->CreateMemoryAllocator(
absl::StrCat("allocator:", fd)));
}
return CreatePosixEndpointFromFd(
fd, config,
options.resource_quota->memory_quota()->CreateMemoryAllocator(
absl::StrCat("allocator:", fd)));
}
absl::StatusOr<std::unique_ptr<EventEngine::Listener>> absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
PosixEventEngine::CreateListener( PosixEventEngine::CreateListener(
Listener::AcceptCallback on_accept, Listener::AcceptCallback on_accept,

@ -172,8 +172,6 @@ class PosixEventEngine final : public PosixEventEngineWithFdSupport,
std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd( std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd(
int fd, const EndpointConfig& config, int fd, const EndpointConfig& config,
MemoryAllocator memory_allocator) override; MemoryAllocator memory_allocator) override;
std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd(
int fd, const EndpointConfig& config) override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener( absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
Listener::AcceptCallback on_accept, Listener::AcceptCallback on_accept,

@ -39,7 +39,6 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/passive_listener.h>
#include <grpc/slice.h> #include <grpc/slice.h>
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
@ -75,9 +74,6 @@
"grpc.server.max_pending_requests_hard_limit" "grpc.server.max_pending_requests_hard_limit"
namespace grpc_core { namespace grpc_core {
namespace experimental {
class PassiveListenerImpl;
} // namespace experimental
extern TraceFlag grpc_server_channel_trace; extern TraceFlag grpc_server_channel_trace;
@ -116,7 +112,7 @@ class Server : public ServerInterface,
/// Interface for listeners. /// Interface for listeners.
/// Implementations must override the Orphan() method, which should stop /// Implementations must override the Orphan() method, which should stop
/// listening and initiate destruction of the listener. /// listening and initiate destruction of the listener.
class ListenerInterface : public InternallyRefCounted<ListenerInterface> { class ListenerInterface : public Orphanable {
public: public:
~ListenerInterface() override = default; ~ListenerInterface() override = default;
@ -216,14 +212,6 @@ class Server : public ServerInterface,
void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
private: private:
// note: the grpc_core::Server redundant namespace qualification is
// required for older gcc versions.
// TODO(yashykt): eliminate this friend statement as part of your upcoming
// server listener refactoring.
friend absl::Status(::grpc_server_add_passive_listener)(
grpc_core::Server* server, grpc_server_credentials* credentials,
std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
passive_listener);
struct RequestedCall; struct RequestedCall;
class RequestMatcherInterface; class RequestMatcherInterface;

@ -31,7 +31,6 @@
#include <grpc/impl/channel_arg_names.h> #include <grpc/impl/channel_arg_names.h>
#include <grpc/impl/compression_types.h> #include <grpc/impl/compression_types.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h> #include <grpc/support/sync.h>
#include <grpc/support/workaround_list.h> #include <grpc/support/workaround_list.h>
#include <grpcpp/completion_queue.h> #include <grpcpp/completion_queue.h>
@ -48,38 +47,11 @@
#include <grpcpp/support/channel_arguments.h> #include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/server_interceptor.h> #include <grpcpp/support/server_interceptor.h>
#include "src/core/ext/transport/chttp2/server/chttp2_server.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h" #include "src/core/lib/gpr/useful.h"
#include "src/core/server/server.h"
#include "src/cpp/server/external_connection_acceptor_impl.h" #include "src/cpp/server/external_connection_acceptor_impl.h"
namespace grpc { namespace grpc {
namespace {
// A PIMPL wrapper class that owns the only ref to the passive listener
// implementation. This is returned to the application.
class PassiveListenerOwner final
: public grpc_core::experimental::PassiveListener {
public:
explicit PassiveListenerOwner(std::shared_ptr<PassiveListener> listener)
: listener_(std::move(listener)) {}
absl::Status AcceptConnectedEndpoint(
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
endpoint) override {
return listener_->AcceptConnectedEndpoint(std::move(endpoint));
}
absl::Status AcceptConnectedFd(int fd) override {
return listener_->AcceptConnectedFd(fd);
}
private:
std::shared_ptr<PassiveListener> listener_;
};
} // namespace
static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>* static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
g_plugin_factory_list; g_plugin_factory_list;
@ -253,18 +225,6 @@ ServerBuilder& ServerBuilder::SetResourceQuota(
return *this; return *this;
} }
ServerBuilder& ServerBuilder::experimental_type::AddPassiveListener(
std::shared_ptr<grpc::ServerCredentials> creds,
std::unique_ptr<experimental::PassiveListener>& passive_listener) {
auto core_passive_listener =
std::make_shared<grpc_core::experimental::PassiveListenerImpl>();
builder_->unstarted_passive_listeners_.emplace_back(core_passive_listener,
std::move(creds));
passive_listener =
std::make_unique<PassiveListenerOwner>(std::move(core_passive_listener));
return *builder_;
}
ServerBuilder& ServerBuilder::AddListeningPort( ServerBuilder& ServerBuilder::AddListeningPort(
const std::string& addr_uri, std::shared_ptr<ServerCredentials> creds, const std::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
int* selected_port) { int* selected_port) {
@ -438,26 +398,6 @@ std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
cq->RegisterServer(server.get()); cq->RegisterServer(server.get());
} }
for (auto& unstarted_listener : unstarted_passive_listeners_) {
has_frequently_polled_cqs = true;
auto passive_listener = unstarted_listener.passive_listener.lock();
auto* core_server = grpc_core::Server::FromC(server->c_server());
if (passive_listener != nullptr) {
auto* creds = unstarted_listener.credentials->c_creds();
if (creds == nullptr) {
gpr_log(GPR_ERROR, "Credentials missing for PassiveListener");
return nullptr;
}
auto success = grpc_server_add_passive_listener(
core_server, creds, std::move(passive_listener));
if (!success.ok()) {
gpr_log(GPR_ERROR, "Failed to create a passive listener: %s",
success.ToString().c_str());
return nullptr;
}
}
}
if (!has_frequently_polled_cqs) { if (!has_frequently_polled_cqs) {
gpr_log(GPR_ERROR, gpr_log(GPR_ERROR,
"At least one of the completion queues must be frequently polled"); "At least one of the completion queues must be frequently polled");

@ -19,7 +19,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <thread>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
#include <vector> #include <vector>
@ -160,70 +159,6 @@ class NotifyOnDelete {
grpc_core::Notification* signal_; grpc_core::Notification* signal_;
}; };
// An endpoint implementation that supports Read and Write via std::threads.
// Passing a grpc_core::Notification will allow owners to know when all
// in-flight callbacks have been run, and all endpoint state has been destroyed.
class ThreadedNoopEndpoint : public EventEngine::Endpoint {
public:
explicit ThreadedNoopEndpoint(grpc_core::Notification* destroyed)
: state_(std::make_shared<EndpointState>(destroyed)) {}
~ThreadedNoopEndpoint() override {
std::thread deleter([state = state_]() {
CleanupThread(state->read);
CleanupThread(state->write);
});
deleter.detach();
}
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* /* args */) override {
buffer->Clear();
CleanupThread(state_->read);
state_->read = new std::thread([cb = std::move(on_read)]() mutable {
cb(absl::UnknownError("test"));
});
return false;
}
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /* args */) override {
data->Clear();
CleanupThread(state_->write);
state_->write = new std::thread([cb = std::move(on_writable)]() mutable {
cb(absl::UnknownError("test"));
});
return false;
}
const EventEngine::ResolvedAddress& GetPeerAddress() const override {
return peer_;
}
const EventEngine::ResolvedAddress& GetLocalAddress() const override {
return local_;
}
private:
struct EndpointState {
explicit EndpointState(grpc_core::Notification* deleter)
: delete_notifier_(deleter) {}
std::thread* read = nullptr;
std::thread* write = nullptr;
NotifyOnDelete delete_notifier_;
};
static void CleanupThread(std::thread* thd) {
if (thd != nullptr) {
thd->join();
delete thd;
}
}
std::shared_ptr<EndpointState> state_;
EventEngine::ResolvedAddress peer_;
EventEngine::ResolvedAddress local_;
};
} // namespace experimental } // namespace experimental
} // namespace grpc_event_engine } // namespace grpc_event_engine

@ -28,7 +28,6 @@ grpc_cc_test(
deps = [ deps = [
"//:grpc++_unsecure", "//:grpc++_unsecure",
"//src/proto/grpc/testing:echo_proto", "//src/proto/grpc/testing:echo_proto",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/test_util:grpc_test_util_base", "//test/core/test_util:grpc_test_util_base",
"//test/core/test_util:grpc_test_util_unsecure", "//test/core/test_util:grpc_test_util_unsecure",
], ],

@ -16,19 +16,14 @@
// //
// //
#include <sys/socket.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpcpp/server.h> #include <grpcpp/server.h>
#include <grpcpp/server_builder.h> #include <grpcpp/server_builder.h>
#include <grpcpp/support/config.h> #include <grpcpp/support/config.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/event_engine/event_engine_test_utils.h"
#include "test/core/test_util/port.h" #include "test/core/test_util/port.h"
#include "test/core/test_util/test_config.h" #include "test/core/test_util/test_config.h"
@ -88,56 +83,6 @@ TEST_F(ServerBuilderTest, CreateServerRepeatedPortWithDisallowedReusePort) {
nullptr); nullptr);
} }
TEST_F(ServerBuilderTest, AddPassiveListener) {
std::unique_ptr<experimental::PassiveListener> passive_listener;
auto server =
ServerBuilder()
.experimental()
.AddPassiveListener(InsecureServerCredentials(), passive_listener)
.BuildAndStart();
server->Shutdown();
}
TEST_F(ServerBuilderTest, PassiveListenerAcceptConnectedFd) {
std::unique_ptr<experimental::PassiveListener> passive_listener;
ServerBuilder builder;
auto cq = builder.AddCompletionQueue();
// TODO(hork): why is the service necessary? Queue isn't drained otherwise.
auto server =
builder.RegisterService(&g_service)
.experimental()
.AddPassiveListener(InsecureServerCredentials(), passive_listener)
.BuildAndStart();
ASSERT_NE(server.get(), nullptr);
#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
int fd = socket(AF_INET, SOCK_STREAM, 0);
auto accept_status = passive_listener->AcceptConnectedFd(fd);
ASSERT_TRUE(accept_status.ok()) << accept_status;
#else
int fd = -1;
auto accept_status = passive_listener->AcceptConnectedFd(fd);
ASSERT_FALSE(accept_status.ok()) << accept_status;
#endif
server->Shutdown();
}
TEST_F(ServerBuilderTest, PassiveListenerAcceptConnectedEndpoint) {
std::unique_ptr<experimental::PassiveListener> passive_listener;
auto server =
ServerBuilder()
.experimental()
.AddPassiveListener(InsecureServerCredentials(), passive_listener)
.BuildAndStart();
grpc_core::Notification endpoint_destroyed;
auto success = passive_listener->AcceptConnectedEndpoint(
std::make_unique<grpc_event_engine::experimental::ThreadedNoopEndpoint>(
&endpoint_destroyed));
ASSERT_TRUE(success.ok())
<< "AcceptConnectedEndpoint failure: " << success.ToString();
endpoint_destroyed.WaitForNotification();
server->Shutdown();
}
} // namespace } // namespace
} // namespace grpc } // namespace grpc

@ -77,10 +77,6 @@ IGNORED_FILES = [
"src/core/lib/gprpp/global_config_env.h", "src/core/lib/gprpp/global_config_env.h",
"src/core/lib/profiling/timers.h", "src/core/lib/profiling/timers.h",
"src/core/lib/gprpp/crash.h", "src/core/lib/gprpp/crash.h",
# The grpc_core::Server redundant namespace qualification is required for
# older gcc versions.
"src/core/ext/transport/chttp2/server/chttp2_server.h",
"src/core/server/server.h",
] ]
# find our home # find our home

@ -21,13 +21,6 @@ import os
import re import re
import sys import sys
IGNORED_FILES = [
# note: the grpc_core::Server redundant namespace qualification is required
# for older gcc versions.
"src/core/ext/transport/chttp2/server/chttp2_server.h",
"src/core/server/server.h",
]
def find_closing_mustache(contents, initial_depth): def find_closing_mustache(contents, initial_depth):
"""Find the closing mustache for a given number of open mustaches.""" """Find the closing mustache for a given number of open mustaches."""
@ -173,8 +166,6 @@ for config in _CONFIGURATION:
for file in files: for file in files:
if file.endswith(".cc") or file.endswith(".h"): if file.endswith(".cc") or file.endswith(".h"):
path = os.path.join(root, file) path = os.path.join(root, file)
if path in IGNORED_FILES:
continue
try: try:
with open(path) as f: with open(path) as f:
contents = f.read() contents = f.read()

@ -928,7 +928,6 @@ include/grpc/impl/grpc_types.h \
include/grpc/impl/propagation_bits.h \ include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \ include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \ include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \ include/grpc/slice.h \
include/grpc/slice_buffer.h \ include/grpc/slice_buffer.h \
include/grpc/status.h \ include/grpc/status.h \
@ -1041,7 +1040,6 @@ include/grpcpp/impl/server_initializer.h \
include/grpcpp/impl/service_type.h \ include/grpcpp/impl/service_type.h \
include/grpcpp/impl/status.h \ include/grpcpp/impl/status.h \
include/grpcpp/impl/sync.h \ include/grpcpp/impl/sync.h \
include/grpcpp/passive_listener.h \
include/grpcpp/resource_quota.h \ include/grpcpp/resource_quota.h \
include/grpcpp/security/audit_logging.h \ include/grpcpp/security/audit_logging.h \
include/grpcpp/security/auth_context.h \ include/grpcpp/security/auth_context.h \

@ -928,7 +928,6 @@ include/grpc/impl/grpc_types.h \
include/grpc/impl/propagation_bits.h \ include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \ include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \ include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \ include/grpc/slice.h \
include/grpc/slice_buffer.h \ include/grpc/slice_buffer.h \
include/grpc/status.h \ include/grpc/status.h \
@ -1041,7 +1040,6 @@ include/grpcpp/impl/server_initializer.h \
include/grpcpp/impl/service_type.h \ include/grpcpp/impl/service_type.h \
include/grpcpp/impl/status.h \ include/grpcpp/impl/status.h \
include/grpcpp/impl/sync.h \ include/grpcpp/impl/sync.h \
include/grpcpp/passive_listener.h \
include/grpcpp/resource_quota.h \ include/grpcpp/resource_quota.h \
include/grpcpp/security/audit_logging.h \ include/grpcpp/security/audit_logging.h \
include/grpcpp/security/auth_context.h \ include/grpcpp/security/auth_context.h \

@ -861,7 +861,6 @@ include/grpc/impl/grpc_types.h \
include/grpc/impl/propagation_bits.h \ include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \ include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \ include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \ include/grpc/slice.h \
include/grpc/slice_buffer.h \ include/grpc/slice_buffer.h \
include/grpc/status.h \ include/grpc/status.h \

@ -861,7 +861,6 @@ include/grpc/impl/grpc_types.h \
include/grpc/impl/propagation_bits.h \ include/grpc/impl/propagation_bits.h \
include/grpc/impl/slice_type.h \ include/grpc/impl/slice_type.h \
include/grpc/load_reporting.h \ include/grpc/load_reporting.h \
include/grpc/passive_listener.h \
include/grpc/slice.h \ include/grpc/slice.h \
include/grpc/slice_buffer.h \ include/grpc/slice_buffer.h \
include/grpc/status.h \ include/grpc/status.h \

Loading…
Cancel
Save