[Python Otel] Allow start observability without context manager (#35932)

Allow start observability globally with a new API `start_open_telemetry_observability`.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35932

PiperOrigin-RevId: 612639020
pull/36053/head
Xuan Wang 9 months ago committed by Copybara-Service
parent 427c8a89e9
commit 672d8abdca
  1. 20
      examples/python/observability/observability_greeter_client.py
  2. 34
      examples/python/observability/observability_greeter_server.py
  3. 13
      src/python/grpcio_observability/grpc_observability/__init__.py
  4. 59
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py
  5. 2
      src/python/grpcio_tests/tests/observability/_observability_api_test.py
  6. 99
      src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py

@ -47,16 +47,16 @@ def run():
provider = MeterProvider(metric_readers=[reader])
otel_plugin = BaseOpenTelemetryPlugin(provider)
with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]):
with grpc.insecure_channel(target="localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
try:
response = stub.SayHello(
helloworld_pb2.HelloRequest(name="You")
)
print(f"Greeter client received: {response.message}")
except grpc.RpcError as rpc_error:
print("Call failed with code: ", rpc_error.code())
grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin])
with grpc.insecure_channel(target="localhost:50051") as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name="You"))
print(f"Greeter client received: {response.message}")
except grpc.RpcError as rpc_error:
print("Call failed with code: ", rpc_error.code())
grpc_observability.end_open_telemetry_observability()
# Sleep to make sure all metrics are exported.
time.sleep(5)

@ -57,22 +57,24 @@ def serve():
provider = MeterProvider(metric_readers=[reader])
otel_plugin = BaseOpenTelemetryPlugin(provider)
with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]):
server = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=10),
)
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:" + _SERVER_PORT)
server.start()
print("Server started, listening on " + _SERVER_PORT)
# Sleep to make sure client made RPC call and all metrics are exported.
time.sleep(10)
print("Metrics exported on Server side:")
for metric in all_metrics:
print(metric)
server.stop(0)
grpc_observability.start_open_telemetry_observability(plugins=[otel_plugin])
server = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=10),
)
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
server.add_insecure_port("[::]:" + _SERVER_PORT)
server.start()
print("Server started, listening on " + _SERVER_PORT)
# Sleep to make sure client made RPC call and all metrics are exported.
time.sleep(10)
print("Metrics exported on Server side:")
for metric in all_metrics:
print(metric)
server.stop(0)
grpc_observability.end_open_telemetry_observability()
if __name__ == "__main__":

@ -15,6 +15,17 @@
from grpc_observability._open_telemetry_observability import (
OpenTelemetryObservability,
)
from grpc_observability._open_telemetry_observability import (
end_open_telemetry_observability,
)
from grpc_observability._open_telemetry_observability import (
start_open_telemetry_observability,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
__all__ = ("OpenTelemetryObservability", "OpenTelemetryPlugin")
__all__ = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)

