Merge pull request #21452 from markdroth/grpclb_balancer_controls_fallback

grpclb: Add support for balancer telling client to enter fallback mode.
pull/21478/head
Mark D. Roth 5 years ago committed by GitHub
commit 4c34781f45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
  3. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  4. 18
      src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.c
  5. 34
      src/core/ext/upb-generated/src/proto/grpc/lb/v1/load_balancer.upb.h
  6. 7
      src/proto/grpc/lb/v1/load_balancer.proto
  7. 43
      test/cpp/end2end/grpclb_end2end_test.cc

@ -1157,6 +1157,25 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
} }
break; break;
} }
case response.FALLBACK: {
if (!grpclb_policy->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Entering fallback mode as requested by balancer",
grpclb_policy);
if (grpclb_policy->fallback_at_startup_checks_pending_) {
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
}
grpclb_policy->fallback_mode_ = true;
grpclb_policy->CreateOrUpdateChildPolicyLocked();
// Reset serverlist, so that if the balancer exits fallback
// mode by sending the same serverlist we were previously
// using, we don't incorrectly ignore it as a duplicate.
grpclb_policy->serverlist_.reset();
}
break;
}
} }
} }
grpc_slice_unref_internal(response_slice); grpc_slice_unref_internal(response_slice);

@ -181,6 +181,12 @@ bool GrpcLbResponseParse(const grpc_slice& encoded_grpc_grpclb_response,
} }
return true; return true;
} }
// Handle fallback.
if (grpc_lb_v1_LoadBalanceResponse_has_fallback_response(response)) {
result->type = result->FALLBACK;
return true;
}
// Unknown response type.
return false; return false;
} }

@ -48,7 +48,7 @@ struct GrpcLbServer {
}; };
struct GrpcLbResponse { struct GrpcLbResponse {
enum { INITIAL, SERVERLIST } type; enum { INITIAL, SERVERLIST, FALLBACK } type;
grpc_millis client_stats_report_interval = 0; grpc_millis client_stats_report_interval = 0;
std::vector<GrpcLbServer> serverlist; std::vector<GrpcLbServer> serverlist;
}; };

