mirror of https://github.com/grpc/grpc.git
[Python O11y] Implement CSM observability for Python (#36557)
Implement Python CSM observability. Design: [go/grpc-python-opentelemetry](http://goto.google.com/grpc-python-opentelemetry) <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. --> Closes #36557 PiperOrigin-RevId: 639073741pull/36714/head^2
parent
7ccb51e2ea
commit
f3220d08d2
45 changed files with 2634 additions and 493 deletions
@ -0,0 +1,4 @@ |
||||
graft src/python/grpcio_csm_observability/grpc_csm_observability.egg-info |
||||
graft grpc_csm_observability |
||||
include grpc_version.py |
||||
include README.rst |
@ -0,0 +1,5 @@ |
||||
gRPC Python CSM Observability |
||||
========================= |
||||
|
||||
Package for gRPC Python CSM Observability. |
||||
TODO(xuanwn): Add more content. |
@ -0,0 +1,36 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("@grpc_python_dependencies//:requirements.bzl", "requirement") |
||||
|
||||
package(default_visibility = ["//:__subpackages__"]) |
||||
|
||||
# Since packages in requirement() are non-hermetic, |
||||
# csm_observability is for internal use only. |
||||
py_library( |
||||
name = "csm_observability", |
||||
srcs = glob(["*.py"]), |
||||
imports = [ |
||||
".", |
||||
"../", |
||||
], |
||||
srcs_version = "PY3ONLY", |
||||
deps = [ |
||||
requirement("opentelemetry-resourcedetector-gcp"), |
||||
requirement("opentelemetry-sdk"), |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//src/python/grpcio_observability/grpc_observability:pyobservability", |
||||
"@com_google_protobuf//:protobuf_python", |
||||
], |
||||
) |
@ -0,0 +1,18 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
from grpc_csm_observability._csm_observability_plugin import ( |
||||
CsmOpenTelemetryPlugin, |
||||
) |
||||
|
||||
__all__ = ("CsmOpenTelemetryPlugin",) |
@ -0,0 +1,343 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import json |
||||
import os |
||||
import re |
||||
from typing import AnyStr, Callable, Dict, Iterable, List, Optional, Union |
||||
|
||||
from google.protobuf import struct_pb2 |
||||
from grpc_observability._observability import OptionalLabelType |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption |
||||
|
||||
# pytype: disable=pyi-error |
||||
from opentelemetry.metrics import MeterProvider |
||||
from opentelemetry.resourcedetector.gcp_resource_detector import ( |
||||
GoogleCloudResourceDetector, |
||||
) |
||||
from opentelemetry.sdk.resources import Resource |
||||
from opentelemetry.semconv.resource import ResourceAttributes |
||||
|
||||
TRAFFIC_DIRECTOR_AUTHORITY = "traffic-director-global.xds.googleapis.com" |
||||
UNKNOWN_VALUE = "unknown" |
||||
TYPE_GCE = "gcp_compute_engine" |
||||
TYPE_GKE = "gcp_kubernetes_engine" |
||||
MESH_ID_PREFIX = "mesh:" |
||||
|
||||
METADATA_EXCHANGE_KEY_FIXED_MAP = { |
||||
"type": "csm.remote_workload_type", |
||||
"canonical_service": "csm.remote_workload_canonical_service", |
||||
} |
||||
|
||||
METADATA_EXCHANGE_KEY_GKE_MAP = { |
||||
"workload_name": "csm.remote_workload_name", |
||||
"namespace_name": "csm.remote_workload_namespace_name", |
||||
"cluster_name": "csm.remote_workload_cluster_name", |
||||
"location": "csm.remote_workload_location", |
||||
"project_id": "csm.remote_workload_project_id", |
||||
} |
||||
|
||||
METADATA_EXCHANGE_KEY_GCE_MAP = { |
||||
"workload_name": "csm.remote_workload_name", |
||||
"location": "csm.remote_workload_location", |
||||
"project_id": "csm.remote_workload_project_id", |
||||
} |
||||
|
||||
|
||||
class CSMOpenTelemetryLabelInjector(OpenTelemetryLabelInjector): |
||||
""" |
||||
An implementation of OpenTelemetryLabelInjector for CSM. |
||||
|
||||
This injector will fetch labels from GCP resource detector and |
||||
environment, it's also responsible for serialize and deserialize |
||||
metadata exchange labels. |
||||
""" |
||||
|
||||
_exchange_labels: Dict[str, AnyStr] |
||||
_additional_exchange_labels: Dict[str, str] |
||||
|
||||
def __init__(self): |
||||
fields = {} |
||||
self._exchange_labels = {} |
||||
self._additional_exchange_labels = {} |
||||
|
||||
# Labels from environment |
||||
canonical_service_value = os.getenv( |
||||
"CSM_CANONICAL_SERVICE_NAME", UNKNOWN_VALUE |
||||
) |
||||
workload_name_value = os.getenv("CSM_WORKLOAD_NAME", UNKNOWN_VALUE) |
||||
|
||||
gcp_resource = GoogleCloudResourceDetector().detect() |
||||
resource_type_value = get_resource_type(gcp_resource) |
||||
namespace_value = get_str_value_from_resource( |
||||
ResourceAttributes.K8S_NAMESPACE_NAME, gcp_resource |
||||
) |
||||
cluster_name_value = get_str_value_from_resource( |
||||
ResourceAttributes.K8S_CLUSTER_NAME, gcp_resource |
||||
) |
||||
# ResourceAttributes.CLOUD_AVAILABILITY_ZONE are called |
||||
# "zones" on Google Cloud. |
||||
location_value = get_str_value_from_resource("cloud.zone", gcp_resource) |
||||
if UNKNOWN_VALUE == location_value: |
||||
location_value = get_str_value_from_resource( |
||||
ResourceAttributes.CLOUD_REGION, gcp_resource |
||||
) |
||||
project_id_value = get_str_value_from_resource( |
||||
ResourceAttributes.CLOUD_ACCOUNT_ID, gcp_resource |
||||
) |
||||
|
||||
fields["type"] = struct_pb2.Value(string_value=resource_type_value) |
||||
fields["canonical_service"] = struct_pb2.Value( |
||||
string_value=canonical_service_value |
||||
) |
||||
if resource_type_value == TYPE_GKE: |
||||
fields["workload_name"] = struct_pb2.Value( |
||||
string_value=workload_name_value |
||||
) |
||||
fields["namespace_name"] = struct_pb2.Value( |
||||
string_value=namespace_value |
||||
) |
||||
fields["cluster_name"] = struct_pb2.Value( |
||||
string_value=cluster_name_value |
||||
) |
||||
fields["location"] = struct_pb2.Value(string_value=location_value) |
||||
fields["project_id"] = struct_pb2.Value( |
||||
string_value=project_id_value |
||||
) |
||||
elif resource_type_value == TYPE_GCE: |
||||
fields["workload_name"] = struct_pb2.Value( |
||||
string_value=workload_name_value |
||||
) |
||||
fields["location"] = struct_pb2.Value(string_value=location_value) |
||||
fields["project_id"] = struct_pb2.Value( |
||||
string_value=project_id_value |
||||
) |
||||
|
||||
serialized_struct = struct_pb2.Struct(fields=fields) |
||||
serialized_str = serialized_struct.SerializeToString() |
||||
|
||||
self._exchange_labels = {"XEnvoyPeerMetadata": serialized_str} |
||||
self._additional_exchange_labels[ |
||||
"csm.workload_canonical_service" |
||||
] = canonical_service_value |
||||
self._additional_exchange_labels["csm.mesh_id"] = get_mesh_id() |
||||
|
||||
def get_labels_for_exchange(self) -> Dict[str, AnyStr]: |
||||
return self._exchange_labels |
||||
|
||||
def get_additional_labels( |
||||
self, include_exchange_labels: bool |
||||
) -> Dict[str, str]: |
||||
if include_exchange_labels: |
||||
return self._additional_exchange_labels |
||||
else: |
||||
return {} |
||||
|
||||
@staticmethod |
||||
def deserialize_labels(labels: Dict[str, AnyStr]) -> Dict[str, AnyStr]: |
||||
deserialized_labels = {} |
||||
for key, value in labels.items(): |
||||
if "XEnvoyPeerMetadata" == key: |
||||
pb_struct = struct_pb2.Struct() |
||||
pb_struct.ParseFromString(value) |
||||
|
||||
remote_type = get_value_from_struct("type", pb_struct) |
||||
|
||||
for ( |
||||
local_key, |
||||
remote_key, |
||||
) in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): |
||||
deserialized_labels[remote_key] = get_value_from_struct( |
||||
local_key, pb_struct |
||||
) |
||||
if remote_type == TYPE_GKE: |
||||
for ( |
||||
local_key, |
||||
remote_key, |
||||
) in METADATA_EXCHANGE_KEY_GKE_MAP.items(): |
||||
deserialized_labels[remote_key] = get_value_from_struct( |
||||
local_key, pb_struct |
||||
) |
||||
elif remote_type == TYPE_GCE: |
||||
for ( |
||||
local_key, |
||||
remote_key, |
||||
) in METADATA_EXCHANGE_KEY_GCE_MAP.items(): |
||||
deserialized_labels[remote_key] = get_value_from_struct( |
||||
local_key, pb_struct |
||||
) |
||||
# If CSM label injector is enabled on server side but client didn't send |
||||
# XEnvoyPeerMetadata, we'll record remote label as unknown. |
||||
else: |
||||
for _, remote_key in METADATA_EXCHANGE_KEY_FIXED_MAP.items(): |
||||
deserialized_labels[remote_key] = UNKNOWN_VALUE |
||||
deserialized_labels[key] = value |
||||
|
||||
return deserialized_labels |
||||
|
||||
|
||||
class CsmOpenTelemetryPluginOption(OpenTelemetryPluginOption): |
||||
""" |
||||
An implementation of OpenTelemetryPlugin for CSM. |
||||
""" |
||||
|
||||
_label_injector: CSMOpenTelemetryLabelInjector |
||||
|
||||
def __init__(self): |
||||
self._label_injector = CSMOpenTelemetryLabelInjector() |
||||
|
||||
@staticmethod |
||||
def is_active_on_client_channel(target: str) -> bool: |
||||
"""Determines whether this plugin option is active on a channel based on target. |
||||
|
||||
Args: |
||||
target: Required. The target for the RPC. |
||||
|
||||
Returns: |
||||
True if this this plugin option is active on the channel, false otherwise. |
||||
""" |
||||
# CSM channels should have an "xds" scheme |
||||
if not target.startswith("xds:"): |
||||
return False |
||||
# If scheme is correct, the authority should be TD if exist |
||||
authority_pattern = r"^xds:\/\/([^/]+)" |
||||
match = re.search(authority_pattern, target) |
||||
if match: |
||||
return TRAFFIC_DIRECTOR_AUTHORITY in match.group(1) |
||||
else: |
||||
# Return True if the authority doesn't exist |
||||
return True |
||||
|
||||
@staticmethod |
||||
def is_active_on_server( |
||||
xds: bool, # pylint: disable=unused-argument |
||||
) -> bool: |
||||
"""Determines whether this plugin option is active on a given server. |
||||
|
||||
Since servers don't need to be xds enabled to work as part of a service |
||||
mesh, we're returning True and enable this PluginOption for all servers. |
||||
|
||||
Note: This always returns true because server can be part of the mesh even |
||||
if it's not xds-enabled. And we want CSM labels for those servers too. |
||||
|
||||
Args: |
||||
xds: Required. if this server is build for xds. |
||||
|
||||
Returns: |
||||
True if this this plugin option is active on the server, false otherwise. |
||||
""" |
||||
return True |
||||
|
||||
def get_label_injector(self) -> OpenTelemetryLabelInjector: |
||||
return self._label_injector |
||||
|
||||
|
||||
# pylint: disable=no-self-use |
||||
class CsmOpenTelemetryPlugin(OpenTelemetryPlugin): |
||||
"""Describes a Plugin for CSM OpenTelemetry observability. |
||||
|
||||
This is class is part of an EXPERIMENTAL API. |
||||
""" |
||||
|
||||
plugin_options: Iterable[OpenTelemetryPluginOption] |
||||
meter_provider: Optional[MeterProvider] |
||||
generic_method_attribute_filter: Callable[[str], bool] |
||||
|
||||
def __init__( |
||||
self, |
||||
*, |
||||
plugin_options: Iterable[OpenTelemetryPluginOption] = [], |
||||
meter_provider: Optional[MeterProvider] = None, |
||||
generic_method_attribute_filter: Optional[Callable[[str], bool]] = None, |
||||
): |
||||
new_options = list(plugin_options) + [CsmOpenTelemetryPluginOption()] |
||||
super().__init__( |
||||
plugin_options=new_options, |
||||
meter_provider=meter_provider, |
||||
generic_method_attribute_filter=generic_method_attribute_filter, |
||||
) |
||||
|
||||
def _get_enabled_optional_labels(self) -> List[OptionalLabelType]: |
||||
return [OptionalLabelType.XDS_SERVICE_LABELS] |
||||
|
||||
|
||||
def get_value_from_struct(key: str, struct: struct_pb2.Struct) -> str: |
||||
value = struct.fields.get(key) |
||||
if not value: |
||||
return UNKNOWN_VALUE |
||||
return value.string_value |
||||
|
||||
|
||||
def get_str_value_from_resource( |
||||
attribute: Union[ResourceAttributes, str], resource: Resource |
||||
) -> str: |
||||
value = resource.attributes.get(attribute, UNKNOWN_VALUE) |
||||
return str(value) |
||||
|
||||
|
||||
# pylint: disable=line-too-long |
||||
def get_resource_type(gcp_resource: Resource) -> str: |
||||
# Convert resource type from GoogleCloudResourceDetector to the value we used for |
||||
# metadata exchange. |
||||
# Reference: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/cc61f23a5ff2f16f4aa2c38d07e55153828849cc/opentelemetry-resourcedetector-gcp/src/opentelemetry/resourcedetector/gcp_resource_detector/__init__.py#L96 |
||||
gcp_resource_type = get_str_value_from_resource( |
||||
"gcp.resource_type", gcp_resource |
||||
) |
||||
if gcp_resource_type == "gke_container": |
||||
return TYPE_GKE |
||||
elif gcp_resource_type == "gce_instance": |
||||
return TYPE_GCE |
||||
else: |
||||
return gcp_resource_type |
||||
|
||||
|
||||
# Returns the mesh ID by reading and parsing the bootstrap file. Returns "unknown" |
||||
# if for some reason, mesh ID could not be figured out. |
||||
def get_mesh_id() -> str: |
||||
config_contents = get_bootstrap_config_contents() |
||||
|
||||
try: |
||||
config_json = json.loads(config_contents) |
||||
# The expected format of the Node ID is - |
||||
# projects/[GCP Project number]/networks/mesh:[Mesh ID]/nodes/[UUID] |
||||
node_id_parts = config_json.get("node", {}).get("id", "").split("/") |
||||
if len(node_id_parts) == 6 and node_id_parts[3].startswith( |
||||
MESH_ID_PREFIX |
||||
): |
||||
return node_id_parts[3][len(MESH_ID_PREFIX) :] |
||||
except json.decoder.JSONDecodeError: |
||||
return UNKNOWN_VALUE |
||||
|
||||
return UNKNOWN_VALUE |
||||
|
||||
|
||||
def get_bootstrap_config_contents() -> str: |
||||
"""Get the contents of the bootstrap config from environment variable or file. |
||||
|
||||
Returns: |
||||
The content from environment variable. Or empty str if no config was found. |
||||
""" |
||||
contents_str = "" |
||||
for source in ("GRPC_XDS_BOOTSTRAP", "GRPC_XDS_BOOTSTRAP_CONFIG"): |
||||
config = os.getenv(source) |
||||
if config: |
||||
if os.path.isfile(config): # Prioritize file over raw config |
||||
with open(config, "r") as f: |
||||
contents_str = f.read() |
||||
else: |
||||
contents_str = config |
||||
|
||||
return contents_str |
@ -0,0 +1,17 @@ |
||||
# Copyright 2024 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_csm_observability/grpc_version.py.template`!!! |
||||
|
||||
VERSION = '1.65.0.dev0' |
@ -0,0 +1,63 @@ |
||||
# Copyright 2024 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import os |
||||
|
||||
import setuptools |
||||
|
||||
_PACKAGE_PATH = os.path.realpath(os.path.dirname(__file__)) |
||||
_README_PATH = os.path.join(_PACKAGE_PATH, "README.rst") |
||||
|
||||
# Ensure we're in the proper directory whether or not we're being used by pip. |
||||
os.chdir(os.path.dirname(os.path.abspath(__file__))) |
||||
|
||||
import grpc_version |
||||
|
||||
CLASSIFIERS = [ |
||||
"Development Status :: 4 - Beta", |
||||
"Programming Language :: Python", |
||||
"Programming Language :: Python :: 3", |
||||
"License :: OSI Approved :: Apache Software License", |
||||
] |
||||
|
||||
PACKAGE_DIRECTORIES = { |
||||
"": ".", |
||||
} |
||||
|
||||
INSTALL_REQUIRES = ( |
||||
"opentelemetry-sdk>=1.24.0", |
||||
"opentelemetry-resourcedetector-gcp>=1.6.0a0", |
||||
"grpcio=={version}".format(version=grpc_version.VERSION), |
||||
"protobuf>=5.26.1,<6.0dev", |
||||
) |
||||
|
||||
setuptools.setup( |
||||
name="grpcio-csm-observability", |
||||
version=grpc_version.VERSION, |
||||
description="gRPC Python CSM observability package", |
||||
long_description=open(_README_PATH, "r").read(), |
||||
author="The gRPC Authors", |
||||
author_email="grpc-io@googlegroups.com", |
||||
url="https://grpc.io", |
||||
project_urls={ |
||||
"Source Code": "https://github.com/grpc/grpc/tree/master/src/python/grpcio_csm_observability", |
||||
"Bug Tracker": "https://github.com/grpc/grpc/issues", |
||||
}, |
||||
license="Apache License 2.0", |
||||
classifiers=CLASSIFIERS, |
||||
package_dir=PACKAGE_DIRECTORIES, |
||||
packages=setuptools.find_packages("."), |
||||
python_requires=">=3.8", |
||||
install_requires=INSTALL_REQUIRES, |
||||
) |
@ -1,167 +0,0 @@ |
||||
# Copyright 2023 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
from __future__ import annotations |
||||
|
||||
import logging |
||||
import time |
||||
from typing import Any, Set |
||||
|
||||
import grpc |
||||
|
||||
# pytype: disable=pyi-error |
||||
from grpc_observability import _cyobservability |
||||
from grpc_observability import _observability_config |
||||
|
||||
_LOGGER = logging.getLogger(__name__) |
||||
|
||||
ClientCallTracerCapsule = Any # it appears only once in the function signature |
||||
ServerCallTracerFactoryCapsule = ( |
||||
Any # it appears only once in the function signature |
||||
) |
||||
grpc_observability = Any # grpc_observability.py imports this module. |
||||
|
||||
GRPC_STATUS_CODE_TO_STRING = { |
||||
grpc.StatusCode.OK: "OK", |
||||
grpc.StatusCode.CANCELLED: "CANCELLED", |
||||
grpc.StatusCode.UNKNOWN: "UNKNOWN", |
||||
grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", |
||||
grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", |
||||
grpc.StatusCode.NOT_FOUND: "NOT_FOUND", |
||||
grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", |
||||
grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", |
||||
grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", |
||||
grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", |
||||
grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", |
||||
grpc.StatusCode.ABORTED: "ABORTED", |
||||
grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", |
||||
grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", |
||||
grpc.StatusCode.INTERNAL: "INTERNAL", |
||||
grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", |
||||
grpc.StatusCode.DATA_LOSS: "DATA_LOSS", |
||||
} |
||||
|
||||
|
||||
# pylint: disable=no-self-use |
||||
class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): |
||||
"""GCP OpenCensus based plugin implementation. |
||||
|
||||
If no exporter is passed, the default will be OpenCensus StackDriver |
||||
based exporter. |
||||
|
||||
For more details, please refer to User Guide: |
||||
* https://cloud.google.com/stackdriver/docs/solutions/grpc |
||||
|
||||
Attributes: |
||||
config: Configuration for GCP OpenCensus Observability. |
||||
exporter: Exporter used to export data. |
||||
""" |
||||
|
||||
config: _observability_config.GcpObservabilityConfig |
||||
exporter: "grpc_observability.Exporter" |
||||
_registered_method: Set[bytes] |
||||
|
||||
def __init__(self, exporter: "grpc_observability.Exporter" = None): |
||||
self.exporter = None |
||||
self.config = None |
||||
try: |
||||
self.config = _observability_config.read_config() |
||||
_cyobservability.activate_config(self.config) |
||||
except Exception as e: # pylint: disable=broad-except |
||||
raise ValueError(f"Reading configuration failed with: {e}") |
||||
|
||||
if exporter: |
||||
self.exporter = exporter |
||||
else: |
||||
raise ValueError(f"Please provide an exporter!") |
||||
|
||||
if self.config.tracing_enabled: |
||||
self.set_tracing(True) |
||||
if self.config.stats_enabled: |
||||
self.set_stats(True) |
||||
|
||||
def __enter__(self): |
||||
try: |
||||
_cyobservability.cyobservability_init(self.exporter) |
||||
# TODO(xuanwn): Use specific exceptons |
||||
except Exception as e: # pylint: disable=broad-except |
||||
_LOGGER.exception("GCPOpenCensusObservability failed with: %s", e) |
||||
|
||||
grpc._observability.observability_init(self) |
||||
return self |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb) -> None: |
||||
self.exit() |
||||
|
||||
def exit(self) -> None: |
||||
# Sleep so we don't loss any data. If we shutdown export thread |
||||
# immediately after exit, it's possible that core didn't call RecordEnd |
||||
# in callTracer, and all data recorded by calling RecordEnd will be |
||||
# lost. |
||||
# CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in |
||||
# AwaitNextBatchLocked. |
||||
# TODO(xuanwn): explicit synchronization |
||||
# https://github.com/grpc/grpc/issues/33262 |
||||
time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) |
||||
self.set_tracing(False) |
||||
self.set_stats(False) |
||||
_cyobservability.observability_deinit() |
||||
grpc._observability.observability_deinit() |
||||
|
||||
def create_client_call_tracer( |
||||
self, method_name: bytes, target: bytes |
||||
) -> ClientCallTracerCapsule: |
||||
trace_id = b"TRACE_ID" |
||||
capsule = _cyobservability.create_client_call_tracer( |
||||
method_name, |
||||
target, |
||||
trace_id, |
||||
method_name in self._registered_methods, |
||||
) |
||||
return capsule |
||||
|
||||
def create_server_call_tracer_factory( |
||||
self, |
||||
) -> ServerCallTracerFactoryCapsule: |
||||
capsule = _cyobservability.create_server_call_tracer_factory_capsule() |
||||
return capsule |
||||
|
||||
def delete_client_call_tracer( |
||||
self, client_call_tracer: ClientCallTracerCapsule |
||||
) -> None: |
||||
_cyobservability.delete_client_call_tracer(client_call_tracer) |
||||
|
||||
def save_trace_context( |
||||
self, trace_id: str, span_id: str, is_sampled: bool |
||||
) -> None: |
||||
pass |
||||
|
||||
def record_rpc_latency( |
||||
self, |
||||
method: str, |
||||
target: str, |
||||
rpc_latency: float, |
||||
status_code: grpc.StatusCode, |
||||
) -> None: |
||||
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") |
||||
_cyobservability._record_rpc_latency( |
||||
self.exporter, |
||||
method, |
||||
target, |
||||
rpc_latency, |
||||
status_code, |
||||
method in self._registered_methods, |
||||
) |
||||
|
||||
def save_registered_method(self, method_name: bytes) -> None: |
||||
self._registered_methods.add(method_name) |
@ -0,0 +1,115 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include "metadata_exchange.h" |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <algorithm> |
||||
#include <array> |
||||
#include <cstdint> |
||||
#include <unordered_map> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "constants.h" |
||||
|
||||
#include <grpc/slice.h> |
||||
|
||||
#include "src/core/telemetry/call_tracer.h" |
||||
|
||||
namespace grpc_observability { |
||||
|
||||
PythonLabelsInjector::PythonLabelsInjector( |
||||
const std::vector<Label>& exchange_labels) { |
||||
for (const auto& label : exchange_labels) { |
||||
auto it = MetadataExchangeKeyNames.find(label.key); |
||||
if (it != MetadataExchangeKeyNames.end()) { |
||||
metadata_to_exchange_.emplace_back(label.key, label.value); |
||||
} |
||||
} |
||||
} |
||||
|
||||
std::vector<Label> PythonLabelsInjector::GetExchangeLabels( |
||||
grpc_metadata_batch* incoming_initial_metadata) const { |
||||
std::vector<Label> labels; |
||||
for (const auto& key : MetadataExchangeKeyNames) { |
||||
if (key == kXEnvoyPeerMetadata) { |
||||
auto xds_peer_metadata = |
||||
incoming_initial_metadata->Take(grpc_core::XEnvoyPeerMetadata()); |
||||
grpc_core::Slice xds_remote_metadata = xds_peer_metadata.has_value() |
||||
? *std::move(xds_peer_metadata) |
||||
: grpc_core::Slice(); |
||||
if (!xds_remote_metadata.empty()) { |
||||
std::string xds_decoded_metadata; |
||||
bool metadata_decoded = absl::Base64Unescape( |
||||
xds_remote_metadata.as_string_view(), &xds_decoded_metadata); |
||||
if (metadata_decoded) { |
||||
labels.emplace_back(kXEnvoyPeerMetadata, xds_decoded_metadata); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
return labels; |
||||
} |
||||
|
||||
void PythonLabelsInjector::AddExchangeLabelsToMetadata( |
||||
grpc_metadata_batch* outgoing_initial_metadata) const { |
||||
for (const auto& metadata : metadata_to_exchange_) { |
||||
if (metadata.first == kXEnvoyPeerMetadata) { |
||||
grpc_core::Slice metadata_slice = grpc_core::Slice::FromCopiedString( |
||||
absl::Base64Escape(absl::string_view(metadata.second))); |
||||
outgoing_initial_metadata->Set(grpc_core::XEnvoyPeerMetadata(), |
||||
metadata_slice.Ref()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void PythonLabelsInjector::AddXdsOptionalLabels( |
||||
bool is_client, |
||||
absl::Span<const grpc_core::RefCountedStringValue> optional_labels_span, |
||||
std::vector<Label>& labels) { |
||||
if (!is_client) { |
||||
// Currently the CSM optional labels are only set on client.
|
||||
return; |
||||
} |
||||
// Performs JSON label name format to CSM Observability Metric spec format
|
||||
// conversion.
|
||||
absl::string_view service_name = |
||||
optional_labels_span[static_cast<size_t>( |
||||
grpc_core::ClientCallTracer::CallAttemptTracer:: |
||||
OptionalLabelKey::kXdsServiceName)] |
||||
.as_string_view(); |
||||
absl::string_view service_namespace = |
||||
optional_labels_span[static_cast<size_t>( |
||||
grpc_core::ClientCallTracer::CallAttemptTracer:: |
||||
OptionalLabelKey::kXdsServiceNamespace)] |
||||
.as_string_view(); |
||||
// According to the CSM Observability Metric spec, if the control plane fails
|
||||
// to provide these labels, the client will set their values to "unknown".
|
||||
if (service_name.empty()) { |
||||
service_name = "unknown"; |
||||
} |
||||
if (service_namespace.empty()) { |
||||
service_namespace = "unknown"; |
||||
} |
||||
labels.emplace_back("csm.service_name", std::string(service_name)); |
||||
labels.emplace_back("csm.service_namespace_name", |
||||
std::string(service_namespace)); |
||||
} |
||||
|
||||
} // namespace grpc_observability
|
@ -0,0 +1,63 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_PYTHON_OBSERVABILITY_METADATA_EXCHANGE_H |
||||
#define GRPC_PYTHON_OBSERVABILITY_METADATA_EXCHANGE_H |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <bitset> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/string_view.h" |
||||
#include "constants.h" |
||||
#include "python_observability_context.h" |
||||
|
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_observability { |
||||
|
||||
class PythonLabelsInjector { |
||||
public: |
||||
explicit PythonLabelsInjector(const std::vector<Label>& exchange_labels); |
||||
|
||||
// Read the incoming initial metadata to get the set of labels exchanged from
|
||||
// peer.
|
||||
std::vector<Label> GetExchangeLabels( |
||||
grpc_metadata_batch* incoming_initial_metadata) const; |
||||
|
||||
// Add metadata_to_exchange_ to the outgoing initial metadata.
|
||||
void AddExchangeLabelsToMetadata( |
||||
grpc_metadata_batch* outgoing_initial_metadata) const; |
||||
|
||||
// Add optional xds labels from optional_labels_span to labels.
|
||||
void AddXdsOptionalLabels( |
||||
bool is_client, |
||||
absl::Span<const grpc_core::RefCountedStringValue> optional_labels_span, |
||||
std::vector<Label>& labels); |
||||
|
||||
private: |
||||
std::vector<std::pair<std::string, std::string>> metadata_to_exchange_; |
||||
}; |
||||
|
||||
} // namespace grpc_observability
|
||||
|
||||
#endif // GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H
|
@ -0,0 +1,652 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from collections import defaultdict |
||||
import datetime |
||||
import json |
||||
import logging |
||||
import os |
||||
import random |
||||
import sys |
||||
import time |
||||
from typing import Any, Callable, Dict, List, Optional, Set |
||||
import unittest |
||||
from unittest import mock |
||||
|
||||
from grpc_csm_observability import CsmOpenTelemetryPlugin |
||||
from grpc_csm_observability._csm_observability_plugin import ( |
||||
CSMOpenTelemetryLabelInjector, |
||||
) |
||||
from grpc_csm_observability._csm_observability_plugin import TYPE_GCE |
||||
from grpc_csm_observability._csm_observability_plugin import TYPE_GKE |
||||
from grpc_csm_observability._csm_observability_plugin import UNKNOWN_VALUE |
||||
import grpc_observability |
||||
from grpc_observability import _open_telemetry_measures |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption |
||||
from opentelemetry.sdk.metrics import MeterProvider |
||||
from opentelemetry.sdk.metrics.export import AggregationTemporality |
||||
from opentelemetry.sdk.metrics.export import MetricExportResult |
||||
from opentelemetry.sdk.metrics.export import MetricExporter |
||||
from opentelemetry.sdk.metrics.export import MetricsData |
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader |
||||
from opentelemetry.sdk.resources import Resource |
||||
|
||||
from tests.observability import _test_server |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
OTEL_EXPORT_INTERVAL_S = 0.5 |
||||
# We only expect basic labels to be exchanged. |
||||
CSM_METADATA_EXCHANGE_DEFAULT_LABELS = [ |
||||
"csm.remote_workload_type", |
||||
"csm.remote_workload_canonical_service", |
||||
] |
||||
|
||||
# The following metrics should have optional labels when optional |
||||
# labels is enabled through OpenTelemetryPlugin. |
||||
METRIC_NAME_WITH_OPTIONAL_LABEL = [ |
||||
"grpc.client.attempt.duration", |
||||
"grpc.client.attempt.sent_total_compressed_message_size", |
||||
"grpc.client.attempt.rcvd_total_compressed_message_size", |
||||
] |
||||
CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"] |
||||
|
||||
# The following metrics should have metadata exchange labels when metadata |
||||
# exchange flow is triggered. |
||||
METRIC_NAME_WITH_EXCHANGE_LABEL = [ |
||||
"grpc.client.attempt.duration", |
||||
"grpc.client.attempt.sent_total_compressed_message_size", |
||||
"grpc.client.attempt.rcvd_total_compressed_message_size", |
||||
"grpc.server.call.duration", |
||||
"grpc.server.call.sent_total_compressed_message_size", |
||||
"grpc.server.call.rcvd_total_compressed_message_size", |
||||
] |
||||
|
||||
MOCK_GKE_RESOURCE = Resource.create( |
||||
attributes={ |
||||
"gcp.resource_type": "gke_container", |
||||
"k8s.pod.name": "pod", |
||||
"k8s.container.name": "container", |
||||
"k8s.namespace.name": "namespace", |
||||
"k8s.cluster.name": "cluster", |
||||
"cloud.region": "region", |
||||
"cloud.account.id": "id", |
||||
} |
||||
) |
||||
|
||||
MOCK_GCE_RESOURCE = Resource.create( |
||||
attributes={ |
||||
"gcp.resource_type": "gce_instance", |
||||
"cloud.zone": "zone", |
||||
"cloud.account.id": "id", |
||||
} |
||||
) |
||||
|
||||
MOCK_UNKNOWN_RESOURCE = Resource.create( |
||||
attributes={ |
||||
"gcp.resource_type": "random", |
||||
} |
||||
) |
||||
|
||||
|
||||
class OTelMetricExporter(MetricExporter): |
||||
"""Implementation of :class:`MetricExporter` that export metrics to the |
||||
provided metric_list. |
||||
|
||||
all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, |
||||
value is a list of labels recorded for that metric. |
||||
An example item of this dict: |
||||
{"grpc.client.attempt.started": |
||||
[{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, |
||||
{'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} |
||||
""" |
||||
|
||||
def __init__( |
||||
self, |
||||
all_metrics: Dict[str, List], |
||||
preferred_temporality: Dict[type, AggregationTemporality] = None, |
||||
preferred_aggregation: Dict[ |
||||
type, "opentelemetry.sdk.metrics.view.Aggregation" |
||||
] = None, |
||||
): |
||||
super().__init__( |
||||
preferred_temporality=preferred_temporality, |
||||
preferred_aggregation=preferred_aggregation, |
||||
) |
||||
self.all_metrics = all_metrics |
||||
|
||||
def export( |
||||
self, |
||||
metrics_data: MetricsData, |
||||
timeout_millis: float = 10_000, |
||||
**kwargs, |
||||
) -> MetricExportResult: |
||||
self.record_metric(metrics_data) |
||||
return MetricExportResult.SUCCESS |
||||
|
||||
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: |
||||
pass |
||||
|
||||
def force_flush(self, timeout_millis: float = 10_000) -> bool: |
||||
return True |
||||
|
||||
def record_metric(self, metrics_data: MetricsData) -> None: |
||||
for resource_metric in metrics_data.resource_metrics: |
||||
for scope_metric in resource_metric.scope_metrics: |
||||
for metric in scope_metric.metrics: |
||||
for data_point in metric.data.data_points: |
||||
self.all_metrics[metric.name].append( |
||||
data_point.attributes |
||||
) |
||||
|
||||
|
||||
class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption): |
||||
_label_injector: OpenTelemetryLabelInjector |
||||
_active_on_client: bool |
||||
_active_on_server: bool |
||||
|
||||
def __init__( |
||||
self, |
||||
label_injector: OpenTelemetryLabelInjector, |
||||
active_on_client: Optional[bool] = True, |
||||
active_on_server: Optional[bool] = True, |
||||
): |
||||
self._label_injector = label_injector |
||||
self._active_on_client = active_on_client |
||||
self._active_on_server = active_on_server |
||||
|
||||
def is_active_on_client_channel(self, target: str) -> bool: |
||||
return self._active_on_client |
||||
|
||||
def is_active_on_server(self, xds: bool) -> bool: |
||||
return self._active_on_server |
||||
|
||||
def get_label_injector(self) -> OpenTelemetryLabelInjector: |
||||
return self._label_injector |
||||
|
||||
|
||||
@unittest.skipIf( |
||||
os.name == "nt" or "darwin" in sys.platform, |
||||
"Observability is not supported in Windows and MacOS", |
||||
) |
||||
class CSMObservabilityPluginTest(unittest.TestCase): |
||||
def setUp(self): |
||||
self.all_metrics = defaultdict(list) |
||||
otel_exporter = OTelMetricExporter(self.all_metrics) |
||||
reader = PeriodicExportingMetricReader( |
||||
exporter=otel_exporter, |
||||
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, |
||||
) |
||||
self._provider = MeterProvider(metric_readers=[reader]) |
||||
self._server = None |
||||
self._port = None |
||||
|
||||
def tearDown(self): |
||||
if self._server: |
||||
self._server.stop(0) |
||||
|
||||
def testOptionalXdsServiceLabelExist(self): |
||||
csm_plugin = CsmOpenTelemetryPlugin( |
||||
meter_provider=self._provider, |
||||
) |
||||
|
||||
csm_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
csm_plugin.deregister_global() |
||||
|
||||
validate_metrics_exist(self, self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
if name in METRIC_NAME_WITH_OPTIONAL_LABEL: |
||||
self._validate_label_exist( |
||||
name, label_list, CSM_OPTIONAL_LABEL_KEYS |
||||
) |
||||
else: |
||||
self._validate_label_not_exist( |
||||
name, label_list, CSM_OPTIONAL_LABEL_KEYS |
||||
) |
||||
|
||||
def testPluginOptionOnlyEnabledForXdsTargets(self): |
||||
csm_plugin = CsmOpenTelemetryPlugin( |
||||
meter_provider=self._provider, |
||||
) |
||||
csm_plugin_option = csm_plugin.plugin_options[0] |
||||
self.assertFalse( |
||||
csm_plugin_option.is_active_on_client_channel("foo.bar.google.com") |
||||
) |
||||
self.assertFalse( |
||||
csm_plugin_option.is_active_on_client_channel( |
||||
"dns:///foo.bar.google.com" |
||||
) |
||||
) |
||||
self.assertFalse( |
||||
csm_plugin_option.is_active_on_client_channel( |
||||
"dns:///foo.bar.google.com:1234" |
||||
) |
||||
) |
||||
self.assertFalse( |
||||
csm_plugin_option.is_active_on_client_channel( |
||||
"dns://authority/foo.bar.google.com:1234" |
||||
) |
||||
) |
||||
self.assertFalse( |
||||
csm_plugin_option.is_active_on_client_channel("xds://authority/foo") |
||||
) |
||||
|
||||
self.assertTrue( |
||||
csm_plugin_option.is_active_on_client_channel("xds:///foo") |
||||
) |
||||
self.assertTrue( |
||||
csm_plugin_option.is_active_on_client_channel( |
||||
"xds://traffic-director-global.xds.googleapis.com/foo" |
||||
) |
||||
) |
||||
self.assertTrue( |
||||
csm_plugin_option.is_active_on_client_channel( |
||||
"xds://traffic-director-global.xds.googleapis.com/foo.bar" |
||||
) |
||||
) |
||||
|
||||
def testGetMeshIdFromConfig(self): |
||||
config_json = { |
||||
"node": { |
||||
"id": "projects/12345/networks/mesh:test_mesh_id/nodes/abcdefg" |
||||
} |
||||
} |
||||
config_str = json.dumps(config_json) |
||||
with mock.patch.dict( |
||||
os.environ, {"GRPC_XDS_BOOTSTRAP_CONFIG": config_str} |
||||
): |
||||
csm_plugin = CsmOpenTelemetryPlugin( |
||||
meter_provider=self._provider, |
||||
) |
||||
csm_label_injector = csm_plugin.plugin_options[ |
||||
0 |
||||
].get_label_injector() |
||||
additional_labels = csm_label_injector.get_additional_labels( |
||||
include_exchange_labels=True |
||||
) |
||||
self.assertEqual(additional_labels["csm.mesh_id"], "test_mesh_id") |
||||
|
||||
def testGetMeshIdFromFile(self): |
||||
config_json = { |
||||
"node": { |
||||
"id": "projects/12345/networks/mesh:test_mesh_id/nodes/abcdefg" |
||||
} |
||||
} |
||||
config_file_path = "/tmp/" + str(random.randint(0, 100000)) |
||||
with open(config_file_path, "w", encoding="utf-8") as f: |
||||
f.write(json.dumps(config_json)) |
||||
|
||||
with mock.patch.dict( |
||||
os.environ, {"GRPC_XDS_BOOTSTRAP": config_file_path} |
||||
): |
||||
csm_plugin = CsmOpenTelemetryPlugin( |
||||
meter_provider=self._provider, |
||||
) |
||||
csm_label_injector = csm_plugin.plugin_options[ |
||||
0 |
||||
].get_label_injector() |
||||
additional_labels = csm_label_injector.get_additional_labels( |
||||
include_exchange_labels=True |
||||
) |
||||
self.assertEqual(additional_labels["csm.mesh_id"], "test_mesh_id") |
||||
|
||||
def testGetMeshIdFromInvalidConfig(self): |
||||
config_json = {"node": {"id": "12345"}} |
||||
config_str = json.dumps(config_json) |
||||
with mock.patch.dict( |
||||
os.environ, {"GRPC_XDS_BOOTSTRAP_CONFIG": config_str} |
||||
): |
||||
csm_plugin = CsmOpenTelemetryPlugin( |
||||
meter_provider=self._provider, |
||||
) |
||||
csm_label_injector = csm_plugin.plugin_options[ |
||||
0 |
||||
].get_label_injector() |
||||
additional_labels = csm_label_injector.get_additional_labels( |
||||
include_exchange_labels=True |
||||
) |
||||
self.assertEqual(additional_labels["csm.mesh_id"], "unknown") |
||||
|
||||
def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: |
||||
self._validate_server_metrics_names(metric_names) |
||||
self._validate_client_metrics_names(metric_names) |
||||
|
||||
def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: |
||||
for base_metric in _open_telemetry_measures.base_metrics(): |
||||
if "grpc.server" in base_metric.name: |
||||
self.assertTrue( |
||||
base_metric.name in metric_names, |
||||
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", |
||||
) |
||||
|
||||
def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: |
||||
for base_metric in _open_telemetry_measures.base_metrics(): |
||||
if "grpc.client" in base_metric.name: |
||||
self.assertTrue( |
||||
base_metric.name in metric_names, |
||||
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", |
||||
) |
||||
|
||||
def _validate_label_exist( |
||||
self, |
||||
metric_name: str, |
||||
metric_label_list: List[str], |
||||
labels_to_check: List[str], |
||||
) -> None: |
||||
for metric_label in metric_label_list: |
||||
for label in labels_to_check: |
||||
self.assertTrue( |
||||
label in metric_label, |
||||
msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}", |
||||
) |
||||
|
||||
def _validate_label_not_exist( |
||||
self, |
||||
metric_name: str, |
||||
metric_label_list: List[str], |
||||
labels_to_check: List[str], |
||||
) -> None: |
||||
for metric_label in metric_label_list: |
||||
for label in labels_to_check: |
||||
self.assertFalse( |
||||
label in metric_label, |
||||
msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}", |
||||
) |
||||
|
||||
|
||||
@unittest.skipIf( |
||||
os.name == "nt" or "darwin" in sys.platform, |
||||
"Observability is not supported in Windows and MacOS", |
||||
) |
||||
class MetadataExchangeTest(unittest.TestCase): |
||||
def setUp(self): |
||||
self.all_metrics = defaultdict(list) |
||||
otel_exporter = OTelMetricExporter(self.all_metrics) |
||||
reader = PeriodicExportingMetricReader( |
||||
exporter=otel_exporter, |
||||
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, |
||||
) |
||||
self._provider = MeterProvider(metric_readers=[reader]) |
||||
self._server = None |
||||
self._port = None |
||||
|
||||
def tearDown(self): |
||||
if self._server: |
||||
self._server.stop(0) |
||||
|
||||
@mock.patch( |
||||
"opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" |
||||
) |
||||
def testMetadataExchangeClientDoesNotSendMetadata(self, mock_detector): |
||||
mock_detector.return_value = MOCK_GKE_RESOURCE |
||||
with mock.patch.dict( |
||||
os.environ, |
||||
{ |
||||
"CSM_CANONICAL_SERVICE_NAME": "canonical_service", |
||||
"CSM_WORKLOAD_NAME": "workload", |
||||
}, |
||||
): |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=CSMOpenTelemetryLabelInjector(), |
||||
active_on_client=False, |
||||
) |
||||
|
||||
# Manually create csm_plugin so that it's always disabled on client. |
||||
csm_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
csm_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
csm_plugin.deregister_global() |
||||
|
||||
validate_metrics_exist(self, self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
for labels in label_list: |
||||
# Verifies that the server records unknown when the client does not send metadata |
||||
if name in ["grpc.server.call.duration"]: |
||||
self.assertEqual( |
||||
labels["csm.workload_canonical_service"], |
||||
"canonical_service", |
||||
) |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_canonical_service"], |
||||
"unknown", |
||||
) |
||||
# Client metric should not have CSM labels. |
||||
elif "grpc.client" in name: |
||||
self.assertTrue( |
||||
"csm.workload_canonical_service" not in labels.keys() |
||||
) |
||||
self.assertTrue( |
||||
"csm.remote_workload_canonical_service" |
||||
not in labels.keys() |
||||
) |
||||
|
||||
@mock.patch( |
||||
"opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" |
||||
) |
||||
def testResourceDetectorGCE(self, mock_detector): |
||||
mock_detector.return_value = MOCK_GCE_RESOURCE |
||||
with mock.patch.dict( |
||||
os.environ, |
||||
{ |
||||
"CSM_CANONICAL_SERVICE_NAME": "canonical_service", |
||||
"CSM_WORKLOAD_NAME": "workload", |
||||
}, |
||||
): |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=CSMOpenTelemetryLabelInjector(), |
||||
) |
||||
|
||||
# Have to manually create csm_plugin so that we can enable it for all |
||||
# channels. |
||||
csm_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
csm_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
_test_server.unary_unary_call(port=port) |
||||
csm_plugin.deregister_global() |
||||
|
||||
validate_metrics_exist(self, self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
# started metrics shouldn't have any csm labels. |
||||
if name in [ |
||||
"grpc.client.attempt.started", |
||||
"grpc.server.call.started", |
||||
]: |
||||
self._verify_no_service_mesh_attributes(label_list) |
||||
# duration metrics should have all csm related labels. |
||||
elif name in [ |
||||
"grpc.client.attempt.duration", |
||||
"grpc.server.call.duration", |
||||
]: |
||||
self._verify_service_mesh_attributes(label_list, TYPE_GCE) |
||||
|
||||
@mock.patch( |
||||
"opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" |
||||
) |
||||
def testResourceDetectorGKE(self, mock_detector): |
||||
mock_detector.return_value = MOCK_GKE_RESOURCE |
||||
with mock.patch.dict( |
||||
os.environ, |
||||
{ |
||||
"CSM_CANONICAL_SERVICE_NAME": "canonical_service", |
||||
"CSM_WORKLOAD_NAME": "workload", |
||||
}, |
||||
): |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=CSMOpenTelemetryLabelInjector() |
||||
) |
||||
|
||||
# Have to manually create csm_plugin so that we can enable it for all |
||||
# channels. |
||||
csm_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
csm_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
csm_plugin.deregister_global() |
||||
|
||||
validate_metrics_exist(self, self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
# started metrics shouldn't have any csm labels. |
||||
if name in [ |
||||
"grpc.client.attempt.started", |
||||
"grpc.server.call.started", |
||||
]: |
||||
self._verify_no_service_mesh_attributes(label_list) |
||||
# duration metrics should have all csm related labels. |
||||
elif name in [ |
||||
"grpc.client.attempt.duration", |
||||
"grpc.server.call.duration", |
||||
]: |
||||
self._verify_service_mesh_attributes(label_list, TYPE_GKE) |
||||
|
||||
@mock.patch( |
||||
"opentelemetry.resourcedetector.gcp_resource_detector.GoogleCloudResourceDetector.detect" |
||||
) |
||||
def testResourceDetectorUnknown(self, mock_detector): |
||||
mock_detector.return_value = MOCK_UNKNOWN_RESOURCE |
||||
with mock.patch.dict( |
||||
os.environ, |
||||
{ |
||||
"CSM_CANONICAL_SERVICE_NAME": "canonical_service", |
||||
"CSM_WORKLOAD_NAME": "workload", |
||||
}, |
||||
): |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=CSMOpenTelemetryLabelInjector() |
||||
) |
||||
|
||||
# Have to manually create csm_plugin so that we can enable it for all |
||||
# channels. |
||||
csm_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
csm_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
csm_plugin.deregister_global() |
||||
|
||||
validate_metrics_exist(self, self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
# started metrics shouldn't have any csm labels. |
||||
if name in [ |
||||
"grpc.client.attempt.started", |
||||
"grpc.server.call.started", |
||||
]: |
||||
self._verify_no_service_mesh_attributes(label_list) |
||||
# duration metrics should have all csm related labels. |
||||
elif name in [ |
||||
"grpc.client.attempt.duration", |
||||
"grpc.server.call.duration", |
||||
]: |
||||
self._verify_service_mesh_attributes(label_list, UNKNOWN_VALUE) |
||||
|
||||
def _verify_service_mesh_attributes( |
||||
self, label_list: List[Dict[str, str]], resource_type: str |
||||
): |
||||
for labels in label_list: |
||||
# Assuming attributes is a dictionary |
||||
self.assertEqual( |
||||
labels["csm.workload_canonical_service"], "canonical_service" |
||||
) |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_canonical_service"], |
||||
"canonical_service", |
||||
) |
||||
|
||||
if resource_type == TYPE_GKE: |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_type"], "gcp_kubernetes_engine" |
||||
) |
||||
self.assertEqual(labels["csm.remote_workload_name"], "workload") |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_namespace_name"], "namespace" |
||||
) |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_cluster_name"], "cluster" |
||||
) |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_location"], "region" |
||||
) |
||||
self.assertEqual(labels["csm.remote_workload_project_id"], "id") |
||||
elif resource_type == TYPE_GCE: |
||||
self.assertEqual( |
||||
labels["csm.remote_workload_type"], "gcp_compute_engine" |
||||
) |
||||
self.assertEqual(labels["csm.remote_workload_name"], "workload") |
||||
self.assertEqual(labels["csm.remote_workload_location"], "zone") |
||||
self.assertEqual(labels["csm.remote_workload_project_id"], "id") |
||||
elif resource_type == UNKNOWN_VALUE: |
||||
self.assertEqual(labels["csm.remote_workload_type"], "random") |
||||
|
||||
def _verify_no_service_mesh_attributes( |
||||
self, label_list: List[Dict[str, str]] |
||||
): |
||||
for labels in label_list: |
||||
self.assertTrue( |
||||
"csm.remote_workload_canonical_service" not in labels.keys() |
||||
) |
||||
self.assertTrue("csm.remote_workload_type" not in labels.keys()) |
||||
self.assertTrue( |
||||
"csm.workload_canonical_service" not in labels.keys() |
||||
) |
||||
self.assertTrue("csm.workload_type" not in labels.keys()) |
||||
self.assertTrue("csm.mesh_id" not in labels.keys()) |
||||
|
||||
|
||||
def validate_metrics_exist( |
||||
testCase: unittest.TestCase, all_metrics: Dict[str, Any] |
||||
) -> None: |
||||
# Sleep here to make sure we have at least one export from OTel MetricExporter. |
||||
assert_eventually( |
||||
testCase=testCase, |
||||
predicate=lambda: len(all_metrics.keys()) > 1, |
||||
message=lambda: f"No metrics was exported", |
||||
) |
||||
|
||||
|
||||
def assert_eventually( |
||||
testCase: unittest.TestCase, |
||||
predicate: Callable[[], bool], |
||||
*, |
||||
timeout: Optional[datetime.timedelta] = None, |
||||
message: Optional[Callable[[], str]] = None, |
||||
) -> None: |
||||
message = message or (lambda: "Proposition did not evaluate to true") |
||||
timeout = timeout or datetime.timedelta(seconds=5) |
||||
end = datetime.datetime.now() + timeout |
||||
while datetime.datetime.now() < end: |
||||
if predicate(): |
||||
break |
||||
time.sleep(0.5) |
||||
else: |
||||
testCase.fail(message() + " after " + str(timeout)) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig() |
||||
unittest.main(verbosity=2) |
@ -0,0 +1,25 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
_BEFORE_IMPORT = tuple(globals()) |
||||
|
||||
from grpc_csm_observability import * # pylint: disable=wildcard-import,unused-wildcard-import |
||||
|
||||
_AFTER_IMPORT = tuple(globals()) |
||||
|
||||
GRPC_CSM_OBSERVABILITY_ELEMENTS = tuple( |
||||
element |
||||
for element in _AFTER_IMPORT |
||||
if element not in _BEFORE_IMPORT and element != "_BEFORE_IMPORT" |
||||
) |
@ -0,0 +1,363 @@ |
||||
# Copyright 2024 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from collections import defaultdict |
||||
import datetime |
||||
import logging |
||||
import os |
||||
import sys |
||||
import time |
||||
from typing import Any, AnyStr, Callable, Dict, List, Optional, Set |
||||
import unittest |
||||
|
||||
from google.protobuf import struct_pb2 |
||||
import grpc_observability |
||||
from grpc_observability import _open_telemetry_measures |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryLabelInjector |
||||
from grpc_observability._open_telemetry_plugin import OpenTelemetryPluginOption |
||||
from opentelemetry.sdk.metrics import MeterProvider |
||||
from opentelemetry.sdk.metrics.export import AggregationTemporality |
||||
from opentelemetry.sdk.metrics.export import MetricExportResult |
||||
from opentelemetry.sdk.metrics.export import MetricExporter |
||||
from opentelemetry.sdk.metrics.export import MetricsData |
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader |
||||
|
||||
from tests.observability import _test_server |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
STREAM_LENGTH = 5 |
||||
OTEL_EXPORT_INTERVAL_S = 0.5 |
||||
CSM_METADATA_EXCHANGE_LABEL_KEY = "exchange_labels_key" |
||||
|
||||
# The following metrics should have optional labels when optional |
||||
# labels is enabled through OpenTelemetryPlugin. |
||||
METRIC_NAME_WITH_OPTIONAL_LABEL = [ |
||||
"grpc.client.attempt.duration" |
||||
"grpc.client.attempt.sent_total_compressed_message_size", |
||||
"grpc.client.attempt.rcvd_total_compressed_message_size", |
||||
] |
||||
CSM_OPTIONAL_LABEL_KEYS = ["csm.service_name", "csm.service_namespace_name"] |
||||
|
||||
# The following metrics should have metadata exchange labels when metadata |
||||
# exchange flow is triggered. |
||||
METRIC_NAME_WITH_EXCHANGE_LABEL = [ |
||||
"grpc.client.attempt.duration" |
||||
"grpc.client.attempt.sent_total_compressed_message_size", |
||||
"grpc.client.attempt.rcvd_total_compressed_message_size", |
||||
"grpc.server.call.duration", |
||||
"grpc.server.call.sent_total_compressed_message_size", |
||||
"grpc.server.call.rcvd_total_compressed_message_size", |
||||
] |
||||
|
||||
|
||||
class OTelMetricExporter(MetricExporter): |
||||
"""Implementation of :class:`MetricExporter` that export metrics to the |
||||
provided metric_list. |
||||
|
||||
all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, |
||||
value is a list of labels recorded for that metric. |
||||
An example item of this dict: |
||||
{"grpc.client.attempt.started": |
||||
[{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, |
||||
{'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} |
||||
""" |
||||
|
||||
def __init__( |
||||
self, |
||||
all_metrics: Dict[str, List], |
||||
preferred_temporality: Dict[type, AggregationTemporality] = None, |
||||
preferred_aggregation: Dict[ |
||||
type, "opentelemetry.sdk.metrics.view.Aggregation" |
||||
] = None, |
||||
): |
||||
super().__init__( |
||||
preferred_temporality=preferred_temporality, |
||||
preferred_aggregation=preferred_aggregation, |
||||
) |
||||
self.all_metrics = all_metrics |
||||
|
||||
def export( |
||||
self, |
||||
metrics_data: MetricsData, |
||||
timeout_millis: float = 10_000, |
||||
**kwargs, |
||||
) -> MetricExportResult: |
||||
self.record_metric(metrics_data) |
||||
return MetricExportResult.SUCCESS |
||||
|
||||
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: |
||||
pass |
||||
|
||||
def force_flush(self, timeout_millis: float = 10_000) -> bool: |
||||
return True |
||||
|
||||
def record_metric(self, metrics_data: MetricsData) -> None: |
||||
for resource_metric in metrics_data.resource_metrics: |
||||
for scope_metric in resource_metric.scope_metrics: |
||||
for metric in scope_metric.metrics: |
||||
for data_point in metric.data.data_points: |
||||
self.all_metrics[metric.name].append( |
||||
data_point.attributes |
||||
) |
||||
|
||||
|
||||
class TestLabelInjector(OpenTelemetryLabelInjector): |
||||
_exchange_labels: Dict[str, AnyStr] |
||||
_local_labels: Dict[str, str] |
||||
|
||||
def __init__( |
||||
self, local_labels: Dict[str, str], exchange_labels: Dict[str, str] |
||||
): |
||||
self._exchange_labels = exchange_labels |
||||
self._local_labels = local_labels |
||||
|
||||
def get_labels_for_exchange(self) -> Dict[str, AnyStr]: |
||||
return self._exchange_labels |
||||
|
||||
def get_additional_labels( |
||||
self, include_exchange_labels: bool |
||||
) -> Dict[str, str]: |
||||
return self._local_labels |
||||
|
||||
def deserialize_labels( |
||||
self, labels: Dict[str, AnyStr] |
||||
) -> Dict[str, AnyStr]: |
||||
deserialized_labels = {} |
||||
for key, value in labels.items(): |
||||
if "XEnvoyPeerMetadata" == key: |
||||
struct = struct_pb2.Struct() |
||||
struct.ParseFromString(value) |
||||
|
||||
exchange_labels_value = self._get_value_from_struct( |
||||
CSM_METADATA_EXCHANGE_LABEL_KEY, struct |
||||
) |
||||
deserialized_labels[ |
||||
CSM_METADATA_EXCHANGE_LABEL_KEY |
||||
] = exchange_labels_value |
||||
else: |
||||
deserialized_labels[key] = value |
||||
|
||||
return deserialized_labels |
||||
|
||||
def _get_value_from_struct( |
||||
self, key: str, struct: struct_pb2.Struct |
||||
) -> str: |
||||
value = struct.fields.get(key) |
||||
if not value: |
||||
return "unknown" |
||||
return value.string_value |
||||
|
||||
|
||||
class TestOpenTelemetryPluginOption(OpenTelemetryPluginOption): |
||||
_label_injector: OpenTelemetryLabelInjector |
||||
_active_on_client: bool |
||||
_active_on_server: bool |
||||
|
||||
def __init__( |
||||
self, |
||||
label_injector: OpenTelemetryLabelInjector, |
||||
active_on_client: Optional[bool] = True, |
||||
active_on_server: Optional[bool] = True, |
||||
): |
||||
self._label_injector = label_injector |
||||
self._active_on_client = active_on_client |
||||
self._active_on_server = active_on_server |
||||
|
||||
def is_active_on_client_channel(self, target: str) -> bool: |
||||
return self._active_on_client |
||||
|
||||
def is_active_on_server(self, xds: bool) -> bool: |
||||
return self._active_on_server |
||||
|
||||
def get_label_injector(self) -> OpenTelemetryLabelInjector: |
||||
return self._label_injector |
||||
|
||||
|
||||
@unittest.skipIf( |
||||
os.name == "nt" or "darwin" in sys.platform, |
||||
"Observability is not supported in Windows and MacOS", |
||||
) |
||||
class ObservabilityPluginTest(unittest.TestCase): |
||||
def setUp(self): |
||||
self.all_metrics = defaultdict(list) |
||||
otel_exporter = OTelMetricExporter(self.all_metrics) |
||||
reader = PeriodicExportingMetricReader( |
||||
exporter=otel_exporter, |
||||
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, |
||||
) |
||||
self._provider = MeterProvider(metric_readers=[reader]) |
||||
self._server = None |
||||
self._port = None |
||||
|
||||
def tearDown(self): |
||||
if self._server: |
||||
self._server.stop(0) |
||||
|
||||
def testLabelInjectorWithLocalLabels(self): |
||||
"""Local labels in label injector should be added to all metrics.""" |
||||
label_injector = TestLabelInjector( |
||||
local_labels={"local_labels_key": "local_labels_value"}, |
||||
exchange_labels={}, |
||||
) |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=label_injector |
||||
) |
||||
otel_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
otel_plugin.register_global() |
||||
self._server, port = _test_server.start_server() |
||||
_test_server.unary_unary_call(port=port) |
||||
otel_plugin.deregister_global() |
||||
|
||||
self._validate_metrics_exist(self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
self._validate_label_exist(name, label_list, ["local_labels_key"]) |
||||
|
||||
def testClientSidePluginOption(self): |
||||
label_injector = TestLabelInjector( |
||||
local_labels={"local_labels_key": "local_labels_value"}, |
||||
exchange_labels={}, |
||||
) |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=label_injector, active_on_server=False |
||||
) |
||||
otel_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
otel_plugin.register_global() |
||||
server, port = _test_server.start_server() |
||||
self._server = server |
||||
_test_server.unary_unary_call(port=port) |
||||
otel_plugin.deregister_global() |
||||
|
||||
self._validate_metrics_exist(self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
if "grpc.client" in name: |
||||
self._validate_label_exist( |
||||
name, label_list, ["local_labels_key"] |
||||
) |
||||
for name, label_list in self.all_metrics.items(): |
||||
if "grpc.server" in name: |
||||
self._validate_label_not_exist( |
||||
name, label_list, ["local_labels_key"] |
||||
) |
||||
|
||||
def testServerSidePluginOption(self): |
||||
label_injector = TestLabelInjector( |
||||
local_labels={"local_labels_key": "local_labels_value"}, |
||||
exchange_labels={}, |
||||
) |
||||
plugin_option = TestOpenTelemetryPluginOption( |
||||
label_injector=label_injector, active_on_client=False |
||||
) |
||||
otel_plugin = grpc_observability.OpenTelemetryPlugin( |
||||
meter_provider=self._provider, plugin_options=[plugin_option] |
||||
) |
||||
|
||||
otel_plugin.register_global() |
||||
server, port = _test_server.start_server() |
||||
self._server = server |
||||
_test_server.unary_unary_call(port=port) |
||||
otel_plugin.deregister_global() |
||||
|
||||
self._validate_metrics_exist(self.all_metrics) |
||||
for name, label_list in self.all_metrics.items(): |
||||
if "grpc.client" in name: |
||||
self._validate_label_not_exist( |
||||
name, label_list, ["local_labels_key"] |
||||
) |
||||
for name, label_list in self.all_metrics.items(): |
||||
if "grpc.server" in name: |
||||
self._validate_label_exist( |
||||
name, label_list, ["local_labels_key"] |
||||
) |
||||
|
||||
def assert_eventually( |
||||
self, |
||||
predicate: Callable[[], bool], |
||||
*, |
||||
timeout: Optional[datetime.timedelta] = None, |
||||
message: Optional[Callable[[], str]] = None, |
||||
) -> None: |
||||
message = message or (lambda: "Proposition did not evaluate to true") |
||||
timeout = timeout or datetime.timedelta(seconds=5) |
||||
end = datetime.datetime.now() + timeout |
||||
while datetime.datetime.now() < end: |
||||
if predicate(): |
||||
break |
||||
time.sleep(0.5) |
||||
else: |
||||
self.fail(message() + " after " + str(timeout)) |
||||
|
||||
def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None: |
||||
# Sleep here to make sure we have at least one export from OTel MetricExporter. |
||||
self.assert_eventually( |
||||
lambda: len(all_metrics.keys()) > 1, |
||||
message=lambda: f"No metrics was exported", |
||||
) |
||||
|
||||
def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: |
||||
self._validate_server_metrics_names(metric_names) |
||||
self._validate_client_metrics_names(metric_names) |
||||
|
||||
def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: |
||||
for base_metric in _open_telemetry_measures.base_metrics(): |
||||
if "grpc.server" in base_metric.name: |
||||
self.assertTrue( |
||||
base_metric.name in metric_names, |
||||
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", |
||||
) |
||||
|
||||
def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: |
||||
for base_metric in _open_telemetry_measures.base_metrics(): |
||||
if "grpc.client" in base_metric.name: |
||||
self.assertTrue( |
||||
base_metric.name in metric_names, |
||||
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", |
||||
) |
||||
|
||||
def _validate_label_exist( |
||||
self, |
||||
metric_name: str, |
||||
metric_label_list: List[str], |
||||
labels_to_check: List[str], |
||||
) -> None: |
||||
for metric_label in metric_label_list: |
||||
for label in labels_to_check: |
||||
self.assertTrue( |
||||
label in metric_label, |
||||
msg=f"label with key {label} not found in metric {metric_name}, found label list: {metric_label}", |
||||
) |
||||
|
||||
def _validate_label_not_exist( |
||||
self, |
||||
metric_name: str, |
||||
metric_label_list: List[str], |
||||
labels_to_check: List[str], |
||||
) -> None: |
||||
for metric_label in metric_label_list: |
||||
for label in labels_to_check: |
||||
self.assertFalse( |
||||
label in metric_label, |
||||
msg=f"found unexpected label with key {label} in metric {metric_name}, found label list: {metric_label}", |
||||
) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig() |
||||
unittest.main(verbosity=2) |
@ -0,0 +1,19 @@ |
||||
%YAML 1.2 |
||||
--- | |
||||
# Copyright 2024 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_csm_observability/grpc_version.py.template`!!! |
||||
|
||||
VERSION = '${settings.python_version.pep440()}' |
Loading…
Reference in new issue