From 672d8abdca6e7fb4919586bbc4e7fe0212a68abc Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Mon, 4 Mar 2024 17:04:23 -0800 Subject: [PATCH] [Python Otel] Allow start observability without context manager (#35932) Allow start observability globally with a new API `start_open_telemetry_observability`. Closes #35932 PiperOrigin-RevId: 612639020 --- .../observability_greeter_client.py | 20 ++-- .../observability_greeter_server.py | 34 ++++--- .../grpc_observability/__init__.py | 13 ++- .../_open_telemetry_observability.py | 59 +++++++++-- .../observability/_observability_api_test.py | 2 + .../_open_telemetry_observability_test.py | 99 ++++++++++++++++++- 6 files changed, 190 insertions(+), 37 deletions(-) diff --git a/examples/python/observability/observability_greeter_client.py b/examples/python/observability/observability_greeter_client.py index 25f7d68a4c8..5e0e024f93f 100644 --- a/examples/python/observability/observability_greeter_client.py +++ b/examples/python/observability/observability_greeter_client.py @@ -47,16 +47,16 @@ def run(): provider = MeterProvider(metric_readers=[reader]) otel_plugin = BaseOpenTelemetryPlugin(provider) - with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): - with grpc.insecure_channel(target="localhost:50051") as channel: - stub = helloworld_pb2_grpc.GreeterStub(channel) - try: - response = stub.SayHello( - helloworld_pb2.HelloRequest(name="You") - ) - print(f"Greeter client received: {response.message}") - except grpc.RpcError as rpc_error: - print("Call failed with code: ", rpc_error.code()) + grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin]) + + with grpc.insecure_channel(target="localhost:50051") as channel: + stub = helloworld_pb2_grpc.GreeterStub(channel) + try: + response = stub.SayHello(helloworld_pb2.HelloRequest(name="You")) + print(f"Greeter client received: {response.message}") + except grpc.RpcError as rpc_error: + print("Call failed with code: ", rpc_error.code()) + grpc_observability.end_open_telemetry_observability() # Sleep to make sure all metrics are exported. time.sleep(5) diff --git a/examples/python/observability/observability_greeter_server.py b/examples/python/observability/observability_greeter_server.py index 1f5a9689be6..542e68be42d 100644 --- a/examples/python/observability/observability_greeter_server.py +++ b/examples/python/observability/observability_greeter_server.py @@ -57,22 +57,24 @@ def serve(): provider = MeterProvider(metric_readers=[reader]) otel_plugin = BaseOpenTelemetryPlugin(provider) - with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): - server = grpc.server( - thread_pool=futures.ThreadPoolExecutor(max_workers=10), - ) - helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) - server.add_insecure_port("[::]:" + _SERVER_PORT) - server.start() - print("Server started, listening on " + _SERVER_PORT) - - # Sleep to make sure client made RPC call and all metrics are exported. - time.sleep(10) - print("Metrics exported on Server side:") - for metric in all_metrics: - print(metric) - - server.stop(0) + grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin]) + + server = grpc.server( + thread_pool=futures.ThreadPoolExecutor(max_workers=10), + ) + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:" + _SERVER_PORT) + server.start() + print("Server started, listening on " + _SERVER_PORT) + + # Sleep to make sure client made RPC call and all metrics are exported. + time.sleep(10) + print("Metrics exported on Server side:") + for metric in all_metrics: + print(metric) + + server.stop(0) + grpc_observability.end_open_telemetry_observability() if __name__ == "__main__": diff --git a/src/python/grpcio_observability/grpc_observability/__init__.py b/src/python/grpcio_observability/grpc_observability/__init__.py index 7dad3d83efe..f4a7ef453e9 100644 --- a/src/python/grpcio_observability/grpc_observability/__init__.py +++ b/src/python/grpcio_observability/grpc_observability/__init__.py @@ -15,6 +15,17 @@ from grpc_observability._open_telemetry_observability import ( OpenTelemetryObservability, ) +from grpc_observability._open_telemetry_observability import ( + end_open_telemetry_observability, +) +from grpc_observability._open_telemetry_observability import ( + start_open_telemetry_observability, +) from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin -__all__ = ("OpenTelemetryObservability", "OpenTelemetryPlugin") +__all__ = ( + "OpenTelemetryObservability", + "OpenTelemetryPlugin", + "start_open_telemetry_observability", + "end_open_telemetry_observability", +) diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py index a102509e5c8..74f40b66a1f 100644 --- a/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import threading import time from typing import Any, Iterable, Optional @@ -54,6 +55,22 @@ GRPC_STATUS_CODE_TO_STRING = { grpc.StatusCode.DATA_LOSS: "DATA_LOSS", } +_observability_lock: threading.RLock = threading.RLock() +_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None + + +def start_open_telemetry_observability( + *, + plugins: Optional[Iterable[OpenTelemetryPlugin]] = None, +) -> None: + _start_open_telemetry_observability( + OpenTelemetryObservability(plugins=plugins) + ) + + +def end_open_telemetry_observability() -> None: + _end_open_telemetry_observability() + # pylint: disable=no-self-use class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): @@ -66,7 +83,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): """ exporter: "grpc_observability.Exporter" - plugins: Iterable[OpenTelemetryPlugin] def __init__( self, @@ -80,13 +96,20 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): self.exporter = _OpenTelemetryExporterDelegator(_plugins) + def __enter__(self): + _start_open_telemetry_observability(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + _end_open_telemetry_observability() + + def observability_init(self): try: _cyobservability.activate_stats() self.set_stats(True) except Exception as e: # pylint: disable=broad-except raise ValueError(f"Activate observability metrics failed with: {e}") - def __enter__(self): try: _cyobservability.cyobservability_init(self.exporter) # TODO(xuanwn): Use specific exceptons @@ -94,12 +117,8 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): _LOGGER.exception("Initiate observability failed with: %s", e) grpc._observability.observability_init(self) - return self - def __exit__(self, exc_type, exc_val, exc_tb) -> None: - self.exit() - - def exit(self) -> None: + def observability_deinit(self) -> None: # Sleep so we don't loss any data. If we shutdown export thread # immediately after exit, it's possible that core didn't call RecordEnd # in callTracer, and all data recorded by calling RecordEnd will be @@ -150,3 +169,29 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): _cyobservability._record_rpc_latency( self.exporter, method, target, rpc_latency, status_code ) + + +def _start_open_telemetry_observability( + otel_o11y: OpenTelemetryObservability, +) -> None: + global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement + with _observability_lock: + if _OPEN_TELEMETRY_OBSERVABILITY is None: + _OPEN_TELEMETRY_OBSERVABILITY = otel_o11y + _OPEN_TELEMETRY_OBSERVABILITY.observability_init() + else: + raise RuntimeError( + "gPRC Python observability was already initialized!" + ) + + +def _end_open_telemetry_observability() -> None: + global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement + with _observability_lock: + if not _OPEN_TELEMETRY_OBSERVABILITY: + raise RuntimeError( + "Trying to end gPRC Python observability without initialize first!" + ) + else: + _OPEN_TELEMETRY_OBSERVABILITY.observability_deinit() + _OPEN_TELEMETRY_OBSERVABILITY = None diff --git a/src/python/grpcio_tests/tests/observability/_observability_api_test.py b/src/python/grpcio_tests/tests/observability/_observability_api_test.py index efdd63dca0a..e3f92338de5 100644 --- a/src/python/grpcio_tests/tests/observability/_observability_api_test.py +++ b/src/python/grpcio_tests/tests/observability/_observability_api_test.py @@ -24,6 +24,8 @@ class AllTest(unittest.TestCase): expected_observability_code_elements = ( "OpenTelemetryObservability", "OpenTelemetryPlugin", + "start_open_telemetry_observability", + "end_open_telemetry_observability", ) self.assertCountEqual( diff --git a/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py b/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py index 81c9750970f..7031965b49f 100644 --- a/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py +++ b/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py @@ -20,6 +20,7 @@ import sys import time from typing import Any, Callable, Dict, List, Optional, Set import unittest +from unittest.mock import patch import grpc import grpc_observability @@ -134,7 +135,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase): if self._server: self._server.stop(0) - def testRecordUnaryUnary(self): + def testRecordUnaryUnaryUseContextManager(self): otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) with grpc_observability.OpenTelemetryObservability( plugins=[otel_plugin] @@ -146,6 +147,78 @@ class OpenTelemetryObservabilityTest(unittest.TestCase): self._validate_metrics_exist(self.all_metrics) self._validate_all_metrics_names(self.all_metrics) + def testRecordUnaryUnaryUseGlobalInit(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + + grpc_observability.start_open_telemetry_observability( + plugins=[otel_plugin] + ) + server, port = _test_server.start_server() + self._server = server + _test_server.unary_unary_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + grpc_observability.end_open_telemetry_observability() + + def testCallGlobalInitThrowErrorWhenGlobalCalled(self): + grpc_observability.start_open_telemetry_observability(plugins=[]) + try: + grpc_observability.start_open_telemetry_observability(plugins=[]) + except RuntimeError as exp: + self.assertIn( + "gPRC Python observability was already initialized", str(exp) + ) + + grpc_observability.end_open_telemetry_observability() + + def testCallGlobalInitThrowErrorWhenContextManagerCalled(self): + with grpc_observability.OpenTelemetryObservability(plugins=[]): + try: + grpc_observability.start_open_telemetry_observability( + plugins=[] + ) + except RuntimeError as exp: + self.assertIn( + "gPRC Python observability was already initialized", + str(exp), + ) + + def testCallContextManagerThrowErrorWhenGlobalInitCalled(self): + grpc_observability.start_open_telemetry_observability(plugins=[]) + try: + with grpc_observability.OpenTelemetryObservability(plugins=[]): + pass + except RuntimeError as exp: + self.assertIn( + "gPRC Python observability was already initialized", str(exp) + ) + grpc_observability.end_open_telemetry_observability() + + def testContextManagerThrowErrorWhenContextManagerCalled(self): + with grpc_observability.OpenTelemetryObservability(plugins=[]): + try: + with grpc_observability.OpenTelemetryObservability(plugins=[]): + pass + except RuntimeError as exp: + self.assertIn( + "gPRC Python observability was already initialized", + str(exp), + ) + + def testNoErrorCallGlobalInitThenContextManager(self): + grpc_observability.start_open_telemetry_observability(plugins=[]) + grpc_observability.end_open_telemetry_observability() + + with grpc_observability.OpenTelemetryObservability(plugins=[]): + pass + + def testNoErrorCallContextManagerThenGlobalInit(self): + with grpc_observability.OpenTelemetryObservability(plugins=[]): + pass + grpc_observability.start_open_telemetry_observability(plugins=[]) + grpc_observability.end_open_telemetry_observability() + def testRecordUnaryUnaryWithClientInterceptor(self): interceptor = _ClientUnaryUnaryInterceptor() otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) @@ -212,7 +285,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase): self._validate_metrics_exist(self.all_metrics) self._validate_all_metrics_names(self.all_metrics) - def testNoRecordAfterExit(self): + def testNoRecordAfterExitUseContextManager(self): otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) with grpc_observability.OpenTelemetryObservability( plugins=[otel_plugin] @@ -230,6 +303,26 @@ class OpenTelemetryObservabilityTest(unittest.TestCase): with self.assertRaisesRegex(AssertionError, "No metrics was exported"): self._validate_metrics_exist(self.all_metrics) + def testNoRecordAfterExitUseGlobal(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + + grpc_observability.start_open_telemetry_observability( + plugins=[otel_plugin] + ) + server, port = _test_server.start_server() + self._server = server + self._port = port + _test_server.unary_unary_call(port=port) + grpc_observability.end_open_telemetry_observability() + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + + self.all_metrics = defaultdict(list) + _test_server.unary_unary_call(port=self._port) + with self.assertRaisesRegex(AssertionError, "No metrics was exported"): + self._validate_metrics_exist(self.all_metrics) + def testRecordUnaryStream(self): otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) @@ -342,7 +435,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase): message: Optional[Callable[[], str]] = None, ) -> None: message = message or (lambda: "Proposition did not evaluate to true") - timeout = timeout or datetime.timedelta(seconds=10) + timeout = timeout or datetime.timedelta(seconds=5) end = datetime.datetime.now() + timeout while datetime.datetime.now() < end: if predicate():