Merge pull request #11923 from markdroth/grpclb_drop_protocol_changes

Use new protocol for reporting dropped calls to grpclb balancer.
pull/11958/head
Mark D. Roth 8 years ago committed by GitHub
commit 1d27c66d8e
  1. 1
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
  2. 26
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  3. 79
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
  4. 27
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
  5. 44
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
  6. 2
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  7. 22
      src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
  8. 48
      src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
  9. 42
      src/proto/grpc/lb/v1/load_balancer.proto
  10. 92
      test/cpp/end2end/grpclb_end2end_test.cc
  11. 16
      test/cpp/grpclb/grpclb_api_test.cc

@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);

@ -416,9 +416,7 @@ struct rr_connectivity_data {
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
return false;
}
if (server->drop) return false;
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = {
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked(
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
grpc_grpclb_client_stats_add_call_finished(
server->drop_for_rate_limiting, server->drop_for_load_balancing,
false /* failed_to_send */, false /* known_received */,
wc_arg->client_stats);
grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@ -1309,15 +1304,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
grpc_grpclb_dropped_call_counts *drop_entries =
request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
0 &&
request->client_stats
.num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
request->client_stats.num_calls_finished_known_received == 0;
request->client_stats.num_calls_finished_known_received == 0 &&
(drop_entries == NULL || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@ -1332,7 +1326,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
grpc_grpclb_load_report_request_create(glb_policy->client_stats);
grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {

@ -18,8 +18,11 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@ -29,10 +32,11 @@
struct grpc_grpclb_client_stats {
gpr_refcount refs;
// This field must only be accessed via *_locked() methods.
grpc_grpclb_dropped_call_counts* drop_token_counts;
// These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
gpr_atm num_calls_finished_with_drop_for_rate_limiting;
gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started(
}
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_drop_for_rate_limiting,
bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
if (finished_with_drop_for_rate_limiting) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_drop_for_rate_limiting,
(gpr_atm)1);
}
if (finished_with_drop_for_load_balancing) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_drop_for_load_balancing,
(gpr_atm)1);
}
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished(
}
}
void grpc_grpclb_client_stats_add_call_dropped_locked(
char* token, grpc_grpclb_client_stats* client_stats) {
// Increment num_calls_started and num_calls_finished.
gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
// Record the drop.
if (client_stats->drop_token_counts == NULL) {
client_stats->drop_token_counts =
gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
}
grpc_grpclb_dropped_call_counts* drop_token_counts =
client_stats->drop_token_counts;
for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
++drop_token_counts->token_counts[i].count;
return;
}
}
// Not found, so add a new entry. We double the size of the array each time.
size_t new_num_entries = 2;
while (new_num_entries < drop_token_counts->num_entries + 1) {
new_num_entries *= 2;
}
drop_token_counts->token_counts =
gpr_realloc(drop_token_counts->token_counts,
new_num_entries * sizeof(grpc_grpclb_drop_token_count));
grpc_grpclb_drop_token_count* new_entry =
&drop_token_counts->token_counts[drop_token_counts->num_entries++];
new_entry->token = gpr_strdup(token);
new_entry->count = 1;
}
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
void grpc_grpclb_client_stats_get(
void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
int64_t* num_calls_finished_with_drop_for_rate_limiting,
int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received) {
int64_t* num_calls_finished_known_received,
grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
num_calls_finished_with_drop_for_rate_limiting,
&client_stats->num_calls_finished_with_drop_for_rate_limiting);
atomic_get_and_reset_counter(
num_calls_finished_with_drop_for_load_balancing,
&client_stats->num_calls_finished_with_drop_for_load_balancing);
atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
*drop_token_counts = client_stats->drop_token_counts;
client_stats->drop_token_counts = NULL;
}
void grpc_grpclb_dropped_call_counts_destroy(
grpc_grpclb_dropped_call_counts* drop_entries) {
if (drop_entries != NULL) {
for (size_t i = 0; i < drop_entries->num_entries; ++i) {
gpr_free(drop_entries->token_counts[i].token);
}
gpr_free(drop_entries->token_counts);
gpr_free(drop_entries);
}
}

