From 2b813d2bffeb00f7dd1749b63f9f4c2a4ad65817 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 13 Oct 2021 12:14:00 -0700 Subject: [PATCH] grpclb: implement subchannel caching (#27657) * grpclb: implement subchannel caching * code review changes * fix clang tidy * code review changes --- .../client_channel/lb_policy/grpclb/grpclb.cc | 171 +++++++++++++----- .../client_channel/lb_policy/grpclb/grpclb.h | 4 + test/cpp/end2end/grpclb_end2end_test.cc | 86 +++++++-- 3 files changed, 199 insertions(+), 62 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 777d3f0a819..e00a2f864f7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// /// Implementation of the gRPC LB policy. /// @@ -109,9 +107,7 @@ #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 - -#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token" -#define GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS "grpc.grpclb_address_client_stats" +#define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000 namespace grpc_core { @@ -234,16 +230,24 @@ class GrpcLb : public LoadBalancingPolicy { class SubchannelWrapper : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel, - std::string lb_token, + RefCountedPtr lb_policy, std::string lb_token, RefCountedPtr client_stats) : DelegatingSubchannel(std::move(subchannel)), + lb_policy_(std::move(lb_policy)), lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} + ~SubchannelWrapper() override { + if (!lb_policy_->shutting_down_) { + lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel()); + } + } + const std::string& lb_token() const { return lb_token_; } GrpcLbClientStats* client_stats() const { return client_stats_.get(); } private: + RefCountedPtr lb_policy_; std::string lb_token_; RefCountedPtr client_stats_; }; @@ -422,6 +426,13 @@ class GrpcLb : public LoadBalancingPolicy { const grpc_channel_args* args); void CreateOrUpdateChildPolicyLocked(); + // Subchannel caching. + void CacheDeletedSubchannelLocked( + RefCountedPtr subchannel); + void StartSubchannelCacheTimerLocked(); + static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error); + void OnSubchannelCacheTimerLocked(grpc_error_handle error); + // Who the client is trying to communicate with. std::string server_name_; // Configurations for the policy. @@ -448,7 +459,7 @@ class GrpcLb : public LoadBalancingPolicy { // contains a non-NULL lb_call_. OrphanablePtr lb_calld_; // Timeout in milliseconds for the LB call. 0 means no deadline. - int lb_call_timeout_ms_ = 0; + const int lb_call_timeout_ms_ = 0; // Balancer call retry state. BackOff lb_call_backoff_; bool retry_timer_callback_pending_ = false; @@ -466,7 +477,7 @@ class GrpcLb : public LoadBalancingPolicy { // State for fallback-at-startup checks. // Timeout after startup after which we will go into fallback mode if // we have not received a serverlist from the balancer. - int fallback_at_startup_timeout_ = 0; + const int fallback_at_startup_timeout_ = 0; bool fallback_at_startup_checks_pending_ = false; grpc_timer lb_fallback_timer_; grpc_closure lb_on_fallback_; @@ -475,6 +486,15 @@ class GrpcLb : public LoadBalancingPolicy { OrphanablePtr child_policy_; // Child policy in state READY. bool child_policy_ready_ = false; + + // Deleted subchannel caching. + const grpc_millis subchannel_cache_interval_ms_; + std::map>> + cached_subchannels_; + grpc_timer subchannel_cache_timer_; + grpc_closure on_subchannel_cache_timer_; + bool subchannel_cache_timer_pending_ = false; }; // @@ -675,7 +695,8 @@ RefCountedPtr GrpcLb::Helper::CreateSubchannel( return MakeRefCounted( parent_->channel_control_helper()->CreateSubchannel(std::move(address), args), - std::move(lb_token), std::move(client_stats)); + parent_->Ref(DEBUG_LOCATION, "SubchannelWrapper"), std::move(lb_token), + std::move(client_stats)); } void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, @@ -1322,9 +1343,21 @@ grpc_channel_args* BuildBalancerChannelArgs( // ctor and dtor // +std::string GetServerNameFromChannelArgs(const grpc_channel_args* args) { + const char* server_uri = + grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(server_uri != nullptr); + absl::StatusOr uri = URI::Parse(server_uri); + GPR_ASSERT(uri.ok() && !uri->path().empty()); + return std::string(absl::StripPrefix(uri->path(), "/")); +} + GrpcLb::GrpcLb(Args args) : LoadBalancingPolicy(std::move(args)), + server_name_(GetServerNameFromChannelArgs(args.args)), response_generator_(MakeRefCounted()), + lb_call_timeout_ms_(grpc_channel_args_find_integer( + args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX})), lb_call_backoff_( BackOff::Options() .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * @@ -1332,31 +1365,25 @@ GrpcLb::GrpcLb(Args args) .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * - 1000)) { - // Closure Initialization - GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this, - grpc_schedule_on_exec_ctx); - // Record server name. - const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); - const char* server_uri = grpc_channel_arg_get_string(arg); - GPR_ASSERT(server_uri != nullptr); - absl::StatusOr uri = URI::Parse(server_uri); - GPR_ASSERT(uri.ok() && !uri->path().empty()); - server_name_ = std::string(absl::StripPrefix(uri->path(), "/")); + 1000)), + fallback_at_startup_timeout_(grpc_channel_args_find_integer( + args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS, + {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})), + subchannel_cache_interval_ms_(grpc_channel_args_find_integer( + args.args, GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS, + {GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS, 0, INT_MAX})) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "[grpclb %p] Will use '%s' as the server name for LB request.", this, server_name_.c_str()); } - // Record LB call timeout. - arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); - // Record fallback-at-startup timeout. - arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - fallback_at_startup_timeout_ = grpc_channel_arg_get_integer( - arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); + // Closure Initialization + GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this, + nullptr); } GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); } @@ -1364,6 +1391,11 @@ GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); } void GrpcLb::ShutdownLocked() { shutting_down_ = true; lb_calld_.reset(); + if (subchannel_cache_timer_pending_) { + subchannel_cache_timer_pending_ = false; + grpc_timer_cancel(&subchannel_cache_timer_); + } + cached_subchannels_.clear(); if (retry_timer_callback_pending_) { grpc_timer_cancel(&lb_call_retry_timer_); } @@ -1680,6 +1712,57 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() { child_policy_->UpdateLocked(std::move(update_args)); } +// +// subchannel caching +// + +void GrpcLb::CacheDeletedSubchannelLocked( + RefCountedPtr subchannel) { + grpc_millis deletion_time = + ExecCtx::Get()->Now() + subchannel_cache_interval_ms_; + cached_subchannels_[deletion_time].push_back(std::move(subchannel)); + if (!subchannel_cache_timer_pending_) { + Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release(); + subchannel_cache_timer_pending_ = true; + StartSubchannelCacheTimerLocked(); + } +} + +void GrpcLb::StartSubchannelCacheTimerLocked() { + GPR_ASSERT(!cached_subchannels_.empty()); + grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first, + &on_subchannel_cache_timer_); +} + +void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + GRPC_ERROR_REF(error); + self->work_serializer()->Run( + [self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); }, + DEBUG_LOCATION); +} + +void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) { + if (subchannel_cache_timer_pending_ && error == GRPC_ERROR_NONE) { + auto it = cached_subchannels_.begin(); + if (it != cached_subchannels_.end()) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, + "[grpclb %p] removing %" PRIuPTR " subchannels from cache", + this, it->second.size()); + } + cached_subchannels_.erase(it); + } + if (!cached_subchannels_.empty()) { + StartSubchannelCacheTimerLocked(); + return; + } + subchannel_cache_timer_pending_ = false; + } + Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer"); + GRPC_ERROR_UNREF(error); +} + // // factory // diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h index a032b5dbf1d..27e7a3a333f 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h @@ -32,6 +32,10 @@ #define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER \ "grpc.address_is_backend_from_grpclb_load_balancer" +// For use in tests. +#define GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS \ + "grpc.internal.grpclb_subchannel_cache_interval_ms" + namespace grpc_core { extern const char kGrpcLbClientStatsMetadataKey[]; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 072d83f6c4e..a11c684a829 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -1,20 +1,18 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// #include #include @@ -43,6 +41,7 @@ #include #include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/server_address.h" @@ -441,7 +440,8 @@ class GrpclbEnd2endTest : public ::testing::Test { void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } void ResetStub(int fallback_timeout = 0, - const std::string& expected_targets = "") { + const std::string& expected_targets = "", + int subchannel_cache_delay_ms = 0) { ChannelArguments args; if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout); args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, @@ -449,6 +449,10 @@ class GrpclbEnd2endTest : public ::testing::Test { if (!expected_targets.empty()) { args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets); } + if (subchannel_cache_delay_ms > 0) { + args.SetInt(GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS, + subchannel_cache_delay_ms); + } std::ostringstream uri; uri << "fake:///" << kApplicationTargetName_; // TODO(dgq): templatize tests to run everything using both secure and @@ -781,6 +785,52 @@ TEST_F(SingleBalancerTest, Vanilla) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, SubchannelCaching) { + ResetStub(/*fallback_timeout=*/0, /*expected_targets=*/"", + /*subchannel_cache_delay_ms=*/1500); + SetNextResolutionAllBalancers(); + // Initially send all backends. + ScheduleResponseForBalancer( + 0, BuildResponseForBackends(GetBackendPorts(), {}), 0); + // Then remove backends 0 and 1. + ScheduleResponseForBalancer( + 0, BuildResponseForBackends(GetBackendPorts(2), {}), 1000); + // Now re-add backend 1. + ScheduleResponseForBalancer( + 0, BuildResponseForBackends(GetBackendPorts(1), {}), 1000); + // Wait for all backends to come online. + WaitForAllBackends(); + // Send RPCs for long enough to get all responses. + gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(3000); + do { + CheckRpcSendOk(); + } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); + // Backend 0 should have received less traffic than the others. + // Backend 1 would have received less traffic than 2 and 3. + gpr_log(GPR_INFO, "BACKEND 0: %" PRIuPTR " requests", + backends_[0]->service_.request_count()); + EXPECT_GT(backends_[0]->service_.request_count(), 0); + for (size_t i = 1; i < backends_.size(); ++i) { + gpr_log(GPR_INFO, "BACKEND %" PRIuPTR ": %" PRIuPTR " requests", i, + backends_[i]->service_.request_count()); + EXPECT_GT(backends_[i]->service_.request_count(), + backends_[0]->service_.request_count()) + << "backend " << i; + if (i >= 2) { + EXPECT_GT(backends_[i]->service_.request_count(), + backends_[1]->service_.request_count()) + << "backend " << i; + } + } + // Backend 1 should never have lost its connection from the client. + EXPECT_EQ(1UL, backends_[1]->service_.clients().size()); + balancers_[0]->service_.NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancers_[0]->service_.request_count()); + // And sent 3 responses. + EXPECT_EQ(3U, balancers_[0]->service_.response_count()); +} + TEST_F(SingleBalancerTest, ReturnServerStatus) { SetNextResolutionAllBalancers(); ScheduleResponseForBalancer(