|
|
|
@ -328,12 +328,10 @@ def get_client_accumulated_stats(): |
|
|
|
|
request = messages_pb2.LoadBalancerAccumulatedStatsRequest() |
|
|
|
|
logger.debug('Invoking GetClientAccumulatedStats RPC to %s:%d:', |
|
|
|
|
host, args.stats_port) |
|
|
|
|
response = stub.GetClientAccumulatedStats(request, |
|
|
|
|
wait_for_ready=True, |
|
|
|
|
timeout=_CONNECTION_TIMEOUT_SEC) |
|
|
|
|
response = stub.GetClientAccumulatedStats( |
|
|
|
|
request, wait_for_ready=True, timeout=_CONNECTION_TIMEOUT_SEC) |
|
|
|
|
logger.debug('Invoked GetClientAccumulatedStats RPC to %s: %s', |
|
|
|
|
host, |
|
|
|
|
response) |
|
|
|
|
host, response) |
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -353,14 +351,14 @@ def configure_client(rpc_types, metadata): |
|
|
|
|
md.type = rpc_type |
|
|
|
|
md.key = md_key |
|
|
|
|
md.value = md_value |
|
|
|
|
logger.debug('Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', |
|
|
|
|
host, |
|
|
|
|
args.stats_port, |
|
|
|
|
request) |
|
|
|
|
logger.debug( |
|
|
|
|
'Invoking XdsUpdateClientConfigureService RPC to %s:%d: %s', |
|
|
|
|
host, args.stats_port, request) |
|
|
|
|
stub.Configure(request, |
|
|
|
|
wait_for_ready=True, |
|
|
|
|
timeout=_CONNECTION_TIMEOUT_SEC) |
|
|
|
|
logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', host) |
|
|
|
|
logger.debug('Invoked XdsUpdateClientConfigureService RPC to %s', |
|
|
|
|
host) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RpcDistributionError(Exception): |
|
|
|
@ -426,12 +424,11 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): |
|
|
|
|
threshold_fraction = threshold / 100.0 |
|
|
|
|
start_time = time.time() |
|
|
|
|
error_msg = None |
|
|
|
|
logger.debug('Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' |
|
|
|
|
% (timeout_sec, num_rpcs, rpc_type, threshold)) |
|
|
|
|
logger.debug( |
|
|
|
|
'Waiting for %d sec until %d %s RPCs (with %d%% tolerance) in-flight' % |
|
|
|
|
(timeout_sec, num_rpcs, rpc_type, threshold)) |
|
|
|
|
while time.time() - start_time <= timeout_sec: |
|
|
|
|
error_msg = _check_rpcs_in_flight(rpc_type, |
|
|
|
|
num_rpcs, |
|
|
|
|
threshold, |
|
|
|
|
error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, |
|
|
|
|
threshold_fraction) |
|
|
|
|
if error_msg: |
|
|
|
|
time.sleep(2) |
|
|
|
@ -440,13 +437,11 @@ def wait_until_rpcs_in_flight(rpc_type, timeout_sec, num_rpcs, threshold): |
|
|
|
|
# Ensure the number of outstanding RPCs is stable. |
|
|
|
|
if not error_msg: |
|
|
|
|
time.sleep(5) |
|
|
|
|
error_msg = _check_rpcs_in_flight(rpc_type, |
|
|
|
|
num_rpcs, |
|
|
|
|
threshold, |
|
|
|
|
error_msg = _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, |
|
|
|
|
threshold_fraction) |
|
|
|
|
if error_msg: |
|
|
|
|
raise Exception("Wrong number of %s RPCs in-flight: %s" |
|
|
|
|
% (rpc_type, error_msg)) |
|
|
|
|
raise Exception("Wrong number of %s RPCs in-flight: %s" % |
|
|
|
|
(rpc_type, error_msg)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_rpcs_in_flight(rpc_type, num_rpcs, threshold, threshold_fraction): |
|
|
|
@ -1082,11 +1077,8 @@ def test_header_matching(gcp, original_backend_service, instance_group, |
|
|
|
|
patch_backend_service(gcp, alternate_backend_service, []) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_circuit_breaking(gcp, |
|
|
|
|
original_backend_service, |
|
|
|
|
instance_group, |
|
|
|
|
alternate_backend_service, |
|
|
|
|
same_zone_instance_group): |
|
|
|
|
def test_circuit_breaking(gcp, original_backend_service, instance_group, |
|
|
|
|
alternate_backend_service, same_zone_instance_group): |
|
|
|
|
logger.info('Running test_circuit_breaking') |
|
|
|
|
# The config validation for proxyless doesn't allow setting |
|
|
|
|
# circuit_breakers. Disable validate validate_for_proxyless |
|
|
|
@ -1096,18 +1088,14 @@ def test_circuit_breaking(gcp, |
|
|
|
|
set_validate_for_proxyless(gcp, False) |
|
|
|
|
original_backend_service_max_requests = 500 |
|
|
|
|
alternate_backend_service_max_requests = 1000 |
|
|
|
|
patch_backend_service(gcp, |
|
|
|
|
original_backend_service, |
|
|
|
|
[instance_group], |
|
|
|
|
circuit_breakers={ |
|
|
|
|
'maxRequests': |
|
|
|
|
original_backend_service_max_requests |
|
|
|
|
}) |
|
|
|
|
patch_backend_service( |
|
|
|
|
gcp, |
|
|
|
|
original_backend_service, [instance_group], |
|
|
|
|
circuit_breakers={'maxRequests': original_backend_service_max_requests}) |
|
|
|
|
logger.info('Waiting for original backends to become healthy') |
|
|
|
|
wait_for_healthy_backends(gcp, original_backend_service, instance_group) |
|
|
|
|
patch_backend_service(gcp, |
|
|
|
|
alternate_backend_service, |
|
|
|
|
[same_zone_instance_group], |
|
|
|
|
alternate_backend_service, [same_zone_instance_group], |
|
|
|
|
circuit_breakers={ |
|
|
|
|
'maxRequests': |
|
|
|
|
alternate_backend_service_max_requests |
|
|
|
@ -1116,7 +1104,8 @@ def test_circuit_breaking(gcp, |
|
|
|
|
wait_for_healthy_backends(gcp, alternate_backend_service, |
|
|
|
|
same_zone_instance_group) |
|
|
|
|
original_backend_instances = get_instance_names(gcp, instance_group) |
|
|
|
|
alternate_backend_instances = get_instance_names(gcp,same_zone_instance_group) |
|
|
|
|
alternate_backend_instances = get_instance_names(gcp, |
|
|
|
|
same_zone_instance_group) |
|
|
|
|
route_rules = [ |
|
|
|
|
{ |
|
|
|
|
'priority': 0, |
|
|
|
@ -1137,10 +1126,10 @@ def test_circuit_breaking(gcp, |
|
|
|
|
] |
|
|
|
|
try: |
|
|
|
|
# Make client send UNARY_CALL and EMPTY_CALL. |
|
|
|
|
configure_client( |
|
|
|
|
[messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], |
|
|
|
|
[]) |
|
|
|
|
configure_client([ |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL |
|
|
|
|
], []) |
|
|
|
|
logger.info('Patching url map with %s', route_rules) |
|
|
|
|
patch_url_map_backend_service(gcp, |
|
|
|
|
original_backend_service, |
|
|
|
@ -1151,10 +1140,10 @@ def test_circuit_breaking(gcp, |
|
|
|
|
_WAIT_FOR_STATS_SEC) |
|
|
|
|
|
|
|
|
|
# Make all calls keep-open. |
|
|
|
|
configure_client( |
|
|
|
|
[messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL], |
|
|
|
|
[(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
configure_client([ |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL |
|
|
|
|
], [(messages_pb2.ClientConfigureRequest.RpcType.UNARY_CALL, |
|
|
|
|
'rpc-behavior', 'keep-open'), |
|
|
|
|
(messages_pb2.ClientConfigureRequest.RpcType.EMPTY_CALL, |
|
|
|
|
'rpc-behavior', 'keep-open')]) |
|
|
|
@ -1162,29 +1151,26 @@ def test_circuit_breaking(gcp, |
|
|
|
|
'UNARY_CALL', |
|
|
|
|
(_WAIT_FOR_BACKEND_SEC + |
|
|
|
|
int(original_backend_service_max_requests / args.qps)), |
|
|
|
|
original_backend_service_max_requests, |
|
|
|
|
1) |
|
|
|
|
original_backend_service_max_requests, 1) |
|
|
|
|
wait_until_rpcs_in_flight( |
|
|
|
|
'EMPTY_CALL', |
|
|
|
|
(_WAIT_FOR_BACKEND_SEC + |
|
|
|
|
int(alternate_backend_service_max_requests / args.qps)), |
|
|
|
|
alternate_backend_service_max_requests, |
|
|
|
|
1) |
|
|
|
|
alternate_backend_service_max_requests, 1) |
|
|
|
|
|
|
|
|
|
# Increment circuit breakers max_requests threshold. |
|
|
|
|
original_backend_service_max_requests = 800 |
|
|
|
|
patch_backend_service(gcp, |
|
|
|
|
original_backend_service, |
|
|
|
|
[instance_group], |
|
|
|
|
original_backend_service, [instance_group], |
|
|
|
|
circuit_breakers={ |
|
|
|
|
'maxRequests': |
|
|
|
|
original_backend_service_max_requests}) |
|
|
|
|
original_backend_service_max_requests |
|
|
|
|
}) |
|
|
|
|
wait_until_rpcs_in_flight( |
|
|
|
|
'UNARY_CALL', |
|
|
|
|
(_WAIT_FOR_BACKEND_SEC + |
|
|
|
|
int(original_backend_service_max_requests / args.qps)), |
|
|
|
|
original_backend_service_max_requests, |
|
|
|
|
1) |
|
|
|
|
original_backend_service_max_requests, 1) |
|
|
|
|
finally: |
|
|
|
|
patch_url_map_backend_service(gcp, original_backend_service) |
|
|
|
|
patch_backend_service(gcp, original_backend_service, [instance_group]) |
|
|
|
@ -2103,8 +2089,7 @@ try: |
|
|
|
|
alternate_backend_service, |
|
|
|
|
same_zone_instance_group) |
|
|
|
|
elif test_case == 'circuit_breaking': |
|
|
|
|
test_circuit_breaking(gcp, backend_service, |
|
|
|
|
instance_group, |
|
|
|
|
test_circuit_breaking(gcp, backend_service, instance_group, |
|
|
|
|
alternate_backend_service, |
|
|
|
|
same_zone_instance_group) |
|
|
|
|
else: |
|
|
|
|