@ -13,6 +13,7 @@
# limitations under the License.
import logging
import threading
import time
from typing import Any, Iterable, Optional
@ -54,6 +55,22 @@ GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
_observability_lock: threading.RLock = threading.RLock()
_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None
def start_open_telemetry_observability(
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
) -> None:
_start_open_telemetry_observability(
OpenTelemetryObservability(plugins=plugins)
)
def end_open_telemetry_observability() -> None:
_end_open_telemetry_observability()
# pylint: disable=no-self-use
class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
@ -66,7 +83,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
"""
exporter: "grpc_observability.Exporter"
plugins: Iterable[OpenTelemetryPlugin]
def __init__(
self,
@ -80,13 +96,20 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
self.exporter = _OpenTelemetryExporterDelegator(_plugins)
def __enter__(self):
_start_open_telemetry_observability(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
_end_open_telemetry_observability()
def observability_init(self):
try:
_cyobservability.activate_stats()
self.set_stats(True)
except Exception as e: # pylint: disable=broad-except
raise ValueError(f"Activate observability metrics failed with: {e}")
def __enter__(self):
try:
_cyobservability.cyobservability_init(self.exporter)
# TODO(xuanwn): Use specific exceptons
@ -94,12 +117,8 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
_LOGGER.exception("Initiate observability failed with: %s", e)
grpc._observability.observability_init(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.exit()
def exit(self) -> None:
def observability_deinit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
# immediately after exit, it's possible that core didn't call RecordEnd
# in callTracer, and all data recorded by calling RecordEnd will be
@ -150,3 +169,29 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
_cyobservability._record_rpc_latency(
self.exporter, method, target, rpc_latency, status_code
)
def _start_open_telemetry_observability(
otel_o11y: OpenTelemetryObservability,
) -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if _OPEN_TELEMETRY_OBSERVABILITY is None:
_OPEN_TELEMETRY_OBSERVABILITY = otel_o11y
_OPEN_TELEMETRY_OBSERVABILITY.observability_init()
else:
raise RuntimeError(
"gPRC Python observability was already initialized!"
)
def _end_open_telemetry_observability() -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if not _OPEN_TELEMETRY_OBSERVABILITY:
raise RuntimeError(
"Trying to end gPRC Python observability without initialize first!"
)
else:
_OPEN_TELEMETRY_OBSERVABILITY.observability_deinit()
_OPEN_TELEMETRY_OBSERVABILITY = None

@ -24,6 +24,8 @@ class AllTest(unittest.TestCase):
expected_observability_code_elements = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)
self.assertCountEqual(

@ -20,6 +20,7 @@ import sys
import time
from typing import Any, Callable, Dict, List, Optional, Set
import unittest
from unittest.mock import patch
import grpc
import grpc_observability
@ -134,7 +135,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
if self._server:
self._server.stop(0)
def testRecordUnaryUnary(self):
def testRecordUnaryUnaryUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
@ -146,6 +147,78 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testRecordUnaryUnaryUseGlobalInit(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
)
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
grpc_observability.end_open_telemetry_observability()
def testCallGlobalInitThrowErrorWhenGlobalCalled(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
try:
grpc_observability.start_open_telemetry_observability(plugins=[])
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized", str(exp)
)
grpc_observability.end_open_telemetry_observability()
def testCallGlobalInitThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
try:
grpc_observability.start_open_telemetry_observability(
plugins=[]
)
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized",
str(exp),
)
def testCallContextManagerThrowErrorWhenGlobalInitCalled(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized", str(exp)
)
grpc_observability.end_open_telemetry_observability()
def testContextManagerThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
except RuntimeError as exp:
self.assertIn(
"gPRC Python observability was already initialized",
str(exp),
)
def testNoErrorCallGlobalInitThenContextManager(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
grpc_observability.end_open_telemetry_observability()
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
def testNoErrorCallContextManagerThenGlobalInit(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
grpc_observability.start_open_telemetry_observability(plugins=[])
grpc_observability.end_open_telemetry_observability()
def testRecordUnaryUnaryWithClientInterceptor(self):
interceptor = _ClientUnaryUnaryInterceptor()
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
@ -212,7 +285,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testNoRecordAfterExit(self):
def testNoRecordAfterExitUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
@ -230,6 +303,26 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
self._validate_metrics_exist(self.all_metrics)
def testNoRecordAfterExitUseGlobal(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
)
server, port = _test_server.start_server()
self._server = server
self._port = port
_test_server.unary_unary_call(port=port)
grpc_observability.end_open_telemetry_observability()
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
self.all_metrics = defaultdict(list)
_test_server.unary_unary_call(port=self._port)
with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
self._validate_metrics_exist(self.all_metrics)
def testRecordUnaryStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
@ -342,7 +435,7 @@ class OpenTelemetryObservabilityTest(unittest.TestCase):
message: Optional[Callable[[], str]] = None,
) -> None:
message = message or (lambda: "Proposition did not evaluate to true")
timeout = timeout or datetime.timedelta(seconds=10)
timeout = timeout or datetime.timedelta(seconds=5)
end = datetime.datetime.now() + timeout
while datetime.datetime.now() < end:
if predicate():

Loading…
Cancel
Save