[observability] Switch from datetime.utcnow to time.perf_counter (#34829)

datetime.utcnow() raises a deprecation warning on Python 3.12, so this
change prevents that deprecation warning while also switching to a
monotonic clock for calculating RPC latency

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->

cc @XuanWang-Amos
pull/34892/head
Michael P. Nitowski 1 year ago committed by GitHub
parent 434a5c8e72
commit e3b7f577e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      src/python/grpcio/grpc/_channel.py
  2. 4
      src/python/grpcio/grpc/_observability.py

@ -14,7 +14,6 @@
"""Invocation-side implementation of gRPC Python."""
import copy
from datetime import datetime
import functools
import logging
import os
@ -130,8 +129,8 @@ class _RPCState(object):
cancelled: bool
callbacks: List[NullaryCallbackType]
fork_epoch: Optional[int]
rpc_start_time: Optional[datetime]
rpc_end_time: Optional[datetime]
rpc_start_time: Optional[float] # In relative seconds
rpc_end_time: Optional[float] # In relative seconds
method: Optional[str]
def __init__(
@ -223,7 +222,7 @@ def _handle_event(
state.code = code
state.details = batch_operation.details()
state.debug_error_string = batch_operation.error_string()
state.rpc_end_time = datetime.utcnow()
state.rpc_end_time = time.perf_counter()
_observability.maybe_record_rpc_latency(state)
callbacks.extend(state.callbacks)
state.callbacks = None
@ -1122,7 +1121,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
if state is None:
raise rendezvous # pylint: disable-msg=raising-bad-type
else:
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1193,7 +1192,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
raise rendezvous # pylint: disable-msg=raising-bad-type
else:
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1277,7 +1276,7 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
)
operations_and_tags = tuple((ops, None) for ops in operations)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1353,7 +1352,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1412,7 +1411,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
augmented_metadata = _compression.augment_metadata(
metadata, compression
)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1500,7 +1499,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
augmented_metadata = _compression.augment_metadata(
metadata, compression
)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
@ -1578,7 +1577,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
)
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = datetime.utcnow()
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,

@ -278,6 +278,6 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
with get_plugin() as plugin:
if not (plugin and plugin.stats_enabled):
return
rpc_latency = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency.total_seconds() * 1000
rpc_latency_s = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency_s * 1000
plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)

Loading…
Cancel
Save