[Python O11y] Change public interface (#36094)

Address comments from design review meeting, mainly:
* Use `OpenTelemetryPlugin` as public API.
* Use keyword args to build plugin.
<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #36094

PiperOrigin-RevId: 615807264
pull/36081/head
Xuan Wang 11 months ago committed by Copybara-Service
parent c910004328
commit 24be69b9bd
  1. 17
      examples/python/observability/observability_greeter_client.py
  2. 17
      examples/python/observability/observability_greeter_server.py
  3. 1
      src/python/grpcio_observability/grpc_observability/BUILD.bazel
  4. 16
      src/python/grpcio_observability/grpc_observability/__init__.py
  5. 38
      src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py
  6. 177
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py
  7. 240
      src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py
  8. 7
      src/python/grpcio_tests/tests/observability/_observability_api_test.py
  9. 167
      src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py

@ -16,7 +16,6 @@
from collections import defaultdict
import logging
import time
from typing import Optional
import grpc
import grpc_observability
@ -29,14 +28,6 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
OTEL_EXPORT_INTERVAL_S = 0.5
class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin):
def __init__(self, provider: MeterProvider):
self.provider = provider
def get_meter_provider(self) -> Optional[MeterProvider]:
return self.provider
def run():
all_metrics = defaultdict(list)
otel_exporter = open_telemetry_exporter.OTelMetricExporter(all_metrics)
@ -45,9 +36,11 @@ def run():
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
)
provider = MeterProvider(metric_readers=[reader])
otel_plugin = BaseOpenTelemetryPlugin(provider)
grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin])
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=provider
)
otel_plugin.register_global()
with grpc.insecure_channel(target="localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
@ -56,7 +49,7 @@ def run():
print(f"Greeter client received: {response.message}")
except grpc.RpcError as rpc_error:
print("Call failed with code: ", rpc_error.code())
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
# Sleep to make sure all metrics are exported.
time.sleep(5)

@ -17,7 +17,6 @@ from collections import defaultdict
from concurrent import futures
import logging
import time
from typing import Optional
import grpc
import grpc_observability
@ -31,14 +30,6 @@ _OTEL_EXPORT_INTERVAL_S = 0.5
_SERVER_PORT = "50051"
class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin):
def __init__(self, provider: MeterProvider):
self.provider = provider
def get_meter_provider(self) -> Optional[MeterProvider]:
return self.provider
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
message = request.name
@ -55,9 +46,11 @@ def serve():
export_interval_millis=_OTEL_EXPORT_INTERVAL_S * 1000,
)
provider = MeterProvider(metric_readers=[reader])
otel_plugin = BaseOpenTelemetryPlugin(provider)
grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin])
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=provider
)
otel_plugin.register_global()
server = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=10),
@ -74,7 +67,7 @@ def serve():
print(metric)
server.stop(0)
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
if __name__ == "__main__":

