diff --git a/tools/run_tests/performance/prometheus.py b/tools/run_tests/performance/prometheus.py new file mode 100644 index 00000000000..810f318de94 --- /dev/null +++ b/tools/run_tests/performance/prometheus.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 + +# Copyright 2022 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# A script to fetch total cpu seconds and memory data from prometheus. +# example usage: python3 prometheus.py +# --url=http://prometheus.prometheus.svc.cluster.local:9090 +# --pod_type=driver --pod_type=clients --container_name=main +# --container_name=sidecar +"""Perform Prometheus range queries to obtain cpu and memory data. + +This module performs range queries through Prometheus API to obtain +total cpu seconds and memory during a test run for given container +in given pods. The cpu data obtained is total cpu second used within +given period of time. The memory data was instant memory usage at +the query time. +""" +import argparse +import json +import logging +import statistics +from typing import Any, Dict, List + +from dateutil import parser +import requests + + +class Prometheus: + """Objects which holds the start time, end time and query URL.""" + + def __init__( + self, + url: str, + start: str, + end: str, + ): + self.url = url + self.start = start + self.end = end + + def _fetch_by_query(self, query: str) -> Dict[str, Any]: + """Fetches the given query with time range. + + Fetch the given query within a time range. The pulling + interval is every 5s, the actual data from the query is + a time series. + """ + resp = requests.get( + self.url + '/api/v1/query_range', + { + 'query': query, + 'start': self.start, + 'end': self.end, + 'step': 5 + }, + ) + resp.raise_for_status() + return resp.json() + + def _fetch_cpu_for_pod(self, container_matcher: str, + pod_name: str) -> Dict[str, List[float]]: + """Fetches the cpu data for each pod. + + Fetch total cpu seconds during the time range specified in the Prometheus instance + for a pod. After obtain the cpu seconds, the data are trimmed from time series to + a data list and saved in the Dict that keyed by the container names. + + Args: + container_matcher: A string consist one or more container name separated by |. + """ + query = ( + 'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="' + + pod_name + '",container=' + container_matcher + '}') + logging.debug('running prometheus query for cpu: %s', query) + cpu_data = self._fetch_by_query(query) + logging.debug('raw cpu data: %s', str(cpu_data)) + cpu_container_name_to_data_list = get_data_list_from_timeseries( + cpu_data) + return cpu_container_name_to_data_list + + def _fetch_memory_for_pod(self, container_matcher: str, + pod_name: str) -> Dict[str, List[float]]: + """Fetches memory data for each pod. + + Fetch total memory data during the time range specified in the Prometheus instance + for a pod. After obtain the memory data, the data are trimmed from time series to + a data list and saved in the Dict that keyed by the container names. + + Args: + container_matcher: A string consist one or more container name separated by |. + """ + query = ( + 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' + + pod_name + '",container=' + container_matcher + "}") + + logging.debug('running prometheus query for memory: %s', query) + memory_data = self._fetch_by_query(query) + + logging.debug('raw memory data: %s', str(memory_data)) + memory_container_name_to_data_list = get_data_list_from_timeseries( + memory_data) + + return memory_container_name_to_data_list + + def fetch_cpu_and_memory_data( + self, container_list: List[str], + pod_dict: Dict[str, List[str]]) -> Dict[str, Any]: + """Fetch total cpu seconds and memory data for multiple pods. + + Args: + container_list: A list of container names to fetch the data for. + pod_dict: the pods to fetch data for, the pod_dict is keyed by + role of the pod: clients, driver and servers. The values + for the pod_dict are the list of pod names that consist + the same role specified in the key. + """ + container_matcher = construct_container_matcher(container_list) + processed_data = {} + for role, pod_names in pod_dict.items(): + pod_data = {} + for pod in pod_names: + container_data = {} + for container, data in self._fetch_cpu_for_pod( + container_matcher, pod).items(): + container_data[container] = {} + container_data[container][ + 'cpuSeconds'] = compute_total_cpu_seconds(data) + + for container, data in self._fetch_memory_for_pod( + container_matcher, pod).items(): + container_data[container][ + 'memoryMean'] = compute_average_memory_usage(data) + + pod_data[pod] = container_data + processed_data[role] = pod_data + return processed_data + + +def construct_container_matcher(container_list: List[str]) -> str: + """Constructs the container matching string used in the + prometheus query.""" + if len(container_list) == 0: + raise Exception('no container name provided') + + containers_to_fetch = '"' + if len(container_list) == 1: + containers_to_fetch = container_list[0] + else: + containers_to_fetch = '~"' + container_list[0] + for container in container_list[1:]: + containers_to_fetch = containers_to_fetch + '|' + container + containers_to_fetch = containers_to_fetch + '"' + return containers_to_fetch + + +def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]: + """Constructs a Dict as keys are the container names and + values are a list of data taken from given timeseries data.""" + if data['status'] != 'success': + raise Exception('command failed: ' + data['status'] + str(data)) + if data['data']['resultType'] != 'matrix': + raise Exception('resultType is not matrix: ' + + data['data']['resultType']) + + container_name_to_data_list = {} + for res in data["data"]["result"]: + container_name = res["metric"]["container"] + container_data_timeseries = res["values"] + + container_data = [] + for d in container_data_timeseries: + container_data.append(float(d[1])) + container_name_to_data_list[container_name] = container_data + return container_name_to_data_list + + +def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float: + """Computes the total cpu seconds by CPUs[end]-CPUs[start].""" + return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0] + + +def compute_average_memory_usage(memory_data_list: List[float]) -> float: + """Computes the mean and for a given list of data.""" + return statistics.mean(memory_data_list) + + +def construct_pod_dict(node_info_file: str, + pod_types: List[str]) -> Dict[str, List[str]]: + """Constructs a dict of pod names to be queried. + + Args: + node_info_file: The file path contains the pod names to query. + The pods' names are put into a Dict of list that keyed by the + role name: clients, servers and driver. + """ + with open(node_info_file, 'r') as f: + pod_names = json.load(f) + pod_type_to_name = {'clients': [], 'driver': [], 'servers': []} + + for client in pod_names['Clients']: + pod_type_to_name['clients'].append(client['Name']) + for server in pod_names['Servers']: + pod_type_to_name['servers'].append(server['Name']) + + pod_type_to_name["driver"].append(pod_names['Driver']['Name']) + + pod_names_to_query = {} + for pod_type in pod_types: + pod_names_to_query[pod_type] = pod_type_to_name[pod_type] + return pod_names_to_query + + +def convert_UTC_to_epoch(utc_timestamp: str) -> str: + """Converts a utc timestamp string to epoch time string.""" + parsed_time = parser.parse(utc_timestamp) + epoch = parsed_time.strftime('%s') + return epoch + + +def main() -> None: + argp = argparse.ArgumentParser( + description='Fetch cpu and memory stats from prometheus') + argp.add_argument('--url', help='Prometheus base url', required=True) + argp.add_argument( + '--scenario_result_file', + default='scenario_result.json', + type=str, + help='File contains epoch seconds for start and end time', + ) + argp.add_argument( + '--node_info_file', + default='/var/data/qps_workers/node_info.json', + help='File contains pod name to query the metrics for', + ) + argp.add_argument( + '--pod_type', + action='append', + help= + 'Pod type to query the metrics for, the options are driver, client and server', + choices=['driver', 'clients', 'servers'], + required=True, + ) + argp.add_argument( + '--container_name', + action='append', + help='The container names to query the metrics for', + required=True, + ) + argp.add_argument( + '--export_file_name', + default='prometheus_query_result.json', + type=str, + help='Name of exported JSON file.', + ) + argp.add_argument( + '--quiet', + default=False, + help='Suppress informative output', + ) + args = argp.parse_args() + + if not args.quiet: + logging.getLogger().setLevel(logging.DEBUG) + + with open(args.scenario_result_file, 'r') as q: + scenario_result = json.load(q) + start_time = convert_UTC_to_epoch( + scenario_result['summary']['startTime']) + end_time = convert_UTC_to_epoch(scenario_result['summary']['endTime']) + p = Prometheus( + url=args.url, + start=start_time, + end=end_time, + ) + + pod_dict = construct_pod_dict(args.node_info_file, args.pod_type) + processed_data = p.fetch_cpu_and_memory_data( + container_list=args.container_name, pod_dict=pod_dict) + processed_data['testDurationSeconds'] = float(end_time) - float(start_time) + + logging.debug(json.dumps(processed_data, sort_keys=True, indent=4)) + + with open(args.export_file_name, 'w', encoding='utf8') as export_file: + json.dump(processed_data, export_file, sort_keys=True, indent=4) + + +if __name__ == '__main__': + main()