@ -71,20 +71,22 @@ const upb_msglayout grpc_lb_v1_ClientStats_msginit = {
UPB_SIZE(40, 48), 6, false, UPB_SIZE(40, 48), 6, false,
}; };
static const upb_msglayout *const grpc_lb_v1_LoadBalanceResponse_submsgs[2] = { static const upb_msglayout *const grpc_lb_v1_LoadBalanceResponse_submsgs[3] = {
&grpc_lb_v1_FallbackResponse_msginit,
&grpc_lb_v1_InitialLoadBalanceResponse_msginit, &grpc_lb_v1_InitialLoadBalanceResponse_msginit,
&grpc_lb_v1_ServerList_msginit, &grpc_lb_v1_ServerList_msginit,
}; };
static const upb_msglayout_field grpc_lb_v1_LoadBalanceResponse__fields[2] = { static const upb_msglayout_field grpc_lb_v1_LoadBalanceResponse__fields[3] = {
{1, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 0, 11, 1}, {1, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 1, 11, 1},
{2, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 1, 11, 1}, {2, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 2, 11, 1},
{3, UPB_SIZE(0, 0), UPB_SIZE(-5, -9), 0, 11, 1},
}; };
const upb_msglayout grpc_lb_v1_LoadBalanceResponse_msginit = { const upb_msglayout grpc_lb_v1_LoadBalanceResponse_msginit = {
&grpc_lb_v1_LoadBalanceResponse_submsgs[0], &grpc_lb_v1_LoadBalanceResponse_submsgs[0],
&grpc_lb_v1_LoadBalanceResponse__fields[0], &grpc_lb_v1_LoadBalanceResponse__fields[0],
UPB_SIZE(8, 16), 2, false, UPB_SIZE(8, 16), 3, false,
}; };
static const upb_msglayout *const grpc_lb_v1_InitialLoadBalanceResponse_submsgs[1] = { static const upb_msglayout *const grpc_lb_v1_InitialLoadBalanceResponse_submsgs[1] = {
@ -129,5 +131,11 @@ const upb_msglayout grpc_lb_v1_Server_msginit = {
UPB_SIZE(24, 48), 4, false, UPB_SIZE(24, 48), 4, false,
}; };
const upb_msglayout grpc_lb_v1_FallbackResponse_msginit = {
NULL,
NULL,
UPB_SIZE(0, 0), 0, false,
};
#include "upb/port_undef.inc" #include "upb/port_undef.inc"

@ -28,6 +28,7 @@ struct grpc_lb_v1_LoadBalanceResponse;
struct grpc_lb_v1_InitialLoadBalanceResponse; struct grpc_lb_v1_InitialLoadBalanceResponse;
struct grpc_lb_v1_ServerList; struct grpc_lb_v1_ServerList;
struct grpc_lb_v1_Server; struct grpc_lb_v1_Server;
struct grpc_lb_v1_FallbackResponse;
typedef struct grpc_lb_v1_LoadBalanceRequest grpc_lb_v1_LoadBalanceRequest; typedef struct grpc_lb_v1_LoadBalanceRequest grpc_lb_v1_LoadBalanceRequest;
typedef struct grpc_lb_v1_InitialLoadBalanceRequest grpc_lb_v1_InitialLoadBalanceRequest; typedef struct grpc_lb_v1_InitialLoadBalanceRequest grpc_lb_v1_InitialLoadBalanceRequest;
typedef struct grpc_lb_v1_ClientStatsPerToken grpc_lb_v1_ClientStatsPerToken; typedef struct grpc_lb_v1_ClientStatsPerToken grpc_lb_v1_ClientStatsPerToken;
@ -36,6 +37,7 @@ typedef struct grpc_lb_v1_LoadBalanceResponse grpc_lb_v1_LoadBalanceResponse;
typedef struct grpc_lb_v1_InitialLoadBalanceResponse grpc_lb_v1_InitialLoadBalanceResponse; typedef struct grpc_lb_v1_InitialLoadBalanceResponse grpc_lb_v1_InitialLoadBalanceResponse;
typedef struct grpc_lb_v1_ServerList grpc_lb_v1_ServerList; typedef struct grpc_lb_v1_ServerList grpc_lb_v1_ServerList;
typedef struct grpc_lb_v1_Server grpc_lb_v1_Server; typedef struct grpc_lb_v1_Server grpc_lb_v1_Server;
typedef struct grpc_lb_v1_FallbackResponse grpc_lb_v1_FallbackResponse;
extern const upb_msglayout grpc_lb_v1_LoadBalanceRequest_msginit; extern const upb_msglayout grpc_lb_v1_LoadBalanceRequest_msginit;
extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceRequest_msginit; extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceRequest_msginit;
extern const upb_msglayout grpc_lb_v1_ClientStatsPerToken_msginit; extern const upb_msglayout grpc_lb_v1_ClientStatsPerToken_msginit;
@ -44,6 +46,7 @@ extern const upb_msglayout grpc_lb_v1_LoadBalanceResponse_msginit;
extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceResponse_msginit; extern const upb_msglayout grpc_lb_v1_InitialLoadBalanceResponse_msginit;
extern const upb_msglayout grpc_lb_v1_ServerList_msginit; extern const upb_msglayout grpc_lb_v1_ServerList_msginit;
extern const upb_msglayout grpc_lb_v1_Server_msginit; extern const upb_msglayout grpc_lb_v1_Server_msginit;
extern const upb_msglayout grpc_lb_v1_FallbackResponse_msginit;
struct google_protobuf_Duration; struct google_protobuf_Duration;
struct google_protobuf_Timestamp; struct google_protobuf_Timestamp;
extern const upb_msglayout google_protobuf_Duration_msginit; extern const upb_msglayout google_protobuf_Duration_msginit;
@ -221,6 +224,7 @@ UPB_INLINE char *grpc_lb_v1_LoadBalanceResponse_serialize(const grpc_lb_v1_LoadB
typedef enum { typedef enum {
grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_initial_response = 1, grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_initial_response = 1,
grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_server_list = 2, grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_server_list = 2,
grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_fallback_response = 3,
grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_NOT_SET = 0 grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_NOT_SET = 0
} grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases; } grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases;
UPB_INLINE grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_case(const grpc_lb_v1_LoadBalanceResponse* msg) { return (grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases)UPB_FIELD_AT(msg, int32_t, UPB_SIZE(4, 8)); } UPB_INLINE grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_case(const grpc_lb_v1_LoadBalanceResponse* msg) { return (grpc_lb_v1_LoadBalanceResponse_load_balance_response_type_oneofcases)UPB_FIELD_AT(msg, int32_t, UPB_SIZE(4, 8)); }
@ -229,6 +233,8 @@ UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_initial_response(const grpc_l
UPB_INLINE const grpc_lb_v1_InitialLoadBalanceResponse* grpc_lb_v1_LoadBalanceResponse_initial_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 1, NULL); } UPB_INLINE const grpc_lb_v1_InitialLoadBalanceResponse* grpc_lb_v1_LoadBalanceResponse_initial_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 1, NULL); }
UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return _upb_has_oneof_field(msg, UPB_SIZE(4, 8), 2); } UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return _upb_has_oneof_field(msg, UPB_SIZE(4, 8), 2); }
UPB_INLINE const grpc_lb_v1_ServerList* grpc_lb_v1_LoadBalanceResponse_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_ServerList*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 2, NULL); } UPB_INLINE const grpc_lb_v1_ServerList* grpc_lb_v1_LoadBalanceResponse_server_list(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_ServerList*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 2, NULL); }
UPB_INLINE bool grpc_lb_v1_LoadBalanceResponse_has_fallback_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return _upb_has_oneof_field(msg, UPB_SIZE(4, 8), 3); }
UPB_INLINE const grpc_lb_v1_FallbackResponse* grpc_lb_v1_LoadBalanceResponse_fallback_response(const grpc_lb_v1_LoadBalanceResponse *msg) { return UPB_READ_ONEOF(msg, const grpc_lb_v1_FallbackResponse*, UPB_SIZE(0, 0), UPB_SIZE(4, 8), 3, NULL); }
UPB_INLINE void grpc_lb_v1_LoadBalanceResponse_set_initial_response(grpc_lb_v1_LoadBalanceResponse *msg, grpc_lb_v1_InitialLoadBalanceResponse* value) { UPB_INLINE void grpc_lb_v1_LoadBalanceResponse_set_initial_response(grpc_lb_v1_LoadBalanceResponse *msg, grpc_lb_v1_InitialLoadBalanceResponse* value) {
UPB_WRITE_ONEOF(msg, grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), value, UPB_SIZE(4, 8), 1); UPB_WRITE_ONEOF(msg, grpc_lb_v1_InitialLoadBalanceResponse*, UPB_SIZE(0, 0), value, UPB_SIZE(4, 8), 1);
@ -254,6 +260,18 @@ UPB_INLINE struct grpc_lb_v1_ServerList* grpc_lb_v1_LoadBalanceResponse_mutable_
} }
return sub; return sub;
} }
UPB_INLINE void grpc_lb_v1_LoadBalanceResponse_set_fallback_response(grpc_lb_v1_LoadBalanceResponse *msg, grpc_lb_v1_FallbackResponse* value) {
UPB_WRITE_ONEOF(msg, grpc_lb_v1_FallbackResponse*, UPB_SIZE(0, 0), value, UPB_SIZE(4, 8), 3);
}
UPB_INLINE struct grpc_lb_v1_FallbackResponse* grpc_lb_v1_LoadBalanceResponse_mutable_fallback_response(grpc_lb_v1_LoadBalanceResponse *msg, upb_arena *arena) {
struct grpc_lb_v1_FallbackResponse* sub = (struct grpc_lb_v1_FallbackResponse*)grpc_lb_v1_LoadBalanceResponse_fallback_response(msg);
if (sub == NULL) {
sub = (struct grpc_lb_v1_FallbackResponse*)upb_msg_new(&grpc_lb_v1_FallbackResponse_msginit, arena);
if (!sub) return NULL;
grpc_lb_v1_LoadBalanceResponse_set_fallback_response(msg, sub);
}
return sub;
}
/* grpc.lb.v1.InitialLoadBalanceResponse */ /* grpc.lb.v1.InitialLoadBalanceResponse */
@ -350,6 +368,22 @@ UPB_INLINE void grpc_lb_v1_Server_set_drop(grpc_lb_v1_Server *msg, bool value) {
UPB_FIELD_AT(msg, bool, UPB_SIZE(4, 4)) = value; UPB_FIELD_AT(msg, bool, UPB_SIZE(4, 4)) = value;
} }
/* grpc.lb.v1.FallbackResponse */
UPB_INLINE grpc_lb_v1_FallbackResponse *grpc_lb_v1_FallbackResponse_new(upb_arena *arena) {
return (grpc_lb_v1_FallbackResponse *)upb_msg_new(&grpc_lb_v1_FallbackResponse_msginit, arena);
}
UPB_INLINE grpc_lb_v1_FallbackResponse *grpc_lb_v1_FallbackResponse_parse(const char *buf, size_t size,
upb_arena *arena) {
grpc_lb_v1_FallbackResponse *ret = grpc_lb_v1_FallbackResponse_new(arena);
return (ret && upb_decode(buf, size, ret, &grpc_lb_v1_FallbackResponse_msginit, arena)) ? ret : NULL;
}
UPB_INLINE char *grpc_lb_v1_FallbackResponse_serialize(const grpc_lb_v1_FallbackResponse *msg, upb_arena *arena, size_t *len) {
return upb_encode(msg, &grpc_lb_v1_FallbackResponse_msginit, arena, len);
}
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif #endif

