From 31773d2c6ad1dffd8bcf8565609f12ff2c254efe Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 13 Dec 2019 09:11:22 -0800 Subject: [PATCH] grpclb: Add support for balancer telling client to enter fallback mode. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 19 ++++++++ .../lb_policy/grpclb/load_balancer_api.cc | 6 +++ .../lb_policy/grpclb/load_balancer_api.h | 2 +- .../src/proto/grpc/lb/v1/load_balancer.upb.c | 18 +++++--- .../src/proto/grpc/lb/v1/load_balancer.upb.h | 34 +++++++++++++++ src/proto/grpc/lb/v1/load_balancer.proto | 7 +++ test/cpp/end2end/grpclb_end2end_test.cc | 43 ++++++++++++++++++- 7 files changed, 122 insertions(+), 7 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5c4ed7516f3..cdcf997e62b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1157,6 +1157,25 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked( } break; } + case response.FALLBACK: { + if (!grpclb_policy->fallback_mode_) { + gpr_log(GPR_INFO, + "[grpclb %p] Entering fallback mode as requested by balancer", + grpclb_policy); + if (grpclb_policy->fallback_at_startup_checks_pending_) { + grpclb_policy->fallback_at_startup_checks_pending_ = false; + grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_); + grpclb_policy->CancelBalancerChannelConnectivityWatchLocked(); + } + grpclb_policy->fallback_mode_ = true; + grpclb_policy->CreateOrUpdateChildPolicyLocked(); + // Reset serverlist, so that if the balancer exits fallback + // mode by sending the same serverlist we were previously + // using, we don't incorrectly ignore it as a duplicate. + grpclb_policy->serverlist_.reset(); + } + break; + } } } grpc_slice_unref_internal(response_slice); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc index f561593ec80..3185b993c41 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc @@ -181,6 +181,12 @@ bool GrpcLbResponseParse(const grpc_slice& encoded_grpc_grpclb_response, } return true; } + // Handle fallback. + if (grpc_lb_v1_LoadBalanceResponse_has_fallback_response(response)) { + result->type = result->FALLBACK; + return true; + } + // Unknown response type. return false; } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 00a7cd303ba..6caa120f587 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -48,7 +48,7 @@ struct GrpcLbServer { }; struct GrpcLbResponse { - enum { INITIAL, SERVERLIST } type; + enum { INITIAL, SERVERLIST, FALLBACK } type; grpc_millis client_stats_report_interval = 0; std::vector serverlist; }; diff --git a/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c b/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c index 75f07614920..90b7cb7f622 100644 --- a/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c +++ b/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c @@ -71,20 +71,22 @@ const upb_msglayout grpc_lb_v1_ClientStats_msginit = { UPB_SIZE(40, 48), 6, false, }; -static const upb_msglayout *const grpc_lb_v1_LoadBalanceResponse_submsgs[2] = { +static const upb_msglayout *const grpc_lb_v1_LoadBalanceResponse_submsgs[3] = { + &grpc_lb_v1_FallbackResponse_msginit, &grpc_lb_v1_InitialLoadBalanceResponse_msginit, &grpc_lb_v1_ServerList_msginit, }; -static const upb_msglayout_field grpc_lb_v1_LoadBalanceResponse__fields[2] = { - {1, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 0, 11, 1}, - {2, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 1, 11, 1}, +static const upb_msglayout_field grpc_lb_v1_LoadBalanceResponse__fields[3] = { + {1, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 1, 11, 1}, + {2, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 2, 11, 1}, + {3, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 0, 11, 1}, }; const upb_msglayout grpc_lb_v1_LoadBalanceResponse_msginit = { &grpc_lb_v1_LoadBalanceResponse_submsgs[0], &grpc_lb_v1_LoadBalanceResponse__fields[0], - UPB_SIZE(8, 16), 2, false, + UPB_SIZE(8, 16), 3, false, }; static const upb_msglayout *const grpc_lb_v1_InitialLoadBalanceResponse_submsgs[1] = { @@ -129,5 +131,11 @@ const upb_msglayout grpc_lb_v1_Server_msginit = { UPB_SIZE(24, 48), 4, false, }; +const upb_msglayout grpc_lb_v1_FallbackResponse_msginit = { + NULL, + NULL, + UPB_SIZE(0, 0), 0, false, +}; + #include "upb/port_undef.inc" diff --git a/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h b/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h index 64845660d17..d6331a5b515 100644 --- a/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h +++ b/src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h @@ -28,6 +28,7 @@ struct grpc_lb_v1_LoadBalanceResponse; struct grpc_lb_v1_InitialLoadBalanceResponse; struct grpc_lb_v1_ServerList; struct grpc_lb_v1_Server; +struct grpc_lb_v1_FallbackResponse; typedef struct grpc_lb_v1_LoadBalanceRequest grpc_lb_v1_LoadBalanceRequest; typedef struct grpc_lb_v1_InitialLoadBalanceRequest grpc_lb_v1_InitialLoadBalanceRequest; typedef struct grpc_lb_v1_ClientStatsPerToken grpc_lb_v1_ClientStatsPerToken; @@ -36,6 +37,7 @@ typedef struct grpc_lb_v1_LoadBalanceResponse grpc_lb_v1_LoadBalanceResponse; typedef struct grpc_lb_v1_InitialLoadBalanceResponse grpc_lb_v1_InitialLoadBalanceResponse; typedef struct grpc_lb_v1_ServerList grpc_lb_v1_ServerList; typedef struct grpc_lb_v1_Server grpc_lb_v1_Server; +typedef struct grpc_lb_v1_FallbackResponse grpc_lb_v1_FallbackResponse; extern const upb_msglayout grpc_lb_v1_LoadBalanceRequest_msginit; extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceRequest_msginit; extern const upb_msglayout grpc_lb_v1_ClientStatsPerToken_msginit; @@ -44,6 +46,7 @@ extern const upb_msglayout grpc_lb_v1_LoadBalanceResponse_msginit; extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceResponse_msginit; extern const upb_msglayout grpc_lb_v1_ServerList_msginit; extern const upb_msglayout grpc_lb_v1_Server_msginit; +extern const upb_msglayout grpc_lb_v1_FallbackResponse_msginit; struct google_protobuf_Duration; struct google_protobuf_Timestamp; extern const upb_msglayout google_protobuf_Duration_msginit; @@ -221,6 +224,7 @@ UPB_INLINE char *grpc_lb_v1_LoadBalanceResponse_serialize(const grpc_lb_v1_LoadB typedef enum { grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_initial_response = 1, grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_server_list = 2, + grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_fallback_response = 3, grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_NOT_SET = 0 } grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases; UPB_INLINE grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_case(const grpc_lb_v1_LoadBalanceResponse* msg) { return (grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases)UPB_FIELD_AT(msg, int32_t, UPB_SIZE(4, 8)); } @@ -229,6 +233,8 @@ UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_initial_response(const grpc_l UPB_INLINE const grpc_lb_v1_InitialLoadBalanceResponse* grpc_lb_v1_LoadBalanceResponse_initial_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 1, NULL); } UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return _upb_has_oneof_field(msg, UPB_SIZE(4, 8), 2); } UPB_INLINE const grpc_lb_v1_ServerList* grpc_lb_v1_LoadBalanceResponse_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_ServerList*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 2, NULL); } +UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_fallback_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return _upb_has_oneof_field(msg, UPB_SIZE(4, 8), 3); } +UPB_INLINE const grpc_lb_v1_FallbackResponse* grpc_lb_v1_LoadBalanceResponse_fallback_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_FallbackResponse*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 3, NULL); } UPB_INLINE void grpc_lb_v1_LoadBalanceResponse_set_initial_response(grpc_lb_v1_LoadBalanceResponse *msg, grpc_lb_v1_InitialLoadBalanceResponse* value) { UPB_WRITE_ONEOF(msg, grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), value, UPB_SIZE(4, 8), 1); @@ -254,6 +260,18 @@ UPB_INLINE struct grpc_lb_v1_ServerList* grpc_lb_v1_LoadBalanceResponse_mutable_ } return sub; } +UPB_INLINE void grpc_lb_v1_LoadBalanceResponse_set_fallback_response(grpc_lb_v1_LoadBalanceResponse *msg, grpc_lb_v1_FallbackResponse* value) { + UPB_WRITE_ONEOF(msg, grpc_lb_v1_FallbackResponse*, UPB_SIZE(0, 0), value, UPB_SIZE(4, 8), 3); +} +UPB_INLINE struct grpc_lb_v1_FallbackResponse* grpc_lb_v1_LoadBalanceResponse_mutable_fallback_response(grpc_lb_v1_LoadBalanceResponse *msg, upb_arena *arena) { + struct grpc_lb_v1_FallbackResponse* sub = (struct grpc_lb_v1_FallbackResponse*)grpc_lb_v1_LoadBalanceResponse_fallback_response(msg); + if (sub == NULL) { + sub = (struct grpc_lb_v1_FallbackResponse*)upb_msg_new(&grpc_lb_v1_FallbackResponse_msginit, arena); + if (!sub) return NULL; + grpc_lb_v1_LoadBalanceResponse_set_fallback_response(msg, sub); + } + return sub; +} /* grpc.lb.v1.InitialLoadBalanceResponse */ @@ -350,6 +368,22 @@ UPB_INLINE void grpc_lb_v1_Server_set_drop(grpc_lb_v1_Server *msg, bool value) { UPB_FIELD_AT(msg, bool, UPB_SIZE(4, 4)) = value; } +/* grpc.lb.v1.FallbackResponse */ + +UPB_INLINE grpc_lb_v1_FallbackResponse *grpc_lb_v1_FallbackResponse_new(upb_arena *arena) { + return (grpc_lb_v1_FallbackResponse *)upb_msg_new(&grpc_lb_v1_FallbackResponse_msginit, arena); +} +UPB_INLINE grpc_lb_v1_FallbackResponse *grpc_lb_v1_FallbackResponse_parse(const char *buf, size_t size, + upb_arena *arena) { + grpc_lb_v1_FallbackResponse *ret = grpc_lb_v1_FallbackResponse_new(arena); + return (ret && upb_decode(buf, size, ret, &grpc_lb_v1_FallbackResponse_msginit, arena)) ? ret : NULL; +} +UPB_INLINE char *grpc_lb_v1_FallbackResponse_serialize(const grpc_lb_v1_FallbackResponse *msg, upb_arena *arena, size_t *len) { + return upb_encode(msg, &grpc_lb_v1_FallbackResponse_msginit, arena, len); +} + + + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto index c9932bcf60b..ccf2efd629e 100644 --- a/src/proto/grpc/lb/v1/load_balancer.proto +++ b/src/proto/grpc/lb/v1/load_balancer.proto @@ -94,6 +94,11 @@ message LoadBalanceResponse { // Contains the list of servers selected by the load balancer. The client // should send requests to these servers in the specified order. ServerList server_list = 2; + + // If this field is set, then the client should eagerly enter fallback + // mode (even if there are existing, healthy connections to backends). + // See go/grpclb-explicit-fallback for more details. + FallbackResponse fallback_response = 3; } } @@ -148,3 +153,5 @@ message Server { reserved 5; } + +message FallbackResponse {} diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 8f0b4cef2ee..6ebcd6f62bd 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -1371,7 +1371,7 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) { TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) { const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); ResetStub(kFallbackTimeoutMs); - // Return an unreachable balancer and one fallback backend. + // Return one balancer and one fallback backend. std::vector addresses; addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""}); addresses.emplace_back(AddressData{backends_[0]->port_, false, ""}); @@ -1384,6 +1384,47 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) { /* wait_for_ready */ false); } +TEST_F(SingleBalancerTest, FallbackControlledByBalancer_BeforeFirstServerlist) { + const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); + ResetStub(kFallbackTimeoutMs); + // Return one balancer and one fallback backend. + std::vector addresses; + addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""}); + addresses.emplace_back(AddressData{backends_[0]->port_, false, ""}); + SetNextResolution(addresses); + // Balancer explicitly tells client to fallback. + LoadBalanceResponse resp; + resp.mutable_fallback_response(); + ScheduleResponseForBalancer(0, resp, 0); + // Send RPC with deadline less than the fallback timeout and make sure it + // succeeds. + CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000, + /* wait_for_ready */ false); +} + +TEST_F(SingleBalancerTest, FallbackControlledByBalancer_AfterFirstServerlist) { + // Return one balancer and one fallback backend (backend 0). + std::vector addresses; + addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""}); + addresses.emplace_back(AddressData{backends_[0]->port_, false, ""}); + SetNextResolution(addresses); + // Balancer initially sends serverlist, then tells client to fall back, + // then sends the serverlist again. + // The serverlist points to backend 1. + LoadBalanceResponse serverlist_resp = + BalancerServiceImpl::BuildResponseForBackends({backends_[1]->port_}, {}); + LoadBalanceResponse fallback_resp; + fallback_resp.mutable_fallback_response(); + ScheduleResponseForBalancer(0, serverlist_resp, 0); + ScheduleResponseForBalancer(0, fallback_resp, 100); + ScheduleResponseForBalancer(0, serverlist_resp, 100); + // Requests initially go to backend 1, then go to backend 0 in + // fallback mode, then go back to backend 1 when we exit fallback. + WaitForBackend(1); + WaitForBackend(0); + WaitForBackend(1); +} + TEST_F(SingleBalancerTest, BackendsRestart) { SetNextResolutionAllBalancers(); const size_t kNumRpcsPerAddress = 100;