From e3b7f577e048b6a3565f066829bb6cf1fe501324 Mon Sep 17 00:00:00 2001 From: "Michael P. Nitowski" Date: Tue, 7 Nov 2023 15:42:20 -0500 Subject: [PATCH] [observability] Switch from datetime.utcnow to time.perf_counter (#34829) datetime.utcnow() raises a deprecation warning on Python 3.12, so this change prevents that deprecation warning while also switching to a monotonic clock for calculating RPC latency cc @XuanWang-Amos --- src/python/grpcio/grpc/_channel.py | 21 ++++++++++----------- src/python/grpcio/grpc/_observability.py | 4 ++-- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index e4b34123409..fef906e51d5 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -14,7 +14,6 @@ """Invocation-side implementation of gRPC Python.""" import copy -from datetime import datetime import functools import logging import os @@ -130,8 +129,8 @@ class _RPCState(object): cancelled: bool callbacks: List[NullaryCallbackType] fork_epoch: Optional[int] - rpc_start_time: Optional[datetime] - rpc_end_time: Optional[datetime] + rpc_start_time: Optional[float] # In relative seconds + rpc_end_time: Optional[float] # In relative seconds method: Optional[str] def __init__( @@ -223,7 +222,7 @@ def _handle_event( state.code = code state.details = batch_operation.details() state.debug_error_string = batch_operation.error_string() - state.rpc_end_time = datetime.utcnow() + state.rpc_end_time = time.perf_counter() _observability.maybe_record_rpc_latency(state) callbacks.extend(state.callbacks) state.callbacks = None @@ -1122,7 +1121,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): if state is None: raise rendezvous # pylint: disable-msg=raising-bad-type else: - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1193,7 +1192,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): raise rendezvous # pylint: disable-msg=raising-bad-type else: event_handler = _event_handler(state, self._response_deserializer) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1277,7 +1276,7 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), ) operations_and_tags = tuple((ops, None) for ops in operations) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1353,7 +1352,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): ), (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), ) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1412,7 +1411,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): augmented_metadata = _compression.augment_metadata( metadata, compression ) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1500,7 +1499,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): augmented_metadata = _compression.augment_metadata( metadata, compression ) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, @@ -1578,7 +1577,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), ) event_handler = _event_handler(state, self._response_deserializer) - state.rpc_start_time = datetime.utcnow() + state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, diff --git a/src/python/grpcio/grpc/_observability.py b/src/python/grpcio/grpc/_observability.py index 1435b1be98c..d4e8ce6ab7b 100644 --- a/src/python/grpcio/grpc/_observability.py +++ b/src/python/grpcio/grpc/_observability.py @@ -278,6 +278,6 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: with get_plugin() as plugin: if not (plugin and plugin.stats_enabled): return - rpc_latency = state.rpc_end_time - state.rpc_start_time - rpc_latency_ms = rpc_latency.total_seconds() * 1000 + rpc_latency_s = state.rpc_end_time - state.rpc_start_time + rpc_latency_ms = rpc_latency_s * 1000 plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)