grpclb: implement subchannel caching (#27657)

* grpclb: implement subchannel caching

* code review changes

* fix clang tidy

* code review changes
pull/27713/head
Mark D. Roth 3 years ago committed by GitHub
parent 8610a34016
commit 2b813d2bff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 171
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h
  3. 86
      test/cpp/end2end/grpclb_end2end_test.cc

@ -1,20 +1,18 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
/// Implementation of the gRPC LB policy.
///
@ -109,9 +107,7 @@
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_GRPCLB_RECONNECT_JITTER 0.2
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
#define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
#define GRPC_ARG_GRPCLB_ADDRESS_CLIENT_STATS "grpc.grpclb_address_client_stats"
#define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000
namespace grpc_core {
@ -234,16 +230,24 @@ class GrpcLb : public LoadBalancingPolicy {
class SubchannelWrapper : public DelegatingSubchannel {
public:
SubchannelWrapper(RefCountedPtr<SubchannelInterface> subchannel,
std::string lb_token,
RefCountedPtr<GrpcLb> lb_policy, std::string lb_token,
RefCountedPtr<GrpcLbClientStats> client_stats)
: DelegatingSubchannel(std::move(subchannel)),
lb_policy_(std::move(lb_policy)),
lb_token_(std::move(lb_token)),
client_stats_(std::move(client_stats)) {}
~SubchannelWrapper() override {
if (!lb_policy_->shutting_down_) {
lb_policy_->CacheDeletedSubchannelLocked(wrapped_subchannel());
}
}
const std::string& lb_token() const { return lb_token_; }
GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
private:
RefCountedPtr<GrpcLb> lb_policy_;
std::string lb_token_;
RefCountedPtr<GrpcLbClientStats> client_stats_;
};
@ -422,6 +426,13 @@ class GrpcLb : public LoadBalancingPolicy {
const grpc_channel_args* args);
void CreateOrUpdateChildPolicyLocked();
// Subchannel caching.
void CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel);
void StartSubchannelCacheTimerLocked();
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
void OnSubchannelCacheTimerLocked(grpc_error_handle error);
// Who the client is trying to communicate with.
std::string server_name_;
// Configurations for the policy.
@ -448,7 +459,7 @@ class GrpcLb : public LoadBalancingPolicy {
// contains a non-NULL lb_call_.
OrphanablePtr<BalancerCallState> lb_calld_;
// Timeout in milliseconds for the LB call. 0 means no deadline.
int lb_call_timeout_ms_ = 0;
const int lb_call_timeout_ms_ = 0;
// Balancer call retry state.
BackOff lb_call_backoff_;
bool retry_timer_callback_pending_ = false;
@ -466,7 +477,7 @@ class GrpcLb : public LoadBalancingPolicy {
// State for fallback-at-startup checks.
// Timeout after startup after which we will go into fallback mode if
// we have not received a serverlist from the balancer.
int fallback_at_startup_timeout_ = 0;
const int fallback_at_startup_timeout_ = 0;
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
@ -475,6 +486,15 @@ class GrpcLb : public LoadBalancingPolicy {
OrphanablePtr<LoadBalancingPolicy> child_policy_;
// Child policy in state READY.
bool child_policy_ready_ = false;
// Deleted subchannel caching.
const grpc_millis subchannel_cache_interval_ms_;
std::map<grpc_millis /*deletion time*/,
std::vector<RefCountedPtr<SubchannelInterface>>>
cached_subchannels_;
grpc_timer subchannel_cache_timer_;
grpc_closure on_subchannel_cache_timer_;
bool subchannel_cache_timer_pending_ = false;
};
//
@ -675,7 +695,8 @@ RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
return MakeRefCounted<SubchannelWrapper>(
parent_->channel_control_helper()->CreateSubchannel(std::move(address),
args),
std::move(lb_token), std::move(client_stats));
parent_->Ref(DEBUG_LOCATION, "SubchannelWrapper"), std::move(lb_token),
std::move(client_stats));
}
void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
@ -1322,9 +1343,21 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
std::string GetServerNameFromChannelArgs(const grpc_channel_args* args) {
const char* server_uri =
grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(server_uri != nullptr);
absl::StatusOr<URI> uri = URI::Parse(server_uri);
GPR_ASSERT(uri.ok() && !uri->path().empty());
return std::string(absl::StripPrefix(uri->path(), "/"));
}
GrpcLb::GrpcLb(Args args)
: LoadBalancingPolicy(std::move(args)),
server_name_(GetServerNameFromChannelArgs(args.args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS, {0, 0, INT_MAX})),
lb_call_backoff_(
BackOff::Options()
.set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
@ -1332,31 +1365,25 @@ GrpcLb::GrpcLb(Args args)
.set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
.set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1000)) {
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
GPR_ASSERT(server_uri != nullptr);
absl::StatusOr<URI> uri = URI::Parse(server_uri);
GPR_ASSERT(uri.ok() && !uri->path().empty());
server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1000)),
fallback_at_startup_timeout_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS,
{GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})),
subchannel_cache_interval_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS,
{GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS, 0, INT_MAX})) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] Will use '%s' as the server name for LB request.",
this, server_name_.c_str());
}
// Record LB call timeout.
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
// Record fallback-at-startup timeout.
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
nullptr);
}
GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); }
@ -1364,6 +1391,11 @@ GrpcLb::~GrpcLb() { grpc_channel_args_destroy(args_); }
void GrpcLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (subchannel_cache_timer_pending_) {
subchannel_cache_timer_pending_ = false;
grpc_timer_cancel(&subchannel_cache_timer_);
}
cached_subchannels_.clear();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
}
@ -1680,6 +1712,57 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
child_policy_->UpdateLocked(std::move(update_args));
}
//
// subchannel caching
//
void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
grpc_millis deletion_time =
ExecCtx::Get()->Now() + subchannel_cache_interval_ms_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
subchannel_cache_timer_pending_ = true;
StartSubchannelCacheTimerLocked();
}
}
void GrpcLb::StartSubchannelCacheTimerLocked() {
GPR_ASSERT(!cached_subchannels_.empty());
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first,
&on_subchannel_cache_timer_);
}
void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcLb*>(arg);
GRPC_ERROR_REF(error);
self->work_serializer()->Run(
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
DEBUG_LOCATION);
}
void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
if (subchannel_cache_timer_pending_ && error == GRPC_ERROR_NONE) {
auto it = cached_subchannels_.begin();
if (it != cached_subchannels_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] removing %" PRIuPTR " subchannels from cache",
this, it->second.size());
}
cached_subchannels_.erase(it);
}
if (!cached_subchannels_.empty()) {
StartSubchannelCacheTimerLocked();
return;
}
subchannel_cache_timer_pending_ = false;
}
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
GRPC_ERROR_UNREF(error);
}
//
// factory
//

