Merge pull request #21413 from markdroth/grpclb_proto_api_cleanup

Clean up APIs for handling grpclb protos
pull/21452/head
Mark D. Roth 5 years ago committed by GitHub
commit 95fe553848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 328
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  2. 203
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc
  3. 85
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
  4. 70
      test/cpp/grpclb/grpclb_api_test.cc

@ -172,8 +172,6 @@ class GrpcLb : public LoadBalancingPolicy {
void ScheduleNextClientLoadReportLocked();
void SendClientLoadReportLocked();
static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
static void MaybeSendClientLoadReport(void* arg, grpc_error* error);
static void ClientLoadReportDone(void* arg, grpc_error* error);
static void OnInitialRequestSent(void* arg, grpc_error* error);
@ -227,14 +225,12 @@ class GrpcLb : public LoadBalancingPolicy {
class Serverlist : public RefCounted<Serverlist> {
public:
// Takes ownership of serverlist.
explicit Serverlist(grpc_grpclb_serverlist* serverlist)
: serverlist_(serverlist) {}
~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
explicit Serverlist(std::vector<GrpcLbServer> serverlist)
: serverlist_(std::move(serverlist)) {}
bool operator==(const Serverlist& other) const;
const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
const std::vector<GrpcLbServer>& serverlist() const { return serverlist_; }
// Returns a text representation suitable for logging.
grpc_core::UniquePtr<char> AsText() const;
@ -257,7 +253,7 @@ class GrpcLb : public LoadBalancingPolicy {
const char* ShouldDrop();
private:
grpc_grpclb_serverlist* serverlist_;
std::vector<GrpcLbServer> serverlist_;
// Guarded by the channel's data plane combiner, NOT the control
// plane combiner. It should not be accessed by anything but the
@ -404,28 +400,26 @@ class GrpcLb : public LoadBalancingPolicy {
//
bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
return serverlist_ == other.serverlist_;
}
void ParseServer(const grpc_grpclb_server* server,
grpc_resolved_address* addr) {
void ParseServer(const GrpcLbServer& server, grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
if (server.drop) return;
const uint16_t netorder_port = grpc_htons((uint16_t)server.port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_server_ip_address& ip = server->ip_address;
if (ip.size == 4) {
if (server.ip_size == 4) {
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
addr4->sin_family = GRPC_AF_INET;
memcpy(&addr4->sin_addr, ip.data, ip.size);
memcpy(&addr4->sin_addr, server.ip_addr, server.ip_size);
addr4->sin_port = netorder_port;
} else if (ip.size == 16) {
} else if (server.ip_size == 16) {
addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
addr6->sin6_family = GRPC_AF_INET6;
memcpy(&addr6->sin6_addr, ip.data, ip.size);
memcpy(&addr6->sin6_addr, server.ip_addr, server.ip_size);
addr6->sin6_port = netorder_port;
}
}
@ -433,10 +427,10 @@ void ParseServer(const grpc_grpclb_server* server,
grpc_core::UniquePtr<char> GrpcLb::Serverlist::AsText() const {
gpr_strvec entries;
gpr_strvec_init(&entries);
for (size_t i = 0; i < serverlist_->num_servers; ++i) {
const auto* server = serverlist_->servers[i];
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
char* ipport;
if (server->drop) {
if (server.drop) {
ipport = gpr_strdup("(drop)");
} else {
grpc_resolved_address addr;
@ -445,7 +439,7 @@ grpc_core::UniquePtr<char> GrpcLb::Serverlist::AsText() const {
}
char* entry;
gpr_asprintf(&entry, " %" PRIuPTR ": %s token=%s\n", i, ipport,
server->load_balance_token);
server.load_balance_token);
gpr_free(ipport);
gpr_strvec_add(&entries, entry);
}
@ -492,23 +486,22 @@ const grpc_arg_pointer_vtable lb_token_arg_vtable = {
const grpc_arg_pointer_vtable client_stats_arg_vtable = {
client_stats_copy, client_stats_destroy, equal_cmp};
bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
if (server->drop) return false;
const grpc_grpclb_server_ip_address& ip = server->ip_address;
if (GPR_UNLIKELY(server->port >> 16 != 0)) {
bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
if (server.drop) return false;
if (GPR_UNLIKELY(server.port >> 16 != 0)) {
if (log) {
gpr_log(GPR_ERROR,
"Invalid port '%d' at index %lu of serverlist. Ignoring.",
server->port, (unsigned long)idx);
server.port, (unsigned long)idx);
}
return false;
}
if (GPR_UNLIKELY(ip.size != 4 && ip.size != 16)) {
if (GPR_UNLIKELY(server.ip_size != 4 && server.ip_size != 16)) {
if (log) {
gpr_log(GPR_ERROR,
"Expected IP to be 4 or 16 bytes, got %d at index %lu of "
"serverlist. Ignoring",
ip.size, (unsigned long)idx);
server.ip_size, (unsigned long)idx);
}
return false;
}
@ -519,20 +512,20 @@ bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
ServerAddressList addresses;
for (size_t i = 0; i < serverlist_->num_servers; ++i) {
const grpc_grpclb_server* server = serverlist_->servers[i];
if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
// Address processing.
grpc_resolved_address addr;
ParseServer(server, &addr);
// LB token processing.
char lb_token[GPR_ARRAY_SIZE(server->load_balance_token) + 1];
if (server->load_balance_token[0] != 0) {
char lb_token[GPR_ARRAY_SIZE(server.load_balance_token) + 1];
if (server.load_balance_token[0] != 0) {
const size_t lb_token_max_length =
GPR_ARRAY_SIZE(server->load_balance_token);
GPR_ARRAY_SIZE(server.load_balance_token);
const size_t lb_token_length =
strnlen(server->load_balance_token, lb_token_max_length);
memcpy(lb_token, server->load_balance_token, lb_token_length);
strnlen(server.load_balance_token, lb_token_max_length);
memcpy(lb_token, server.load_balance_token, lb_token_length);
lb_token[lb_token_length] = '\0';
} else {
char* uri = grpc_sockaddr_to_uri(&addr);
@ -561,18 +554,18 @@ ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
}
bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
if (serverlist_->num_servers == 0) return false;
for (size_t i = 0; i < serverlist_->num_servers; ++i) {
if (!serverlist_->servers[i]->drop) return false;
if (serverlist_.empty()) return false;
for (const GrpcLbServer& server : serverlist_) {
if (!server.drop) return false;
}
return true;
}
const char* GrpcLb::Serverlist::ShouldDrop() {
if (serverlist_->num_servers == 0) return nullptr;
grpc_grpclb_server* server = serverlist_->servers[drop_index_];
drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
return server->drop ? server->load_balance_token : nullptr;
if (serverlist_.empty()) return nullptr;
GrpcLbServer& server = serverlist_[drop_index_];
drop_index_ = (drop_index_ + 1) % serverlist_.size();
return server.drop ? server.load_balance_token : nullptr;
}
//
@ -782,10 +775,8 @@ GrpcLb::BalancerCallState::BalancerCallState(
nullptr, deadline, nullptr);
// Init the LB call request payload.
upb::Arena arena;
grpc_grpclb_request* request =
grpc_grpclb_request_create(grpclb_policy()->server_name_, arena.ptr());
grpc_slice request_payload_slice =
grpc_grpclb_request_encode(request, arena.ptr());
GrpcLbRequestCreate(grpclb_policy()->server_name_, arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
@ -936,33 +927,24 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
}
}
bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
grpc_grpclb_request* request) {
const grpc_lb_v1_ClientStats* cstats =
grpc_lb_v1_LoadBalanceRequest_client_stats(request);
if (cstats == nullptr) {
return true;
}
size_t drop_count;
grpc_lb_v1_ClientStats_calls_finished_with_drop(cstats, &drop_count);
return grpc_lb_v1_ClientStats_num_calls_started(cstats) == 0 &&
grpc_lb_v1_ClientStats_num_calls_finished(cstats) == 0 &&
grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send(
cstats) == 0 &&
grpc_lb_v1_ClientStats_num_calls_finished_known_received(cstats) ==
0 &&
drop_count == 0;
}
void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
// Construct message payload.
GPR_ASSERT(send_message_payload_ == nullptr);
upb::Arena arena;
grpc_grpclb_request* request =
grpc_grpclb_load_report_request_create(client_stats_.get(), arena.ptr());
// Get snapshot of stats.
int64_t num_calls_started;
int64_t num_calls_finished;
int64_t num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_known_received;
std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
client_stats_->Get(&num_calls_started, &num_calls_finished,
&num_calls_finished_with_client_failed_to_send,
&num_calls_finished_known_received, &drop_token_counts);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (LoadReportCountersAreZero(request)) {
if (num_calls_started == 0 && num_calls_finished == 0 &&
num_calls_finished_with_client_failed_to_send == 0 &&
num_calls_finished_known_received == 0 &&
(drop_token_counts == nullptr || drop_token_counts->size() == 0)) {
if (last_client_load_report_counters_were_zero_) {
ScheduleNextClientLoadReportLocked();
return;
@ -971,8 +953,12 @@ void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
} else {
last_client_load_report_counters_were_zero_ = false;
}
grpc_slice request_payload_slice =
grpc_grpclb_request_encode(request, arena.ptr());
// Populate load report.
upb::Arena arena;
grpc_slice request_payload_slice = GrpcLbLoadReportRequestCreate(
num_calls_started, num_calls_finished,
num_calls_finished_with_client_failed_to_send,
num_calls_finished_known_received, drop_token_counts.get(), arena.ptr());
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
@ -1064,107 +1050,10 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
grpc_byte_buffer_reader_destroy(&bbr);
grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
lb_calld->recv_message_payload_ = nullptr;
const grpc_grpclb_initial_response* initial_response;
grpc_grpclb_serverlist* serverlist;
GrpcLbResponse response;
upb::Arena arena;
if (!lb_calld->seen_initial_response_ &&
(initial_response = grpc_grpclb_initial_response_parse(
response_slice, arena.ptr())) != nullptr) {
// Have NOT seen initial response, look for initial response.
const google_protobuf_Duration* client_stats_report_interval =
grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
initial_response);
if (client_stats_report_interval != nullptr) {
lb_calld->client_stats_report_interval_ =
GPR_MAX(GPR_MS_PER_SEC,
grpc_grpclb_duration_to_millis(client_stats_report_interval));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response "
"message; client load reporting interval = %" PRId64
" milliseconds",
grpclb_policy, lb_calld,
lb_calld->client_stats_report_interval_);
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response message; "
"client load reporting NOT enabled",
grpclb_policy, lb_calld);
}
lb_calld->seen_initial_response_ = true;
} else if ((serverlist = grpc_grpclb_response_parse_serverlist(
response_slice)) != nullptr) {
// Have seen initial response, look for serverlist.
GPR_ASSERT(lb_calld->lb_call_ != nullptr);
auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
grpc_core::UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
" servers received:\n%s",
grpclb_policy, lb_calld, serverlist->num_servers,
serverlist_text.get());
}
lb_calld->seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
// Ref held by callback.
lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
lb_calld->ScheduleNextClientLoadReportLocked();
}
// Check if the serverlist differs from the previous one.
if (grpclb_policy->serverlist_ != nullptr &&
*grpclb_policy->serverlist_ == *serverlist_wrapper) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Incoming server list identical to "
"current, ignoring.",
grpclb_policy, lb_calld);
}
} else { // New serverlist.
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
// know that we can reach at least one of the backends in the new
// serverlist. Unfortunately, we can't do that, since we need to
// send the new addresses to the child policy in order to determine
// if they are reachable, and if we don't exit fallback mode now,
// CreateOrUpdateChildPolicyLocked() will use the fallback
// addresses instead of the addresses from the new serverlist.
// However, if we can't reach any of the servers in the new
// serverlist, then the child policy will never switch away from
// the fallback addresses, but the grpclb policy will still think
// that we're not in fallback mode, which means that we won't send
// updates to the child policy when the fallback addresses are
// updated by the resolver. This is sub-optimal, but the only way
// to fix it is to maintain a completely separate child policy for
// fallback mode, and that's more work than we want to put into
// the grpclb implementation at this point, since we're deprecating
// it in favor of the xds policy. We will implement this the
// right way in the xds policy instead.
if (grpclb_policy->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy);
grpclb_policy->fallback_mode_ = false;
}
if (grpclb_policy->fallback_at_startup_checks_pending_) {
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
// GrpcLb instance is destroyed.
grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
grpclb_policy->CreateOrUpdateChildPolicyLocked();
}
} else {
// No valid initial response or serverlist found.
if (!GrpcLbResponseParse(response_slice, arena.ptr(), &response) ||
(response.type == response.INITIAL && lb_calld->seen_initial_response_)) {
char* response_slice_str =
grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
gpr_log(GPR_ERROR,
@ -1172,6 +1061,103 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
"Ignoring.",
grpclb_policy, lb_calld, response_slice_str);
gpr_free(response_slice_str);
} else {
switch (response.type) {
case response.INITIAL: {
if (response.client_stats_report_interval != 0) {
lb_calld->client_stats_report_interval_ =
GPR_MAX(GPR_MS_PER_SEC, response.client_stats_report_interval);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response "
"message; client load reporting interval = %" PRId64
" milliseconds",
grpclb_policy, lb_calld,
lb_calld->client_stats_report_interval_);
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Received initial LB response "
"message; client load reporting NOT enabled",
grpclb_policy, lb_calld);
}
lb_calld->seen_initial_response_ = true;
break;
}
case response.SERVERLIST: {
GPR_ASSERT(lb_calld->lb_call_ != nullptr);
auto serverlist_wrapper =
MakeRefCounted<Serverlist>(std::move(response.serverlist));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
grpc_core::UniquePtr<char> serverlist_text =
serverlist_wrapper->AsText();
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
" servers received:\n%s",
grpclb_policy, lb_calld,
serverlist_wrapper->serverlist().size(),
serverlist_text.get());
}
lb_calld->seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
if (lb_calld->client_stats_report_interval_ > 0 &&
lb_calld->client_stats_ == nullptr) {
lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
// Ref held by callback.
lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
lb_calld->ScheduleNextClientLoadReportLocked();
}
// Check if the serverlist differs from the previous one.
if (grpclb_policy->serverlist_ != nullptr &&
*grpclb_policy->serverlist_ == *serverlist_wrapper) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
"[grpclb %p] lb_calld=%p: Incoming server list identical "
"to current, ignoring.",
grpclb_policy, lb_calld);
}
} else { // New serverlist.
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
// know that we can reach at least one of the backends in the new
// serverlist. Unfortunately, we can't do that, since we need to
// send the new addresses to the child policy in order to determine
// if they are reachable, and if we don't exit fallback mode now,
// CreateOrUpdateChildPolicyLocked() will use the fallback
// addresses instead of the addresses from the new serverlist.
// However, if we can't reach any of the servers in the new
// serverlist, then the child policy will never switch away from
// the fallback addresses, but the grpclb policy will still think
// that we're not in fallback mode, which means that we won't send
// updates to the child policy when the fallback addresses are
// updated by the resolver. This is sub-optimal, but the only way
// to fix it is to maintain a completely separate child policy for
// fallback mode, and that's more work than we want to put into
// the grpclb implementation at this point, since we're deprecating
// it in favor of the xds policy. We will implement this the
// right way in the xds policy instead.
if (grpclb_policy->fallback_mode_) {
gpr_log(GPR_INFO,
"[grpclb %p] Received response from balancer; exiting "
"fallback mode",
grpclb_policy);
grpclb_policy->fallback_mode_ = false;
}
if (grpclb_policy->fallback_at_startup_checks_pending_) {
grpclb_policy->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
// instance will be destroyed either upon the next update or when the
// GrpcLb instance is destroyed.
grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
grpclb_policy->CreateOrUpdateChildPolicyLocked();
}
break;
}
}
}
grpc_slice_unref_internal(response_slice);
if (!grpclb_policy->shutting_down_) {

@ -28,16 +28,38 @@
namespace grpc_core {
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name,
upb_arena* arena) {
grpc_grpclb_request* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
bool GrpcLbServer::operator==(const GrpcLbServer& other) const {
if (ip_size != other.ip_size) return false;
int r = memcmp(ip_addr, other.ip_addr, ip_size);
if (r != 0) return false;
if (port != other.port) return false;
r = strncmp(load_balance_token, other.load_balance_token,
sizeof(load_balance_token));
if (r != 0) return false;
return drop == other.drop;
}
namespace {
grpc_slice grpc_grpclb_request_encode(
const grpc_lb_v1_LoadBalanceRequest* request, upb_arena* arena) {
size_t buf_length;
char* buf =
grpc_lb_v1_LoadBalanceRequest_serialize(request, arena, &buf_length);
return grpc_slice_from_copied_buffer(buf, buf_length);
}
} // namespace
grpc_slice GrpcLbRequestCreate(const char* lb_service_name, upb_arena* arena) {
grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
grpc_lb_v1_InitialLoadBalanceRequest* initial_request =
grpc_lb_v1_LoadBalanceRequest_mutable_initial_request(req, arena);
size_t name_len =
GPR_MIN(strlen(lb_service_name), GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH);
grpc_lb_v1_InitialLoadBalanceRequest_set_name(
initial_request, upb_strview_make(lb_service_name, name_len));
return req;
return grpc_grpclb_request_encode(req, arena);
}
namespace {
@ -50,23 +72,18 @@ void google_protobuf_Timestamp_assign(google_protobuf_Timestamp* timestamp,
} // namespace
grpc_grpclb_request* grpc_grpclb_load_report_request_create(
GrpcLbClientStats* client_stats, upb_arena* arena) {
grpc_grpclb_request* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
grpc_slice GrpcLbLoadReportRequestCreate(
int64_t num_calls_started, int64_t num_calls_finished,
int64_t num_calls_finished_with_client_failed_to_send,
int64_t num_calls_finished_known_received,
const GrpcLbClientStats::DroppedCallCounts* drop_token_counts,
upb_arena* arena) {
grpc_lb_v1_LoadBalanceRequest* req = grpc_lb_v1_LoadBalanceRequest_new(arena);
grpc_lb_v1_ClientStats* req_stats =
grpc_lb_v1_LoadBalanceRequest_mutable_client_stats(req, arena);
google_protobuf_Timestamp_assign(
grpc_lb_v1_ClientStats_mutable_timestamp(req_stats, arena),
gpr_now(GPR_CLOCK_REALTIME));
int64_t num_calls_started;
int64_t num_calls_finished;
int64_t num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_known_received;
std::unique_ptr<GrpcLbClientStats::DroppedCallCounts> drop_token_counts;
client_stats->Get(&num_calls_started, &num_calls_finished,
&num_calls_finished_with_client_failed_to_send,
&num_calls_finished_known_received, &drop_token_counts);
grpc_lb_v1_ClientStats_set_num_calls_started(req_stats, num_calls_started);
grpc_lb_v1_ClientStats_set_num_calls_finished(req_stats, num_calls_finished);
grpc_lb_v1_ClientStats_set_num_calls_finished_with_client_failed_to_send(
@ -75,152 +92,96 @@ grpc_grpclb_request* grpc_grpclb_load_report_request_create(
req_stats, num_calls_finished_known_received);
if (drop_token_counts != nullptr) {
for (size_t i = 0; i < drop_token_counts->size(); ++i) {
GrpcLbClientStats::DropTokenCount& cur = (*drop_token_counts)[i];
const GrpcLbClientStats::DropTokenCount& cur = (*drop_token_counts)[i];
grpc_lb_v1_ClientStatsPerToken* cur_msg =
grpc_lb_v1_ClientStats_add_calls_finished_with_drop(req_stats, arena);
const size_t token_len = strlen(cur.token.get());
char* token = reinterpret_cast<char*>(upb_arena_malloc(arena, token_len));
memcpy(token, cur.token.get(), token_len);
grpc_lb_v1_ClientStatsPerToken_set_load_balance_token(
cur_msg, upb_strview_make(token, token_len));
grpc_lb_v1_ClientStatsPerToken_set_num_calls(cur_msg, cur.count);
}
}
return req;
}
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request,
upb_arena* arena) {
size_t buf_length;
char* buf =
grpc_lb_v1_LoadBalanceRequest_serialize(request, arena, &buf_length);
return grpc_slice_from_copied_buffer(buf, buf_length);
return grpc_grpclb_request_encode(req, arena);
}
const grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
const grpc_slice& encoded_grpc_grpclb_response, upb_arena* arena) {
grpc_lb_v1_LoadBalanceResponse* response =
grpc_lb_v1_LoadBalanceResponse_parse(
reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response)),
GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response), arena);
if (response == nullptr) {
gpr_log(GPR_ERROR, "grpc_lb_v1_LoadBalanceResponse parse error");
return nullptr;
}
return grpc_lb_v1_LoadBalanceResponse_initial_response(response);
}
namespace {
grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
const grpc_slice& encoded_grpc_grpclb_response) {
upb::Arena arena;
grpc_lb_v1_LoadBalanceResponse* response =
grpc_lb_v1_LoadBalanceResponse_parse(
reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response)),
GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response), arena.ptr());
if (response == nullptr) {
gpr_log(GPR_ERROR, "grpc_lb_v1_LoadBalanceResponse parse error");
return nullptr;
}
grpc_grpclb_serverlist* server_list = static_cast<grpc_grpclb_serverlist*>(
gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
// First pass: count number of servers.
bool ParseServerList(const grpc_lb_v1_LoadBalanceResponse& response,
std::vector<GrpcLbServer>* server_list) {
// Determine the number of servers.
const grpc_lb_v1_ServerList* server_list_msg =
grpc_lb_v1_LoadBalanceResponse_server_list(response);
grpc_lb_v1_LoadBalanceResponse_server_list(&response);
if (server_list_msg == nullptr) return false;
size_t server_count = 0;
const grpc_lb_v1_Server* const* servers = nullptr;
if (server_list_msg != nullptr) {
servers = grpc_lb_v1_ServerList_servers(server_list_msg, &server_count);
}
// Second pass: populate servers.
const grpc_lb_v1_Server* const* servers =
grpc_lb_v1_ServerList_servers(server_list_msg, &server_count);
// Populate servers.
if (server_count > 0) {
server_list->servers = static_cast<grpc_grpclb_server**>(
gpr_zalloc(sizeof(grpc_grpclb_server*) * server_count));
server_list->num_servers = server_count;
server_list->reserve(server_count);
for (size_t i = 0; i < server_count; ++i) {
grpc_grpclb_server* cur = server_list->servers[i] =
static_cast<grpc_grpclb_server*>(
gpr_zalloc(sizeof(grpc_grpclb_server)));
GrpcLbServer& cur = *server_list->emplace(server_list->end());
upb_strview address = grpc_lb_v1_Server_ip_address(servers[i]);
if (address.size == 0) {
; // Nothing to do because cur->ip_address is an empty string.
} else if (address.size <= GRPC_GRPCLB_SERVER_IP_ADDRESS_MAX_SIZE) {
cur->ip_address.size = static_cast<int32_t>(address.size);
memcpy(cur->ip_address.data, address.data, address.size);
cur.ip_size = static_cast<int32_t>(address.size);
memcpy(cur.ip_addr, address.data, address.size);
}
cur->port = grpc_lb_v1_Server_port(servers[i]);
cur.port = grpc_lb_v1_Server_port(servers[i]);
upb_strview token = grpc_lb_v1_Server_load_balance_token(servers[i]);
if (token.size == 0) {
; // Nothing to do because cur->load_balance_token is an empty string.
} else if (token.size <= GRPC_GRPCLB_SERVER_LOAD_BALANCE_TOKEN_MAX_SIZE) {
memcpy(cur->load_balance_token, token.data, token.size);
memcpy(cur.load_balance_token, token.data, token.size);
} else {
gpr_log(GPR_ERROR,
"grpc_lb_v1_LoadBalanceResponse has too long token. len=%zu",
token.size);
}
cur->drop = grpc_lb_v1_Server_drop(servers[i]);
cur.drop = grpc_lb_v1_Server_drop(servers[i]);
}
}
return server_list;
return true;
}
void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist) {
if (serverlist == nullptr) {
return;
}
for (size_t i = 0; i < serverlist->num_servers; i++) {
gpr_free(serverlist->servers[i]);
}
gpr_free(serverlist->servers);
gpr_free(serverlist);
grpc_millis grpc_grpclb_duration_to_millis(
const google_protobuf_Duration* duration_pb) {
return static_cast<grpc_millis>(
(google_protobuf_Duration_seconds(duration_pb) * GPR_MS_PER_SEC) +
(google_protobuf_Duration_nanos(duration_pb) / GPR_NS_PER_MS));
}
grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
const grpc_grpclb_serverlist* server_list) {
grpc_grpclb_serverlist* copy = static_cast<grpc_grpclb_serverlist*>(
gpr_zalloc(sizeof(grpc_grpclb_serverlist)));
copy->num_servers = server_list->num_servers;
copy->servers = static_cast<grpc_grpclb_server**>(
gpr_malloc(sizeof(grpc_grpclb_server*) * server_list->num_servers));
for (size_t i = 0; i < server_list->num_servers; i++) {
copy->servers[i] = static_cast<grpc_grpclb_server*>(
gpr_malloc(sizeof(grpc_grpclb_server)));
memcpy(copy->servers[i], server_list->servers[i],
sizeof(grpc_grpclb_server));
}
return copy;
}
} // namespace
bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
const grpc_grpclb_serverlist* rhs) {
if (lhs == nullptr || rhs == nullptr) {
return false;
}
if (lhs->num_servers != rhs->num_servers) {
return false;
bool GrpcLbResponseParse(const grpc_slice& encoded_grpc_grpclb_response,
upb_arena* arena, GrpcLbResponse* result) {
grpc_lb_v1_LoadBalanceResponse* response =
grpc_lb_v1_LoadBalanceResponse_parse(
reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response)),
GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response), arena);
// Handle serverlist responses.
if (ParseServerList(*response, &result->serverlist)) {
result->type = result->SERVERLIST;
return true;
}
for (size_t i = 0; i < lhs->num_servers; i++) {
if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) {
return false;
// Handle initial responses.
auto* initial_response =
grpc_lb_v1_LoadBalanceResponse_initial_response(response);
if (initial_response != nullptr) {
result->type = result->INITIAL;
const google_protobuf_Duration* client_stats_report_interval =
grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
initial_response);
if (client_stats_report_interval != nullptr) {
result->client_stats_report_interval =
grpc_grpclb_duration_to_millis(client_stats_report_interval);
}
return true;
}
return true;
}
bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs,
const grpc_grpclb_server* rhs) {
return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0;
}
grpc_millis grpc_grpclb_duration_to_millis(
const grpc_grpclb_duration* duration_pb) {
return static_cast<grpc_millis>(
google_protobuf_Duration_seconds(duration_pb) * GPR_MS_PER_SEC +
google_protobuf_Duration_nanos(duration_pb) / GPR_NS_PER_MS);
return false;
}
} // namespace grpc_core

