[ring_hash] add test and make some minor fixes and improvements (#34610)

- Fixes support for the same address being present more than once in the
address list, which was accidentally broken in #34244.
- Change the call attribute to encode the hash as an integer instead of
a string.
pull/34630/head
Mark D. Roth 1 year ago committed by GitHub
parent bb6a6faa69
commit 01907a7767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      CMakeLists.txt
  2. 18
      build_autogenerated.yaml
  3. 1
      src/core/BUILD
  4. 49
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
  5. 8
      src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h
  6. 11
      src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
  7. 18
      test/core/client_channel/lb_policy/BUILD
  8. 196
      test/core/client_channel/lb_policy/ring_hash_test.cc
  9. 24
      tools/run_tests/generated/tests.json

41
CMakeLists.txt generated

@ -1272,6 +1272,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx retry_transparent_not_sent_on_wire_test)
add_dependencies(buildtests_cxx retry_unref_before_finish_test)
add_dependencies(buildtests_cxx retry_unref_before_recv_test)
add_dependencies(buildtests_cxx ring_hash_test)
add_dependencies(buildtests_cxx rls_end2end_test)
add_dependencies(buildtests_cxx rls_lb_config_parser_test)
add_dependencies(buildtests_cxx round_robin_test)
@ -20692,6 +20693,46 @@ target_link_libraries(retry_unref_before_recv_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(ring_hash_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/client_channel/lb_policy/ring_hash_test.cc
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
)
target_compile_features(ring_hash_test PUBLIC cxx_std_14)
target_include_directories(ring_hash_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(ring_hash_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)

@ -13941,6 +13941,24 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
- name: ring_hash_test
gtest: true
build: test
language: c++
headers:
- test/core/client_channel/lb_policy/lb_policy_test_lib.h
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/client_channel/lb_policy/ring_hash_test.cc
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
deps:
- gtest
- protobuf
- grpc_test_util
uses_polling: false
- name: rls_end2end_test
gtest: true
build: test

@ -4908,6 +4908,7 @@ grpc_cc_library(
"lb_policy_registry",
"pollset_set",
"ref_counted",
"resolved_address",
"unique_type_name",
"validation_errors",
"//:channel_arg_names",

@ -33,7 +33,6 @@
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
@ -63,6 +62,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/delegating_helper.h"
#include "src/core/lib/load_balancing/lb_policy.h"
@ -306,15 +306,10 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
auto* hash_attribute = static_cast<RequestHashAttribute*>(
call_state->GetCallAttribute(RequestHashAttribute::TypeName()));
absl::string_view hash;
if (hash_attribute != nullptr) {
hash = hash_attribute->request_hash();
}
uint64_t h;
if (!absl::SimpleAtoi(hash, &h)) {
return PickResult::Fail(
absl::InternalError("ring hash value is not a number"));
if (hash_attribute == nullptr) {
return PickResult::Fail(absl::InternalError("hash attribute not present"));
}
uint64_t request_hash = hash_attribute->request_hash();
const auto& ring = ring_->ring();
// Find the index in the ring to use for this RPC.
// Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c
@ -331,10 +326,10 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
}
uint64_t midval = ring[index].hash;
uint64_t midval1 = index == 0 ? 0 : ring[index - 1].hash;
if (h <= midval && h > midval1) {
if (request_hash <= midval && request_hash > midval1) {
break;
}
if (midval < h) {
if (midval < request_hash) {
lowp = index + 1;
} else {
highp = index - 1;
@ -632,7 +627,37 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
endpoints_ = *std::move(args.addresses);
// De-dup endpoints, taking weight into account.
endpoints_.clear();
endpoints_.reserve(args.addresses->size());
std::map<EndpointAddressSet, size_t> endpoint_indices;
size_t num_skipped = 0;
for (size_t i = 0; i < args.addresses->size(); ++i) {
EndpointAddresses& endpoint = (*args.addresses)[i];
const EndpointAddressSet key(endpoint.addresses());
auto p = endpoint_indices.emplace(key, i - num_skipped);
if (!p.second) {
// Duplicate endpoint. Combine weights and skip the dup.
EndpointAddresses& prev_endpoint = endpoints_[p.first->second];
int weight_arg =
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
int prev_weight_arg =
prev_endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] merging duplicate endpoint for %s, combined "
"weight %d",
this, key.ToString().c_str(), weight_arg + prev_weight_arg);
}
prev_endpoint = EndpointAddresses(
prev_endpoint.addresses(),
prev_endpoint.args().Set(GRPC_ARG_ADDRESS_WEIGHT,
weight_arg + prev_weight_arg));
++num_skipped;
} else {
endpoints_.push_back(std::move(endpoint));
}
}
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",

@ -21,8 +21,6 @@
#include <stdint.h>
#include "absl/strings/string_view.h"
#include "src/core/lib/gprpp/unique_type_name.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/json/json.h"
@ -37,15 +35,15 @@ class RequestHashAttribute
public:
static UniqueTypeName TypeName();
explicit RequestHashAttribute(absl::string_view request_hash)
explicit RequestHashAttribute(uint64_t request_hash)
: request_hash_(request_hash) {}
absl::string_view request_hash() const { return request_hash_; }
uint64_t request_hash() const { return request_hash_; }
private:
UniqueTypeName type() const override { return TypeName(); }
absl::string_view request_hash_;
uint64_t request_hash_;
};
// Helper Parsing method to parse ring hash policy configs; for example, ring

@ -812,8 +812,8 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
// Rotating the old value prevents duplicate hash rules from cancelling
// each other out and preserves all of the entropy
const uint64_t old_value =
hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
hash = old_value ^ new_hash.value();
hash.has_value() ? ((*hash << 1) | (*hash >> 63)) : 0;
hash = old_value ^ *new_hash;
}
// If the policy is a terminal policy and a hash has been generated,
// ignore the rest of the hash policies.
@ -833,13 +833,8 @@ absl::Status XdsResolver::XdsConfigSelector::GetCallConfig(
}
args.service_config_call_data->SetCallAttribute(
args.arena->New<XdsClusterAttribute>(cluster->cluster_name()));
std::string hash_string = absl::StrCat(hash.value());
char* hash_value =
static_cast<char*>(args.arena->Alloc(hash_string.size() + 1));
memcpy(hash_value, hash_string.c_str(), hash_string.size());
hash_value[hash_string.size()] = '\0';
args.service_config_call_data->SetCallAttribute(
args.arena->New<RequestHashAttribute>(hash_value));
args.arena->New<RequestHashAttribute>(*hash));
args.service_config_call_data->SetCallAttribute(
args.arena->ManagedNew<XdsRouteStateAttributeImpl>(route_config_data_,
entry));