@ -94,6 +94,11 @@ message LoadBalanceResponse {
// Contains the list of servers selected by the load balancer. The client // Contains the list of servers selected by the load balancer. The client
// should send requests to these servers in the specified order. // should send requests to these servers in the specified order.
ServerList server_list = 2; ServerList server_list = 2;
// If this field is set, then the client should eagerly enter fallback
// mode (even if there are existing, healthy connections to backends).
// See go/grpclb-explicit-fallback for more details.
FallbackResponse fallback_response = 3;
} }
} }
@ -148,3 +153,5 @@ message Server {
reserved 5; reserved 5;
} }
message FallbackResponse {}

@ -1371,7 +1371,7 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerChannelFails) {
TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) { TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor(); const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs); ResetStub(kFallbackTimeoutMs);
// Return an unreachable balancer and one fallback backend. // Return one balancer and one fallback backend.
std::vector<AddressData> addresses; std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""}); addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
addresses.emplace_back(AddressData{backends_[0]->port_, false, ""}); addresses.emplace_back(AddressData{backends_[0]->port_, false, ""});
@ -1384,6 +1384,47 @@ TEST_F(SingleBalancerTest, FallbackEarlyWhenBalancerCallFails) {
/* wait_for_ready */ false); /* wait_for_ready */ false);
} }
TEST_F(SingleBalancerTest, FallbackControlledByBalancer_BeforeFirstServerlist) {
const int kFallbackTimeoutMs = 10000 * grpc_test_slowdown_factor();
ResetStub(kFallbackTimeoutMs);
// Return one balancer and one fallback backend.
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
addresses.emplace_back(AddressData{backends_[0]->port_, false, ""});
SetNextResolution(addresses);
// Balancer explicitly tells client to fallback.
LoadBalanceResponse resp;
resp.mutable_fallback_response();
ScheduleResponseForBalancer(0, resp, 0);
// Send RPC with deadline less than the fallback timeout and make sure it
// succeeds.
CheckRpcSendOk(/* times */ 1, /* timeout_ms */ 1000,
/* wait_for_ready */ false);
}
TEST_F(SingleBalancerTest, FallbackControlledByBalancer_AfterFirstServerlist) {
// Return one balancer and one fallback backend (backend 0).
std::vector<AddressData> addresses;
addresses.emplace_back(AddressData{balancers_[0]->port_, true, ""});
addresses.emplace_back(AddressData{backends_[0]->port_, false, ""});
SetNextResolution(addresses);
// Balancer initially sends serverlist, then tells client to fall back,
// then sends the serverlist again.
// The serverlist points to backend 1.
LoadBalanceResponse serverlist_resp =
BalancerServiceImpl::BuildResponseForBackends({backends_[1]->port_}, {});
LoadBalanceResponse fallback_resp;
fallback_resp.mutable_fallback_response();
ScheduleResponseForBalancer(0, serverlist_resp, 0);
ScheduleResponseForBalancer(0, fallback_resp, 100);
ScheduleResponseForBalancer(0, serverlist_resp, 100);
// Requests initially go to backend 1, then go to backend 0 in
// fallback mode, then go back to backend 1 when we exit fallback.
WaitForBackend(1);
WaitForBackend(0);
WaitForBackend(1);
}
TEST_F(SingleBalancerTest, BackendsRestart) { TEST_F(SingleBalancerTest, BackendsRestart) {
SetNextResolutionAllBalancers(); SetNextResolutionAllBalancers();
const size_t kNumRpcsPerAddress = 100; const size_t kNumRpcsPerAddress = 100;

Loading…
Cancel
Save