#!/usr/bin/env python3 # Copyright 2022 The gRPC Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # A script to fetch total cpu seconds and memory data from prometheus. # example usage: python3 prometheus.py # --url=http://prometheus.prometheus.svc.cluster.local:9090 # --pod_type=driver --pod_type=clients --container_name=main # --container_name=sidecar """Perform Prometheus range queries to obtain cpu and memory data. This module performs range queries through Prometheus API to obtain total cpu seconds and memory during a test run for given container in given pods. The cpu data obtained is total cpu second used within given period of time. The memory data was instant memory usage at the query time. """ import argparse import json import logging import statistics import time from typing import Any, Dict, List from dateutil import parser import requests class Prometheus: """Objects which holds the start time, end time and query URL.""" def __init__( self, url: str, start: str, end: str, ): self.url = url self.start = start self.end = end def _fetch_by_query(self, query: str) -> Dict[str, Any]: """Fetches the given query with time range. Fetch the given query within a time range. The pulling interval is every 5s, the actual data from the query is a time series. """ resp = requests.get( self.url + "/api/v1/query_range", {"query": query, "start": self.start, "end": self.end, "step": 5}, ) resp.raise_for_status() return resp.json() def _fetch_cpu_for_pod( self, container_matcher: str, pod_name: str ) -> Dict[str, List[float]]: """Fetches the cpu data for each pod. Fetch total cpu seconds during the time range specified in the Prometheus instance for a pod. After obtain the cpu seconds, the data are trimmed from time series to a data list and saved in the Dict that keyed by the container names. Args: container_matcher: A string consist one or more container name separated by |. """ query = ( 'container_cpu_usage_seconds_total{job="kubernetes-cadvisor",pod="' + pod_name + '",container=' + container_matcher + "}" ) logging.debug("running prometheus query for cpu: %s", query) cpu_data = self._fetch_by_query(query) logging.debug("raw cpu data: %s", str(cpu_data)) cpu_container_name_to_data_list = get_data_list_from_timeseries( cpu_data ) return cpu_container_name_to_data_list def _fetch_memory_for_pod( self, container_matcher: str, pod_name: str ) -> Dict[str, List[float]]: """Fetches memory data for each pod. Fetch total memory data during the time range specified in the Prometheus instance for a pod. After obtain the memory data, the data are trimmed from time series to a data list and saved in the Dict that keyed by the container names. Args: container_matcher: A string consist one or more container name separated by |. """ query = ( 'container_memory_usage_bytes{job="kubernetes-cadvisor",pod="' + pod_name + '",container=' + container_matcher + "}" ) logging.debug("running prometheus query for memory: %s", query) memory_data = self._fetch_by_query(query) logging.debug("raw memory data: %s", str(memory_data)) memory_container_name_to_data_list = get_data_list_from_timeseries( memory_data ) return memory_container_name_to_data_list def fetch_cpu_and_memory_data( self, container_list: List[str], pod_dict: Dict[str, List[str]] ) -> Dict[str, Any]: """Fetch total cpu seconds and memory data for multiple pods. Args: container_list: A list of container names to fetch the data for. pod_dict: the pods to fetch data for, the pod_dict is keyed by role of the pod: clients, driver and servers. The values for the pod_dict are the list of pod names that consist the same role specified in the key. """ container_matcher = construct_container_matcher(container_list) processed_data = {} for role, pod_names in pod_dict.items(): pod_data = {} for pod in pod_names: container_data = {} for container, data in self._fetch_cpu_for_pod( container_matcher, pod ).items(): container_data[container] = {} container_data[container][ "cpuSeconds" ] = compute_total_cpu_seconds(data) for container, data in self._fetch_memory_for_pod( container_matcher, pod ).items(): container_data[container][ "memoryMean" ] = compute_average_memory_usage(data) pod_data[pod] = container_data processed_data[role] = pod_data return processed_data def construct_container_matcher(container_list: List[str]) -> str: """Constructs the container matching string used in the prometheus query.""" if len(container_list) == 0: raise Exception("no container name provided") containers_to_fetch = '"' if len(container_list) == 1: containers_to_fetch = container_list[0] else: containers_to_fetch = '~"' + container_list[0] for container in container_list[1:]: containers_to_fetch = containers_to_fetch + "|" + container containers_to_fetch = containers_to_fetch + '"' return containers_to_fetch def get_data_list_from_timeseries(data: Any) -> Dict[str, List[float]]: """Constructs a Dict as keys are the container names and values are a list of data taken from given timeseries data.""" if data["status"] != "success": raise Exception("command failed: " + data["status"] + str(data)) if data["data"]["resultType"] != "matrix": raise Exception( "resultType is not matrix: " + data["data"]["resultType"] ) container_name_to_data_list = {} for res in data["data"]["result"]: container_name = res["metric"]["container"] container_data_timeseries = res["values"] container_data = [] for d in container_data_timeseries: container_data.append(float(d[1])) container_name_to_data_list[container_name] = container_data return container_name_to_data_list def compute_total_cpu_seconds(cpu_data_list: List[float]) -> float: """Computes the total cpu seconds by CPUs[end]-CPUs[start].""" return cpu_data_list[len(cpu_data_list) - 1] - cpu_data_list[0] def compute_average_memory_usage(memory_data_list: List[float]) -> float: """Computes the mean and for a given list of data.""" return statistics.mean(memory_data_list) def construct_pod_dict( node_info_file: str, pod_types: List[str] ) -> Dict[str, List[str]]: """Constructs a dict of pod names to be queried. Args: node_info_file: The file path contains the pod names to query. The pods' names are put into a Dict of list that keyed by the role name: clients, servers and driver. """ with open(node_info_file, "r") as f: pod_names = json.load(f) pod_type_to_name = {"clients": [], "driver": [], "servers": []} for client in pod_names["Clients"]: pod_type_to_name["clients"].append(client["Name"]) for server in pod_names["Servers"]: pod_type_to_name["servers"].append(server["Name"]) pod_type_to_name["driver"].append(pod_names["Driver"]["Name"]) pod_names_to_query = {} for pod_type in pod_types: pod_names_to_query[pod_type] = pod_type_to_name[pod_type] return pod_names_to_query def convert_UTC_to_epoch(utc_timestamp: str) -> str: """Converts a utc timestamp string to epoch time string.""" parsed_time = parser.parse(utc_timestamp) epoch = parsed_time.strftime("%s") return epoch def main() -> None: argp = argparse.ArgumentParser( description="Fetch cpu and memory stats from prometheus" ) argp.add_argument("--url", help="Prometheus base url", required=True) argp.add_argument( "--scenario_result_file", default="scenario_result.json", type=str, help="File contains epoch seconds for start and end time", ) argp.add_argument( "--node_info_file", default="/var/data/qps_workers/node_info.json", help="File contains pod name to query the metrics for", ) argp.add_argument( "--pod_type", action="append", help=( "Pod type to query the metrics for, the options are driver, client" " and server" ), choices=["driver", "clients", "servers"], required=True, ) argp.add_argument( "--container_name", action="append", help="The container names to query the metrics for", required=True, ) argp.add_argument( "--export_file_name", default="prometheus_query_result.json", type=str, help="Name of exported JSON file.", ) argp.add_argument( "--quiet", default=False, help="Suppress informative output", ) argp.add_argument( "--delay_seconds", default=0, type=int, help=( "Configure delay in seconds to perform Prometheus queries, default" " is 0" ), ) args = argp.parse_args() if not args.quiet: logging.getLogger().setLevel(logging.DEBUG) with open(args.scenario_result_file, "r") as q: scenario_result = json.load(q) start_time = convert_UTC_to_epoch( scenario_result["summary"]["startTime"] ) end_time = convert_UTC_to_epoch(scenario_result["summary"]["endTime"]) p = Prometheus( url=args.url, start=start_time, end=end_time, ) time.sleep(args.delay_seconds) pod_dict = construct_pod_dict(args.node_info_file, args.pod_type) processed_data = p.fetch_cpu_and_memory_data( container_list=args.container_name, pod_dict=pod_dict ) processed_data["testDurationSeconds"] = float(end_time) - float(start_time) logging.debug(json.dumps(processed_data, sort_keys=True, indent=4)) with open(args.export_file_name, "w", encoding="utf8") as export_file: json.dump(processed_data, export_file, sort_keys=True, indent=4) if __name__ == "__main__": main()