@ -234,3 +234,21 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "ring_hash_test",
srcs = ["ring_hash_test.cc"],
external_deps = ["gtest"],
language = "C++",
tags = [
"lb_unit_test",
],
uses_event_engine = False,
uses_polling = False,
deps = [
":lb_policy_test_lib",
"//src/core:channel_args",
"//src/core:grpc_lb_policy_ring_hash",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,196 @@
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <stdint.h>
#include <algorithm>
#include <array>
#include <memory>
#include <string>
#include <vector>
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include "gtest/gtest.h"
#define XXH_INLINE_ALL
#include "xxhash.h"
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/resolver/endpoint_addresses.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
// TODO(roth): I created this file when I fixed a bug and wrote only a
// very basic test and the test needed for that bug. When we have time,
// we need a lot more tests here to cover all of the policy's functionality.
class RingHashTest : public LoadBalancingPolicyTest {
protected:
RingHashTest() : LoadBalancingPolicyTest("ring_hash_experimental") {}
static RefCountedPtr<LoadBalancingPolicy::Config> MakeRingHashConfig(
int min_ring_size = 0, int max_ring_size = 0) {
Json::Object fields;
if (min_ring_size > 0) {
fields["minRingSize"] = Json::FromString(absl::StrCat(min_ring_size));
}
if (max_ring_size > 0) {
fields["maxRingSize"] = Json::FromString(absl::StrCat(max_ring_size));
}
return MakeConfig(Json::FromArray({Json::FromObject(
{{"ring_hash_experimental", Json::FromObject(fields)}})}));
}
RequestHashAttribute* MakeHashAttribute(absl::string_view address) {
std::string hash_input =
absl::StrCat(absl::StripPrefix(address, "ipv4:"), "_0");
uint64_t hash = XXH64(hash_input.data(), hash_input.size(), 0);
attribute_storage_.emplace_back(
std::make_unique<RequestHashAttribute>(hash));
return attribute_storage_.back().get();
}
std::vector<std::unique_ptr<RequestHashAttribute>> attribute_storage_;
};
TEST_F(RingHashTest, Basic) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
absl::OkStatus());
auto picker = ExpectState(GRPC_CHANNEL_IDLE);
auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
ExpectPickQueued(picker.get(), {address0_attribute});
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
ExpectPickQueued(picker.get(), {address0_attribute});
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
auto address = ExpectPickComplete(picker.get(), {address0_attribute});
EXPECT_EQ(address, kAddresses[0]);
}
TEST_F(RingHashTest, SameAddressListedMultipleTimes) {
const std::array<absl::string_view, 3> kAddresses = {
"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:441"};
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
absl::OkStatus());
auto picker = ExpectState(GRPC_CHANNEL_IDLE);
auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
ExpectPickQueued(picker.get(), {address0_attribute});
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
auto* subchannel = FindSubchannel(kAddresses[0]);
ASSERT_NE(subchannel, nullptr);
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
ExpectPickQueued(picker.get(), {address0_attribute});
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
auto address = ExpectPickComplete(picker.get(), {address0_attribute});
EXPECT_EQ(address, kAddresses[0]);
}
TEST_F(RingHashTest, MultipleAddressesPerEndpoint) {
constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
"ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
"ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
const std::array<EndpointAddresses, 2> kEndpoints = {
MakeEndpointAddresses(kEndpoint1Addresses),
MakeEndpointAddresses(kEndpoint2Addresses)};
EXPECT_EQ(
ApplyUpdate(BuildUpdate(kEndpoints, MakeRingHashConfig()), lb_policy()),
absl::OkStatus());
auto picker = ExpectState(GRPC_CHANNEL_IDLE);
// Normal connection to first address of the first endpoint.
auto* address0_attribute = MakeHashAttribute(kEndpoint1Addresses[0]);
ExpectPickQueued(picker.get(), {address0_attribute});
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
ASSERT_NE(subchannel, nullptr);
EXPECT_TRUE(subchannel->ConnectionRequested());
auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
ASSERT_NE(subchannel2, nullptr);
EXPECT_FALSE(subchannel2->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
ExpectPickQueued(picker.get(), {address0_attribute});
subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
auto address = ExpectPickComplete(picker.get(), {address0_attribute});
EXPECT_EQ(address, kEndpoint1Addresses[0]);
// Now that connection fails.
subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
ExpectReresolutionRequest();
picker = ExpectState(GRPC_CHANNEL_IDLE);
EXPECT_FALSE(subchannel->ConnectionRequested());
EXPECT_FALSE(subchannel2->ConnectionRequested());
// The LB policy will try to reconnect when it gets another pick.
ExpectPickQueued(picker.get(), {address0_attribute});
WaitForWorkSerializerToFlush();
WaitForWorkSerializerToFlush();
EXPECT_TRUE(subchannel->ConnectionRequested());
subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
ExpectPickQueued(picker.get(), {address0_attribute});
// The connection attempt fails.
subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
absl::UnavailableError("ugh"));
// The PF child policy will try to connect to the second address for the
// endpoint.
EXPECT_TRUE(subchannel2->ConnectionRequested());
subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
picker = ExpectState(GRPC_CHANNEL_CONNECTING);
ExpectPickQueued(picker.get(), {address0_attribute});
subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
picker = ExpectState(GRPC_CHANNEL_READY);
address = ExpectPickComplete(picker.get(), {address0_attribute});
EXPECT_EQ(address, kEndpoint1Addresses[1]);
}
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(&argc, argv);
return RUN_ALL_TESTS();
}

@ -8269,6 +8269,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "ring_hash_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save