@ -32,6 +32,10 @@
#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER \
"grpc.address_is_backend_from_grpclb_load_balancer"
// For use in tests.
#define GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS \
"grpc.internal.grpclb_subchannel_cache_interval_ms"
namespace grpc_core {
extern const char kGrpcLbClientStatsMetadataKey[];

@ -1,20 +1,18 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <deque>
#include <memory>
@ -43,6 +41,7 @@
#include <grpcpp/server_builder.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
@ -441,7 +440,8 @@ class GrpclbEnd2endTest : public ::testing::Test {
void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
void ResetStub(int fallback_timeout = 0,
const std::string& expected_targets = "") {
const std::string& expected_targets = "",
int subchannel_cache_delay_ms = 0) {
ChannelArguments args;
if (fallback_timeout > 0) args.SetGrpclbFallbackTimeout(fallback_timeout);
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
@ -449,6 +449,10 @@ class GrpclbEnd2endTest : public ::testing::Test {
if (!expected_targets.empty()) {
args.SetString(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS, expected_targets);
}
if (subchannel_cache_delay_ms > 0) {
args.SetInt(GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS,
subchannel_cache_delay_ms);
}
std::ostringstream uri;
uri << "fake:///" << kApplicationTargetName_;
// TODO(dgq): templatize tests to run everything using both secure and
@ -781,6 +785,52 @@ TEST_F(SingleBalancerTest, Vanilla) {
EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
}
TEST_F(SingleBalancerTest, SubchannelCaching) {
ResetStub(/*fallback_timeout=*/0, /*expected_targets=*/"",
/*subchannel_cache_delay_ms=*/1500);
SetNextResolutionAllBalancers();
// Initially send all backends.
ScheduleResponseForBalancer(
0, BuildResponseForBackends(GetBackendPorts(), {}), 0);
// Then remove backends 0 and 1.
ScheduleResponseForBalancer(
0, BuildResponseForBackends(GetBackendPorts(2), {}), 1000);
// Now re-add backend 1.
ScheduleResponseForBalancer(
0, BuildResponseForBackends(GetBackendPorts(1), {}), 1000);
// Wait for all backends to come online.
WaitForAllBackends();
// Send RPCs for long enough to get all responses.
gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(3000);
do {
CheckRpcSendOk();
} while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
// Backend 0 should have received less traffic than the others.
// Backend 1 would have received less traffic than 2 and 3.
gpr_log(GPR_INFO, "BACKEND 0: %" PRIuPTR " requests",
backends_[0]->service_.request_count());
EXPECT_GT(backends_[0]->service_.request_count(), 0);
for (size_t i = 1; i < backends_.size(); ++i) {
gpr_log(GPR_INFO, "BACKEND %" PRIuPTR ": %" PRIuPTR " requests", i,
backends_[i]->service_.request_count());
EXPECT_GT(backends_[i]->service_.request_count(),
backends_[0]->service_.request_count())
<< "backend " << i;
if (i >= 2) {
EXPECT_GT(backends_[i]->service_.request_count(),
backends_[1]->service_.request_count())
<< "backend " << i;
}
}
// Backend 1 should never have lost its connection from the client.
EXPECT_EQ(1UL, backends_[1]->service_.clients().size());
balancers_[0]->service_.NotifyDoneWithServerlists();
// The balancer got a single request.
EXPECT_EQ(1U, balancers_[0]->service_.request_count());
// And sent 3 responses.
EXPECT_EQ(3U, balancers_[0]->service_.response_count());
}
TEST_F(SingleBalancerTest, ReturnServerStatus) {
SetNextResolutionAllBalancers();
ScheduleResponseForBalancer(

Loading…
Cancel
Save