@ -21,6 +21,8 @@
#include <grpc/support/port_platform.h>
#include <vector>
#include <grpc/slice_buffer.h>
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
@ -33,69 +35,38 @@
namespace grpc_core {
typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response;
typedef google_protobuf_Duration grpc_grpclb_duration;
typedef google_protobuf_Timestamp grpc_grpclb_timestamp;
typedef struct {
int32_t size;
char data[GRPC_GRPCLB_SERVER_IP_ADDRESS_MAX_SIZE];
} grpc_grpclb_server_ip_address;
// Contains server information. When the drop field is not true, use the other
// fields.
typedef struct {
grpc_grpclb_server_ip_address ip_address;
struct GrpcLbServer {
int32_t ip_size;
char ip_addr[GRPC_GRPCLB_SERVER_IP_ADDRESS_MAX_SIZE];
int32_t port;
char load_balance_token[GRPC_GRPCLB_SERVER_LOAD_BALANCE_TOKEN_MAX_SIZE];
bool drop;
} grpc_grpclb_server;
typedef struct {
grpc_grpclb_server** servers;
size_t num_servers;
} grpc_grpclb_serverlist;
/**
* Create a request for a gRPC LB service under \a lb_service_name.
* \a lb_service_name should be alive when returned request is being used.
*/
grpc_grpclb_request* grpc_grpclb_request_create(const char* lb_service_name,
upb_arena* arena);
grpc_grpclb_request* grpc_grpclb_load_report_request_create(
grpc_core::GrpcLbClientStats* client_stats, upb_arena* arena);
/** Protocol Buffers v3-encode \a request */
grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request* request,
upb_arena* arena);
/** Parse (ie, decode) the bytes in \a encoded_grpc_grpclb_response as a \a
* grpc_grpclb_initial_response */
const grpc_grpclb_initial_response* grpc_grpclb_initial_response_parse(
const grpc_slice& encoded_grpc_grpclb_response, upb_arena* arena);
/** Parse the list of servers from an encoded \a grpc_grpclb_response */
grpc_grpclb_serverlist* grpc_grpclb_response_parse_serverlist(
const grpc_slice& encoded_grpc_grpclb_response);
/** Return a copy of \a sl. The caller is responsible for calling \a
* grpc_grpclb_destroy_serverlist on the returned copy. */
grpc_grpclb_serverlist* grpc_grpclb_serverlist_copy(
const grpc_grpclb_serverlist* sl);
bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist* lhs,
const grpc_grpclb_serverlist* rhs);
bool grpc_grpclb_server_equals(const grpc_grpclb_server* lhs,
const grpc_grpclb_server* rhs);
/** Destroy \a serverlist */
void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist* serverlist);
grpc_millis grpc_grpclb_duration_to_millis(
const grpc_grpclb_duration* duration_pb);
bool operator==(const GrpcLbServer& other) const;
};
struct GrpcLbResponse {
enum { INITIAL, SERVERLIST } type;
grpc_millis client_stats_report_interval = 0;
std::vector<GrpcLbServer> serverlist;
};
// Creates a serialized grpclb request.
grpc_slice GrpcLbRequestCreate(const char* lb_service_name, upb_arena* arena);
// Creates a serialized grpclb load report request.
grpc_slice GrpcLbLoadReportRequestCreate(
int64_t num_calls_started, int64_t num_calls_finished,
int64_t num_calls_finished_with_client_failed_to_send,
int64_t num_calls_finished_known_received,
const GrpcLbClientStats::DroppedCallCounts* drop_token_counts,
upb_arena* arena);
// Deserialize a grpclb response.
bool GrpcLbResponseParse(const grpc_slice& serialized_response,
upb_arena* arena, GrpcLbResponse* response);
} // namespace grpc_core