@ -25,6 +25,16 @@
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
typedef struct {
char* token;
int64_t count;
} grpc_grpclb_drop_token_count;
typedef struct {
grpc_grpclb_drop_token_count* token_counts;
size_t num_entries;
} grpc_grpclb_dropped_call_counts;
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
bool finished_with_drop_for_rate_limiting,
bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_get(
// This method is not thread-safe; caller must synchronize.
void grpc_grpclb_client_stats_add_call_dropped_locked(
char* token, grpc_grpclb_client_stats* client_stats);
// This method is not thread-safe; caller must synchronize.
void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
int64_t* num_calls_finished_with_drop_for_rate_limiting,
int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
int64_t* num_calls_finished_known_received);
int64_t* num_calls_finished_known_received,
grpc_grpclb_dropped_call_counts** drop_token_counts);
void grpc_grpclb_dropped_call_counts_destroy(
grpc_grpclb_dropped_call_counts* drop_entries);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/

@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp,
timestamp_pb->nanos = timestamp.tv_nsec;
}
grpc_grpclb_request *grpc_grpclb_load_report_request_create(
static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
void *const *arg) {
char *str = *arg;
if (!pb_encode_tag_for_field(stream, field)) return false;
return pb_encode_string(stream, (uint8_t *)str, strlen(str));
}
static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
void *const *arg) {
grpc_grpclb_dropped_call_counts *drop_entries = *arg;
if (drop_entries == NULL) return true;
for (size_t i = 0; i < drop_entries->num_entries; ++i) {
if (!pb_encode_tag_for_field(stream, field)) return false;
grpc_lb_v1_ClientStatsPerToken drop_message;
drop_message.load_balance_token.funcs.encode = encode_string;
drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
drop_message.has_num_calls = true;
drop_message.num_calls = drop_entries->token_counts[i].count;
if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
&drop_message)) {
return false;
}
}
return true;
}
grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create(
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
grpc_grpclb_client_stats_get(
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
&req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
&req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
&req->client_stats.num_calls_finished_known_received);
&req->client_stats.num_calls_finished_known_received,
(grpc_grpclb_dropped_call_counts **)&req->client_stats
.calls_finished_with_drop.arg);
return req;
}
@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
}
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
if (request->has_client_stats) {
grpc_grpclb_dropped_call_counts *drop_entries =
request->client_stats.calls_finished_with_drop.arg;
grpc_grpclb_dropped_call_counts_destroy(drop_entries);
}
gpr_free(request);
}

@ -44,7 +44,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
grpc_grpclb_request *grpc_grpclb_load_report_request_create(
grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */

@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
PB_LAST_FIELD
};
const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
PB_LAST_FIELD
};
@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
const pb_field_t grpc_lb_v1_Server_fields[6] = {
const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
PB_LAST_FIELD
};
@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif

