From e97d83ca8affab7f17a45929f7d1f8c6b753f122 Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Thu, 1 Aug 2024 15:25:39 -0700 Subject: [PATCH 1/7] [examples] Add route guide callback server/client to CMakeLists.txt (#37369) Closes #37369 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37369 from eugeneo:fix-route-guide-callback-cmake 6c6696f2e5f732e29708ac4df82b2035966216f8 PiperOrigin-RevId: 658558458 --- examples/cpp/route_guide/CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/cpp/route_guide/CMakeLists.txt b/examples/cpp/route_guide/CMakeLists.txt index f3a286e2ab7..90cebda39b1 100644 --- a/examples/cpp/route_guide/CMakeLists.txt +++ b/examples/cpp/route_guide/CMakeLists.txt @@ -72,7 +72,8 @@ target_link_libraries(route_guide_helper # Targets route_guide_(client|server) foreach(_target - route_guide_client route_guide_server) + route_guide_client route_guide_server + route_guide_callback_client route_guide_callback_server) add_executable(${_target} "${_target}.cc") target_link_libraries(${_target} From 3e704c7552e429b8520360e37ba240471e9f6f19 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 1 Aug 2024 17:01:13 -0700 Subject: [PATCH 2/7] [codelab] Add a codelab for gRPC C++ OTel plugin (#37318) Closes #37318 PiperOrigin-RevId: 658586660 --- examples/cpp/otel/codelab/BUILD | 99 +++++++++++ examples/cpp/otel/codelab/CMakeLists.txt | 87 ++++++++++ .../otel/codelab/greeter_callback_client.cc | 138 +++++++++++++++ .../greeter_callback_client_solution.cc | 158 ++++++++++++++++++ .../otel/codelab/greeter_callback_server.cc | 112 +++++++++++++ .../greeter_callback_server_solution.cc | 127 ++++++++++++++ examples/cpp/otel/codelab/util.cc | 70 ++++++++ examples/cpp/otel/codelab/util.h | 31 ++++ 8 files changed, 822 insertions(+) create mode 100644 examples/cpp/otel/codelab/BUILD create mode 100644 examples/cpp/otel/codelab/CMakeLists.txt create mode 100644 examples/cpp/otel/codelab/greeter_callback_client.cc create mode 100644 examples/cpp/otel/codelab/greeter_callback_client_solution.cc create mode 100644 examples/cpp/otel/codelab/greeter_callback_server.cc create mode 100644 examples/cpp/otel/codelab/greeter_callback_server_solution.cc create mode 100644 examples/cpp/otel/codelab/util.cc create mode 100644 examples/cpp/otel/codelab/util.h diff --git a/examples/cpp/otel/codelab/BUILD b/examples/cpp/otel/codelab/BUILD new file mode 100644 index 00000000000..2cc92ae3f4a --- /dev/null +++ b/examples/cpp/otel/codelab/BUILD @@ -0,0 +1,99 @@ +# Copyright 2023 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +licenses(["notice"]) + +package( + default_visibility = ["//examples/cpp:__subpackages__"], +) + +cc_library( + name = "util", + srcs = ["util.cc"], + hdrs = ["util.h"], + defines = ["BAZEL_BUILD"], + deps = [ + "//:grpc++", + "//examples/protos:helloworld_cc_grpc", + "@io_opentelemetry_cpp//sdk/src/metrics", + ], +) + +cc_binary( + name = "greeter_callback_client", + srcs = ["greeter_callback_client.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "util", + "//:grpc++", + "//:grpcpp_otel_plugin", + "//examples/cpp/otel:util", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", + "@io_opentelemetry_cpp//sdk/src/metrics", + ], +) + +cc_binary( + name = "greeter_callback_server", + srcs = ["greeter_callback_server.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "util", + "//:grpc++", + "//:grpc++_reflection", + "//:grpcpp_otel_plugin", + "//examples/cpp/otel:util", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/strings:str_format", + "@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", + "@io_opentelemetry_cpp//sdk/src/metrics", + ], +) + +cc_binary( + name = "greeter_callback_client_solution", + srcs = ["greeter_callback_client_solution.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "util", + "//:grpc++", + "//:grpcpp_otel_plugin", + "//examples/cpp/otel:util", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", + "@io_opentelemetry_cpp//sdk/src/metrics", + ], +) + +cc_binary( + name = "greeter_callback_server_solution", + srcs = ["greeter_callback_server_solution.cc"], + defines = ["BAZEL_BUILD"], + deps = [ + "util", + "//:grpc++", + "//:grpc++_reflection", + "//:grpcpp_otel_plugin", + "//examples/cpp/otel:util", + "@com_google_absl//absl/flags:flag", + "@com_google_absl//absl/flags:parse", + "@com_google_absl//absl/strings:str_format", + "@io_opentelemetry_cpp//exporters/prometheus:prometheus_exporter", + "@io_opentelemetry_cpp//sdk/src/metrics", + ], +) diff --git a/examples/cpp/otel/codelab/CMakeLists.txt b/examples/cpp/otel/codelab/CMakeLists.txt new file mode 100644 index 00000000000..6ce38d77a2f --- /dev/null +++ b/examples/cpp/otel/codelab/CMakeLists.txt @@ -0,0 +1,87 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# cmake build file for C++ gRPC OpenTelemetry example. +# Assumes absl, protobuf, prometheus-cpp, opentelemetry-cpp and gRPC (with -DgRPC_BUILD_OPENTELEMETRY_PLUGIN=ON) have been installed using cmake. +# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build +# that automatically builds all the dependencies before building helloworld. + +cmake_minimum_required(VERSION 3.13) + +project(grpc_opentelemetry_example C CXX) + +include(../../cmake/common.cmake) + +# Find prometheus-cpp package +find_package(prometheus-cpp CONFIG REQUIRED) + +# Find opentelemetry-cpp package +find_package(opentelemetry-cpp CONFIG REQUIRED) + +# Proto file +get_filename_component(hw_proto "../../../protos/helloworld.proto" ABSOLUTE) +get_filename_component(hw_proto_path "${hw_proto}" PATH) + +# Generated sources +set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") +set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") +set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") +set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") +add_custom_command( + OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" + COMMAND ${_PROTOBUF_PROTOC} + ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" + --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" + -I "${hw_proto_path}" + --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" + "${hw_proto}" + DEPENDS "${hw_proto}") + +# Include generated *.pb.h files +include_directories("${CMAKE_CURRENT_BINARY_DIR}") +include_directories("${CMAKE_SOURCE_DIR}") + +# hw_grpc_proto +add_library(hw_grpc_proto + ${hw_grpc_srcs} + ${hw_grpc_hdrs} + ${hw_proto_srcs} + ${hw_proto_hdrs}) +target_link_libraries(hw_grpc_proto + ${_REFLECTION} + ${_GRPC_GRPCPP} + ${_PROTOBUF_LIBPROTOBUF}) + +# util +add_library(util + "util.cc") +target_link_libraries(util + hw_grpc_proto + opentelemetry-cpp::metrics + ${_GRPC_GRPCPP} + ${_REFLECTION} + ${_PROTOBUF_LIBPROTOBUF}) + +# Targets greeter_callback_(client|server) greeter_callback_(client|server)_solution +foreach(_target + greeter_callback_client greeter_callback_server greeter_callback_client_solution greeter_callback_server_solution) + add_executable(${_target} "${_target}.cc") + target_link_libraries(${_target} + absl::flags + absl::flags_parse + opentelemetry-cpp::metrics + opentelemetry-cpp::prometheus_exporter + gRPC::grpcpp_otel_plugin + util) +endforeach() diff --git a/examples/cpp/otel/codelab/greeter_callback_client.cc b/examples/cpp/otel/codelab/greeter_callback_client.cc new file mode 100644 index 00000000000..d9eab4e9edd --- /dev/null +++ b/examples/cpp/otel/codelab/greeter_callback_client.cc @@ -0,0 +1,138 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil +// version. Refer +// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042. +#ifndef HAVE_ABSEIL +#define HAVE_ABSEIL +#endif + +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "opentelemetry/exporters/prometheus/exporter_factory.h" +#include "opentelemetry/exporters/prometheus/exporter_options.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" + +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/cpp/otel/codelab/util.h" +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#include "util.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); +ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9465", + "Prometheus exporter endpoint"); + +namespace { + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +class GreeterClient { + public: + GreeterClient(std::shared_ptr channel) + : stub_(Greeter::NewStub(channel)) {} + + // Assembles the client's payload, sends it and presents the response back + // from the server. + std::string SayHello(const std::string& user) { + // Data we are sending to the server. + HelloRequest request; + request.set_name(user); + + // Container for the data we expect from the server. + HelloReply reply; + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // The actual RPC. + std::mutex mu; + std::condition_variable cv; + bool done = false; + Status status; + stub_->async()->SayHello(&context, &request, &reply, + [&mu, &cv, &done, &status](Status s) { + status = std::move(s); + std::lock_guard lock(mu); + done = true; + cv.notify_one(); + }); + + std::unique_lock lock(mu); + while (!done) { + cv.wait(lock); + } + + // Act upon its status. + if (status.ok()) { + return reply.message(); + } else { + std::cout << status.error_code() << ": " << status.error_message() + << std::endl; + return "RPC failed"; + } + } + + private: + std::unique_ptr stub_; +}; + +void RunClient(const std::string& target_str) { + // Instantiate the client. It requires a channel, out of which the actual RPCs + // are created. This channel models a connection to an endpoint specified by + // the argument "--target=" which is the only expected argument. + grpc::ChannelArguments args; + // Continuously send RPCs every second. + while (true) { + GreeterClient greeter(grpc::CreateCustomChannel( + target_str, grpc::InsecureChannelCredentials(), args)); + std::string user("world"); + std::string reply = greeter.SayHello(user); + std::cout << "Greeter received: " << reply << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +} // namespace + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + + // CODELAB HINT : Add code to register gRPC OpenTelemetry plugin here. + + // Continuously send RPCs. + RunClient(absl::GetFlag(FLAGS_target)); + + return 0; +} diff --git a/examples/cpp/otel/codelab/greeter_callback_client_solution.cc b/examples/cpp/otel/codelab/greeter_callback_client_solution.cc new file mode 100644 index 00000000000..a3a8ffe51f1 --- /dev/null +++ b/examples/cpp/otel/codelab/greeter_callback_client_solution.cc @@ -0,0 +1,158 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil +// version. Refer +// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042. +#ifndef HAVE_ABSEIL +#define HAVE_ABSEIL +#endif + +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "opentelemetry/exporters/prometheus/exporter_factory.h" +#include "opentelemetry/exporters/prometheus/exporter_options.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" + +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/cpp/otel/codelab/util.h" +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#include "util.h" +#endif + +ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); +ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9465", + "Prometheus exporter endpoint"); + +namespace { + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +class GreeterClient { + public: + GreeterClient(std::shared_ptr channel) + : stub_(Greeter::NewStub(channel)) {} + + // Assembles the client's payload, sends it and presents the response back + // from the server. + std::string SayHello(const std::string& user) { + // Data we are sending to the server. + HelloRequest request; + request.set_name(user); + + // Container for the data we expect from the server. + HelloReply reply; + + // Context for the client. It could be used to convey extra information to + // the server and/or tweak certain RPC behaviors. + ClientContext context; + + // The actual RPC. + std::mutex mu; + std::condition_variable cv; + bool done = false; + Status status; + stub_->async()->SayHello(&context, &request, &reply, + [&mu, &cv, &done, &status](Status s) { + status = std::move(s); + std::lock_guard lock(mu); + done = true; + cv.notify_one(); + }); + + std::unique_lock lock(mu); + while (!done) { + cv.wait(lock); + } + + // Act upon its status. + if (status.ok()) { + return reply.message(); + } else { + std::cout << status.error_code() << ": " << status.error_message() + << std::endl; + return "RPC failed"; + } + } + + private: + std::unique_ptr stub_; +}; + +void RunClient(const std::string& target_str) { + // Instantiate the client. It requires a channel, out of which the actual RPCs + // are created. This channel models a connection to an endpoint specified by + // the argument "--target=" which is the only expected argument. + grpc::ChannelArguments args; + // Continuously send RPCs every second. + while (true) { + GreeterClient greeter(grpc::CreateCustomChannel( + target_str, grpc::InsecureChannelCredentials(), args)); + std::string user("world"); + std::string reply = greeter.SayHello(user); + std::cout << "Greeter received: " << reply << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +} // namespace + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + // CODELAB HINT : Add code to register OpenTelemetry plugin here. + // Register a global gRPC OpenTelemetry plugin configured with a prometheus + // exporter. + opentelemetry::exporter::metrics::PrometheusExporterOptions opts; + opts.url = absl::GetFlag(FLAGS_prometheus_endpoint); + auto prometheus_exporter = + opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(opts); + auto meter_provider = + std::make_shared(); + // The default histogram boundaries are not granular enough for RPCs. Override + // the "grpc.client.attempt.duration" view as recommended by + // https://github.com/grpc/proposal/blob/master/A66-otel-stats.md. + AddLatencyView(meter_provider.get(), "grpc.client.attempt.duration", "s"); + meter_provider->AddMetricReader(std::move(prometheus_exporter)); + auto status = grpc::OpenTelemetryPluginBuilder() + .SetMeterProvider(std::move(meter_provider)) + .BuildAndRegisterGlobal(); + if (!status.ok()) { + std::cerr << "Failed to register gRPC OpenTelemetry Plugin: " + << status.ToString() << std::endl; + return static_cast(status.code()); + } + + // Continuously send RPCs. + RunClient(absl::GetFlag(FLAGS_target)); + + return 0; +} diff --git a/examples/cpp/otel/codelab/greeter_callback_server.cc b/examples/cpp/otel/codelab/greeter_callback_server.cc new file mode 100644 index 00000000000..bba7c9ce072 --- /dev/null +++ b/examples/cpp/otel/codelab/greeter_callback_server.cc @@ -0,0 +1,112 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil +// version. Refer +// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042. +#ifndef HAVE_ABSEIL +#define HAVE_ABSEIL +#endif + +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_format.h" +#include "opentelemetry/exporters/prometheus/exporter_factory.h" +#include "opentelemetry/exporters/prometheus/exporter_options.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" + +#include +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/cpp/otel/codelab/util.h" +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#include "util.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); +ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9464", + "Prometheus exporter endpoint"); + +namespace { + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +// Logic and data behind the server's behavior. +class GreeterServiceImpl final : public Greeter::CallbackService { + ServerUnaryReactor* SayHello(CallbackServerContext* context, + const HelloRequest* request, + HelloReply* reply) override { + std::string prefix("Hello "); + reply->set_message(prefix + request->name()); + + // CODELAB HINT: This sleep seems suspicious. + std::this_thread::sleep_for(std::chrono::seconds(5)); + + ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; + } +}; + +void RunServer(uint16_t port) { + std::string server_address = absl::StrFormat("0.0.0.0:%d", port); + GreeterServiceImpl service; + + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + // Finally assemble the server. + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +} // namespace + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + + // CODELAB HINT : Add code to register gRPC OpenTelemetry plugin here. + + RunServer(absl::GetFlag(FLAGS_port)); + return 0; +} diff --git a/examples/cpp/otel/codelab/greeter_callback_server_solution.cc b/examples/cpp/otel/codelab/greeter_callback_server_solution.cc new file mode 100644 index 00000000000..a53ab288af4 --- /dev/null +++ b/examples/cpp/otel/codelab/greeter_callback_server_solution.cc @@ -0,0 +1,127 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil +// version. Refer +// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042. +#ifndef HAVE_ABSEIL +#define HAVE_ABSEIL +#endif + +#include +#include +#include + +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/strings/str_format.h" +#include "opentelemetry/exporters/prometheus/exporter_factory.h" +#include "opentelemetry/exporters/prometheus/exporter_options.h" +#include "opentelemetry/sdk/metrics/meter_provider.h" + +#include +#include +#include +#include + +#ifdef BAZEL_BUILD +#include "examples/cpp/otel/codelab/util.h" +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#include "util.h" +#endif + +ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); +ABSL_FLAG(std::string, prometheus_endpoint, "localhost:9464", + "Prometheus exporter endpoint"); + +namespace { + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +// Logic and data behind the server's behavior. +class GreeterServiceImpl final : public Greeter::CallbackService { + ServerUnaryReactor* SayHello(CallbackServerContext* context, + const HelloRequest* request, + HelloReply* reply) override { + std::string prefix("Hello "); + reply->set_message(prefix + request->name()); + + ServerUnaryReactor* reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; + } +}; + +void RunServer(uint16_t port) { + std::string server_address = absl::StrFormat("0.0.0.0:%d", port); + GreeterServiceImpl service; + + grpc::EnableDefaultHealthCheckService(true); + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); + ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + // Finally assemble the server. + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +} // namespace + +int main(int argc, char** argv) { + absl::ParseCommandLine(argc, argv); + // Register a global gRPC OpenTelemetry plugin configured with a prometheus + // exporter. + opentelemetry::exporter::metrics::PrometheusExporterOptions opts; + opts.url = absl::GetFlag(FLAGS_prometheus_endpoint); + auto prometheus_exporter = + opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(opts); + auto meter_provider = + std::make_shared(); + // The default histogram boundaries are not granular enough for RPCs. Override + // the "grpc.server.call.duration" view as recommended by + // https://github.com/grpc/proposal/blob/master/A66-otel-stats.md. + AddLatencyView(meter_provider.get(), "grpc.server.call.duration", "s"); + meter_provider->AddMetricReader(std::move(prometheus_exporter)); + auto status = grpc::OpenTelemetryPluginBuilder() + .SetMeterProvider(std::move(meter_provider)) + .BuildAndRegisterGlobal(); + if (!status.ok()) { + std::cerr << "Failed to register gRPC OpenTelemetry Plugin: " + << status.ToString() << std::endl; + return static_cast(status.code()); + } + RunServer(absl::GetFlag(FLAGS_port)); + return 0; +} diff --git a/examples/cpp/otel/codelab/util.cc b/examples/cpp/otel/codelab/util.cc new file mode 100644 index 00000000000..a8fcce95663 --- /dev/null +++ b/examples/cpp/otel/codelab/util.cc @@ -0,0 +1,70 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil +// version. Refer +// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042. +#ifndef HAVE_ABSEIL +#define HAVE_ABSEIL +#endif + +#include "opentelemetry/sdk/metrics/view/instrument_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/meter_selector_factory.h" +#include "opentelemetry/sdk/metrics/view/view_factory.h" + +#include + +#ifdef BAZEL_BUILD +#include "examples/cpp/otel/codelab/util.h" +#include "examples/protos/helloworld.grpc.pb.h" +#else +#include "helloworld.grpc.pb.h" +#include "util.h" +#endif + +using grpc::CallbackServerContext; +using grpc::Channel; +using grpc::ClientContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::Status; +using helloworld::Greeter; +using helloworld::HelloReply; +using helloworld::HelloRequest; + +void AddLatencyView(opentelemetry::sdk::metrics::MeterProvider* provider, + const std::string& name, const std::string& unit) { + auto histogram_config = std::make_shared< + opentelemetry::sdk::metrics::HistogramAggregationConfig>(); + histogram_config->boundaries_ = { + 0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, + 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, + 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, + 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, + 5, 10, 20, 50, 100}; + provider->AddView( + opentelemetry::sdk::metrics::InstrumentSelectorFactory::Create( + opentelemetry::sdk::metrics::InstrumentType::kHistogram, name, unit), + opentelemetry::sdk::metrics::MeterSelectorFactory::Create( + "grpc-c++", grpc::Version(), ""), + opentelemetry::sdk::metrics::ViewFactory::Create( + name, "", unit, + opentelemetry::sdk::metrics::AggregationType::kHistogram, + std::move(histogram_config))); +} diff --git a/examples/cpp/otel/codelab/util.h b/examples/cpp/otel/codelab/util.h new file mode 100644 index 00000000000..cf140de16f3 --- /dev/null +++ b/examples/cpp/otel/codelab/util.h @@ -0,0 +1,31 @@ +// +// +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#ifndef GRPCPP_EXAMPLES_CPP_OTEL_CODELAB_UTIL_H +#define GRPCPP_EXAMPLES_CPP_OTEL_CODELAB_UTIL_H + +#include + +#include "opentelemetry/sdk/metrics/meter_provider.h" + +// Helper function that adds view for gRPC latency instrument \a name with unit +// \a unit with bucket boundaries that are more useful for RPCs. +void AddLatencyView(opentelemetry::sdk::metrics::MeterProvider* provider, + const std::string& name, const std::string& unit); + +#endif // GRPCPP_EXAMPLES_CPP_OTEL_UTIL_H From 449d1b248fe4d9e7a8b0a5cc482c0b6e58da2e57 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 1 Aug 2024 18:35:51 -0700 Subject: [PATCH 3/7] Create a CircularBuffer to store events generated by latent_see. This bounds the total number of generated events. PiperOrigin-RevId: 658611718 --- CMakeLists.txt | 33 ++++++++ Package.swift | 1 + build_autogenerated.yaml | 32 ++++++++ gRPC-C++.podspec | 2 + gRPC-Core.podspec | 2 + grpc.gemspec | 1 + package.xml | 1 + src/core/BUILD | 15 ++++ src/core/util/latent_see.cc | 14 +++- src/core/util/latent_see.h | 9 ++- src/core/util/ring_buffer.h | 109 +++++++++++++++++++++++++++ test/core/util/BUILD | 13 ++++ test/core/util/ring_buffer_test.cc | 88 +++++++++++++++++++++ tools/doxygen/Doxyfile.c++.internal | 1 + tools/doxygen/Doxyfile.core.internal | 1 + tools/run_tests/generated/tests.json | 24 ++++++ 16 files changed, 343 insertions(+), 3 deletions(-) create mode 100644 src/core/util/ring_buffer.h create mode 100644 test/core/util/ring_buffer_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a49b3eea64..8df7a76bbed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1412,6 +1412,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx retry_transparent_not_sent_on_wire_test) add_dependencies(buildtests_cxx retry_unref_before_finish_test) add_dependencies(buildtests_cxx retry_unref_before_recv_test) + add_dependencies(buildtests_cxx ring_buffer_test) add_dependencies(buildtests_cxx ring_hash_test) add_dependencies(buildtests_cxx rls_end2end_test) add_dependencies(buildtests_cxx rls_lb_config_parser_test) @@ -27274,6 +27275,38 @@ target_link_libraries(retry_unref_before_recv_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(ring_buffer_test + test/core/util/ring_buffer_test.cc +) +target_compile_features(ring_buffer_test PUBLIC cxx_std_14) +target_include_directories(ring_buffer_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(ring_buffer_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/Package.swift b/Package.swift index bba949769a6..d9689ec1700 100644 --- a/Package.swift +++ b/Package.swift @@ -1942,6 +1942,7 @@ let package = Package( "src/core/util/posix/sync.cc", "src/core/util/posix/time.cc", "src/core/util/posix/tmpfile.cc", + "src/core/util/ring_buffer.h", "src/core/util/spinlock.h", "src/core/util/string.cc", "src/core/util/string.h", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b38db40c93b..26f31f868ea 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -1218,6 +1218,7 @@ libs: - src/core/util/json/json_util.h - src/core/util/json/json_writer.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - src/core/util/upb_utils.h - src/core/xds/grpc/certificate_provider_store.h @@ -2705,6 +2706,7 @@ libs: - src/core/util/json/json_reader.h - src/core/util/json/json_writer.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - src/core/util/upb_utils.h - third_party/upb/upb/generated_code_support.h @@ -4755,6 +4757,7 @@ libs: - src/core/util/json/json_reader.h - src/core/util/json/json_writer.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -5484,6 +5487,7 @@ targets: - src/core/lib/promise/seq.h - src/core/lib/promise/wait_set.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/test_wakeup_schedulers.h src: - src/core/lib/debug/trace.cc @@ -6714,6 +6718,7 @@ targets: - src/core/lib/transport/status_conversion.h - src/core/lib/transport/timeout_encoding.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - test/core/promise/poll_matcher.h - third_party/upb/upb/generated_code_support.h @@ -6901,6 +6906,7 @@ targets: - src/core/lib/promise/status_flag.h - src/core/lib/transport/call_state.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/poll_matcher.h src: - src/core/lib/debug/trace.cc @@ -7245,6 +7251,7 @@ targets: - src/core/util/json/json_args.h - src/core/util/json/json_writer.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -7898,6 +7905,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -8693,6 +8701,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -10241,6 +10250,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -10791,6 +10801,7 @@ targets: - src/core/lib/transport/bdp_estimator.h - src/core/lib/transport/http2_errors.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -10907,6 +10918,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - test/core/promise/test_wakeup_schedulers.h - third_party/upb/upb/generated_code_support.h @@ -12491,6 +12503,7 @@ targets: - src/core/lib/promise/poll.h - src/core/lib/promise/seq.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/test_wakeup_schedulers.h src: - src/core/lib/debug/trace.cc @@ -12581,6 +12594,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - test/core/promise/test_context.h - third_party/upb/upb/generated_code_support.h @@ -13053,6 +13067,7 @@ targets: - src/core/lib/promise/poll.h - src/core/lib/promise/seq.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/test_wakeup_schedulers.h src: - src/core/lib/debug/trace.cc @@ -13286,6 +13301,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - test/core/promise/test_wakeup_schedulers.h - third_party/upb/upb/generated_code_support.h @@ -13840,6 +13856,7 @@ targets: - src/core/lib/promise/promise.h - src/core/lib/promise/wait_set.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/poll_matcher.h src: - src/core/lib/debug/trace.cc @@ -14153,6 +14170,7 @@ targets: - src/core/lib/promise/observable.h - src/core/lib/promise/poll.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/poll_matcher.h src: - src/core/lib/debug/trace.cc @@ -14507,6 +14525,7 @@ targets: - src/core/lib/slice/slice_refcount.h - src/core/lib/slice/slice_string_helpers.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - src/core/util/spinlock.h - third_party/upb/upb/generated_code_support.h src: @@ -15072,6 +15091,7 @@ targets: - src/core/lib/promise/promise_mutex.h - src/core/lib/promise/seq.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/test_wakeup_schedulers.h src: - src/core/lib/debug/trace.cc @@ -18044,6 +18064,17 @@ targets: - grpc_authorization_provider - grpc_unsecure - grpc_test_util +- name: ring_buffer_test + gtest: true + build: test + language: c++ + headers: + - src/core/util/ring_buffer.h + src: + - test/core/util/ring_buffer_test.cc + deps: + - gtest + uses_polling: false - name: ring_hash_test gtest: true build: test @@ -20861,6 +20892,7 @@ targets: - src/core/lib/promise/poll.h - src/core/lib/promise/wait_for_callback.h - src/core/util/latent_see.h + - src/core/util/ring_buffer.h - test/core/promise/test_wakeup_schedulers.h src: - src/core/lib/debug/trace.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index f9092fb9bc7..d68374de26c 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -1322,6 +1322,7 @@ Pod::Spec.new do |s| 'src/core/util/json/json_util.h', 'src/core/util/json/json_writer.h', 'src/core/util/latent_see.h', + 'src/core/util/ring_buffer.h', 'src/core/util/spinlock.h', 'src/core/util/string.h', 'src/core/util/time_precise.h', @@ -2605,6 +2606,7 @@ Pod::Spec.new do |s| 'src/core/util/json/json_util.h', 'src/core/util/json/json_writer.h', 'src/core/util/latent_see.h', + 'src/core/util/ring_buffer.h', 'src/core/util/spinlock.h', 'src/core/util/string.h', 'src/core/util/time_precise.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1507f2037c5..ea930572c75 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2058,6 +2058,7 @@ Pod::Spec.new do |s| 'src/core/util/posix/sync.cc', 'src/core/util/posix/time.cc', 'src/core/util/posix/tmpfile.cc', + 'src/core/util/ring_buffer.h', 'src/core/util/spinlock.h', 'src/core/util/string.cc', 'src/core/util/string.h', @@ -3386,6 +3387,7 @@ Pod::Spec.new do |s| 'src/core/util/json/json_util.h', 'src/core/util/json/json_writer.h', 'src/core/util/latent_see.h', + 'src/core/util/ring_buffer.h', 'src/core/util/spinlock.h', 'src/core/util/string.h', 'src/core/util/time_precise.h', diff --git a/grpc.gemspec b/grpc.gemspec index 56e70feda77..b4bf476f2ce 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1944,6 +1944,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/util/posix/sync.cc ) s.files += %w( src/core/util/posix/time.cc ) s.files += %w( src/core/util/posix/tmpfile.cc ) + s.files += %w( src/core/util/ring_buffer.h ) s.files += %w( src/core/util/spinlock.h ) s.files += %w( src/core/util/string.cc ) s.files += %w( src/core/util/string.h ) diff --git a/package.xml b/package.xml index 74e93567698..f027be50534 100644 --- a/package.xml +++ b/package.xml @@ -1926,6 +1926,7 @@ + diff --git a/src/core/BUILD b/src/core/BUILD index 1acce192d0d..d5617e69f3a 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -148,10 +148,25 @@ grpc_cc_library( ], deps = [ "per_cpu", + "ring_buffer", "//:gpr", ], ) +grpc_cc_library( + name = "ring_buffer", + srcs = [], + hdrs = [ + "util/ring_buffer.h", + ], + external_deps = [ + "absl/types:optional", + ], + deps = [ + "//:gpr_platform", + ], +) + grpc_cc_library( name = "transport_fwd", hdrs = [ diff --git a/src/core/util/latent_see.cc b/src/core/util/latent_see.cc index 7234ad5156d..b40c93a8001 100644 --- a/src/core/util/latent_see.cc +++ b/src/core/util/latent_see.cc @@ -15,10 +15,17 @@ #include "src/core/util/latent_see.h" #ifdef GRPC_ENABLE_LATENT_SEE +#include #include #include +#include +#include + +#include "src/core/lib/gprpp/sync.h" +#include "src/core/util/ring_buffer.h" #include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" #include "absl/types/optional.h" namespace grpc_core { @@ -34,7 +41,10 @@ std::string Log::GenerateJson() { std::vector events; for (auto& fragment : fragments_) { MutexLock lock(&fragment.mu); - events.insert(events.end(), fragment.events.begin(), fragment.events.end()); + for (auto it = fragment.events.begin(); it != fragment.events.end(); ++it) { + events.push_back(*it); + } + fragment.events.Clear(); } absl::optional start_time; for (auto& event : events) { @@ -103,7 +113,7 @@ void Log::FlushBin(Bin* bin) { { MutexLock lock(&fragment.mu); for (auto event : bin->events) { - fragment.events.push_back(RecordedEvent{thread_id, batch_id, event}); + fragment.events.Append(RecordedEvent{thread_id, batch_id, event}); } } bin->events.clear(); diff --git a/src/core/util/latent_see.h b/src/core/util/latent_see.h index d5b9e297cdd..ff23798c923 100644 --- a/src/core/util/latent_see.h +++ b/src/core/util/latent_see.h @@ -18,8 +18,12 @@ #include #ifdef GRPC_ENABLE_LATENT_SEE +#include #include #include +#include +#include +#include #include #include @@ -27,6 +31,7 @@ #include "src/core/lib/gprpp/per_cpu.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/util/ring_buffer.h" namespace grpc_core { namespace latent_see { @@ -59,6 +64,7 @@ struct Bin { class Log { public: + static constexpr int kMaxEventsPerCpu = 50000; static Bin* MaybeStartBin(void* owner) { if (bin_ != nullptr) return bin_; Bin* bin = free_bins_.load(std::memory_order_acquire); @@ -120,7 +126,8 @@ class Log { static std::atomic free_bins_; struct Fragment { Mutex mu; - std::vector events ABSL_GUARDED_BY(mu); + grpc_core::RingBuffer events + ABSL_GUARDED_BY(mu); }; PerCpu fragments_{PerCpuOptions()}; }; diff --git a/src/core/util/ring_buffer.h b/src/core/util/ring_buffer.h new file mode 100644 index 00000000000..2789e7d4d32 --- /dev/null +++ b/src/core/util/ring_buffer.h @@ -0,0 +1,109 @@ +#ifndef GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ +#define GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ + +#include + +#include +#include +#include + +#include "absl/types/optional.h" + +namespace grpc_core { + +template +class RingBuffer { + public: + class RingBufferIterator { + public: + using iterator_category = std::forward_iterator_tag; + using value_type = const T; + using pointer = void; + using reference = void; + using difference_type = std::ptrdiff_t; + + RingBufferIterator& operator++() { + if (--size_ <= 0) { + head_ = 0; + size_ = 0; + buffer_ = nullptr; + } else { + head_ = (head_ + 1) % kCapacity; + } + return *this; + } + + RingBufferIterator operator++(int) { + RingBufferIterator tmp(*this); + operator++(); + return tmp; + } + + bool operator==(const RingBufferIterator& rhs) const { + return (buffer_ == rhs.buffer_ && head_ == rhs.head_ && + size_ == rhs.size_); + } + + bool operator!=(const RingBufferIterator& rhs) const { + return !operator==(rhs); + } + + T operator*() { return buffer_->data_[head_]; } + + RingBufferIterator() : buffer_(nullptr), head_(0), size_(0) {}; + RingBufferIterator(const RingBufferIterator& other) = default; + RingBufferIterator(const RingBuffer* buffer) + : buffer_(buffer), head_(buffer->head_), size_(buffer->size_) { + if (!size_) { + buffer_ = nullptr; + } + } + + private: + friend class RingBuffer; + const RingBuffer* buffer_; + int head_ = 0; + int size_ = 0; + }; + + RingBuffer() = default; + + void Append(T data) { + if (size_ < kCapacity) { + data_[size_] = std::move(data); + size_++; + } else { + data_[head_] = std::move(data); + head_ = (head_ + 1) % kCapacity; + } + } + + // Returns the data of the first element in the buffer and removes it from + // the buffer. If the buffer is empty, returns absl::nullopt. + absl::optional PopIfNotEmpty() { + if (!size_) return absl::nullopt; + T data = std::move(data_[head_]); + --size_; + head_ = (head_ + 1) % kCapacity; + return data; + } + + void Clear() { + head_ = 0; + size_ = 0; + } + + RingBufferIterator begin() const { return RingBufferIterator(this); } + + RingBufferIterator end() const { return RingBufferIterator(); } + + private: + friend class RingBufferIterator; + std::array data_; + int head_ = 0; + int size_ = 0; +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ diff --git a/test/core/util/BUILD b/test/core/util/BUILD index 8337de838eb..a21eae8c2d2 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -124,3 +124,16 @@ grpc_cc_test( "//src/core:useful", ], ) + +grpc_cc_test( + name = "ring_buffer_test", + srcs = ["ring_buffer_test.cc"], + external_deps = ["gtest"], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:gpr_platform", + "//src/core:ring_buffer", + ], +) diff --git a/test/core/util/ring_buffer_test.cc b/test/core/util/ring_buffer_test.cc new file mode 100644 index 00000000000..8531c4b9878 --- /dev/null +++ b/test/core/util/ring_buffer_test.cc @@ -0,0 +1,88 @@ +// +// +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include + +#include "gtest/gtest.h" +#include "src/core/util/ring_buffer.h" + +namespace grpc_core { + +constexpr int kBufferCapacity = 1000; + +TEST(RingBufferTest, BufferAppendPopTest) { + RingBuffer buffer; + EXPECT_FALSE(buffer.PopIfNotEmpty().has_value()); + + for (int i = 0; i < (3 * kBufferCapacity)/2; ++i) { + buffer.Append(i); + } + // Pop half of the elements. Elements in [kBufferCapacity / 2, + // kBufferCapacity) are popped. + int j = kBufferCapacity / 2; + for (int i = 0; i < kBufferCapacity / 2; ++i) { + EXPECT_EQ(buffer.PopIfNotEmpty(), j++); + } + EXPECT_EQ(j, kBufferCapacity); + // Iterate over the remaining elements. + for (auto it = buffer.begin(); it != buffer.end(); ++it) { + EXPECT_EQ(*it, j++); + } + // Elements in [kBufferCapacity, (3 * kBufferCapacity) / 2) should be present. + EXPECT_EQ(j, (3 * kBufferCapacity) / 2); + + // Append some more elements. The buffer should now have elements in + // [kBufferCapacity, 2 * kBufferCapacity). + for (int i = 0; i < kBufferCapacity / 2; ++i) { + buffer.Append(j++); + } + // Pop all the elements. + j = kBufferCapacity; + while (true) { + auto ret = buffer.PopIfNotEmpty(); + if (!ret.has_value()) break; + EXPECT_EQ(*ret, j++); + } + EXPECT_EQ(j, 2 * kBufferCapacity); +} + +TEST(RingBufferTest, BufferAppendIterateTest) { + RingBuffer buffer; + for (int i = 0; i < 5 * kBufferCapacity; ++i) { + buffer.Append(i); + int j = std::max(0, i + 1 - kBufferCapacity); + // If i >= kBufferCapacity, check that the buffer contains only the last + // kBufferCapacity elements [i + 1 - kBufferCapacity, i]. Otherwise check + // that the buffer contains all elements from 0 to i. + for (auto it = buffer.begin(); it != buffer.end(); ++it) { + EXPECT_EQ(*it, j++); + } + // Check that j was incremented at each step which implies that all the + // required elements were present in the buffer. + EXPECT_EQ(j, i + 1); + } + buffer.Clear(); + EXPECT_EQ(buffer.begin(), buffer.end()); +} + +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index ca9f1d59939..1f68d763042 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2948,6 +2948,7 @@ src/core/util/posix/string.cc \ src/core/util/posix/sync.cc \ src/core/util/posix/time.cc \ src/core/util/posix/tmpfile.cc \ +src/core/util/ring_buffer.h \ src/core/util/spinlock.h \ src/core/util/string.cc \ src/core/util/string.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index f2df91ec3c9..21e4dd18484 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -2728,6 +2728,7 @@ src/core/util/posix/string.cc \ src/core/util/posix/sync.cc \ src/core/util/posix/time.cc \ src/core/util/posix/tmpfile.cc \ +src/core/util/ring_buffer.h \ src/core/util/spinlock.h \ src/core/util/string.cc \ src/core/util/string.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index e69d28d8ef1..915f587011a 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -9083,6 +9083,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "ring_buffer_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, From 848272c570a7dcde2179cae474cc704486b76a60 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 1 Aug 2024 20:08:40 -0700 Subject: [PATCH 4/7] Add a Sanitize method to metadata map to allow filtering select metadata types from the map. Includes a unit test PiperOrigin-RevId: 658632905 --- src/core/lib/gprpp/table.h | 28 +++++++++ src/core/lib/transport/metadata_batch.h | 41 +++++++++++++ test/core/transport/metadata_map_test.cc | 77 ++++++++++++++++++++++++ 3 files changed, 146 insertions(+) diff --git a/src/core/lib/gprpp/table.h b/src/core/lib/gprpp/table.h index c0f92888b0f..7085bae0357 100644 --- a/src/core/lib/gprpp/table.h +++ b/src/core/lib/gprpp/table.h @@ -334,6 +334,15 @@ class Table { absl::index_sequence()...>()); } + // Iterate through each set field in the table if it exists in Vs, in the + // order of Vs. For each existing field, call the filter function. If the + // function returns true, keep the field. Otherwise, remove the field. + template + void FilterIn(F f) { + FilterInImpl(std::move(f), + absl::index_sequence()...>()); + } + // Count the number of set fields in the table size_t count() const { return present_bits_.count(); } @@ -415,6 +424,18 @@ class Table { } } + // Call (*f)(value) if that value is in the table. + // If the value is present in the table and (*f)(value) returns false, remove + // the value from the table. + template + void FilterIf(F* f) { + if (auto* p = get()) { + if (!(*f)(*p)) { + clear(); + } + } + } + // For each field (element I=0, 1, ...) if that field is present, call its // destructor. template @@ -444,6 +465,13 @@ class Table { table_detail::do_these_things({(CallIf(&f), 1)...}); } + // For each field (element I=0, 1, ...) if that field is present, call f. If + // f returns false, remove the field from the table. + template + void FilterInImpl(F f, absl::index_sequence) { + table_detail::do_these_things({(FilterIf(&f), 1)...}); + } + template void ClearAllImpl(absl::index_sequence) { table_detail::do_these_things({(clear(), 1)...}); diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 3d7779113ab..fab1325b95c 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -1052,6 +1052,26 @@ struct LogWrapper { } }; +// Callable for the table FilterIn -- for each value, call the +// appropriate filter method to determine of the value should be kept or +// removed. +template +struct FilterWrapper { + Filterer filter_fn; + + template ::value, bool> = true> + bool operator()(const Value& /*which*/) { + return filter_fn(Which()); + } + + template ::value, bool> = true> + bool operator()(const Value& /*which*/) { + return true; + } +}; + // Encoder to compute TransportSize class TransportSizeEncoder { public: @@ -1094,6 +1114,16 @@ class UnknownMap { BackingType::const_iterator begin() const { return unknown_.cbegin(); } BackingType::const_iterator end() const { return unknown_.cend(); } + template + void Filter(Filterer* filter_fn) { + unknown_.erase( + std::remove_if(unknown_.begin(), unknown_.end(), + [&](auto& pair) { + return !(*filter_fn)(pair.first.as_string_view()); + }), + unknown_.end()); + } + bool empty() const { return unknown_.empty(); } size_t size() const { return unknown_.size(); } void Clear() { unknown_.clear(); } @@ -1314,6 +1344,17 @@ class MetadataMap { } } + // Filter the metadata map. + // Iterates over all encodable and unknown headers and calls the filter_fn + // for each of them. If the function returns true, the header is kept. + template + void Filter(Filterer filter_fn) { + table_.template FilterIn, + Value...>( + metadata_detail::FilterWrapper{filter_fn}); + unknown_.Filter(&filter_fn); + } + std::string DebugString() const { metadata_detail::DebugStringBuilder builder; Log([&builder](absl::string_view key, absl::string_view value) { diff --git a/test/core/transport/metadata_map_test.cc b/test/core/transport/metadata_map_test.cc index cc977cd3c4e..91d60baab27 100644 --- a/test/core/transport/metadata_map_test.cc +++ b/test/core/transport/metadata_map_test.cc @@ -260,6 +260,83 @@ TEST(DebugStringBuilderTest, TestAllRedacted) { } } +std::vector GetEncodableHeaders() { + return { + // clang-format off + std::string(ContentTypeMetadata::key()), + std::string(EndpointLoadMetricsBinMetadata::key()), + std::string(GrpcAcceptEncodingMetadata::key()), + std::string(GrpcEncodingMetadata::key()), + std::string(GrpcInternalEncodingRequest::key()), + std::string(GrpcLbClientStatsMetadata::key()), + std::string(GrpcMessageMetadata::key()), + std::string(GrpcPreviousRpcAttemptsMetadata::key()), + std::string(GrpcRetryPushbackMsMetadata::key()), + std::string(GrpcServerStatsBinMetadata::key()), + std::string(GrpcStatusMetadata::key()), + std::string(GrpcTagsBinMetadata::key()), + std::string(GrpcTimeoutMetadata::key()), + std::string(GrpcTraceBinMetadata::key()), + std::string(HostMetadata::key()), + std::string(HttpAuthorityMetadata::key()), + std::string(HttpMethodMetadata::key()), + std::string(HttpPathMetadata::key()), + std::string(HttpSchemeMetadata::key()), + std::string(HttpStatusMetadata::key()), + std::string(LbCostBinMetadata::key()), + std::string(LbTokenMetadata::key()), + std::string(TeMetadata::key()), + // clang-format on + }; +} + +template +void AddNonEncodableHeader(grpc_metadata_batch& md, Value value) { + md.Set(NonEncodableHeader(), value); +} + +template +class HeaderFilter { + public: + template + bool operator()(Key) { + return filter_unknown; + } + bool operator()(absl::string_view /*key*/) { return !filter_unknown; } +}; + +TEST(MetadataMapTest, FilterTest) { + grpc_metadata_batch map; + std::vector allow_list_keys = GetEncodableHeaders(); + std::vector unknown_keys = {"unknown_key_1", "unknown_key_2"}; + allow_list_keys.insert(allow_list_keys.end(), unknown_keys.begin(), + unknown_keys.end()); + // Add some encodable and unknown headers + for (const std::string& curr_key : allow_list_keys) { + map.Append(curr_key, Slice::FromStaticString("value1"), + [](absl::string_view /*error*/, const Slice& /*value*/) {}); + } + + // Add 5 non-encodable headers + constexpr int kNumNonEncodableHeaders = 5; + AddNonEncodableHeader(map, true); + AddNonEncodableHeader(map, nullptr); + AddNonEncodableHeader(map, "value1"); + AddNonEncodableHeader(map, "value1"); + AddNonEncodableHeader( + map, GrpcStreamNetworkState::kNotSentOnWire); + + EXPECT_EQ(map.count(), allow_list_keys.size() + kNumNonEncodableHeaders); + // Remove all unknown headers + map.Filter(HeaderFilter()); + EXPECT_EQ(map.count(), allow_list_keys.size() + kNumNonEncodableHeaders - + unknown_keys.size()); + // Remove all encodable headers + map.Filter(HeaderFilter()); + EXPECT_EQ(map.count(), kNumNonEncodableHeaders); +} + } // namespace testing } // namespace grpc_core From 21b10d8c1dafe77dfdd06b52dd84cf79fa471019 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Aug 2024 20:51:22 -0700 Subject: [PATCH 5/7] [useful] Drop bespoke CountTrailingZeros impl (#37084) use `absl::countr_zero` instead Built on #37083 which should be merged first Closes #37084 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37084 from ctiller:less-bespoke-3 8c30e8fdc5387f9acb1cbf0035b4474bf397775d PiperOrigin-RevId: 658642077 --- src/core/lib/promise/party.cc | 6 ++--- src/core/util/useful.h | 24 ----------------- test/core/util/useful_test.cc | 49 ----------------------------------- 3 files changed, 3 insertions(+), 76 deletions(-) diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index cdf6dd10296..c8bde2d198f 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -276,7 +276,7 @@ void Party::RunPartyAndUnref(uint64_t prev_state) { auto wakeup_mask = std::exchange(wakeup_mask_, 0); while (wakeup_mask != 0) { const uint64_t t = LowestOneBit(wakeup_mask); - const int i = CountTrailingZeros(t); + const int i = absl::countr_zero(t); wakeup_mask ^= t; // If the participant is null, skip. // This allows participants to complete whilst wakers still exist @@ -364,7 +364,7 @@ void Party::AddParticipants(Participant** participants, size_t count) { } wakeup_mask |= new_mask; allocated |= new_mask; - slots[i] = CountTrailingZeros(new_mask); + slots[i] = absl::countr_zero(new_mask); } // Try to allocate this slot and take a ref (atomically). // Ref needs to be taken because once we store the participant it could be @@ -406,7 +406,7 @@ void Party::AddParticipant(Participant* participant) { << "No available slots for new participant; allocated=" << allocated << " state=" << state << " wakeup_mask=" << wakeup_mask; allocated |= wakeup_mask; - slot = CountTrailingZeros(wakeup_mask); + slot = absl::countr_zero(wakeup_mask); // Try to allocate this slot and take a ref (atomically). // Ref needs to be taken because once we store the participant it could be // spuriously woken up and unref the party. diff --git a/src/core/util/useful.h b/src/core/util/useful.h index 37a51611901..8007c4ca334 100644 --- a/src/core/util/useful.h +++ b/src/core/util/useful.h @@ -57,30 +57,6 @@ bool GetBit(T i, size_t n) { return (i & (T(1) << n)) != 0; } -#if GRPC_HAS_BUILTIN(__builtin_ctz) -GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline uint32_t CountTrailingZeros( - uint32_t i) { - DCHECK_NE(i, 0u); // __builtin_ctz returns undefined behavior for 0 - return __builtin_ctz(i); -} -GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline uint32_t CountTrailingZeros( - uint64_t i) { - DCHECK_NE(i, 0u); // __builtin_ctz returns undefined behavior for 0 - return __builtin_ctzll(i); -} -#else -GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline uint32_t CountTrailingZeros( - uint32_t i) { - DCHECK_NE(i, 0); // __builtin_ctz returns undefined behavior for 0 - return absl::popcount((i & -i) - 1); -} -GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline uint32_t CountTrailingZeros( - uint64_t i) { - DCHECK_NE(i, 0); // __builtin_ctz returns undefined behavior for 0 - return absl::popcount((i & -i) - 1); -} -#endif - // This function uses operator< to implement a qsort-style comparison, whereby: // if a is smaller than b, a number smaller than 0 is returned. // if a is bigger than b, a number greater than 0 is returned. diff --git a/test/core/util/useful_test.cc b/test/core/util/useful_test.cc index 53067bfb629..506b2da3040 100644 --- a/test/core/util/useful_test.cc +++ b/test/core/util/useful_test.cc @@ -81,55 +81,6 @@ TEST(UsefulTest, RoundUpToPowerOf2) { EXPECT_EQ(RoundUpToPowerOf2(8), 8); } -TEST(UsefulTest, CountTrailingZeros32) { - EXPECT_EQ(CountTrailingZeros(static_cast(1)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(2)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(3)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(4)), 2); - EXPECT_EQ(CountTrailingZeros(static_cast(5)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(6)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(7)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(8)), 3); - EXPECT_EQ(CountTrailingZeros(static_cast(9)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(10)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(11)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(12)), 2); - EXPECT_EQ(CountTrailingZeros(static_cast(13)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(14)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(15)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(16)), 4); - EXPECT_EQ(CountTrailingZeros(static_cast(256)), 8); - EXPECT_EQ(CountTrailingZeros(static_cast(65535)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(65536)), 16); - EXPECT_EQ(CountTrailingZeros(static_cast(0x80000000)), 31); -} - -TEST(UsefulTest, CountTrailingZeros64) { - EXPECT_EQ(CountTrailingZeros(static_cast(1)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(2)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(3)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(4)), 2); - EXPECT_EQ(CountTrailingZeros(static_cast(5)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(6)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(7)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(8)), 3); - EXPECT_EQ(CountTrailingZeros(static_cast(9)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(10)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(11)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(12)), 2); - EXPECT_EQ(CountTrailingZeros(static_cast(13)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(14)), 1); - EXPECT_EQ(CountTrailingZeros(static_cast(15)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(16)), 4); - EXPECT_EQ(CountTrailingZeros(static_cast(256)), 8); - EXPECT_EQ(CountTrailingZeros(static_cast(65535)), 0); - EXPECT_EQ(CountTrailingZeros(static_cast(65536)), 16); - EXPECT_EQ(CountTrailingZeros(static_cast(0x80000000)), 31); - EXPECT_EQ(CountTrailingZeros(static_cast(0x100000000)), 32); - EXPECT_EQ(CountTrailingZeros(static_cast(0x1000000000000)), 48); - EXPECT_EQ(CountTrailingZeros(static_cast(0x8000000000000000)), 63); -} - TEST(UsefulTest, LowestOneBit8) { EXPECT_EQ(LowestOneBit(static_cast(0)), 0); EXPECT_EQ(LowestOneBit(static_cast(1)), 1); From c8891a9542c97bee22c37eee3528b6b833c995f5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Aug 2024 22:02:08 -0700 Subject: [PATCH 6/7] [chttp2] Remove rstpit experiment (#37343) Closes #37343 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37343 from ctiller:rstpit-buhbye 5c75a120c2ec25bd38e27e74da993d10192c7d00 PiperOrigin-RevId: 658659336 --- BUILD | 1 - CMakeLists.txt | 45 ------------- Makefile | 1 - Package.swift | 2 - bazel/experiments.bzl | 4 -- build_autogenerated.yaml | 16 ----- config.m4 | 1 - config.w32 | 1 - gRPC-C++.podspec | 2 - gRPC-Core.podspec | 3 - grpc.gemspec | 2 - package.xml | 2 - src/core/BUILD | 17 ----- .../chttp2/transport/chttp2_transport.cc | 2 - .../ext/transport/chttp2/transport/internal.h | 3 - .../max_concurrent_streams_policy.cc | 45 ------------- .../transport/max_concurrent_streams_policy.h | 67 ------------------- .../ext/transport/chttp2/transport/parsing.cc | 20 ++---- .../ext/transport/chttp2/transport/writing.cc | 4 -- src/core/lib/experiments/experiments.cc | 18 ----- src/core/lib/experiments/experiments.h | 8 --- src/core/lib/experiments/experiments.yaml | 6 -- src/python/grpcio/grpc_core_dependencies.py | 1 - test/core/transport/chttp2/BUILD | 10 --- .../max_concurrent_streams_policy_test.cc | 48 ------------- tools/doxygen/Doxyfile.c++.internal | 2 - tools/doxygen/Doxyfile.core.internal | 2 - tools/run_tests/generated/tests.json | 24 ------- 28 files changed, 7 insertions(+), 350 deletions(-) delete mode 100644 src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc delete mode 100644 src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h delete mode 100644 test/core/transport/chttp2/max_concurrent_streams_policy_test.cc diff --git a/BUILD b/BUILD index 2588e85212e..9e4a4c70b87 100644 --- a/BUILD +++ b/BUILD @@ -4829,7 +4829,6 @@ grpc_cc_library( "//src/core:iomgr_fwd", "//src/core:iomgr_port", "//src/core:match", - "//src/core:max_concurrent_streams_policy", "//src/core:memory_quota", "//src/core:metadata_batch", "//src/core:metadata_info", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8df7a76bbed..2162cdce541 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1261,7 +1261,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx map_pipe_test) add_dependencies(buildtests_cxx match_test) add_dependencies(buildtests_cxx matchers_test) - add_dependencies(buildtests_cxx max_concurrent_streams_policy_test) add_dependencies(buildtests_cxx max_concurrent_streams_test) add_dependencies(buildtests_cxx max_connection_age_test) add_dependencies(buildtests_cxx max_connection_idle_test) @@ -1947,7 +1946,6 @@ add_library(grpc src/core/ext/transport/chttp2/transport/hpack_parser_table.cc src/core/ext/transport/chttp2/transport/http2_settings.cc src/core/ext/transport/chttp2/transport/huffsyms.cc - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc src/core/ext/transport/chttp2/transport/parsing.cc src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc src/core/ext/transport/chttp2/transport/ping_callbacks.cc @@ -3044,7 +3042,6 @@ add_library(grpc_unsecure src/core/ext/transport/chttp2/transport/hpack_parser_table.cc src/core/ext/transport/chttp2/transport/http2_settings.cc src/core/ext/transport/chttp2/transport/huffsyms.cc - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc src/core/ext/transport/chttp2/transport/parsing.cc src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc src/core/ext/transport/chttp2/transport/ping_callbacks.cc @@ -20370,48 +20367,6 @@ target_link_libraries(matchers_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(max_concurrent_streams_policy_test - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc - test/core/transport/chttp2/max_concurrent_streams_policy_test.cc -) -if(WIN32 AND MSVC) - if(BUILD_SHARED_LIBS) - target_compile_definitions(max_concurrent_streams_policy_test - PRIVATE - "GPR_DLL_IMPORTS" - ) - endif() -endif() -target_compile_features(max_concurrent_streams_policy_test PUBLIC cxx_std_14) -target_include_directories(max_concurrent_streams_policy_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(max_concurrent_streams_policy_test - ${_gRPC_ALLTARGETS_LIBRARIES} - gtest - gpr -) - - endif() if(gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 68bb2cdbfaf..5d194c1d104 100644 --- a/Makefile +++ b/Makefile @@ -726,7 +726,6 @@ LIBGRPC_SRC = \ src/core/ext/transport/chttp2/transport/hpack_parser_table.cc \ src/core/ext/transport/chttp2/transport/http2_settings.cc \ src/core/ext/transport/chttp2/transport/huffsyms.cc \ - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \ src/core/ext/transport/chttp2/transport/ping_callbacks.cc \ diff --git a/Package.swift b/Package.swift index d9689ec1700..793720ceda8 100644 --- a/Package.swift +++ b/Package.swift @@ -244,8 +244,6 @@ let package = Package( "src/core/ext/transport/chttp2/transport/huffsyms.h", "src/core/ext/transport/chttp2/transport/internal.h", "src/core/ext/transport/chttp2/transport/legacy_frame.h", - "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc", - "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h", "src/core/ext/transport/chttp2/transport/parsing.cc", "src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc", "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h", diff --git a/bazel/experiments.bzl b/bazel/experiments.bzl index 88bead1ccad..64a197963a1 100644 --- a/bazel/experiments.bzl +++ b/bazel/experiments.bzl @@ -31,7 +31,6 @@ EXPERIMENT_ENABLES = { "peer_state_based_framing": "peer_state_based_framing", "pick_first_new": "pick_first_new", "promise_based_inproc_transport": "promise_based_inproc_transport", - "rstpit": "rstpit", "schedule_cancellation_over_write": "schedule_cancellation_over_write", "server_privacy": "server_privacy", "tcp_frame_size_tuning": "tcp_frame_size_tuning", @@ -60,7 +59,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], @@ -108,7 +106,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], @@ -150,7 +147,6 @@ EXPERIMENTS = { "flow_control_test": [ "multiping", "peer_state_based_framing", - "rstpit", "tcp_frame_size_tuning", "tcp_rcv_lowat", ], diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 26f31f868ea..f424ff9db90 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -290,7 +290,6 @@ libs: - src/core/ext/transport/chttp2/transport/huffsyms.h - src/core/ext/transport/chttp2/transport/internal.h - src/core/ext/transport/chttp2/transport/legacy_frame.h - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h - src/core/ext/transport/chttp2/transport/ping_abuse_policy.h - src/core/ext/transport/chttp2/transport/ping_callbacks.h - src/core/ext/transport/chttp2/transport/ping_rate_policy.h @@ -1320,7 +1319,6 @@ libs: - src/core/ext/transport/chttp2/transport/hpack_parser_table.cc - src/core/ext/transport/chttp2/transport/http2_settings.cc - src/core/ext/transport/chttp2/transport/huffsyms.cc - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc - src/core/ext/transport/chttp2/transport/parsing.cc - src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc - src/core/ext/transport/chttp2/transport/ping_callbacks.cc @@ -2290,7 +2288,6 @@ libs: - src/core/ext/transport/chttp2/transport/huffsyms.h - src/core/ext/transport/chttp2/transport/internal.h - src/core/ext/transport/chttp2/transport/legacy_frame.h - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h - src/core/ext/transport/chttp2/transport/ping_abuse_policy.h - src/core/ext/transport/chttp2/transport/ping_callbacks.h - src/core/ext/transport/chttp2/transport/ping_rate_policy.h @@ -2766,7 +2763,6 @@ libs: - src/core/ext/transport/chttp2/transport/hpack_parser_table.cc - src/core/ext/transport/chttp2/transport/http2_settings.cc - src/core/ext/transport/chttp2/transport/huffsyms.cc - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc - src/core/ext/transport/chttp2/transport/parsing.cc - src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc - src/core/ext/transport/chttp2/transport/ping_callbacks.cc @@ -13388,18 +13384,6 @@ targets: deps: - gtest - grpc_test_util -- name: max_concurrent_streams_policy_test - gtest: true - build: test - language: c++ - headers: - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h - src: - - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc - - test/core/transport/chttp2/max_concurrent_streams_policy_test.cc - deps: - - gtest - - gpr - name: max_concurrent_streams_test gtest: true build: test diff --git a/config.m4 b/config.m4 index f80bc3a2159..00594e8878f 100644 --- a/config.m4 +++ b/config.m4 @@ -101,7 +101,6 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/transport/chttp2/transport/hpack_parser_table.cc \ src/core/ext/transport/chttp2/transport/http2_settings.cc \ src/core/ext/transport/chttp2/transport/huffsyms.cc \ - src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc \ src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \ src/core/ext/transport/chttp2/transport/ping_callbacks.cc \ diff --git a/config.w32 b/config.w32 index 0384064082d..d443a78a90e 100644 --- a/config.w32 +++ b/config.w32 @@ -66,7 +66,6 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\transport\\chttp2\\transport\\hpack_parser_table.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\http2_settings.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\huffsyms.cc " + - "src\\core\\ext\\transport\\chttp2\\transport\\max_concurrent_streams_policy.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\parsing.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\ping_abuse_policy.cc " + "src\\core\\ext\\transport\\chttp2\\transport\\ping_callbacks.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index d68374de26c..7e66f9de672 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -374,7 +374,6 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/legacy_frame.h', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h', 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', @@ -1658,7 +1657,6 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/legacy_frame.h', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h', 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index ea930572c75..db3d4fabede 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -364,8 +364,6 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/legacy_frame.h', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h', 'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h', @@ -2439,7 +2437,6 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/transport/huffsyms.h', 'src/core/ext/transport/chttp2/transport/internal.h', 'src/core/ext/transport/chttp2/transport/legacy_frame.h', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.h', 'src/core/ext/transport/chttp2/transport/ping_callbacks.h', 'src/core/ext/transport/chttp2/transport/ping_rate_policy.h', diff --git a/grpc.gemspec b/grpc.gemspec index b4bf476f2ce..6d0344c4613 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -250,8 +250,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/transport/chttp2/transport/huffsyms.h ) s.files += %w( src/core/ext/transport/chttp2/transport/internal.h ) s.files += %w( src/core/ext/transport/chttp2/transport/legacy_frame.h ) - s.files += %w( src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc ) - s.files += %w( src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h ) s.files += %w( src/core/ext/transport/chttp2/transport/parsing.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc ) s.files += %w( src/core/ext/transport/chttp2/transport/ping_abuse_policy.h ) diff --git a/package.xml b/package.xml index f027be50534..484d0f6a26b 100644 --- a/package.xml +++ b/package.xml @@ -232,8 +232,6 @@ - - diff --git a/src/core/BUILD b/src/core/BUILD index d5617e69f3a..2e69723908a 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -7203,23 +7203,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "max_concurrent_streams_policy", - srcs = [ - "ext/transport/chttp2/transport/max_concurrent_streams_policy.cc", - ], - hdrs = [ - "ext/transport/chttp2/transport/max_concurrent_streams_policy.h", - ], - external_deps = [ - "absl/log:check", - ], - deps = [ - "//:gpr", - "//:gpr_platform", - ], -) - grpc_cc_library( name = "huffsyms", srcs = [ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index cddcecf1b5a..0e64643cbf6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -65,7 +65,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" @@ -547,7 +546,6 @@ static void read_channel_args(grpc_chttp2_transport* t, value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1); if (value >= 0) { t->settings.mutable_local().SetMaxConcurrentStreams(value); - t->max_concurrent_streams_policy.SetTarget(value); } } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) { VLOG(2) << GRPC_ARG_MAX_CONCURRENT_STREAMS diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5aac16542dc..64f398807ba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -52,7 +52,6 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" @@ -383,8 +382,6 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport, grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; grpc_closure retry_initiate_ping_locked; - grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; - /// ping acks size_t ping_ack_count = 0; size_t ping_ack_capacity = 0; diff --git a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc b/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc deleted file mode 100644 index 355e40898a6..00000000000 --- a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" - -#include - -#include "absl/log/check.h" - -#include -#include - -namespace grpc_core { - -void Chttp2MaxConcurrentStreamsPolicy::AddDemerit() { - ++new_demerits_; - ++unacked_demerits_; -} - -void Chttp2MaxConcurrentStreamsPolicy::FlushedSettings() { - sent_demerits_ += std::exchange(new_demerits_, 0); -} - -void Chttp2MaxConcurrentStreamsPolicy::AckLastSend() { - CHECK(unacked_demerits_ >= sent_demerits_); - unacked_demerits_ -= std::exchange(sent_demerits_, 0); -} - -uint32_t Chttp2MaxConcurrentStreamsPolicy::AdvertiseValue() const { - if (target_ < unacked_demerits_) return 0; - return target_ - unacked_demerits_; -} - -} // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h b/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h deleted file mode 100644 index a2a56d2bfe5..00000000000 --- a/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H -#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H - -#include -#include - -#include - -namespace grpc_core { - -class Chttp2MaxConcurrentStreamsPolicy { - public: - // Set the target number of concurrent streams. - // If everything is idle we should advertise this number. - void SetTarget(uint32_t target) { target_ = target; } - - // Add one demerit to the current target. - // We need to do one full settings round trip after this to clear this - // demerit. - // It will reduce our advertised max concurrent streams by one. - void AddDemerit(); - - // Notify the policy that we've sent a settings frame. - // Newly added demerits since the last settings frame was sent will be cleared - // once that settings frame is acknowledged. - void FlushedSettings(); - - // Notify the policy that we've received an acknowledgement for the last - // settings frame we sent. - void AckLastSend(); - - // Returns what we should advertise as max concurrent streams. - uint32_t AdvertiseValue() const; - - private: - uint32_t target_ = std::numeric_limits::max(); - // Demerit flow: - // When we add a demerit, we add to both new & unacked. - // When we flush settings, we move new to sent. - // When we ack settings, we remove what we sent from unacked. - // eg: - // we add 10 demerits - now new=10, sent=0, unacked=10 - // we send settings - now new=0, sent=10, unacked=10 - // we add 5 demerits - now new=5, sent=10, unacked=15 - // we get the settings ack - now new=5, sent=0, unacked=5 - uint32_t new_demerits_ = 0; - uint32_t sent_demerits_ = 0; - uint32_t unacked_demerits_ = 0; -}; - -} // namespace grpc_core - -#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 8eefef09ec3..070294b2cf4 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -55,7 +55,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/lib/backoff/random_early_detection.h" #include "src/core/lib/debug/trace.h" @@ -650,7 +649,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, } else if (GPR_UNLIKELY( t->max_concurrent_streams_overload_protection && t->streams_allocated.load(std::memory_order_relaxed) > - t->max_concurrent_streams_policy.AdvertiseValue())) { + t->settings.local().max_concurrent_streams())) { // We have more streams allocated than we'd like, so apply some pushback // by refusing this stream. ++t->num_pending_induced_frames; @@ -659,13 +658,12 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, GRPC_HTTP2_REFUSED_STREAM, nullptr)); grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); return init_header_skip_frame_parser(t, priority_type, is_eoh); - } else if (GPR_UNLIKELY( - t->stream_map.size() >= - t->max_concurrent_streams_policy.AdvertiseValue() && - grpc_core::RandomEarlyDetection( - t->max_concurrent_streams_policy.AdvertiseValue(), - t->settings.acked().max_concurrent_streams()) - .Reject(t->stream_map.size(), t->bitgen))) { + } else if (GPR_UNLIKELY(t->stream_map.size() >= + t->settings.local().max_concurrent_streams() && + grpc_core::RandomEarlyDetection( + t->settings.local().max_concurrent_streams(), + t->settings.acked().max_concurrent_streams()) + .Reject(t->stream_map.size(), t->bitgen))) { // We are under the limit of max concurrent streams for the current // setting, but are over the next value that will be advertised. // Apply some backpressure by randomly not accepting new streams. @@ -827,9 +825,6 @@ static grpc_error_handle init_rst_stream_parser(grpc_chttp2_transport* t) { s->call_tracer_wrapper.RecordIncomingBytes({9, 0, 0}); t->parser = grpc_chttp2_transport::Parser{ "rst_stream", grpc_chttp2_rst_stream_parser_parse, &t->simple.rst_stream}; - if (!t->is_client && grpc_core::IsRstpitEnabled()) { - t->max_concurrent_streams_policy.AddDemerit(); - } return absl::OkStatus(); } @@ -854,7 +849,6 @@ static grpc_error_handle init_settings_frame_parser(grpc_chttp2_transport* t) { return err; } if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { - t->max_concurrent_streams_policy.AckLastSend(); if (!t->settings.AckLastSend()) { return GRPC_ERROR_CREATE("Received unexpected settings ack"); } diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 1095bfc2c40..029608120cc 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -49,7 +49,6 @@ #include "src/core/ext/transport/chttp2/transport/http2_settings.h" #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/legacy_frame.h" -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/ext/transport/chttp2/transport/write_size_policy.h" @@ -260,8 +259,6 @@ class WriteContext { } void FlushSettings() { - t_->settings.mutable_local().SetMaxConcurrentStreams( - t_->max_concurrent_streams_policy.AdvertiseValue()); auto update = t_->settings.MaybeSendUpdate(); if (update.has_value()) { grpc_core::Http2Frame frame(std::move(*update)); @@ -280,7 +277,6 @@ class WriteContext { }); } t_->flow_control.FlushedSettings(); - t_->max_concurrent_streams_policy.FlushedSettings(); grpc_core::global_stats().IncrementHttp2SettingsWrites(); } } diff --git a/src/core/lib/experiments/experiments.cc b/src/core/lib/experiments/experiments.cc index 70326c37823..5ce7bfb84f4 100644 --- a/src/core/lib/experiments/experiments.cc +++ b/src/core/lib/experiments/experiments.cc @@ -69,10 +69,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -142,8 +138,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -219,10 +213,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -292,8 +282,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, @@ -369,10 +357,6 @@ const char* const additional_constraints_pick_first_new = "{}"; const char* const description_promise_based_inproc_transport = "Use promises for the in-process transport."; const char* const additional_constraints_promise_based_inproc_transport = "{}"; -const char* const description_rstpit = - "On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short " - "duration"; -const char* const additional_constraints_rstpit = "{}"; const char* const description_schedule_cancellation_over_write = "Allow cancellation op to be scheduled over a write"; const char* const additional_constraints_schedule_cancellation_over_write = @@ -442,8 +426,6 @@ const ExperimentMetadata g_experiment_metadata[] = { description_promise_based_inproc_transport, additional_constraints_promise_based_inproc_transport, nullptr, 0, false, false}, - {"rstpit", description_rstpit, additional_constraints_rstpit, nullptr, 0, - false, true}, {"schedule_cancellation_over_write", description_schedule_cancellation_over_write, additional_constraints_schedule_cancellation_over_write, nullptr, 0, false, diff --git a/src/core/lib/experiments/experiments.h b/src/core/lib/experiments/experiments.h index 498557c5118..89f6b6010ae 100644 --- a/src/core/lib/experiments/experiments.h +++ b/src/core/lib/experiments/experiments.h @@ -75,7 +75,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -109,7 +108,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -142,7 +140,6 @@ inline bool IsPeerStateBasedFramingEnabled() { return false; } #define GRPC_EXPERIMENT_IS_INCLUDED_PICK_FIRST_NEW inline bool IsPickFirstNewEnabled() { return true; } inline bool IsPromiseBasedInprocTransportEnabled() { return false; } -inline bool IsRstpitEnabled() { return false; } inline bool IsScheduleCancellationOverWriteEnabled() { return false; } inline bool IsServerPrivacyEnabled() { return false; } inline bool IsTcpFrameSizeTuningEnabled() { return false; } @@ -171,7 +168,6 @@ enum ExperimentIds { kExperimentIdPeerStateBasedFraming, kExperimentIdPickFirstNew, kExperimentIdPromiseBasedInprocTransport, - kExperimentIdRstpit, kExperimentIdScheduleCancellationOverWrite, kExperimentIdServerPrivacy, kExperimentIdTcpFrameSizeTuning, @@ -238,10 +234,6 @@ inline bool IsPickFirstNewEnabled() { inline bool IsPromiseBasedInprocTransportEnabled() { return IsExperimentEnabled(); } -#define GRPC_EXPERIMENT_IS_INCLUDED_RSTPIT -inline bool IsRstpitEnabled() { - return IsExperimentEnabled(); -} #define GRPC_EXPERIMENT_IS_INCLUDED_SCHEDULE_CANCELLATION_OVER_WRITE inline bool IsScheduleCancellationOverWriteEnabled() { return IsExperimentEnabled(); diff --git a/src/core/lib/experiments/experiments.yaml b/src/core/lib/experiments/experiments.yaml index 5d69cf1bc13..5815f2d66f5 100644 --- a/src/core/lib/experiments/experiments.yaml +++ b/src/core/lib/experiments/experiments.yaml @@ -130,12 +130,6 @@ owner: ctiller@google.com test_tags: [] allow_in_fuzzing_config: false # experiment currently crashes if enabled -- name: rstpit - description: - On RST_STREAM on a server, reduce MAX_CONCURRENT_STREAMS for a short duration - expiry: 2024/08/03 - owner: ctiller@google.com - test_tags: [flow_control_test] - name: schedule_cancellation_over_write description: Allow cancellation op to be scheduled over a write expiry: 2024/08/01 diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 75a21995f6f..53b0c28a869 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -75,7 +75,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/hpack_parser_table.cc', 'src/core/ext/transport/chttp2/transport/http2_settings.cc', 'src/core/ext/transport/chttp2/transport/huffsyms.cc', - 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc', 'src/core/ext/transport/chttp2/transport/parsing.cc', 'src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc', 'src/core/ext/transport/chttp2/transport/ping_callbacks.cc', diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index 54e1b521b77..d2cd2a2a27b 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -346,16 +346,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "max_concurrent_streams_policy_test", - srcs = ["max_concurrent_streams_policy_test.cc"], - external_deps = ["gtest"], - language = "C++", - deps = [ - "//src/core:max_concurrent_streams_policy", - ], -) - grpc_cc_test( name = "streams_not_seen_test", srcs = ["streams_not_seen_test.cc"], diff --git a/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc b/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc deleted file mode 100644 index 11b347dbcd8..00000000000 --- a/test/core/transport/chttp2/max_concurrent_streams_policy_test.cc +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" - -#include - -#include "gtest/gtest.h" - -namespace grpc_core { -namespace { - -TEST(MaxConcurrentStreamsPolicyTest, NoOpWorks) { - Chttp2MaxConcurrentStreamsPolicy policy; - policy.SetTarget(100); - EXPECT_EQ(policy.AdvertiseValue(), 100); -} - -TEST(MaxConcurrentStreamsPolicyTest, BasicFlow) { - Chttp2MaxConcurrentStreamsPolicy policy; - policy.SetTarget(100); - EXPECT_EQ(policy.AdvertiseValue(), 100); - policy.AddDemerit(); - EXPECT_EQ(policy.AdvertiseValue(), 99); - policy.FlushedSettings(); - EXPECT_EQ(policy.AdvertiseValue(), 99); - policy.AckLastSend(); - EXPECT_EQ(policy.AdvertiseValue(), 100); -} - -} // namespace -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 1f68d763042..a1ec0b5c168 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1254,8 +1254,6 @@ src/core/ext/transport/chttp2/transport/huffsyms.cc \ src/core/ext/transport/chttp2/transport/huffsyms.h \ src/core/ext/transport/chttp2/transport/internal.h \ src/core/ext/transport/chttp2/transport/legacy_frame.h \ -src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc \ -src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h \ src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 21e4dd18484..ccc8cabeb95 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1022,8 +1022,6 @@ src/core/ext/transport/chttp2/transport/huffsyms.cc \ src/core/ext/transport/chttp2/transport/huffsyms.h \ src/core/ext/transport/chttp2/transport/internal.h \ src/core/ext/transport/chttp2/transport/legacy_frame.h \ -src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc \ -src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h \ src/core/ext/transport/chttp2/transport/parsing.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.cc \ src/core/ext/transport/chttp2/transport/ping_abuse_policy.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 915f587011a..a70b5442420 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -6135,30 +6135,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "max_concurrent_streams_policy_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": true - }, { "args": [], "benchmark": false, From bda6c3adf8d26184a8551dc6cf4902fe0f1d66a4 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 1 Aug 2024 23:28:06 -0700 Subject: [PATCH 7/7] [sanity] fix (#37385) Closes #37385 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37385 from ctiller:san 2d3c1b4a2c0d780ee7e1b32f860999b7806bdfb5 PiperOrigin-RevId: 658680781 --- src/core/util/latent_see.cc | 6 +++--- src/core/util/latent_see.h | 3 +-- src/core/util/ring_buffer.h | 24 +++++++++++++++++++----- test/core/util/ring_buffer_test.cc | 7 ++++--- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/src/core/util/latent_see.cc b/src/core/util/latent_see.cc index b40c93a8001..585a072c5b3 100644 --- a/src/core/util/latent_see.cc +++ b/src/core/util/latent_see.cc @@ -21,13 +21,13 @@ #include #include -#include "src/core/lib/gprpp/sync.h" -#include "src/core/util/ring_buffer.h" - #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/util/ring_buffer.h" + namespace grpc_core { namespace latent_see { diff --git a/src/core/util/latent_see.h b/src/core/util/latent_see.h index ff23798c923..b4f11d2960b 100644 --- a/src/core/util/latent_see.h +++ b/src/core/util/latent_see.h @@ -126,8 +126,7 @@ class Log { static std::atomic free_bins_; struct Fragment { Mutex mu; - grpc_core::RingBuffer events - ABSL_GUARDED_BY(mu); + RingBuffer events ABSL_GUARDED_BY(mu); }; PerCpu fragments_{PerCpuOptions()}; }; diff --git a/src/core/util/ring_buffer.h b/src/core/util/ring_buffer.h index 2789e7d4d32..4831c2202c1 100644 --- a/src/core/util/ring_buffer.h +++ b/src/core/util/ring_buffer.h @@ -1,5 +1,19 @@ -#ifndef GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ -#define GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_UTIL_RING_BUFFER_H +#define GRPC_SRC_CORE_UTIL_RING_BUFFER_H #include @@ -50,9 +64,9 @@ class RingBuffer { T operator*() { return buffer_->data_[head_]; } - RingBufferIterator() : buffer_(nullptr), head_(0), size_(0) {}; + RingBufferIterator() : buffer_(nullptr), head_(0), size_(0){}; RingBufferIterator(const RingBufferIterator& other) = default; - RingBufferIterator(const RingBuffer* buffer) + explicit RingBufferIterator(const RingBuffer* buffer) : buffer_(buffer), head_(buffer->head_), size_(buffer->size_) { if (!size_) { buffer_ = nullptr; @@ -106,4 +120,4 @@ class RingBuffer { } // namespace grpc_core -#endif // GRPC_SRC_CORE_UTIL_RING_BUFFER_H_ +#endif // GRPC_SRC_CORE_UTIL_RING_BUFFER_H diff --git a/test/core/util/ring_buffer_test.cc b/test/core/util/ring_buffer_test.cc index 8531c4b9878..808018d0538 100644 --- a/test/core/util/ring_buffer_test.cc +++ b/test/core/util/ring_buffer_test.cc @@ -16,10 +16,11 @@ // // -#include +#include "src/core/util/ring_buffer.h" #include "gtest/gtest.h" -#include "src/core/util/ring_buffer.h" + +#include namespace grpc_core { @@ -29,7 +30,7 @@ TEST(RingBufferTest, BufferAppendPopTest) { RingBuffer buffer; EXPECT_FALSE(buffer.PopIfNotEmpty().has_value()); - for (int i = 0; i < (3 * kBufferCapacity)/2; ++i) { + for (int i = 0; i < (3 * kBufferCapacity) / 2; ++i) { buffer.Append(i); } // Pop half of the elements. Elements in [kBufferCapacity / 2,