@ -45,18 +45,17 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
}
grpc::string PackedStringToIp(
const grpc_core::grpc_grpclb_server_ip_address& pb_ip) {
grpc::string PackedStringToIp(const grpc_core::GrpcLbServer& server) {
char ip_str[46] = {0};
int af = -1;
if (pb_ip.size == 4) {
if (server.ip_size == 4) {
af = AF_INET;
} else if (pb_ip.size == 16) {
} else if (server.ip_size == 16) {
af = AF_INET6;
} else {
abort();
}
GPR_ASSERT(inet_ntop(af, (void*)pb_ip.data, ip_str, 46) != nullptr);
GPR_ASSERT(inet_ntop(af, (void*)server.ip_addr, ip_str, 46) != nullptr);
return ip_str;
}
@ -64,9 +63,8 @@ TEST_F(GrpclbTest, CreateRequest) {
const grpc::string service_name = "AServiceName";
LoadBalanceRequest request;
upb::Arena arena;
grpc_core::grpc_grpclb_request* c_req =
grpc_core::grpc_grpclb_request_create(service_name.c_str(), arena.ptr());
grpc_slice slice = grpc_core::grpc_grpclb_request_encode(c_req, arena.ptr());
grpc_slice slice =
grpc_core::GrpcLbRequestCreate(service_name.c_str(), arena.ptr());
const int num_bytes_written = GRPC_SLICE_LENGTH(slice);
EXPECT_GT(num_bytes_written, 0);
request.ParseFromArray(GRPC_SLICE_START_PTR(slice), num_bytes_written);
@ -75,34 +73,29 @@ TEST_F(GrpclbTest, CreateRequest) {
}
TEST_F(GrpclbTest, ParseInitialResponse) {
// Construct response to parse.
LoadBalanceResponse response;
auto* initial_response = response.mutable_initial_response();
auto* client_stats_report_interval =
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
client_stats_report_interval->set_nanos(456);
client_stats_report_interval->set_nanos(456000000);
const grpc::string encoded_response = response.SerializeAsString();
grpc_slice encoded_slice =
grpc_slice_from_copied_string(encoded_response.c_str());
// Test parsing.
grpc_core::GrpcLbResponse resp;
upb::Arena arena;
const grpc_core::grpc_grpclb_initial_response* c_initial_response =
grpc_core::grpc_grpclb_initial_response_parse(encoded_slice, arena.ptr());
upb_strview load_balancer_delegate =
grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate(
c_initial_response);
EXPECT_EQ(load_balancer_delegate.size, 0);
const google_protobuf_Duration* report_interval =
grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval(
c_initial_response);
EXPECT_EQ(google_protobuf_Duration_seconds(report_interval), 123);
EXPECT_EQ(google_protobuf_Duration_nanos(report_interval), 456);
ASSERT_TRUE(
grpc_core::GrpcLbResponseParse(encoded_slice, arena.ptr(), &resp));
grpc_slice_unref(encoded_slice);
EXPECT_EQ(resp.type, resp.INITIAL);
EXPECT_EQ(resp.client_stats_report_interval, 123456);
EXPECT_EQ(resp.serverlist.size(), 0);
}
TEST_F(GrpclbTest, ParseResponseServerList) {
// Construct response to parse.
LoadBalanceResponse response;
auto* serverlist = response.mutable_server_list();
auto* server = serverlist->add_servers();
@ -115,26 +108,25 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
server->set_port(54321);
server->set_load_balance_token("load_balancing");
server->set_drop(true);
const grpc::string encoded_response = response.SerializeAsString();
const grpc_slice encoded_slice = grpc_slice_from_copied_buffer(
encoded_response.data(), encoded_response.size());
grpc_core::grpc_grpclb_serverlist* c_serverlist =
grpc_core::grpc_grpclb_response_parse_serverlist(encoded_slice);
ASSERT_EQ(c_serverlist->num_servers, 2ul);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting");
EXPECT_TRUE(c_serverlist->servers[0]->drop);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing");
EXPECT_TRUE(c_serverlist->servers[1]->drop);
// Test parsing.
grpc_core::GrpcLbResponse resp;
upb::Arena arena;
ASSERT_TRUE(
grpc_core::GrpcLbResponseParse(encoded_slice, arena.ptr(), &resp));
grpc_slice_unref(encoded_slice);
grpc_grpclb_destroy_serverlist(c_serverlist);
EXPECT_EQ(resp.type, resp.SERVERLIST);
EXPECT_EQ(resp.serverlist.size(), 2);
EXPECT_EQ(PackedStringToIp(resp.serverlist[0]), "127.0.0.1");
EXPECT_EQ(resp.serverlist[0].port, 12345);
EXPECT_STREQ(resp.serverlist[0].load_balance_token, "rate_limting");
EXPECT_TRUE(resp.serverlist[0].drop);
EXPECT_EQ(PackedStringToIp(resp.serverlist[1]), "10.0.0.1");
EXPECT_EQ(resp.serverlist[1].port, 54321);
EXPECT_STREQ(resp.serverlist[1].load_balance_token, "load_balancing");
EXPECT_TRUE(resp.serverlist[1].drop);
}
} // namespace

Loading…
Cancel
Save