@ -14,6 +14,13 @@ extern "C" {
#endif
/* Struct definitions */
typedef struct _grpc_lb_v1_ClientStatsPerToken {
pb_callback_t load_balance_token;
bool has_num_calls;
int64_t num_calls;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
} grpc_lb_v1_ClientStatsPerToken;
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
bool has_drop_for_rate_limiting;
bool drop_for_rate_limiting;
bool has_drop_for_load_balancing;
bool drop_for_load_balancing;
bool has_drop;
bool drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t num_calls_started;
bool has_num_calls_finished;
int64_t num_calls_finished;
bool has_num_calls_finished_with_drop_for_rate_limiting;
int64_t num_calls_finished_with_drop_for_rate_limiting;
bool has_num_calls_finished_with_drop_for_load_balancing;
int64_t num_calls_finished_with_drop_for_load_balancing;
bool has_num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_with_client_failed_to_send;
bool has_num_calls_finished_known_received;
int64_t num_calls_finished_known_received;
pb_callback_t calls_finished_with_drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
#define grpc_lb_v1_Server_drop_tag 4
#define grpc_lb_v1_Timestamp_seconds_tag 1
#define grpc_lb_v1_Timestamp_nanos_tag 2
#define grpc_lb_v1_ClientStats_timestamp_tag 1
#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3];
extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
extern const pb_field_t grpc_lb_v1_Server_fields[6];
extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
#define grpc_lb_v1_Timestamp_size 22
#define grpc_lb_v1_LoadBalanceRequest_size 226
#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size)
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
#define grpc_lb_v1_ClientStats_size 90
/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
#define grpc_lb_v1_Server_size 85
#define grpc_lb_v1_Server_size 83
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID

@ -67,6 +67,15 @@ message InitialLoadBalanceRequest {
string name = 1;
}
// Contains the number of calls finished for a particular load balance token.
message ClientStatsPerToken {
// See Server.load_balance_token.
string load_balance_token = 1;
// The total number of RPCs that finished associated with the token.
int64 num_calls = 2;
}
// Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats.
message ClientStats {
@ -79,20 +88,17 @@ message ClientStats {
// The total number of RPCs that finished.
int64 num_calls_finished = 3;
// The total number of RPCs that were dropped by the client because of rate
// limiting.
int64 num_calls_finished_with_drop_for_rate_limiting = 4;
// The total number of RPCs that were dropped by the client because of load
// balancing.
int64 num_calls_finished_with_drop_for_load_balancing = 5;
// The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received
// by a server.
int64 num_calls_finished_known_received = 7;
// The list of dropped calls.
repeated ClientStatsPerToken calls_finished_with_drop = 8;
reserved 4, 5;
}
message LoadBalanceResponse {
@ -134,10 +140,8 @@ message ServerList {
Duration expiration_interval = 3;
}
// Contains server information. When none of the [drop_for_*] fields are true,
// use the other fields. When drop_for_rate_limiting is true, ignore all other
// fields. Use drop_for_load_balancing only when it is true and
// drop_for_rate_limiting is false.
// Contains server information. When the drop field is not true, use the other
// fields.
message Server {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
@ -149,16 +153,16 @@ message Server {
// An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to
// allow the backend to report load to the gRPC LB system.
// allow the backend to report load to the gRPC LB system. The token is also
// used in client stats for reporting dropped calls.
//
// Its length is variable but less than 50 bytes.
string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client
// for rate limiting.
bool drop_for_rate_limiting = 4;
// Indicates whether this particular request should be dropped by the client.
// If the request is dropped, there will be a corresponding entry in
// ClientStats.calls_finished_with_drop.
bool drop = 4;
// Indicates whether this particular request should be dropped by the client
// for load balancing.
bool drop_for_load_balancing = 5;
reserved 5;
}

@ -45,6 +45,7 @@ extern "C" {
#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
// TODO(dgq): Other scenarios in need of testing:
@ -142,22 +143,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
struct ClientStats {
size_t num_calls_started = 0;
size_t num_calls_finished = 0;
size_t num_calls_finished_with_drop_for_rate_limiting = 0;
size_t num_calls_finished_with_drop_for_load_balancing = 0;
size_t num_calls_finished_with_client_failed_to_send = 0;
size_t num_calls_finished_known_received = 0;
std::map<grpc::string, size_t> drop_token_counts;
ClientStats& operator+=(const ClientStats& other) {
num_calls_started += other.num_calls_started;
num_calls_finished += other.num_calls_finished;
num_calls_finished_with_drop_for_rate_limiting +=
other.num_calls_finished_with_drop_for_rate_limiting;
num_calls_finished_with_drop_for_load_balancing +=
other.num_calls_finished_with_drop_for_load_balancing;
num_calls_finished_with_client_failed_to_send +=
other.num_calls_finished_with_client_failed_to_send;
num_calls_finished_known_received +=
other.num_calls_finished_known_received;
for (const auto& p : other.drop_token_counts) {
drop_token_counts[p.first] += p.second;
}
return *this;
}
};
@ -218,17 +217,17 @@ class BalancerServiceImpl : public BalancerService {
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
request.client_stats().num_calls_finished();
client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
request.client_stats()
.num_calls_finished_with_drop_for_rate_limiting();
client_stats_.num_calls_finished_with_drop_for_load_balancing +=
request.client_stats()
.num_calls_finished_with_drop_for_load_balancing();
client_stats_.num_calls_finished_with_client_failed_to_send +=
request.client_stats()
.num_calls_finished_with_client_failed_to_send();
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
for (const auto& drop_token_count :
request.client_stats().calls_finished_with_drop()) {
client_stats_
.drop_token_counts[drop_token_count.load_balance_token()] +=
drop_token_count.num_calls();
}
load_report_cond_.notify_one();
}
done:
@ -252,16 +251,15 @@ class BalancerServiceImpl : public BalancerService {
}
static LoadBalanceResponse BuildResponseForBackends(
const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
int num_drops_for_load_balancing) {
const std::vector<int>& backend_ports,
const std::map<grpc::string, size_t>& drop_token_counts) {
LoadBalanceResponse response;
for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
auto* server = response.mutable_server_list()->add_servers();
server->set_drop_for_rate_limiting(true);
}
for (int i = 0; i < num_drops_for_load_balancing; ++i) {
auto* server = response.mutable_server_list()->add_servers();
server->set_drop_for_load_balancing(true);
for (const auto& drop_token_count : drop_token_counts) {
for (size_t i = 0; i < drop_token_count.second; ++i) {
auto* server = response.mutable_server_list()->add_servers();
server->set_drop(true);
server->set_load_balance_token(drop_token_count.first);
}
}
for (const int& backend_port : backend_ports) {
auto* server = response.mutable_server_list()->add_servers();
@ -499,7 +497,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
@ -538,7 +536,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
const auto t0 = system_clock::now();
@ -580,11 +578,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
// Send a serverlist right away.
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// ... and the same one a bit later.
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
// Send num_backends/2 requests.
@ -648,10 +646,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
0);
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -726,10 +723,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
const std::vector<int> second_backend{GetBackendPorts()[0]};
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
0);
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -809,10 +805,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
0);
1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@ -901,7 +896,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@ -936,7 +932,9 @@ TEST_F(SingleBalancerTest, Drop) {
TEST_F(SingleBalancerTest, DropAllFirst) {
// All registered addresses are marked as "drop".
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0);
0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
0);
const auto& statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
@ -947,10 +945,12 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
TEST_F(SingleBalancerTest, DropAll) {
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000);
0, BalancerServiceImpl::BuildResponseForBackends(
{}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
1000);
// First call succeeds.
auto statuses_and_responses = SendRpc(kMessage_, 1);
@ -980,7 +980,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Send 100 RPCs per server.
const auto& statuses_and_responses =
@ -1009,17 +1009,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
const size_t kNumRpcsPerAddress = 3;
ScheduleResponseForBalancer(
0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
0, BalancerServiceImpl::BuildResponseForBackends(
GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@ -1056,13 +1056,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
client_stats.num_calls_finished);
EXPECT_EQ(kNumRpcsPerAddress * 2,
client_stats.num_calls_finished_with_drop_for_rate_limiting);
EXPECT_EQ(kNumRpcsPerAddress,
client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
EXPECT_THAT(client_stats.drop_token_counts,
::testing::ElementsAre(
::testing::Pair("load_balancing", kNumRpcsPerAddress),
::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2)));
}
} // namespace

@ -91,13 +91,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
auto* server = serverlist->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
server->set_drop_for_rate_limiting(true);
server->set_drop_for_load_balancing(false);
server->set_load_balance_token("rate_limting");
server->set_drop(true);
server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
server->set_drop_for_rate_limiting(false);
server->set_drop_for_load_balancing(true);
server->set_load_balance_token("load_balancing");
server->set_drop(true);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
@ -112,14 +112,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting");
EXPECT_TRUE(c_serverlist->servers[0]->drop);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing");
EXPECT_TRUE(c_serverlist->servers[1]->drop);
EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);

Loading…
Cancel
Save