From 01907a776771eae2b2b5082fcf6d4ca2bd8386ce Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 9 Oct 2023 10:32:59 -0700 Subject: [PATCH] [ring_hash] add test and make some minor fixes and improvements (#34610) - Fixes support for the same address being present more than once in the address list, which was accidentally broken in #34244. - Change the call attribute to encode the hash as an integer instead of a string. --- CMakeLists.txt | 41 ++++ build_autogenerated.yaml | 18 ++ src/core/BUILD | 1 + .../lb_policy/ring_hash/ring_hash.cc | 49 +++-- .../lb_policy/ring_hash/ring_hash.h | 8 +- .../resolver/xds/xds_resolver.cc | 11 +- test/core/client_channel/lb_policy/BUILD | 18 ++ .../lb_policy/ring_hash_test.cc | 196 ++++++++++++++++++ tools/run_tests/generated/tests.json | 24 +++ 9 files changed, 341 insertions(+), 25 deletions(-) create mode 100644 test/core/client_channel/lb_policy/ring_hash_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 925843e7cf1..03e508a8a7d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1272,6 +1272,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx retry_transparent_not_sent_on_wire_test) add_dependencies(buildtests_cxx retry_unref_before_finish_test) add_dependencies(buildtests_cxx retry_unref_before_recv_test) + add_dependencies(buildtests_cxx ring_hash_test) add_dependencies(buildtests_cxx rls_end2end_test) add_dependencies(buildtests_cxx rls_lb_config_parser_test) add_dependencies(buildtests_cxx round_robin_test) @@ -20692,6 +20693,46 @@ target_link_libraries(retry_unref_before_recv_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(ring_hash_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + test/core/client_channel/lb_policy/ring_hash_test.cc + test/core/event_engine/event_engine_test_utils.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc +) +target_compile_features(ring_hash_test PUBLIC cxx_std_14) +target_include_directories(ring_hash_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(ring_hash_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index fd30f5f7c0e..7f914be517c 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -13941,6 +13941,24 @@ targets: - grpc_authorization_provider - grpc_unsecure - grpc_test_util +- name: ring_hash_test + gtest: true + build: test + language: c++ + headers: + - test/core/client_channel/lb_policy/lb_policy_test_lib.h + - test/core/event_engine/event_engine_test_utils.h + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + src: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto + - test/core/client_channel/lb_policy/ring_hash_test.cc + - test/core/event_engine/event_engine_test_utils.cc + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + deps: + - gtest + - protobuf + - grpc_test_util + uses_polling: false - name: rls_end2end_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 981c6941a2a..3ac22e4dbc9 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4908,6 +4908,7 @@ grpc_cc_library( "lb_policy_registry", "pollset_set", "ref_counted", + "resolved_address", "unique_type_name", "validation_errors", "//:channel_arg_names", diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc index 9d440ca75d6..25bc10f5033 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc @@ -33,7 +33,6 @@ #include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/status/statusor.h" -#include "absl/strings/numbers.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" @@ -63,6 +62,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/delegating_helper.h" #include "src/core/lib/load_balancing/lb_policy.h" @@ -306,15 +306,10 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { auto* call_state = static_cast(args.call_state); auto* hash_attribute = static_cast( call_state->GetCallAttribute(RequestHashAttribute::TypeName())); - absl::string_view hash; - if (hash_attribute != nullptr) { - hash = hash_attribute->request_hash(); - } - uint64_t h; - if (!absl::SimpleAtoi(hash, &h)) { - return PickResult::Fail( - absl::InternalError("ring hash value is not a number")); + if (hash_attribute == nullptr) { + return PickResult::Fail(absl::InternalError("hash attribute not present")); } + uint64_t request_hash = hash_attribute->request_hash(); const auto& ring = ring_->ring(); // Find the index in the ring to use for this RPC. // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c @@ -331,10 +326,10 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) { } uint64_t midval = ring[index].hash; uint64_t midval1 = index == 0 ? 0 : ring[index - 1].hash; - if (h <= midval && h > midval1) { + if (request_hash <= midval && request_hash > midval1) { break; } - if (midval < h) { + if (midval < request_hash) { lowp = index + 1; } else { highp = index - 1; @@ -632,7 +627,37 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) { gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses", this, args.addresses->size()); } - endpoints_ = *std::move(args.addresses); + // De-dup endpoints, taking weight into account. + endpoints_.clear(); + endpoints_.reserve(args.addresses->size()); + std::map endpoint_indices; + size_t num_skipped = 0; + for (size_t i = 0; i < args.addresses->size(); ++i) { + EndpointAddresses& endpoint = (*args.addresses)[i]; + const EndpointAddressSet key(endpoint.addresses()); + auto p = endpoint_indices.emplace(key, i - num_skipped); + if (!p.second) { + // Duplicate endpoint. Combine weights and skip the dup. + EndpointAddresses& prev_endpoint = endpoints_[p.first->second]; + int weight_arg = + endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1); + int prev_weight_arg = + prev_endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { + gpr_log(GPR_INFO, + "[RH %p] merging duplicate endpoint for %s, combined " + "weight %d", + this, key.ToString().c_str(), weight_arg + prev_weight_arg); + } + prev_endpoint = EndpointAddresses( + prev_endpoint.addresses(), + prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT, + weight_arg + prev_weight_arg)); + ++num_skipped; + } else { + endpoints_.push_back(std::move(endpoint)); + } + } } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) { gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s", diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h index 95b248ea1e1..5fee8319aaf 100644 --- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h +++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h @@ -21,8 +21,6 @@ #include -#include "absl/strings/string_view.h" - #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/json/json.h" @@ -37,15 +35,15 @@ class RequestHashAttribute public: static UniqueTypeName TypeName(); - explicit RequestHashAttribute(absl::string_view request_hash) + explicit RequestHashAttribute(uint64_t request_hash) : request_hash_(request_hash) {} - absl::string_view request_hash() const { return request_hash_; } + uint64_t request_hash() const { return request_hash_; } private: UniqueTypeName type() const override { return TypeName(); } - absl::string_view request_hash_; + uint64_t request_hash_; }; // Helper Parsing method to parse ring hash policy configs; for example, ring diff --git a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc index 6f326cde8d6..7d452dce98b 100644 --- a/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc @@ -812,8 +812,8 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( // Rotating the old value prevents duplicate hash rules from cancelling // each other out and preserves all of the entropy const uint64_t old_value = - hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0; - hash = old_value ^ new_hash.value(); + hash.has_value() ? ((*hash << 1) | (*hash >> 63)) : 0; + hash = old_value ^ *new_hash; } // If the policy is a terminal policy and a hash has been generated, // ignore the rest of the hash policies. @@ -833,13 +833,8 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig( } args.service_config_call_data->SetCallAttribute( args.arena->New(cluster->cluster_name())); - std::string hash_string = absl::StrCat(hash.value()); - char* hash_value = - static_cast(args.arena->Alloc(hash_string.size() + 1)); - memcpy(hash_value, hash_string.c_str(), hash_string.size()); - hash_value[hash_string.size()] = '\0'; args.service_config_call_data->SetCallAttribute( - args.arena->New(hash_value)); + args.arena->New(*hash)); args.service_config_call_data->SetCallAttribute( args.arena->ManagedNew(route_config_data_, entry)); diff --git a/test/core/client_channel/lb_policy/BUILD b/test/core/client_channel/lb_policy/BUILD index 26dff1b878a..ca181c2b2e8 100644 --- a/test/core/client_channel/lb_policy/BUILD +++ b/test/core/client_channel/lb_policy/BUILD @@ -234,3 +234,21 @@ grpc_cc_test( "//test/core/util:grpc_test_util", ], ) + +grpc_cc_test( + name = "ring_hash_test", + srcs = ["ring_hash_test.cc"], + external_deps = ["gtest"], + language = "C++", + tags = [ + "lb_unit_test", + ], + uses_event_engine = False, + uses_polling = False, + deps = [ + ":lb_policy_test_lib", + "//src/core:channel_args", + "//src/core:grpc_lb_policy_ring_hash", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/client_channel/lb_policy/ring_hash_test.cc b/test/core/client_channel/lb_policy/ring_hash_test.cc new file mode 100644 index 00000000000..c01a50ed65d --- /dev/null +++ b/test/core/client_channel/lb_policy/ring_hash_test.cc @@ -0,0 +1,196 @@ +// +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include + +#include +#include +#include +#include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "absl/types/optional.h" +#include "gtest/gtest.h" + +#define XXH_INLINE_ALL +#include "xxhash.h" + +#include +#include + +#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/load_balancing/lb_policy.h" +#include "src/core/lib/resolver/endpoint_addresses.h" +#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h" +#include "test/core/util/test_config.h" + +namespace grpc_core { +namespace testing { +namespace { + +// TODO(roth): I created this file when I fixed a bug and wrote only a +// very basic test and the test needed for that bug. When we have time, +// we need a lot more tests here to cover all of the policy's functionality. + +class RingHashTest : public LoadBalancingPolicyTest { + protected: + RingHashTest() : LoadBalancingPolicyTest("ring_hash_experimental") {} + + static RefCountedPtr MakeRingHashConfig( + int min_ring_size = 0, int max_ring_size = 0) { + Json::Object fields; + if (min_ring_size > 0) { + fields["minRingSize"] = Json::FromString(absl::StrCat(min_ring_size)); + } + if (max_ring_size > 0) { + fields["maxRingSize"] = Json::FromString(absl::StrCat(max_ring_size)); + } + return MakeConfig(Json::FromArray({Json::FromObject( + {{"ring_hash_experimental", Json::FromObject(fields)}})})); + } + + RequestHashAttribute* MakeHashAttribute(absl::string_view address) { + std::string hash_input = + absl::StrCat(absl::StripPrefix(address, "ipv4:"), "_0"); + uint64_t hash = XXH64(hash_input.data(), hash_input.size(), 0); + attribute_storage_.emplace_back( + std::make_unique(hash)); + return attribute_storage_.back().get(); + } + + std::vector> attribute_storage_; +}; + +TEST_F(RingHashTest, Basic) { + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"}; + EXPECT_EQ( + ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()), + absl::OkStatus()); + auto picker = ExpectState(GRPC_CHANNEL_IDLE); + auto* address0_attribute = MakeHashAttribute(kAddresses[0]); + ExpectPickQueued(picker.get(), {address0_attribute}); + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); + auto* subchannel = FindSubchannel(kAddresses[0]); + ASSERT_NE(subchannel, nullptr); + EXPECT_TRUE(subchannel->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_CONNECTING); + ExpectPickQueued(picker.get(), {address0_attribute}); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + picker = ExpectState(GRPC_CHANNEL_READY); + auto address = ExpectPickComplete(picker.get(), {address0_attribute}); + EXPECT_EQ(address, kAddresses[0]); +} + +TEST_F(RingHashTest, SameAddressListedMultipleTimes) { + const std::array kAddresses = { + "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:441"}; + EXPECT_EQ( + ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()), + absl::OkStatus()); + auto picker = ExpectState(GRPC_CHANNEL_IDLE); + auto* address0_attribute = MakeHashAttribute(kAddresses[0]); + ExpectPickQueued(picker.get(), {address0_attribute}); + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); + auto* subchannel = FindSubchannel(kAddresses[0]); + ASSERT_NE(subchannel, nullptr); + EXPECT_TRUE(subchannel->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_CONNECTING); + ExpectPickQueued(picker.get(), {address0_attribute}); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + picker = ExpectState(GRPC_CHANNEL_READY); + auto address = ExpectPickComplete(picker.get(), {address0_attribute}); + EXPECT_EQ(address, kAddresses[0]); +} + +TEST_F(RingHashTest, MultipleAddressesPerEndpoint) { + constexpr std::array kEndpoint1Addresses = { + "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; + constexpr std::array kEndpoint2Addresses = { + "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"}; + const std::array kEndpoints = { + MakeEndpointAddresses(kEndpoint1Addresses), + MakeEndpointAddresses(kEndpoint2Addresses)}; + EXPECT_EQ( + ApplyUpdate(BuildUpdate(kEndpoints, MakeRingHashConfig()), lb_policy()), + absl::OkStatus()); + auto picker = ExpectState(GRPC_CHANNEL_IDLE); + // Normal connection to first address of the first endpoint. + auto* address0_attribute = MakeHashAttribute(kEndpoint1Addresses[0]); + ExpectPickQueued(picker.get(), {address0_attribute}); + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); + auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]); + ASSERT_NE(subchannel, nullptr); + EXPECT_TRUE(subchannel->ConnectionRequested()); + auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]); + ASSERT_NE(subchannel2, nullptr); + EXPECT_FALSE(subchannel2->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_CONNECTING); + ExpectPickQueued(picker.get(), {address0_attribute}); + subchannel->SetConnectivityState(GRPC_CHANNEL_READY); + picker = ExpectState(GRPC_CHANNEL_READY); + auto address = ExpectPickComplete(picker.get(), {address0_attribute}); + EXPECT_EQ(address, kEndpoint1Addresses[0]); + // Now that connection fails. + subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE); + ExpectReresolutionRequest(); + picker = ExpectState(GRPC_CHANNEL_IDLE); + EXPECT_FALSE(subchannel->ConnectionRequested()); + EXPECT_FALSE(subchannel2->ConnectionRequested()); + // The LB policy will try to reconnect when it gets another pick. + ExpectPickQueued(picker.get(), {address0_attribute}); + WaitForWorkSerializerToFlush(); + WaitForWorkSerializerToFlush(); + EXPECT_TRUE(subchannel->ConnectionRequested()); + subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_CONNECTING); + ExpectPickQueued(picker.get(), {address0_attribute}); + // The connection attempt fails. + subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, + absl::UnavailableError("ugh")); + // The PF child policy will try to connect to the second address for the + // endpoint. + EXPECT_TRUE(subchannel2->ConnectionRequested()); + subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING); + picker = ExpectState(GRPC_CHANNEL_CONNECTING); + ExpectPickQueued(picker.get(), {address0_attribute}); + subchannel2->SetConnectivityState(GRPC_CHANNEL_READY); + picker = ExpectState(GRPC_CHANNEL_READY); + address = ExpectPickComplete(picker.get(), {address0_attribute}); + EXPECT_EQ(address, kEndpoint1Addresses[1]); +} + +} // namespace +} // namespace testing +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index c57c23e9d6c..4ebf1ca5a1e 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -8269,6 +8269,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "ring_hash_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,