mirror of https://github.com/grpc/grpc.git
Merge branch 'master' of https://github.com/grpc/grpc
commit
3a48d602c4
31 changed files with 2653 additions and 951 deletions
@ -1,253 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/combiner.h" |
||||
#include "src/core/lib/iomgr/sockaddr_utils.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
|
||||
void grpc_lb_subchannel_data_unref_subchannel(grpc_lb_subchannel_data* sd, |
||||
const char* reason) { |
||||
if (sd->subchannel != nullptr) { |
||||
if (sd->subchannel_list->tracer->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||
" (subchannel %p): unreffing subchannel", |
||||
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, |
||||
sd->subchannel_list, |
||||
static_cast<size_t>(sd - sd->subchannel_list->subchannels), |
||||
sd->subchannel_list->num_subchannels, sd->subchannel); |
||||
} |
||||
GRPC_SUBCHANNEL_UNREF(sd->subchannel, reason); |
||||
sd->subchannel = nullptr; |
||||
sd->connected_subchannel.reset(); |
||||
if (sd->user_data != nullptr) { |
||||
GPR_ASSERT(sd->user_data_vtable != nullptr); |
||||
sd->user_data_vtable->destroy(sd->user_data); |
||||
sd->user_data = nullptr; |
||||
} |
||||
} |
||||
} |
||||
|
||||
void grpc_lb_subchannel_data_start_connectivity_watch( |
||||
grpc_lb_subchannel_data* sd) { |
||||
if (sd->subchannel_list->tracer->enabled()) { |
||||
gpr_log( |
||||
GPR_INFO, |
||||
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||
" (subchannel %p): requesting connectivity change " |
||||
"notification (from %s)", |
||||
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, |
||||
sd->subchannel_list, |
||||
static_cast<size_t>(sd - sd->subchannel_list->subchannels), |
||||
sd->subchannel_list->num_subchannels, sd->subchannel, |
||||
grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe)); |
||||
} |
||||
sd->connectivity_notification_pending = true; |
||||
grpc_subchannel_notify_on_state_change( |
||||
sd->subchannel, sd->subchannel_list->policy->interested_parties(), |
||||
&sd->pending_connectivity_state_unsafe, |
||||
&sd->connectivity_changed_closure); |
||||
} |
||||
|
||||
void grpc_lb_subchannel_data_stop_connectivity_watch( |
||||
grpc_lb_subchannel_data* sd) { |
||||
if (sd->subchannel_list->tracer->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||
" (subchannel %p): stopping connectivity watch", |
||||
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, |
||||
sd->subchannel_list, |
||||
static_cast<size_t>(sd - sd->subchannel_list->subchannels), |
||||
sd->subchannel_list->num_subchannels, sd->subchannel); |
||||
} |
||||
GPR_ASSERT(sd->connectivity_notification_pending); |
||||
sd->connectivity_notification_pending = false; |
||||
} |
||||
|
||||
grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( |
||||
grpc_core::LoadBalancingPolicy* p, grpc_core::TraceFlag* tracer, |
||||
const grpc_lb_addresses* addresses, grpc_combiner* combiner, |
||||
grpc_client_channel_factory* client_channel_factory, |
||||
const grpc_channel_args& args, grpc_iomgr_cb_func connectivity_changed_cb) { |
||||
grpc_lb_subchannel_list* subchannel_list = |
||||
static_cast<grpc_lb_subchannel_list*>( |
||||
gpr_zalloc(sizeof(*subchannel_list))); |
||||
if (tracer->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", |
||||
tracer->name(), p, subchannel_list, addresses->num_addresses); |
||||
} |
||||
subchannel_list->policy = p; |
||||
subchannel_list->tracer = tracer; |
||||
gpr_ref_init(&subchannel_list->refcount, 1); |
||||
subchannel_list->subchannels = static_cast<grpc_lb_subchannel_data*>( |
||||
gpr_zalloc(sizeof(grpc_lb_subchannel_data) * addresses->num_addresses)); |
||||
// We need to remove the LB addresses in order to be able to compare the
|
||||
// subchannel keys of subchannels from a different batch of addresses.
|
||||
static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, |
||||
GRPC_ARG_LB_ADDRESSES}; |
||||
// Create a subchannel for each address.
|
||||
grpc_subchannel_args sc_args; |
||||
size_t subchannel_index = 0; |
||||
for (size_t i = 0; i < addresses->num_addresses; i++) { |
||||
// If there were any balancer, we would have chosen grpclb policy instead.
|
||||
GPR_ASSERT(!addresses->addresses[i].is_balancer); |
||||
memset(&sc_args, 0, sizeof(grpc_subchannel_args)); |
||||
grpc_arg addr_arg = |
||||
grpc_create_subchannel_address_arg(&addresses->addresses[i].address); |
||||
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( |
||||
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); |
||||
gpr_free(addr_arg.value.string); |
||||
sc_args.args = new_args; |
||||
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( |
||||
client_channel_factory, &sc_args); |
||||
grpc_channel_args_destroy(new_args); |
||||
if (subchannel == nullptr) { |
||||
// Subchannel could not be created.
|
||||
if (tracer->enabled()) { |
||||
char* address_uri = |
||||
grpc_sockaddr_to_uri(&addresses->addresses[i].address); |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] could not create subchannel for address uri %s, " |
||||
"ignoring", |
||||
tracer->name(), subchannel_list->policy, address_uri); |
||||
gpr_free(address_uri); |
||||
} |
||||
continue; |
||||
} |
||||
if (tracer->enabled()) { |
||||
char* address_uri = |
||||
grpc_sockaddr_to_uri(&addresses->addresses[i].address); |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] subchannel list %p index %" PRIuPTR |
||||
": Created subchannel %p for address uri %s", |
||||
tracer->name(), p, subchannel_list, subchannel_index, subchannel, |
||||
address_uri); |
||||
gpr_free(address_uri); |
||||
} |
||||
grpc_lb_subchannel_data* sd = |
||||
&subchannel_list->subchannels[subchannel_index++]; |
||||
sd->subchannel_list = subchannel_list; |
||||
sd->subchannel = subchannel; |
||||
GRPC_CLOSURE_INIT(&sd->connectivity_changed_closure, |
||||
connectivity_changed_cb, sd, |
||||
grpc_combiner_scheduler(combiner)); |
||||
// We assume that the current state is IDLE. If not, we'll get a
|
||||
// callback telling us that.
|
||||
sd->prev_connectivity_state = GRPC_CHANNEL_IDLE; |
||||
sd->curr_connectivity_state = GRPC_CHANNEL_IDLE; |
||||
sd->pending_connectivity_state_unsafe = GRPC_CHANNEL_IDLE; |
||||
sd->user_data_vtable = addresses->user_data_vtable; |
||||
if (sd->user_data_vtable != nullptr) { |
||||
sd->user_data = |
||||
sd->user_data_vtable->copy(addresses->addresses[i].user_data); |
||||
} |
||||
} |
||||
subchannel_list->num_subchannels = subchannel_index; |
||||
subchannel_list->num_idle = subchannel_index; |
||||
return subchannel_list; |
||||
} |
||||
|
||||
static void subchannel_list_destroy(grpc_lb_subchannel_list* subchannel_list) { |
||||
if (subchannel_list->tracer->enabled()) { |
||||
gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", |
||||
subchannel_list->tracer->name(), subchannel_list->policy, |
||||
subchannel_list); |
||||
} |
||||
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { |
||||
grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; |
||||
grpc_lb_subchannel_data_unref_subchannel(sd, "subchannel_list_destroy"); |
||||
} |
||||
gpr_free(subchannel_list->subchannels); |
||||
gpr_free(subchannel_list); |
||||
} |
||||
|
||||
void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, |
||||
const char* reason) { |
||||
gpr_ref_non_zero(&subchannel_list->refcount); |
||||
if (subchannel_list->tracer->enabled()) { |
||||
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); |
||||
gpr_log(GPR_INFO, "[%s %p] subchannel_list %p REF %lu->%lu (%s)", |
||||
subchannel_list->tracer->name(), subchannel_list->policy, |
||||
subchannel_list, static_cast<unsigned long>(count - 1), |
||||
static_cast<unsigned long>(count), reason); |
||||
} |
||||
} |
||||
|
||||
void grpc_lb_subchannel_list_unref(grpc_lb_subchannel_list* subchannel_list, |
||||
const char* reason) { |
||||
const bool done = gpr_unref(&subchannel_list->refcount); |
||||
if (subchannel_list->tracer->enabled()) { |
||||
const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); |
||||
gpr_log(GPR_INFO, "[%s %p] subchannel_list %p UNREF %lu->%lu (%s)", |
||||
subchannel_list->tracer->name(), subchannel_list->policy, |
||||
subchannel_list, static_cast<unsigned long>(count + 1), |
||||
static_cast<unsigned long>(count), reason); |
||||
} |
||||
if (done) { |
||||
subchannel_list_destroy(subchannel_list); |
||||
} |
||||
} |
||||
|
||||
static void subchannel_data_cancel_connectivity_watch( |
||||
grpc_lb_subchannel_data* sd, const char* reason) { |
||||
if (sd->subchannel_list->tracer->enabled()) { |
||||
gpr_log(GPR_INFO, |
||||
"[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR |
||||
" (subchannel %p): canceling connectivity watch (%s)", |
||||
sd->subchannel_list->tracer->name(), sd->subchannel_list->policy, |
||||
sd->subchannel_list, |
||||
static_cast<size_t>(sd - sd->subchannel_list->subchannels), |
||||
sd->subchannel_list->num_subchannels, sd->subchannel, reason); |
||||
} |
||||
grpc_subchannel_notify_on_state_change(sd->subchannel, nullptr, nullptr, |
||||
&sd->connectivity_changed_closure); |
||||
} |
||||
|
||||
void grpc_lb_subchannel_list_shutdown_and_unref( |
||||
grpc_lb_subchannel_list* subchannel_list, const char* reason) { |
||||
if (subchannel_list->tracer->enabled()) { |
||||
gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p (%s)", |
||||
subchannel_list->tracer->name(), subchannel_list->policy, |
||||
subchannel_list, reason); |
||||
} |
||||
GPR_ASSERT(!subchannel_list->shutting_down); |
||||
subchannel_list->shutting_down = true; |
||||
for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { |
||||
grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; |
||||
// If there's a pending notification for this subchannel, cancel it;
|
||||
// the callback is responsible for unreffing the subchannel.
|
||||
// Otherwise, unref the subchannel directly.
|
||||
if (sd->connectivity_notification_pending) { |
||||
subchannel_data_cancel_connectivity_watch(sd, reason); |
||||
} else if (sd->subchannel != nullptr) { |
||||
grpc_lb_subchannel_data_unref_subchannel(sd, reason); |
||||
} |
||||
} |
||||
grpc_lb_subchannel_list_unref(subchannel_list, reason); |
||||
} |
@ -0,0 +1,273 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <cstdlib> |
||||
#include <set> |
||||
#include <unordered_map> |
||||
#include <vector> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_data_store.h" |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
// Some helper functions.
|
||||
namespace { |
||||
|
||||
// Given a map from type K to a set of value type V, finds the set associated
|
||||
// with the given key and erases the value from the set. If the set becomes
|
||||
// empty, also erases the key-set pair. Returns true if the value is erased
|
||||
// successfully.
|
||||
template <typename K, typename V> |
||||
bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map, |
||||
const K& key, const V& value) { |
||||
auto it = map.find(key); |
||||
if (it != map.end()) { |
||||
size_t erased = it->second.erase(value); |
||||
if (it->second.size() == 0) { |
||||
map.erase(it); |
||||
} |
||||
return erased; |
||||
} |
||||
return false; |
||||
}; |
||||
|
||||
// Given a map from type K to a set of value type V, removes the given key and
|
||||
// the associated set, and returns the set. Returns an empty set if the key is
|
||||
// not found.
|
||||
template <typename K, typename V> |
||||
std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map, |
||||
const K& key) { |
||||
auto it = map.find(key); |
||||
if (it != map.end()) { |
||||
auto set = std::move(it->second); |
||||
map.erase(it); |
||||
return set; |
||||
} |
||||
return {}; |
||||
}; |
||||
|
||||
// From a non-empty container, returns a pointer to a random element.
|
||||
template <typename C> |
||||
const typename C::value_type* RandomElement(const C& container) { |
||||
GPR_ASSERT(!container.empty()); |
||||
auto it = container.begin(); |
||||
std::advance(it, std::rand() % container.size()); |
||||
return &(*it); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
void PerBalancerStore::MergeRow(const LoadRecordKey& key, |
||||
const LoadRecordValue& value) { |
||||
// During suspension, the load data received will be dropped.
|
||||
if (!suspended_) { |
||||
load_record_map_[key].MergeFrom(value); |
||||
gpr_log(GPR_DEBUG, |
||||
"[PerBalancerStore %p] Load data merged (Key: %s, Value: %s).", |
||||
this, key.ToString().c_str(), value.ToString().c_str()); |
||||
} else { |
||||
gpr_log(GPR_DEBUG, |
||||
"[PerBalancerStore %p] Load data dropped (Key: %s, Value: %s).", |
||||
this, key.ToString().c_str(), value.ToString().c_str()); |
||||
} |
||||
// We always keep track of num_calls_in_progress_, so that when this
|
||||
// store is resumed, we still have a correct value of
|
||||
// num_calls_in_progress_.
|
||||
GPR_ASSERT(static_cast<int64_t>(num_calls_in_progress_) + |
||||
value.GetNumCallsInProgressDelta() >= |
||||
0); |
||||
num_calls_in_progress_ += value.GetNumCallsInProgressDelta(); |
||||
} |
||||
|
||||
void PerBalancerStore::Suspend() { |
||||
suspended_ = true; |
||||
load_record_map_.clear(); |
||||
gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Suspended.", this); |
||||
} |
||||
|
||||
void PerBalancerStore::Resume() { |
||||
suspended_ = false; |
||||
gpr_log(GPR_DEBUG, "[PerBalancerStore %p] Resumed.", this); |
||||
} |
||||
|
||||
uint64_t PerBalancerStore::GetNumCallsInProgressForReport() { |
||||
GPR_ASSERT(!suspended_); |
||||
last_reported_num_calls_in_progress_ = num_calls_in_progress_; |
||||
return num_calls_in_progress_; |
||||
} |
||||
|
||||
void PerHostStore::ReportStreamCreated(const grpc::string& lb_id, |
||||
const grpc::string& load_key) { |
||||
GPR_ASSERT(lb_id != kInvalidLbId); |
||||
SetUpForNewLbId(lb_id, load_key); |
||||
// Prior to this one, there was no load balancer receiving report, so we may
|
||||
// have unassigned orphaned stores to assign to this new balancer.
|
||||
// TODO(juanlishen): If the load key of this new stream is the same with
|
||||
// some previously adopted orphan store, we may want to take the orphan to
|
||||
// this stream. Need to discuss with LB team.
|
||||
if (assigned_stores_.size() == 1) { |
||||
for (const auto& p : per_balancer_stores_) { |
||||
const grpc::string& other_lb_id = p.first; |
||||
const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second; |
||||
if (other_lb_id != lb_id) { |
||||
orphaned_store->Resume(); |
||||
AssignOrphanedStore(orphaned_store.get(), lb_id); |
||||
} |
||||
} |
||||
} |
||||
// The first connected balancer will adopt the kInvalidLbId.
|
||||
if (per_balancer_stores_.size() == 1) { |
||||
SetUpForNewLbId(kInvalidLbId, ""); |
||||
ReportStreamClosed(kInvalidLbId); |
||||
} |
||||
} |
||||
|
||||
void PerHostStore::ReportStreamClosed(const grpc::string& lb_id) { |
||||
auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id); |
||||
GPR_ASSERT(it_store_for_gone_lb != per_balancer_stores_.end()); |
||||
// Remove this closed stream from our records.
|
||||
GPR_ASSERT(UnorderedMapOfSetEraseKeyValue( |
||||
load_key_to_receiving_lb_ids_, it_store_for_gone_lb->second->load_key(), |
||||
lb_id)); |
||||
std::set<PerBalancerStore*> orphaned_stores = |
||||
UnorderedMapOfSetExtract(assigned_stores_, lb_id); |
||||
// The stores that were assigned to this balancer are orphaned now. They
|
||||
// should be re-assigned to other balancers which are still receiving reports.
|
||||
for (PerBalancerStore* orphaned_store : orphaned_stores) { |
||||
const grpc::string* new_receiver = nullptr; |
||||
auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key()); |
||||
if (it != load_key_to_receiving_lb_ids_.end()) { |
||||
// First, try to pick from the active balancers with the same load key.
|
||||
new_receiver = RandomElement(it->second); |
||||
} else if (!assigned_stores_.empty()) { |
||||
// If failed, pick from all the remaining active balancers.
|
||||
new_receiver = &(RandomElement(assigned_stores_)->first); |
||||
} |
||||
if (new_receiver != nullptr) { |
||||
AssignOrphanedStore(orphaned_store, *new_receiver); |
||||
} else { |
||||
// Load data for an LB ID that can't be assigned to any stream should
|
||||
// be dropped.
|
||||
orphaned_store->Suspend(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
PerBalancerStore* PerHostStore::FindPerBalancerStore( |
||||
const grpc::string& lb_id) const { |
||||
return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end() |
||||
? per_balancer_stores_.find(lb_id)->second.get() |
||||
: nullptr; |
||||
} |
||||
|
||||
const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores( |
||||
const grpc::string& lb_id) const { |
||||
auto it = assigned_stores_.find(lb_id); |
||||
if (it == assigned_stores_.end()) return nullptr; |
||||
return &(it->second); |
||||
} |
||||
|
||||
void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store, |
||||
const grpc::string& new_receiver) { |
||||
auto it = assigned_stores_.find(new_receiver); |
||||
GPR_ASSERT(it != assigned_stores_.end()); |
||||
it->second.insert(orphaned_store); |
||||
gpr_log(GPR_INFO, |
||||
"[PerHostStore %p] Re-assigned orphaned store (%p) with original LB" |
||||
" ID of %s to new receiver %s", |
||||
this, orphaned_store, orphaned_store->lb_id().c_str(), |
||||
new_receiver.c_str()); |
||||
} |
||||
|
||||
void PerHostStore::SetUpForNewLbId(const grpc::string& lb_id, |
||||
const grpc::string& load_key) { |
||||
// The top-level caller (i.e., LoadReportService) should guarantee the
|
||||
// lb_id is unique for each reporting stream.
|
||||
GPR_ASSERT(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end()); |
||||
GPR_ASSERT(assigned_stores_.find(lb_id) == assigned_stores_.end()); |
||||
load_key_to_receiving_lb_ids_[load_key].insert(lb_id); |
||||
std::unique_ptr<PerBalancerStore> per_balancer_store( |
||||
new PerBalancerStore(lb_id, load_key)); |
||||
assigned_stores_[lb_id] = {per_balancer_store.get()}; |
||||
per_balancer_stores_[lb_id] = std::move(per_balancer_store); |
||||
} |
||||
|
||||
PerBalancerStore* LoadDataStore::FindPerBalancerStore( |
||||
const string& hostname, const string& lb_id) const { |
||||
auto it = per_host_stores_.find(hostname); |
||||
if (it != per_host_stores_.end()) { |
||||
const PerHostStore& per_host_store = it->second; |
||||
return per_host_store.FindPerBalancerStore(lb_id); |
||||
} else { |
||||
return nullptr; |
||||
} |
||||
} |
||||
|
||||
void LoadDataStore::MergeRow(const grpc::string& hostname, |
||||
const LoadRecordKey& key, |
||||
const LoadRecordValue& value) { |
||||
PerBalancerStore* per_balancer_store = |
||||
FindPerBalancerStore(hostname, key.lb_id()); |
||||
if (per_balancer_store != nullptr) { |
||||
per_balancer_store->MergeRow(key, value); |
||||
return; |
||||
} |
||||
// Unknown LB ID. Track it until its number of in-progress calls drops to
|
||||
// zero.
|
||||
int64_t in_progress_delta = value.GetNumCallsInProgressDelta(); |
||||
if (in_progress_delta != 0) { |
||||
auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id()); |
||||
if (it_tracker == unknown_balancer_id_trackers_.end()) { |
||||
gpr_log( |
||||
GPR_DEBUG, |
||||
"[LoadDataStore %p] Start tracking unknown balancer (lb_id_: %s).", |
||||
this, key.lb_id().c_str()); |
||||
unknown_balancer_id_trackers_.insert( |
||||
{key.lb_id(), static_cast<uint64_t>(in_progress_delta)}); |
||||
} else if ((it_tracker->second += in_progress_delta) == 0) { |
||||
unknown_balancer_id_trackers_.erase(it_tracker); |
||||
gpr_log(GPR_DEBUG, |
||||
"[LoadDataStore %p] Stop tracking unknown balancer (lb_id_: %s).", |
||||
this, key.lb_id().c_str()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores( |
||||
const grpc::string& hostname, const grpc::string& lb_id) { |
||||
auto it = per_host_stores_.find(hostname); |
||||
if (it == per_host_stores_.end()) return nullptr; |
||||
return it->second.GetAssignedStores(lb_id); |
||||
} |
||||
|
||||
void LoadDataStore::ReportStreamCreated(const grpc::string& hostname, |
||||
const grpc::string& lb_id, |
||||
const grpc::string& load_key) { |
||||
per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key); |
||||
} |
||||
|
||||
void LoadDataStore::ReportStreamClosed(const grpc::string& hostname, |
||||
const grpc::string& lb_id) { |
||||
auto it_per_host_store = per_host_stores_.find(hostname); |
||||
GPR_ASSERT(it_per_host_store != per_host_stores_.end()); |
||||
it_per_host_store->second.ReportStreamClosed(lb_id); |
||||
} |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
@ -0,0 +1,339 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H |
||||
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <memory> |
||||
#include <set> |
||||
#include <unordered_map> |
||||
|
||||
#include <grpc/support/log.h> |
||||
#include <grpcpp/impl/codegen/config.h> |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
constexpr char kInvalidLbId[] = "<INVALID_LBID_238dsb234890rb>"; |
||||
constexpr uint8_t kLbIdLen = 8; |
||||
|
||||
// The load data storage is organized in hierarchy. The LoadDataStore is the
|
||||
// top-level data store. In LoadDataStore, for each host we keep a
|
||||
// PerHostStore, in which for each balancer we keep a PerBalancerStore. Each
|
||||
// PerBalancerStore maintains a map of load records, mapping from LoadRecordKey
|
||||
// to LoadRecordValue. The LoadRecordValue contains a map of customized call
|
||||
// metrics, mapping from a call metric name to the CallMetricValue.
|
||||
|
||||
// The value of a customized call metric.
|
||||
class CallMetricValue { |
||||
public: |
||||
explicit CallMetricValue(uint64_t num_calls = 0, |
||||
double total_metric_value = 0) |
||||
: num_calls_(num_calls), total_metric_value_(total_metric_value) {} |
||||
|
||||
void MergeFrom(CallMetricValue other) { |
||||
num_calls_ += other.num_calls_; |
||||
total_metric_value_ += other.total_metric_value_; |
||||
} |
||||
|
||||
// Getters.
|
||||
uint64_t num_calls() const { return num_calls_; } |
||||
double total_metric_value() const { return total_metric_value_; } |
||||
|
||||
private: |
||||
// The number of calls that finished with this metric.
|
||||
uint64_t num_calls_ = 0; |
||||
// The sum of metric values across all the calls that finished with this
|
||||
// metric.
|
||||
double total_metric_value_ = 0; |
||||
}; |
||||
|
||||
// The key of a load record.
|
||||
class LoadRecordKey { |
||||
public: |
||||
explicit LoadRecordKey(grpc::string lb_id, grpc::string lb_tag, |
||||
grpc::string user_id, grpc::string client_ip_hex) |
||||
: lb_id_(std::move(lb_id)), |
||||
lb_tag_(std::move(lb_tag)), |
||||
user_id_(std::move(user_id)), |
||||
client_ip_hex_(std::move(client_ip_hex)) {} |
||||
|
||||
grpc::string ToString() const { |
||||
return "[lb_id_=" + lb_id_ + ", lb_tag_=" + lb_tag_ + |
||||
", user_id_=" + user_id_ + ", client_ip_hex_=" + client_ip_hex_ + |
||||
"]"; |
||||
} |
||||
|
||||
bool operator==(const LoadRecordKey& other) const { |
||||
return lb_id_ == other.lb_id_ && lb_tag_ == other.lb_tag_ && |
||||
user_id_ == other.user_id_ && client_ip_hex_ == other.client_ip_hex_; |
||||
} |
||||
|
||||
// Getters.
|
||||
const grpc::string& lb_id() const { return lb_id_; } |
||||
const grpc::string& lb_tag() const { return lb_tag_; } |
||||
const grpc::string& user_id() const { return user_id_; } |
||||
const grpc::string& client_ip_hex() const { return client_ip_hex_; } |
||||
|
||||
struct Hasher { |
||||
void hash_combine(size_t* seed, const grpc::string& k) const { |
||||
*seed ^= std::hash<grpc::string>()(k) + 0x9e3779b9 + (*seed << 6) + |
||||
(*seed >> 2); |
||||
} |
||||
|
||||
size_t operator()(const LoadRecordKey& k) const { |
||||
size_t h = 0; |
||||
hash_combine(&h, k.lb_id_); |
||||
hash_combine(&h, k.lb_tag_); |
||||
hash_combine(&h, k.user_id_); |
||||
hash_combine(&h, k.client_ip_hex_); |
||||
return h; |
||||
} |
||||
}; |
||||
|
||||
private: |
||||
grpc::string lb_id_; |
||||
grpc::string lb_tag_; |
||||
grpc::string user_id_; |
||||
grpc::string client_ip_hex_; |
||||
}; |
||||
|
||||
// The value of a load record.
|
||||
class LoadRecordValue { |
||||
public: |
||||
explicit LoadRecordValue(uint64_t start_count = 0, uint64_t ok_count = 0, |
||||
uint64_t error_count = 0, double bytes_sent = 0, |
||||
double bytes_recv = 0, double latency_ms = 0) |
||||
: start_count_(start_count), |
||||
ok_count_(ok_count), |
||||
error_count_(error_count), |
||||
bytes_sent_(bytes_sent), |
||||
bytes_recv_(bytes_recv), |
||||
latency_ms_(latency_ms) {} |
||||
|
||||
void MergeFrom(const LoadRecordValue& other) { |
||||
start_count_ += other.start_count_; |
||||
ok_count_ += other.ok_count_; |
||||
error_count_ += other.error_count_; |
||||
bytes_sent_ += other.bytes_sent_; |
||||
bytes_recv_ += other.bytes_recv_; |
||||
latency_ms_ += other.latency_ms_; |
||||
for (const auto& p : other.call_metrics_) { |
||||
const grpc::string& key = p.first; |
||||
const CallMetricValue& value = p.second; |
||||
call_metrics_[key].MergeFrom(value); |
||||
} |
||||
} |
||||
|
||||
int64_t GetNumCallsInProgressDelta() const { |
||||
return static_cast<int64_t>(start_count_ - ok_count_ - error_count_); |
||||
} |
||||
|
||||
grpc::string ToString() const { |
||||
return "[start_count_=" + grpc::to_string(start_count_) + |
||||
", ok_count_=" + grpc::to_string(ok_count_) + |
||||
", error_count_=" + grpc::to_string(error_count_) + |
||||
", bytes_sent_=" + grpc::to_string(bytes_sent_) + |
||||
", bytes_recv_=" + grpc::to_string(bytes_recv_) + |
||||
", latency_ms_=" + grpc::to_string(latency_ms_) + "]"; |
||||
} |
||||
|
||||
bool InsertCallMetric(const grpc::string& metric_name, |
||||
const CallMetricValue& metric_value) { |
||||
return call_metrics_.insert({metric_name, metric_value}).second; |
||||
} |
||||
|
||||
// Getters.
|
||||
uint64_t start_count() const { return start_count_; } |
||||
uint64_t ok_count() const { return ok_count_; } |
||||
uint64_t error_count() const { return error_count_; } |
||||
double bytes_sent() const { return bytes_sent_; } |
||||
double bytes_recv() const { return bytes_recv_; } |
||||
double latency_ms() const { return latency_ms_; } |
||||
const std::unordered_map<grpc::string, CallMetricValue>& call_metrics() |
||||
const { |
||||
return call_metrics_; |
||||
} |
||||
|
||||
private: |
||||
uint64_t start_count_ = 0; |
||||
uint64_t ok_count_ = 0; |
||||
uint64_t error_count_ = 0; |
||||
double bytes_sent_ = 0; |
||||
double bytes_recv_ = 0; |
||||
double latency_ms_ = 0; |
||||
std::unordered_map<grpc::string, CallMetricValue> call_metrics_; |
||||
}; |
||||
|
||||
// Stores the data associated with a particular LB ID.
|
||||
class PerBalancerStore { |
||||
public: |
||||
using LoadRecordMap = |
||||
std::unordered_map<LoadRecordKey, LoadRecordValue, LoadRecordKey::Hasher>; |
||||
|
||||
PerBalancerStore(grpc::string lb_id, grpc::string load_key) |
||||
: lb_id_(std::move(lb_id)), load_key_(std::move(load_key)) {} |
||||
|
||||
// Merge a load record with the given key and value if the store is not
|
||||
// suspended.
|
||||
void MergeRow(const LoadRecordKey& key, const LoadRecordValue& value); |
||||
|
||||
// Suspend this store, so that no detailed load data will be recorded.
|
||||
void Suspend(); |
||||
// Resume this store from suspension.
|
||||
void Resume(); |
||||
// Is this store suspended or not?
|
||||
bool IsSuspended() const { return suspended_; } |
||||
|
||||
bool IsNumCallsInProgressChangedSinceLastReport() const { |
||||
return num_calls_in_progress_ != last_reported_num_calls_in_progress_; |
||||
} |
||||
|
||||
uint64_t GetNumCallsInProgressForReport(); |
||||
|
||||
grpc::string ToString() { |
||||
return "[PerBalancerStore lb_id_=" + lb_id_ + " load_key_=" + load_key_ + |
||||
"]"; |
||||
} |
||||
|
||||
void ClearLoadRecordMap() { load_record_map_.clear(); } |
||||
|
||||
// Getters.
|
||||
const grpc::string& lb_id() const { return lb_id_; } |
||||
const grpc::string& load_key() const { return load_key_; } |
||||
const LoadRecordMap& load_record_map() const { return load_record_map_; } |
||||
|
||||
private: |
||||
grpc::string lb_id_; |
||||
// TODO(juanlishen): Use bytestring protobuf type?
|
||||
grpc::string load_key_; |
||||
LoadRecordMap load_record_map_; |
||||
uint64_t num_calls_in_progress_ = 0; |
||||
uint64_t last_reported_num_calls_in_progress_ = 0; |
||||
bool suspended_ = false; |
||||
}; |
||||
|
||||
// Stores the data associated with a particular host.
|
||||
class PerHostStore { |
||||
public: |
||||
// When a report stream is created, a PerBalancerStore is created for the
|
||||
// LB ID (guaranteed unique) associated with that stream. If it is the only
|
||||
// active store, adopt all the orphaned stores. If it is the first created
|
||||
// store, adopt the store of kInvalidLbId.
|
||||
void ReportStreamCreated(const grpc::string& lb_id, |
||||
const grpc::string& load_key); |
||||
|
||||
// When a report stream is closed, the PerBalancerStores assigned to the
|
||||
// associate LB ID need to be re-assigned to other active balancers,
|
||||
// ideally with the same load key. If there is no active balancer, we have
|
||||
// to suspend those stores and drop the incoming load data until they are
|
||||
// resumed.
|
||||
void ReportStreamClosed(const grpc::string& lb_id); |
||||
|
||||
// Returns null if not found. Caller doesn't own the returned store.
|
||||
PerBalancerStore* FindPerBalancerStore(const grpc::string& lb_id) const; |
||||
|
||||
// Returns null if lb_id is not found. The returned pointer points to the
|
||||
// underlying data structure, which is not owned by the caller.
|
||||
const std::set<PerBalancerStore*>* GetAssignedStores( |
||||
const grpc::string& lb_id) const; |
||||
|
||||
private: |
||||
// Creates a PerBalancerStore for the given LB ID, assigns the store to
|
||||
// itself, and records the LB ID to the load key.
|
||||
void SetUpForNewLbId(const grpc::string& lb_id, const grpc::string& load_key); |
||||
|
||||
void AssignOrphanedStore(PerBalancerStore* orphaned_store, |
||||
const grpc::string& new_receiver); |
||||
|
||||
std::unordered_map<grpc::string, std::set<grpc::string>> |
||||
load_key_to_receiving_lb_ids_; |
||||
|
||||
// Key: LB ID. The key set includes all the LB IDs that have been
|
||||
// allocated for reporting streams so far.
|
||||
// Value: the unique pointer to the PerBalancerStore of the LB ID.
|
||||
std::unordered_map<grpc::string, std::unique_ptr<PerBalancerStore>> |
||||
per_balancer_stores_; |
||||
|
||||
// Key: LB ID. The key set includes the LB IDs of the balancers that are
|
||||
// currently receiving report.
|
||||
// Value: the set of raw pointers to the PerBalancerStores assigned to the LB
|
||||
// ID. Note that the sets in assigned_stores_ form a division of the value set
|
||||
// of per_balancer_stores_.
|
||||
std::unordered_map<grpc::string, std::set<PerBalancerStore*>> |
||||
assigned_stores_; |
||||
}; |
||||
|
||||
// Thread-unsafe two-level bookkeeper of all the load data.
|
||||
// Note: We never remove any store objects from this class, as per the
|
||||
// current spec. That's because premature removal of the store objects
|
||||
// may lead to loss of critical information, e.g., mapping from lb_id to
|
||||
// load_key, and the number of in-progress calls. Such loss will cause
|
||||
// information inconsistency when the balancer is re-connected. Keeping
|
||||
// all the stores should be fine for PerHostStore, since we assume there
|
||||
// should only be a few hostnames. But it's a potential problem for
|
||||
// PerBalancerStore.
|
||||
class LoadDataStore { |
||||
public: |
||||
// Returns null if not found. Caller doesn't own the returned store.
|
||||
PerBalancerStore* FindPerBalancerStore(const grpc::string& hostname, |
||||
const grpc::string& lb_id) const; |
||||
|
||||
// Returns null if hostname or lb_id is not found. The returned pointer points
|
||||
// to the underlying data structure, which is not owned by the caller.
|
||||
const std::set<PerBalancerStore*>* GetAssignedStores(const string& hostname, |
||||
const string& lb_id); |
||||
|
||||
// If a PerBalancerStore can be found by the hostname and LB ID in
|
||||
// LoadRecordKey, the load data will be merged to that store. Otherwise,
|
||||
// only track the number of the in-progress calls for this unknown LB ID.
|
||||
void MergeRow(const grpc::string& hostname, const LoadRecordKey& key, |
||||
const LoadRecordValue& value); |
||||
|
||||
// Is the given lb_id a tracked unknown LB ID (i.e., the LB ID was associated
|
||||
// with some received load data but unknown to this load data store)?
|
||||
bool IsTrackedUnknownBalancerId(const grpc::string& lb_id) const { |
||||
return unknown_balancer_id_trackers_.find(lb_id) != |
||||
unknown_balancer_id_trackers_.end(); |
||||
} |
||||
|
||||
// Wrapper around PerHostStore::ReportStreamCreated.
|
||||
void ReportStreamCreated(const grpc::string& hostname, |
||||
const grpc::string& lb_id, |
||||
const grpc::string& load_key); |
||||
|
||||
// Wrapper around PerHostStore::ReportStreamClosed.
|
||||
void ReportStreamClosed(const grpc::string& hostname, |
||||
const grpc::string& lb_id); |
||||
|
||||
private: |
||||
// Buffered data that was fetched from Census but hasn't been sent to
|
||||
// balancer. We need to keep this data ourselves because Census will
|
||||
// delete the data once it's returned.
|
||||
std::unordered_map<grpc::string, PerHostStore> per_host_stores_; |
||||
|
||||
// Tracks the number of in-progress calls for each unknown LB ID.
|
||||
std::unordered_map<grpc::string, uint64_t> unknown_balancer_id_trackers_; |
||||
}; |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_DATA_STORE_H
|
@ -0,0 +1,31 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
licenses(["notice"]) # Apache v2 |
||||
|
||||
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package") |
||||
|
||||
grpc_package(name = "test/cpp/server/load_reporter") |
||||
|
||||
grpc_cc_test( |
||||
name = "lb_load_data_store_test", |
||||
srcs = ["load_data_store_test.cc"], |
||||
external_deps = [ |
||||
"gtest", |
||||
], |
||||
deps = [ |
||||
"//:lb_load_data_store", |
||||
"//test/core/util:grpc_test_util", |
||||
], |
||||
) |
@ -0,0 +1,481 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/impl/codegen/port_platform.h> |
||||
|
||||
#include <set> |
||||
#include <vector> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_data_store.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
using ::grpc::load_reporter::CallMetricValue; |
||||
using ::grpc::load_reporter::LoadDataStore; |
||||
using ::grpc::load_reporter::LoadRecordKey; |
||||
using ::grpc::load_reporter::LoadRecordValue; |
||||
using ::grpc::load_reporter::PerBalancerStore; |
||||
using ::grpc::load_reporter::kInvalidLbId; |
||||
|
||||
class LoadDataStoreTest : public ::testing::Test { |
||||
public: |
||||
LoadDataStoreTest() |
||||
: kKey1(kLbId1, kLbTag1, kUser1, kClientIp1), |
||||
kKey2(kLbId2, kLbTag2, kUser2, kClientIp2) {} |
||||
|
||||
// Check whether per_balancer_stores contains a store which was originally
|
||||
// created for <hostname, lb_id, and load_key>.
|
||||
bool PerBalancerStoresContains( |
||||
const LoadDataStore& load_data_store, |
||||
const std::set<PerBalancerStore*>* per_balancer_stores, |
||||
const grpc::string hostname, const grpc::string lb_id, |
||||
const grpc::string load_key) { |
||||
auto original_per_balancer_store = |
||||
load_data_store.FindPerBalancerStore(hostname, lb_id); |
||||
EXPECT_NE(original_per_balancer_store, nullptr); |
||||
EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id); |
||||
EXPECT_EQ(original_per_balancer_store->load_key(), load_key); |
||||
for (auto per_balancer_store : *per_balancer_stores) { |
||||
if (per_balancer_store == original_per_balancer_store) { |
||||
return true; |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
grpc::string FormatLbId(size_t index) { |
||||
return "kLbId" + std::to_string(index); |
||||
} |
||||
|
||||
const grpc::string kHostname1 = "kHostname1"; |
||||
const grpc::string kHostname2 = "kHostname2"; |
||||
const grpc::string kLbId1 = "kLbId1"; |
||||
const grpc::string kLbId2 = "kLbId2"; |
||||
const grpc::string kLbId3 = "kLbId3"; |
||||
const grpc::string kLbId4 = "kLbId4"; |
||||
const grpc::string kLoadKey1 = "kLoadKey1"; |
||||
const grpc::string kLoadKey2 = "kLoadKey2"; |
||||
const grpc::string kLbTag1 = "kLbTag1"; |
||||
const grpc::string kLbTag2 = "kLbTag2"; |
||||
const grpc::string kUser1 = "kUser1"; |
||||
const grpc::string kUser2 = "kUser2"; |
||||
const grpc::string kClientIp1 = "00"; |
||||
const grpc::string kClientIp2 = "02"; |
||||
const grpc::string kMetric1 = "kMetric1"; |
||||
const grpc::string kMetric2 = "kMetric2"; |
||||
const LoadRecordKey kKey1; |
||||
const LoadRecordKey kKey2; |
||||
}; |
||||
|
||||
using PerBalancerStoreTest = LoadDataStoreTest; |
||||
|
||||
TEST_F(LoadDataStoreTest, AssignToSelf) { |
||||
LoadDataStore load_data_store; |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1); |
||||
auto assigned_stores = load_data_store.GetAssignedStores(kHostname1, kLbId1); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores, |
||||
kHostname1, kLbId1, kLoadKey1)); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, ReassignOrphanStores) { |
||||
LoadDataStore load_data_store; |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1); |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId2, kLoadKey1); |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2); |
||||
load_data_store.ReportStreamCreated(kHostname2, kLbId4, kLoadKey1); |
||||
// 1. Close the second stream.
|
||||
load_data_store.ReportStreamClosed(kHostname1, kLbId2); |
||||
auto assigned_to_lb_id_1 = |
||||
load_data_store.GetAssignedStores(kHostname1, kLbId1); |
||||
// The orphaned store is re-assigned to kLbId1 with the same load key.
|
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1, |
||||
kHostname1, kLbId1, kLoadKey1)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1, |
||||
kHostname1, kLbId2, kLoadKey1)); |
||||
// 2. Close the first stream.
|
||||
load_data_store.ReportStreamClosed(kHostname1, kLbId1); |
||||
auto assigned_to_lb_id_3 = |
||||
load_data_store.GetAssignedStores(kHostname1, kLbId3); |
||||
// The orphaned stores are re-assigned to kLbId3 with the same host,
|
||||
// because there isn't any LB with the same load key.
|
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kLbId1, kLoadKey1)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kLbId2, kLoadKey1)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kLbId3, kLoadKey2)); |
||||
// 3. Close the third stream.
|
||||
load_data_store.ReportStreamClosed(kHostname1, kLbId3); |
||||
auto assigned_to_lb_id_4 = |
||||
load_data_store.GetAssignedStores(kHostname2, kLbId4); |
||||
// There is no active LB for the first host now. kLbId4 is active but
|
||||
// it's for the second host, so it wll NOT adopt the orphaned stores.
|
||||
EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4, |
||||
kHostname1, kLbId1, kLoadKey1)); |
||||
EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4, |
||||
kHostname1, kLbId2, kLoadKey1)); |
||||
EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4, |
||||
kHostname1, kLbId3, kLoadKey2)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4, |
||||
kHostname2, kLbId4, kLoadKey1)); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) { |
||||
LoadDataStore load_data_store; |
||||
std::set<grpc::string> active_lb_ids; |
||||
size_t num_lb_ids = 1000; |
||||
for (size_t i = 0; i < num_lb_ids; ++i) { |
||||
load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1); |
||||
active_lb_ids.insert(FormatLbId(i)); |
||||
} |
||||
grpc::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids); |
||||
load_data_store.ReportStreamClosed(kHostname1, orphaned_lb_id); |
||||
active_lb_ids.erase(orphaned_lb_id); |
||||
// Find which LB is assigned the orphaned store.
|
||||
grpc::string assigned_lb_id = ""; |
||||
for (auto lb_id : active_lb_ids) { |
||||
if (PerBalancerStoresContains( |
||||
load_data_store, |
||||
load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1, |
||||
orphaned_lb_id, kLoadKey1)) { |
||||
assigned_lb_id = lb_id; |
||||
break; |
||||
} |
||||
} |
||||
EXPECT_STRNE(assigned_lb_id.c_str(), ""); |
||||
// Close 10 more stream, skipping the assigned_lb_id. The assignment of
|
||||
// orphaned_lb_id shouldn't change.
|
||||
for (size_t _ = 0; _ < 10; ++_) { |
||||
grpc::string lb_id_to_close = ""; |
||||
for (auto lb_id : active_lb_ids) { |
||||
if (lb_id != assigned_lb_id) { |
||||
lb_id_to_close = lb_id; |
||||
break; |
||||
} |
||||
} |
||||
EXPECT_STRNE(lb_id_to_close.c_str(), ""); |
||||
load_data_store.ReportStreamClosed(kHostname1, lb_id_to_close); |
||||
active_lb_ids.erase(lb_id_to_close); |
||||
EXPECT_TRUE(PerBalancerStoresContains( |
||||
load_data_store, |
||||
load_data_store.GetAssignedStores(kHostname1, assigned_lb_id), |
||||
kHostname1, orphaned_lb_id, kLoadKey1)); |
||||
} |
||||
// Close the assigned_lb_id, orphaned_lb_id will be re-assigned again.
|
||||
load_data_store.ReportStreamClosed(kHostname1, assigned_lb_id); |
||||
active_lb_ids.erase(assigned_lb_id); |
||||
size_t orphaned_lb_id_occurences = 0; |
||||
for (auto lb_id : active_lb_ids) { |
||||
if (PerBalancerStoresContains( |
||||
load_data_store, |
||||
load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1, |
||||
orphaned_lb_id, kLoadKey1)) { |
||||
orphaned_lb_id_occurences++; |
||||
} |
||||
} |
||||
EXPECT_EQ(orphaned_lb_id_occurences, 1U); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) { |
||||
LoadDataStore load_data_store; |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1); |
||||
load_data_store.ReportStreamCreated(kHostname2, kLbId2, kLoadKey1); |
||||
auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1); |
||||
auto store_invalid_lb_id_1 = |
||||
load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId); |
||||
EXPECT_FALSE(store_lb_id_1->IsSuspended()); |
||||
EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended()); |
||||
// Disconnect all the streams of the first host.
|
||||
load_data_store.ReportStreamClosed(kHostname1, kLbId1); |
||||
// All the streams of that host are suspended.
|
||||
EXPECT_TRUE(store_lb_id_1->IsSuspended()); |
||||
EXPECT_TRUE(store_invalid_lb_id_1->IsSuspended()); |
||||
// Detailed load data won't be kept when the PerBalancerStore is suspended.
|
||||
store_lb_id_1->MergeRow(kKey1, LoadRecordValue()); |
||||
store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue()); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U); |
||||
EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U); |
||||
// The stores for different hosts won't mix, even if the load key is the same.
|
||||
auto assigned_to_lb_id_2 = |
||||
load_data_store.GetAssignedStores(kHostname2, kLbId2); |
||||
EXPECT_EQ(assigned_to_lb_id_2->size(), 2U); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2, |
||||
kHostname2, kLbId2, kLoadKey1)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2, |
||||
kHostname2, kInvalidLbId, "")); |
||||
// A new stream is created for the first host.
|
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2); |
||||
// The stores for the first host are resumed.
|
||||
EXPECT_FALSE(store_lb_id_1->IsSuspended()); |
||||
EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended()); |
||||
store_lb_id_1->MergeRow(kKey1, LoadRecordValue()); |
||||
store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue()); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U); |
||||
EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U); |
||||
// The resumed stores are assigned to the new LB.
|
||||
auto assigned_to_lb_id_3 = |
||||
load_data_store.GetAssignedStores(kHostname1, kLbId3); |
||||
EXPECT_EQ(assigned_to_lb_id_3->size(), 3U); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kLbId1, kLoadKey1)); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kInvalidLbId, "")); |
||||
EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3, |
||||
kHostname1, kLbId3, kLoadKey2)); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, OneStorePerLbId) { |
||||
LoadDataStore load_data_store; |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId1), nullptr); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId), |
||||
nullptr); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr); |
||||
// Create The first stream.
|
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1); |
||||
auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1); |
||||
auto store_invalid_lb_id_1 = |
||||
load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId); |
||||
// Two stores will be created: one is for the stream; the other one is for
|
||||
// kInvalidLbId.
|
||||
EXPECT_NE(store_lb_id_1, nullptr); |
||||
EXPECT_NE(store_invalid_lb_id_1, nullptr); |
||||
EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr); |
||||
// Create the second stream.
|
||||
load_data_store.ReportStreamCreated(kHostname2, kLbId3, kLoadKey1); |
||||
auto store_lb_id_3 = load_data_store.FindPerBalancerStore(kHostname2, kLbId3); |
||||
auto store_invalid_lb_id_2 = |
||||
load_data_store.FindPerBalancerStore(kHostname2, kInvalidLbId); |
||||
EXPECT_NE(store_lb_id_3, nullptr); |
||||
EXPECT_NE(store_invalid_lb_id_2, nullptr); |
||||
EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2); |
||||
// The PerBalancerStores created for different hosts are independent.
|
||||
EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1); |
||||
EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) { |
||||
LoadDataStore load_data_store; |
||||
size_t num_create = 100; |
||||
size_t num_close = 50; |
||||
for (size_t i = 0; i < num_create; ++i) { |
||||
load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1); |
||||
} |
||||
for (size_t i = 0; i < num_close; ++i) { |
||||
load_data_store.ReportStreamClosed(kHostname1, FormatLbId(i)); |
||||
} |
||||
std::set<grpc::string> reported_lb_ids; |
||||
for (size_t i = num_close; i < num_create; ++i) { |
||||
for (auto assigned_store : |
||||
*load_data_store.GetAssignedStores(kHostname1, FormatLbId(i))) { |
||||
EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second); |
||||
} |
||||
} |
||||
// Add one for kInvalidLbId.
|
||||
EXPECT_EQ(reported_lb_ids.size(), (num_create + 1)); |
||||
EXPECT_NE(reported_lb_ids.find(kInvalidLbId), reported_lb_ids.end()); |
||||
} |
||||
|
||||
TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) { |
||||
LoadDataStore load_data_store; |
||||
load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1); |
||||
// Merge data for a known LB ID.
|
||||
LoadRecordValue v1(192); |
||||
load_data_store.MergeRow(kHostname1, kKey1, v1); |
||||
// Merge data for unknown LB ID.
|
||||
LoadRecordValue v2(23); |
||||
EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2)); |
||||
load_data_store.MergeRow( |
||||
kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v2); |
||||
EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId2)); |
||||
LoadRecordValue v3(952); |
||||
load_data_store.MergeRow( |
||||
kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v3); |
||||
EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3)); |
||||
// The data kept for a known LB ID is correct.
|
||||
auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(), |
||||
v1.start_count()); |
||||
EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count()); |
||||
// No PerBalancerStore created for Unknown LB ID.
|
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId2), nullptr); |
||||
EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr); |
||||
// End all the started RPCs for kLbId1.
|
||||
LoadRecordValue v4(0, v1.start_count()); |
||||
load_data_store.MergeRow(kHostname1, kKey1, v4); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(), |
||||
v1.start_count()); |
||||
EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.ok_count(), |
||||
v4.ok_count()); |
||||
EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U); |
||||
EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId1)); |
||||
// End all the started RPCs for kLbId2.
|
||||
LoadRecordValue v5(0, v2.start_count()); |
||||
load_data_store.MergeRow( |
||||
kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v5); |
||||
EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2)); |
||||
// End some of the started RPCs for kLbId3.
|
||||
LoadRecordValue v6(0, v3.start_count() / 2); |
||||
load_data_store.MergeRow( |
||||
kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v6); |
||||
EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3)); |
||||
} |
||||
|
||||
TEST_F(PerBalancerStoreTest, Suspend) { |
||||
PerBalancerStore per_balancer_store(kLbId1, kLoadKey1); |
||||
EXPECT_FALSE(per_balancer_store.IsSuspended()); |
||||
// Suspend the store.
|
||||
per_balancer_store.Suspend(); |
||||
EXPECT_TRUE(per_balancer_store.IsSuspended()); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Data merged when the store is suspended won't be kept.
|
||||
LoadRecordValue v1(139, 19); |
||||
per_balancer_store.MergeRow(kKey1, v1); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Resume the store.
|
||||
per_balancer_store.Resume(); |
||||
EXPECT_FALSE(per_balancer_store.IsSuspended()); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Data merged after the store is resumed will be kept.
|
||||
LoadRecordValue v2(23, 0, 51); |
||||
per_balancer_store.MergeRow(kKey1, v2); |
||||
EXPECT_EQ(1U, per_balancer_store.load_record_map().size()); |
||||
// Suspend the store.
|
||||
per_balancer_store.Suspend(); |
||||
EXPECT_TRUE(per_balancer_store.IsSuspended()); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Data merged when the store is suspended won't be kept.
|
||||
LoadRecordValue v3(62, 11); |
||||
per_balancer_store.MergeRow(kKey1, v3); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Resume the store.
|
||||
per_balancer_store.Resume(); |
||||
EXPECT_FALSE(per_balancer_store.IsSuspended()); |
||||
EXPECT_EQ(0U, per_balancer_store.load_record_map().size()); |
||||
// Data merged after the store is resumed will be kept.
|
||||
LoadRecordValue v4(225, 98); |
||||
per_balancer_store.MergeRow(kKey1, v4); |
||||
EXPECT_EQ(1U, per_balancer_store.load_record_map().size()); |
||||
// In-progress count is always kept.
|
||||
EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(), |
||||
v1.start_count() - v1.ok_count() + v2.start_count() - |
||||
v2.error_count() + v3.start_count() - v3.ok_count() + |
||||
v4.start_count() - v4.ok_count()); |
||||
} |
||||
|
||||
TEST_F(PerBalancerStoreTest, DataAggregation) { |
||||
PerBalancerStore per_balancer_store(kLbId1, kLoadKey1); |
||||
// Construct some Values.
|
||||
LoadRecordValue v1(992, 34, 13, 234.0, 164.0, 173467.38); |
||||
v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2)); |
||||
LoadRecordValue v2(4842, 213, 9, 393.0, 974.0, 1345.2398); |
||||
v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234)); |
||||
v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08)); |
||||
// v3 doesn't change the number of in-progress RPCs.
|
||||
LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772); |
||||
v3.InsertCallMetric(kMetric1, CallMetricValue(61, 3465.0)); |
||||
v3.InsertCallMetric(kMetric2, CallMetricValue(13, 672.0)); |
||||
// The initial state of the store.
|
||||
uint64_t num_calls_in_progress = 0; |
||||
EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(), |
||||
num_calls_in_progress); |
||||
// Merge v1 and get report of the number of in-progress calls.
|
||||
per_balancer_store.MergeRow(kKey1, v1); |
||||
EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(), |
||||
num_calls_in_progress += |
||||
(v1.start_count() - v1.ok_count() - v1.error_count())); |
||||
EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
// Merge v2 and get report of the number of in-progress calls.
|
||||
per_balancer_store.MergeRow(kKey2, v2); |
||||
EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(), |
||||
num_calls_in_progress += |
||||
(v2.start_count() - v2.ok_count() - v2.error_count())); |
||||
EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
// Merge v3 and get report of the number of in-progress calls.
|
||||
per_balancer_store.MergeRow(kKey1, v3); |
||||
EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport()); |
||||
EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(), |
||||
num_calls_in_progress); |
||||
// LoadRecordValue for kKey1 is aggregated correctly.
|
||||
LoadRecordValue value_for_key1 = |
||||
per_balancer_store.load_record_map().find(kKey1)->second; |
||||
EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count()); |
||||
EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count()); |
||||
EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count()); |
||||
EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent()); |
||||
EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv()); |
||||
EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms()); |
||||
EXPECT_EQ(value_for_key1.call_metrics().size(), 2U); |
||||
EXPECT_EQ(value_for_key1.call_metrics().find(kMetric1)->second.num_calls(), |
||||
v1.call_metrics().find(kMetric1)->second.num_calls() + |
||||
v3.call_metrics().find(kMetric1)->second.num_calls()); |
||||
EXPECT_EQ( |
||||
value_for_key1.call_metrics().find(kMetric1)->second.total_metric_value(), |
||||
v1.call_metrics().find(kMetric1)->second.total_metric_value() + |
||||
v3.call_metrics().find(kMetric1)->second.total_metric_value()); |
||||
EXPECT_EQ(value_for_key1.call_metrics().find(kMetric2)->second.num_calls(), |
||||
v3.call_metrics().find(kMetric2)->second.num_calls()); |
||||
EXPECT_EQ( |
||||
value_for_key1.call_metrics().find(kMetric2)->second.total_metric_value(), |
||||
v3.call_metrics().find(kMetric2)->second.total_metric_value()); |
||||
// LoadRecordValue for kKey2 is aggregated (trivially) correctly.
|
||||
LoadRecordValue value_for_key2 = |
||||
per_balancer_store.load_record_map().find(kKey2)->second; |
||||
EXPECT_EQ(value_for_key2.start_count(), v2.start_count()); |
||||
EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count()); |
||||
EXPECT_EQ(value_for_key2.error_count(), v2.error_count()); |
||||
EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent()); |
||||
EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv()); |
||||
EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms()); |
||||
EXPECT_EQ(value_for_key2.call_metrics().size(), 2U); |
||||
EXPECT_EQ(value_for_key2.call_metrics().find(kMetric1)->second.num_calls(), |
||||
v2.call_metrics().find(kMetric1)->second.num_calls()); |
||||
EXPECT_EQ( |
||||
value_for_key2.call_metrics().find(kMetric1)->second.total_metric_value(), |
||||
v2.call_metrics().find(kMetric1)->second.total_metric_value()); |
||||
EXPECT_EQ(value_for_key2.call_metrics().find(kMetric2)->second.num_calls(), |
||||
v2.call_metrics().find(kMetric2)->second.num_calls()); |
||||
EXPECT_EQ( |
||||
value_for_key2.call_metrics().find(kMetric2)->second.total_metric_value(), |
||||
v2.call_metrics().find(kMetric2)->second.total_metric_value()); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc_test_init(argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -0,0 +1,182 @@ |
||||
#!/usr/bin/env python |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Uploads RBE results to BigQuery""" |
||||
|
||||
import argparse |
||||
import os |
||||
import json |
||||
import sys |
||||
import urllib2 |
||||
import uuid |
||||
|
||||
gcp_utils_dir = os.path.abspath( |
||||
os.path.join(os.path.dirname(__file__), '../../gcp/utils')) |
||||
sys.path.append(gcp_utils_dir) |
||||
import big_query_utils |
||||
|
||||
_DATASET_ID = 'jenkins_test_results' |
||||
_DESCRIPTION = 'Test results from master RBE builds on Kokoro' |
||||
# 90 days in milliseconds |
||||
_EXPIRATION_MS = 90 * 24 * 60 * 60 * 1000 |
||||
_PARTITION_TYPE = 'DAY' |
||||
_PROJECT_ID = 'grpc-testing' |
||||
_RESULTS_SCHEMA = [ |
||||
('job_name', 'STRING', 'Name of Kokoro job'), |
||||
('build_id', 'INTEGER', 'Build ID of Kokoro job'), |
||||
('build_url', 'STRING', 'URL of Kokoro build'), |
||||
('test_target', 'STRING', 'Bazel target path'), |
||||
('test_case', 'STRING', 'Name of test case'), |
||||
('result', 'STRING', 'Test or build result'), |
||||
('timestamp', 'TIMESTAMP', 'Timestamp of test run'), |
||||
] |
||||
_TABLE_ID = 'rbe_test_results' |
||||
|
||||
|
||||
def _get_api_key(): |
||||
"""Returns string with API key to access ResultStore. |
||||
Intended to be used in Kokoro envrionment.""" |
||||
api_key_directory = os.getenv('KOKORO_GFILE_DIR') |
||||
api_key_file = os.path.join(api_key_directory, 'resultstore_api_key') |
||||
assert os.path.isfile(api_key_file), 'Must add --api_key arg if not on ' \ |
||||
'Kokoro or Kokoro envrionment is not set up properly.' |
||||
with open(api_key_file, 'r') as f: |
||||
return f.read().replace('\n', '') |
||||
|
||||
|
||||
def _get_invocation_id(): |
||||
"""Returns String of Bazel invocation ID. Intended to be used in |
||||
Kokoro envirionment.""" |
||||
bazel_id_directory = os.getenv('KOKORO_ARTIFACTS_DIR') |
||||
bazel_id_file = os.path.join(bazel_id_directory, 'bazel_invocation_ids') |
||||
assert os.path.isfile(bazel_id_file), 'bazel_invocation_ids file, written ' \ |
||||
'by bazel_wrapper.py, expected but not found.' |
||||
with open(bazel_id_file, 'r') as f: |
||||
return f.read().replace('\n', '') |
||||
|
||||
|
||||
def _upload_results_to_bq(rows): |
||||
"""Upload test results to a BQ table. |
||||
|
||||
Args: |
||||
rows: A list of dictionaries containing data for each row to insert |
||||
""" |
||||
bq = big_query_utils.create_big_query() |
||||
big_query_utils.create_partitioned_table( |
||||
bq, |
||||
_PROJECT_ID, |
||||
_DATASET_ID, |
||||
_TABLE_ID, |
||||
_RESULTS_SCHEMA, |
||||
_DESCRIPTION, |
||||
partition_type=_PARTITION_TYPE, |
||||
expiration_ms=_EXPIRATION_MS) |
||||
|
||||
max_retries = 3 |
||||
for attempt in range(max_retries): |
||||
if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, _TABLE_ID, |
||||
rows): |
||||
break |
||||
else: |
||||
if attempt < max_retries - 1: |
||||
print('Error uploading result to bigquery, will retry.') |
||||
else: |
||||
print( |
||||
'Error uploading result to bigquery, all attempts failed.') |
||||
sys.exit(1) |
||||
|
||||
|
||||
def _get_resultstore_data(api_key, invocation_id): |
||||
"""Returns dictionary of test results by querying ResultStore API. |
||||
Args: |
||||
api_key: String of ResultStore API key |
||||
invocation_id: String of ResultStore invocation ID to results from |
||||
""" |
||||
all_actions = [] |
||||
page_token = '' |
||||
# ResultStore's API returns data on a limited number of tests. When we exceed |
||||
# that limit, the 'nextPageToken' field is included in the request to get |
||||
# subsequent data, so keep requesting until 'nextPageToken' field is omitted. |
||||
while True: |
||||
req = urllib2.Request( |
||||
url= |
||||
'https://resultstore.googleapis.com/v2/invocations/%s/targets/-/configuredTargets/-/actions?key=%s&pageToken=%s' |
||||
% (invocation_id, api_key, page_token), |
||||
headers={ |
||||
'Content-Type': 'application/json' |
||||
}) |
||||
results = json.loads(urllib2.urlopen(req).read()) |
||||
all_actions.extend(results['actions']) |
||||
if 'nextPageToken' not in results: |
||||
break |
||||
page_token = results['nextPageToken'] |
||||
return all_actions |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
# Arguments are necessary if running in a non-Kokoro envrionment. |
||||
argp = argparse.ArgumentParser(description='Upload RBE results.') |
||||
argp.add_argument('--api_key', default='', type=str) |
||||
argp.add_argument('--invocation_id', default='', type=str) |
||||
args = argp.parse_args() |
||||
|
||||
api_key = args.api_key or _get_api_key() |
||||
invocation_id = args.invocation_id or _get_invocation_id() |
||||
resultstore_actions = _get_resultstore_data(api_key, invocation_id) |
||||
|
||||
bq_rows = [] |
||||
for action in resultstore_actions: |
||||
# Filter out non-test related data, such as build results. |
||||
if 'testAction' not in action: |
||||
continue |
||||
# Some test results contain the fileProcessingErrors field, which indicates |
||||
# an issue with parsing results individual test cases. |
||||
if 'fileProcessingErrors' in action: |
||||
test_cases = [{ |
||||
'testCase': { |
||||
'caseName': str(action['id']['actionId']), |
||||
'result': str(action['statusAttributes']['status']) |
||||
} |
||||
}] |
||||
else: |
||||
test_cases = action['testAction']['testSuite']['tests'][0][ |
||||
'testSuite']['tests'] |
||||
for test_case in test_cases: |
||||
if 'errors' in test_case['testCase']: |
||||
result = 'FAILED' |
||||
else: |
||||
result = 'PASSED' |
||||
bq_rows.append({ |
||||
'insertId': str(uuid.uuid4()), |
||||
'json': { |
||||
'job_name': |
||||
os.getenv('KOKORO_JOB_NAME'), |
||||
'build_id': |
||||
os.getenv('KOKORO_BUILD_NUMBER'), |
||||
'build_url': |
||||
'https://sponge.corp.google.com/invocation?id=%s' % |
||||
os.getenv('KOKORO_BUILD_ID'), |
||||
'test_target': |
||||
action['id']['targetId'], |
||||
'test_case': |
||||
test_case['testCase']['caseName'], |
||||
'result': |
||||
result, |
||||
'timestamp': |
||||
action['timing']['startTime'], |
||||
} |
||||
}) |
||||
# BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. |
||||
for i in range((len(bq_rows) / 1000) + 1): |
||||
_upload_results_to_bq(bq_rows[i * 1000:(i + 1) * 1000]) |
Loading…
Reference in new issue