@ -30,7 +30,7 @@ import json
import os
import string
import sys
from typing import Any , Dict , Iterable , Mapping , Optional , Type
from typing import Any , Callable , Dict , Iterable , List , Mapping , Optional , Type
import yaml
@ -118,17 +118,79 @@ def gen_run_indices(runs_per_test: int) -> Iterable[str]:
yield index_fmt . format ( i )
def scenario_name ( base_name : str , client_channels : Optional [ int ] ,
server_threads : Optional [ int ] , offered_load : Optional [ int ] ) :
""" Constructs scenario name from base name and modifiers. """
elements = [ base_name ]
if client_channels :
elements . append ( ' {:d} channels ' . format ( client_channels ) )
if server_threads :
elements . append ( ' {:d} threads ' . format ( server_threads ) )
if offered_load :
elements . append ( ' {:d} offered_load ' . format ( offered_load ) )
return ' _ ' . join ( elements )
def scenario_transform_function (
client_channels : Optional [ int ] , server_threads : Optional [ int ] ,
offered_loads : Optional [ Iterable [ int ] ]
) - > Optional [ Callable [ [ Iterable [ Mapping [ str , Any ] ] ] , Iterable [ Mapping [ str ,
Any ] ] ] ] :
""" Returns a transform to be applied to a list of scenarios. """
if not any ( ( client_channels , server_threads , len ( offered_loads ) ) ) :
return lambda s : s
def _transform (
scenarios : Iterable [ Mapping [ str ,
Any ] ] ) - > Iterable [ Mapping [ str , Any ] ] :
""" Transforms scenarios by inserting num of client channels, number of async_server_threads and offered_load. """
for base_scenario in scenarios :
base_name = base_scenario [ ' name ' ]
if client_channels :
base_scenario [ ' client_config ' ] [
' client_channels ' ] = client_channels
if server_threads :
base_scenario [ ' server_config ' ] [
' async_server_threads ' ] = server_threads
if not offered_loads :
base_scenario [ ' name ' ] = scenario_name ( base_name ,
client_channels ,
server_threads , 0 )
yield base_scenario
return
for offered_load in offered_loads :
scenario = copy . deepcopy ( base_scenario )
scenario [ ' client_config ' ] [ ' load_params ' ] = {
' poisson ' : {
' offered_load ' : offered_load
}
}
scenario [ ' name ' ] = scenario_name ( base_name , client_channels ,
server_threads , offered_load )
yield scenario
return _transform
def gen_loadtest_configs (
base_config : Mapping [ str , Any ] ,
base_config_clients : Iterable [ Mapping [ str , Any ] ] ,
base_config_servers : Iterable [ Mapping [ str , Any ] ] ,
scenario_name_regex : str ,
language_config : scenario_config_exporter . LanguageConfig ,
loadtest_name_prefix : str ,
uniquifier_elements : Iterable [ str ] ,
annotations : Mapping [ str , str ] ,
instances_per_client : int = 1 ,
runs_per_test : int = 1 ) - > Iterable [ Dict [ str , Any ] ] :
base_config : Mapping [ str , Any ] ,
base_config_clients : Iterable [ Mapping [ str , Any ] ] ,
base_config_servers : Iterable [ Mapping [ str , Any ] ] ,
scenario_name_regex : str ,
language_config : scenario_config_exporter . LanguageConfig ,
loadtest_name_prefix : str ,
uniquifier_elements : Iterable [ str ] ,
annotations : Mapping [ str , str ] ,
instances_per_client : int = 1 ,
runs_per_test : int = 1 ,
scenario_transform : Callable [ [ Iterable [ Mapping [ str , Any ] ] ] ,
List [ Dict [ str , Any ] ] ] = lambda s : s
) - > Iterable [ Dict [ str , Any ] ] :
""" Generates LoadTest configurations for a given language config.
The LoadTest configurations are generated as YAML objects .
@ -142,8 +204,10 @@ def gen_loadtest_configs(
category = language_config . category ,
client_language = language_config . client_language ,
server_language = language_config . server_language )
scenarios = scenario_config_exporter . gen_scenarios ( language_config . language ,
scenario_filter )
scenarios = scenario_transform (
scenario_config_exporter . gen_scenarios ( language_config . language ,
scenario_filter ) )
for scenario in scenarios :
for run_index in gen_run_indices ( runs_per_test ) :
@ -349,7 +413,7 @@ def main() -> None:
help = ' Regex to select scenarios to run. ' )
argp . add_argument (
' --category ' ,
choices = [ ' all ' , ' inproc ' , ' scalable ' , ' smoketest ' , ' sweep ' ] ,
choices = [ ' all ' , ' inproc ' , ' scalable ' , ' smoketest ' , ' sweep ' , ' psm ' ] ,
default = ' all ' ,
help = ' Select a category of tests to run. ' )
argp . add_argument (
@ -378,6 +442,19 @@ def main() -> None:
' --output ' ,
type = str ,
help = ' Output file name. Output to stdout if not set. ' )
argp . add_argument ( ' --client_channels ' ,
type = int ,
help = ' Number of client channels. ' )
argp . add_argument ( ' --server_threads ' ,
type = int ,
help = ' Number of async server threads. ' )
argp . add_argument (
' --offered_loads ' ,
nargs = " * " ,
type = int ,
default = [ ] ,
help = ' A list of QPS values at which each load test scenario will be run. '
)
args = argp . parse_args ( )
if args . instances_per_client < 1 :
@ -404,6 +481,10 @@ def main() -> None:
annotations = parse_key_value_args ( args . annotations )
transform = scenario_transform_function ( args . client_channels ,
args . server_threads ,
args . offered_loads )
with open ( args . template ) as f :
base_config = yaml . safe_load (
string . Template ( f . read ( ) ) . substitute ( substitutions ) )
@ -436,7 +517,8 @@ def main() -> None:
uniquifier_elements = uniquifier_elements ,
annotations = annotations ,
instances_per_client = args . instances_per_client ,
runs_per_test = args . runs_per_test ) )
runs_per_test = args . runs_per_test ,
scenario_transform = transform ) )
configs = ( config for config in itertools . chain ( * config_generators ) )
with open ( args . output , ' w ' ) if args . output else sys . stdout as f :