@ -58,7 +58,6 @@ pyx_library(
py_library(
name = "_opentelemetry_observability",
srcs = [
"_open_telemetry_exporter.py",
"_open_telemetry_measures.py",
"_open_telemetry_observability.py",
"_open_telemetry_plugin.py",

@ -12,20 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc_observability._open_telemetry_observability import (
OpenTelemetryObservability,
)
from grpc_observability._open_telemetry_observability import (
end_open_telemetry_observability,
)
from grpc_observability._open_telemetry_observability import (
start_open_telemetry_observability,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
__all__ = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)
__all__ = ("OpenTelemetryPlugin",)

@ -1,38 +0,0 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, List
from grpc_observability import _observability # pytype: disable=pyi-error
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin
class _OpenTelemetryExporterDelegator(_observability.Exporter):
_plugins: Iterable[_OpenTelemetryPlugin]
def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]):
self._plugins = plugins
def export_stats_data(
self, stats_data: List[_observability.StatsData]
) -> None:
# Records stats data to MeterProvider.
for data in stats_data:
for plugin in self._plugins:
plugin.maybe_record_stats_data(data)
def export_tracing_data(
self, tracing_data: List[_observability.TracingData]
) -> None:
pass

@ -15,17 +15,19 @@
import logging
import threading
import time
from typing import Any, Iterable, Optional
from typing import Any, Dict, Iterable, List, Optional, Union
import grpc
# pytype: disable=pyi-error
from grpc_observability import _cyobservability
from grpc_observability._open_telemetry_exporter import (
_OpenTelemetryExporterDelegator,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin
from grpc_observability import _observability
from grpc_observability import _open_telemetry_measures
from grpc_observability._cyobservability import MetricsName
from grpc_observability._observability import StatsData
from opentelemetry.metrics import Counter
from opentelemetry.metrics import Histogram
from opentelemetry.metrics import Meter
_LOGGER = logging.getLogger(__name__)
@ -34,6 +36,13 @@ ServerCallTracerFactoryCapsule = (
Any # it appears only once in the function signature
)
grpc_observability = Any # grpc_observability.py imports this module.
OpenTelemetryPlugin = Any # _open_telemetry_plugin.py imports this module.
GRPC_METHOD_LABEL = "grpc.method"
GRPC_TARGET_LABEL = "grpc.target"
GRPC_OTHER_LABEL_VALUE = "other"
_observability_lock: threading.RLock = threading.RLock()
_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None
GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.OK: "OK",
@ -55,13 +64,125 @@ GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
_observability_lock: threading.RLock = threading.RLock()
_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None
class _OpenTelemetryPlugin:
_plugin: OpenTelemetryPlugin
_metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]]
def __init__(self, plugin: OpenTelemetryPlugin):
self._plugin = plugin
self._metric_to_recorder = dict()
meter_provider = self._plugin.meter_provider
if meter_provider:
meter = meter_provider.get_meter("grpc-python", grpc.__version__)
enabled_metrics = _open_telemetry_measures.base_metrics()
self._metric_to_recorder = self._register_metrics(
meter, enabled_metrics
)
def _should_record(self, stats_data: StatsData) -> bool:
# Decide if this plugin should record the stats_data.
return stats_data.name in self._metric_to_recorder.keys()
def _record_stats_data(self, stats_data: StatsData) -> None:
recorder = self._metric_to_recorder[stats_data.name]
target = stats_data.labels.get(GRPC_TARGET_LABEL, "")
if not self._plugin.target_attribute_filter(target):
# Filter target name.
stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE
method = stats_data.labels.get(GRPC_METHOD_LABEL, "")
if not self._plugin.generic_method_attribute_filter(method):
# Filter method name.
stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE
value = 0
if stats_data.measure_double:
value = stats_data.value_float
else:
value = stats_data.value_int
if isinstance(recorder, Counter):
recorder.add(value, attributes=stats_data.labels)
elif isinstance(recorder, Histogram):
recorder.record(value, attributes=stats_data.labels)
# pylint: disable=no-self-use
def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None:
# Records stats data to MeterProvider.
if self._should_record(stats_data):
self._record_stats_data(stats_data)
def _register_metrics(
self, meter: Meter, metrics: List[_open_telemetry_measures.Metric]
) -> Dict[MetricsName, Union[Counter, Histogram]]:
metric_to_recorder_map = {}
recorder = None
for metric in metrics:
if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif (
metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES
):
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
metric_to_recorder_map[metric.cyname] = recorder
return metric_to_recorder_map
def start_open_telemetry_observability(
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
plugins: Iterable[_OpenTelemetryPlugin],
) -> None:
_start_open_telemetry_observability(
OpenTelemetryObservability(plugins=plugins)
@ -72,6 +193,26 @@ def end_open_telemetry_observability() -> None:
_end_open_telemetry_observability()
class _OpenTelemetryExporterDelegator(_observability.Exporter):
_plugins: Iterable[_OpenTelemetryPlugin]
def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]):
self._plugins = plugins
def export_stats_data(
self, stats_data: List[_observability.StatsData]
) -> None:
# Records stats data to MeterProvider.
for data in stats_data:
for plugin in self._plugins:
plugin.maybe_record_stats_data(data)
def export_tracing_data(
self, tracing_data: List[_observability.TracingData]
) -> None:
pass
# pylint: disable=no-self-use
class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
"""OpenTelemetry based plugin implementation.
@ -79,7 +220,7 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
This is class is part of an EXPERIMENTAL API.
Args:
plugin: OpenTelemetryPlugin to enable.
plugin: _OpenTelemetryPlugin to enable.
"""
exporter: "grpc_observability.Exporter"
@ -87,21 +228,9 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
def __init__(
self,
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
plugins: Optional[Iterable[_OpenTelemetryPlugin]],
):
_plugins = []
if plugins:
for plugin in plugins:
_plugins.append(_OpenTelemetryPlugin(plugin))
self.exporter = _OpenTelemetryExporterDelegator(_plugins)
def __enter__(self):
_start_open_telemetry_observability(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
_end_open_telemetry_observability()
self.exporter = _OpenTelemetryExporterDelegator(plugins)
def observability_init(self):
try:

@ -13,22 +13,12 @@
# limitations under the License.
import abc
from typing import Dict, Iterable, List, Optional, Union
from typing import Callable, Dict, Iterable, List, Optional
# pytype: disable=pyi-error
import grpc
from grpc_observability import _open_telemetry_measures
from grpc_observability._cyobservability import MetricsName
from grpc_observability._observability import StatsData
from opentelemetry.metrics import Counter
from opentelemetry.metrics import Histogram
from opentelemetry.metrics import Meter
from grpc_observability import _open_telemetry_observability
from opentelemetry.metrics import MeterProvider
GRPC_METHOD_LABEL = "grpc.method"
GRPC_TARGET_LABEL = "grpc.target"
GRPC_OTHER_LABEL_VALUE = "other"
class OpenTelemetryLabelInjector(abc.ABC):
"""
@ -90,184 +80,80 @@ class OpenTelemetryPluginOption(abc.ABC):
# pylint: disable=no-self-use
class OpenTelemetryPlugin:
"""Describes a Plugin for OpenTelemetry observability.
"""Describes a Plugin for OpenTelemetry observability."""
This is class is part of an EXPERIMENTAL API.
"""
plugin_options: Iterable[OpenTelemetryPluginOption]
meter_provider: Optional[MeterProvider]
target_attribute_filter: Callable[[str], bool]
generic_method_attribute_filter: Callable[[str], bool]
_plugin: _open_telemetry_observability._OpenTelemetryPlugin
def get_plugin_options(
def __init__(
self,
) -> Iterable[OpenTelemetryPluginOption]:
"""
This function will be used to get plugin options which are enabled for
this OpenTelemetryPlugin instance.
Returns:
An Iterable of class OpenTelemetryPluginOption which will be enabled for
this OpenTelemetryPlugin.
"""
return []
def get_meter_provider(self) -> Optional[MeterProvider]:
"""
This function will be used to get the MeterProvider for this OpenTelemetryPlugin
instance.
Returns:
A MeterProvider which will be used to collect telemetry data, or None which
means no metrics will be collected.
"""
return None
def target_attribute_filter(
self, target: str # pylint: disable=unused-argument
) -> bool:
*,
plugin_options: Iterable[OpenTelemetryPluginOption] = [],
meter_provider: Optional[MeterProvider] = None,
target_attribute_filter: Optional[Callable[[str], bool]] = None,
generic_method_attribute_filter: Optional[Callable[[str], bool]] = None,
):
"""
Once overridden, this will be called per channel to decide whether to record the
target attribute on client or to replace it with "other".
Args:
plugin_options: An Iterable of OpenTelemetryPluginOption which will be
enabled for this OpenTelemetryPlugin.
meter_provider: A MeterProvider which will be used to collect telemetry data,
or None which means no metrics will be collected.
target_attribute_filter: Once provided, this will be called per channel to decide
whether to record the target attribute on client or to replace it with "other".
This helps reduce the cardinality on metrics in cases where many channels
are created with different targets in the same binary (which might happen
for example, if the channel target string uses IP addresses directly).
Args:
target: The target for the RPC.
Returns:
bool: True means the original target string will be used, False means target string
will be replaced with "other".
Return True means the original target string will be used, False means target string
will be replaced with "other".
generic_method_attribute_filter: Once provided, this will be called with a generic
method type to decide whether to record the method name or to replace it with
"other". Note that pre-registered methods will always be recorded no matter what
this function returns.
Return True means the original method name will be used, False means method name will
be replaced with "other".
"""
return True
self.plugin_options = plugin_options
self.meter_provider = meter_provider
if target_attribute_filter:
self.target_attribute_filter = target_attribute_filter
else:
self.target_attribute_filter = lambda target: True
if generic_method_attribute_filter:
self.generic_method_attribute_filter = (
generic_method_attribute_filter
)
else:
self.generic_method_attribute_filter = lambda method: False
self._plugin = _open_telemetry_observability._OpenTelemetryPlugin(self)
def generic_method_attribute_filter(
self, method: str # pylint: disable=unused-argument
) -> bool:
def register_global(self) -> None:
"""
Once overridden, this will be called with a generic method type to decide whether to
record the method name or to replace it with "other".
Note that pre-registered methods will always be recorded no matter what this
function returns.
Args:
method: The method name for the RPC.
Registers a global plugin that acts on all channels and servers running on the process.
Returns:
bool: True means the original method name will be used, False means method name
will be replaced with "other".
Raises:
RuntimeError: If a global plugin was already registered.
"""
return False
class _OpenTelemetryPlugin:
_plugin: OpenTelemetryPlugin
_metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]]
def __init__(self, plugin: OpenTelemetryPlugin):
self._plugin = plugin
self._metric_to_recorder = dict()
_open_telemetry_observability.start_open_telemetry_observability(
plugins=[self._plugin]
)
meter_provider = self._plugin.get_meter_provider()
if meter_provider:
meter = meter_provider.get_meter("grpc-python", grpc.__version__)
enabled_metrics = _open_telemetry_measures.base_metrics()
self._metric_to_recorder = self._register_metrics(
meter, enabled_metrics
)
def _should_record(self, stats_data: StatsData) -> bool:
# Decide if this plugin should record the stats_data.
return stats_data.name in self._metric_to_recorder.keys()
def _record_stats_data(self, stats_data: StatsData) -> None:
recorder = self._metric_to_recorder[stats_data.name]
target = stats_data.labels.get(GRPC_TARGET_LABEL, "")
if not self._plugin.target_attribute_filter(target):
# Filter target name.
stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE
method = stats_data.labels.get(GRPC_METHOD_LABEL, "")
if not self._plugin.generic_method_attribute_filter(method):
# Filter method name.
stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE
def deregister_global(self) -> None:
"""
De-register the global plugin that acts on all channels and servers running on the process.
value = 0
if stats_data.measure_double:
value = stats_data.value_float
else:
value = stats_data.value_int
if isinstance(recorder, Counter):
recorder.add(value, attributes=stats_data.labels)
elif isinstance(recorder, Histogram):
recorder.record(value, attributes=stats_data.labels)
Raises:
RuntimeError: If no global plugin was registered.
"""
_open_telemetry_observability.end_open_telemetry_observability()
# pylint: disable=no-self-use
def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None:
# Records stats data to MeterProvider.
if self._should_record(stats_data):
self._record_stats_data(stats_data)
def __enter__(self) -> None:
_open_telemetry_observability.start_open_telemetry_observability(
plugins=[self._plugin]
)
def _register_metrics(
self, meter: Meter, metrics: List[_open_telemetry_measures.Metric]
) -> Dict[MetricsName, Union[Counter, Histogram]]:
metric_to_recorder_map = {}
recorder = None
for metric in metrics:
if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif (
metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES
):
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
metric_to_recorder_map[metric.cyname] = recorder
return metric_to_recorder_map
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
_open_telemetry_observability.end_open_telemetry_observability()

@ -21,12 +21,7 @@ from tests.observability import _from_observability_import_star
class AllTest(unittest.TestCase):
def testAll(self):
expected_observability_code_elements = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)
expected_observability_code_elements = ("OpenTelemetryPlugin",)
self.assertCountEqual(
expected_observability_code_elements,

@ -20,14 +20,15 @@ import sys
import time
from typing import Any, Callable, Dict, List, Optional, Set
import unittest
from unittest.mock import patch
import grpc
import grpc_observability
from grpc_observability import _open_telemetry_measures
from grpc_observability._open_telemetry_plugin import GRPC_METHOD_LABEL
from grpc_observability._open_telemetry_plugin import GRPC_OTHER_LABEL_VALUE
from grpc_observability._open_telemetry_plugin import GRPC_TARGET_LABEL
from grpc_observability._open_telemetry_observability import (
GRPC_OTHER_LABEL_VALUE,
)
from grpc_observability._open_telemetry_observability import GRPC_METHOD_LABEL
from grpc_observability._open_telemetry_observability import GRPC_TARGET_LABEL
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import AggregationTemporality
from opentelemetry.sdk.metrics.export import MetricExportResult
@ -94,14 +95,6 @@ class OTelMetricExporter(MetricExporter):
)
class BaseTestOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin):
def __init__(self, provider: MeterProvider):
self.provider = provider
def get_meter_provider(self) -> Optional[MeterProvider]:
return self.provider
class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(
self, continuation, client_call_details, request_or_iterator
@ -136,9 +129,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._server.stop(0)
def testRecordUnaryUnaryUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -148,36 +140,42 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_all_metrics_names(self.all_metrics)
def testRecordUnaryUnaryUseGlobalInit(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
def testCallGlobalInitThrowErrorWhenGlobalCalled(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
try:
grpc_observability.start_open_telemetry_observability(plugins=[])
otel_plugin.register_global()
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized", str(exp)
)
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
def testCallGlobalInitThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
try:
grpc_observability.start_open_telemetry_observability(
plugins=[]
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized",
@ -185,20 +183,29 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
)
def testCallContextManagerThrowErrorWhenGlobalInitCalled(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
pass
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized", str(exp)
)
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
def testContextManagerThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
pass
except RuntimeError as exp:
self.assertIn(
@ -207,23 +214,32 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
)
def testNoErrorCallGlobalInitThenContextManager(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
grpc_observability.end_open_telemetry_observability()
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
otel_plugin.deregister_global()
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
pass
def testNoErrorCallContextManagerThenGlobalInit(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
pass
grpc_observability.start_open_telemetry_observability(plugins=[])
grpc_observability.end_open_telemetry_observability()
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
otel_plugin.deregister_global()
def testRecordUnaryUnaryWithClientInterceptor(self):
interceptor = _ClientUnaryUnaryInterceptor()
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -236,9 +252,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
def testRecordUnaryUnaryWithServerInterceptor(self):
interceptor = _ServerInterceptor()
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server(interceptors=[interceptor])
self._server = server
@ -247,21 +262,12 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testThrowErrorWhenCallingMultipleInit(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with self.assertRaises(ValueError):
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
) as o11y:
grpc._observability.observability_init(o11y)
def testRecordUnaryUnaryClientOnly(self):
server, port = _test_server.start_server()
self._server = server
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
_test_server.unary_unary_call(port=port)
@ -274,9 +280,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self.assertEqual(len(self.all_metrics), 0)
server.stop(0)
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -286,9 +291,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_all_metrics_names(self.all_metrics)
def testNoRecordAfterExitUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -304,16 +308,16 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_metrics_exist(self.all_metrics)
def testNoRecordAfterExitUseGlobal(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
otel_plugin = grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
)
otel_plugin.register_global()
server, port = _test_server.start_server()
self._server = server
self._port = port
_test_server.unary_unary_call(port=port)
grpc_observability.end_open_telemetry_observability()
otel_plugin.deregister_global()
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
@ -324,10 +328,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_metrics_exist(self.all_metrics)
def testRecordUnaryStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -337,10 +339,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_all_metrics_names(self.all_metrics)
def testRecordStreamUnary(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -350,10 +350,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_all_metrics_names(self.all_metrics)
def testRecordStreamStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider
):
server, port = _test_server.start_server()
self._server = server
@ -374,11 +372,8 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
return False
return True
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
otel_plugin.target_attribute_filter = target_filter
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider, target_attribute_filter=target_filter
):
_test_server.unary_unary_call(port=main_port)
_test_server.unary_unary_call(port=backup_port)
@ -406,11 +401,9 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
return False
return True
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
otel_plugin.generic_method_attribute_filter = method_filter
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
with grpc_observability.OpenTelemetryPlugin(
meter_provider=self._provider,
generic_method_attribute_filter=method_filter,
):
server, port = _test_server.start_server()
self._server = server

Loading…
Cancel
Save