[python O11Y] Initial Implementation (#32974)

Testing Command: `bazel test --cache_test_results=no
--test_output=streamed --runs_per_test=1 --test_timeout=10
"//src/python/grpcio_tests/tests/observability:_observability_test"`

### TODO:
 * Better error handling.
pull/33352/head
Xuan Wang 2 years ago committed by GitHub
parent 0f898fa73f
commit 629b7a14da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      BUILD
  2. 1
      bazel/grpc_build_system.bzl
  3. 1
      requirements.bazel.txt
  4. 1
      setup.cfg
  5. 5
      src/cpp/ext/gcp/BUILD
  6. 6
      src/python/grpcio/grpc/BUILD.bazel
  7. 26
      src/python/grpcio/grpc/_channel.py
  8. 2
      src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
  9. 4
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  10. 36
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  11. 24
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  12. 61
      src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi
  13. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  14. 268
      src/python/grpcio/grpc/_observability.py
  15. 1
      src/python/grpcio/grpc/_server.py
  16. 6
      src/python/grpcio_observability/.gitignore
  17. 2
      src/python/grpcio_observability/README.rst
  18. 80
      src/python/grpcio_observability/grpc_observability/BUILD.bazel
  19. 17
      src/python/grpcio_observability/grpc_observability/__init__.py
  20. 152
      src/python/grpcio_observability/grpc_observability/_cyobservability.pxd
  21. 342
      src/python/grpcio_observability/grpc_observability/_cyobservability.pyx
  22. 186
      src/python/grpcio_observability/grpc_observability/_gcp_observability.py
  23. 101
      src/python/grpcio_observability/grpc_observability/_observability.py
  24. 33
      src/python/grpcio_observability/grpc_observability/_open_census_exporter.py
  25. 281
      src/python/grpcio_observability/grpc_observability/client_call_tracer.cc
  26. 139
      src/python/grpcio_observability/grpc_observability/client_call_tracer.h
  27. 54
      src/python/grpcio_observability/grpc_observability/constants.h
  28. 224
      src/python/grpcio_observability/grpc_observability/observability_util.cc
  29. 114
      src/python/grpcio_observability/grpc_observability/observability_util.h
  30. 278
      src/python/grpcio_observability/grpc_observability/python_census_context.cc
  31. 327
      src/python/grpcio_observability/grpc_observability/python_census_context.h
  32. 69
      src/python/grpcio_observability/grpc_observability/sampler.cc
  33. 43
      src/python/grpcio_observability/grpc_observability/sampler.h
  34. 248
      src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
  35. 46
      src/python/grpcio_observability/grpc_observability/server_call_tracer.h
  36. 30
      src/python/grpcio_tests/tests/observability/BUILD.bazel
  37. 401
      src/python/grpcio_tests/tests/observability/_observability_test.py
  38. 1
      tools/distrib/pylint_code.sh
  39. 2
      tools/dockerfile/grpc_clang_format/clang_format_all_the_things.sh
  40. 1
      tools/dockerfile/grpc_iwyu/iwyu.sh

22
BUILD

@ -2274,6 +2274,28 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_rpc_encoding",
srcs = [
"src/cpp/ext/filters/census/rpc_encoding.cc",
],
hdrs = [
"src/cpp/ext/filters/census/rpc_encoding.h",
],
external_deps = [
"absl/base",
"absl/base:core_headers",
"absl/meta:type_traits",
"absl/status",
"absl/strings",
"absl/time",
],
language = "c++",
tags = ["nofixdeps"],
visibility = ["@grpc:grpc_python_observability"],
deps = ["gpr_platform"],
)
grpc_cc_library(
name = "grpc_opencensus_plugin",
srcs = [

@ -113,6 +113,7 @@ def _update_visibility(visibility):
"tsi": PRIVATE,
"xds": PRIVATE,
"xds_client_core": PRIVATE,
"grpc_python_observability": PRIVATE,
}
final_visibility = []
for rule in visibility:

@ -15,3 +15,4 @@ gevent==22.08.0
zope.event==4.5.0
setuptools==44.1.1
xds-protos==0.0.11
opencensus==0.10.0

@ -27,6 +27,7 @@ inputs =
src/python/grpcio/grpc/experimental
src/python/grpcio/grpc
src/python/grpcio_tests/tests_aio
src/python/grpcio_observability/grpc_observability
examples/python/auth
examples/python/helloworld
exclude =

@ -81,7 +81,10 @@ grpc_cc_library(
"absl/types:optional",
],
language = "c++",
visibility = ["//test:__subpackages__"],
visibility = [
"//test:__subpackages__",
"@grpc:grpc_python_observability",
],
deps = [
"//:gpr",
"//:grpc_base",

@ -94,6 +94,11 @@ py_library(
srcs = ["_typing.py"],
)
py_library(
name = "_observability",
srcs = ["_observability.py"],
)
py_library(
name = "grpcio",
srcs = ["__init__.py"],
@ -102,6 +107,7 @@ py_library(
],
imports = ["../"],
deps = [
":_observability",
":_runtime_protos",
":_simple_stubs",
":_typing",

@ -14,6 +14,7 @@
"""Invocation-side implementation of gRPC Python."""
import copy
from datetime import datetime
import functools
import logging
import os
@ -28,6 +29,7 @@ import grpc # pytype: disable=pyi-error
from grpc import _common # pytype: disable=pyi-error
from grpc import _compression # pytype: disable=pyi-error
from grpc import _grpcio_metadata # pytype: disable=pyi-error
from grpc import _observability # pytype: disable=pyi-error
from grpc._cython import cygrpc
from grpc._typing import ChannelArgumentType
from grpc._typing import DeserializingFunction
@ -114,6 +116,9 @@ class _RPCState(object):
cancelled: bool
callbacks: List[NullaryCallbackType]
fork_epoch: Optional[int]
rpc_start_time: Optional[datetime]
rpc_end_time: Optional[datetime]
method: Optional[str]
def __init__(self, due: Sequence[cygrpc.OperationType],
initial_metadata: Optional[MetadataType],
@ -136,6 +141,11 @@ class _RPCState(object):
self.code = code
self.details = details
self.debug_error_string = None
# The following three fields are used for observability.
# Updates to those fields do not trigger self.condition.
self.rpc_start_time = None
self.rpc_end_time = None
self.method = None
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
@ -191,6 +201,8 @@ def _handle_event(
state.code = code
state.details = batch_operation.details()
state.debug_error_string = batch_operation.error_string()
state.rpc_end_time = datetime.utcnow()
_observability.maybe_record_rpc_latency(state)
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
@ -1014,6 +1026,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
operations,
None,
),), self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
event = call.next_event()
_handle_event(event, state, self._response_deserializer)
return state, call
@ -1062,6 +1076,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._method, None, deadline, metadata,
None if credentials is None else credentials._credentials,
(operations,), event_handler, self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
return _MultiThreadedRendezvous(state, call,
self._response_deserializer,
deadline)
@ -1120,6 +1136,8 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
None, _determine_deadline(deadline), metadata, call_credentials,
operations_and_tags, self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
return _SingleThreadedRendezvous(state, call,
self._response_deserializer, deadline)
@ -1180,6 +1198,8 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
None if credentials is None else credentials._credentials,
operations, _event_handler(state, self._response_deserializer),
self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
return _MultiThreadedRendezvous(state, call,
self._response_deserializer,
deadline)
@ -1223,6 +1243,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
None if credentials is None else credentials._credentials,
_stream_unary_invocation_operations_and_tags(
augmented_metadata, initial_metadata_flags), self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
_consume_request_iterator(request_iterator, state, call,
self._request_serializer, None)
while True:
@ -1281,6 +1303,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
_stream_unary_invocation_operations(metadata,
initial_metadata_flags),
event_handler, self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
_consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler)
return _MultiThreadedRendezvous(state, call,
@ -1338,6 +1362,8 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
None, _determine_deadline(deadline), augmented_metadata,
None if credentials is None else credentials._credentials,
operations, event_handler, self._context)
state.rpc_start_time = datetime.utcnow()
state.method = _common.decode(self._method)
_consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler)
return _MultiThreadedRendezvous(state, call,

@ -17,7 +17,7 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call):
raise NotImplementedError("No custom hooks are implemented")
def install_context_from_request_call_event(RequestCallEvent event):
pass
maybe_save_server_trace_context(event)
def uninstall_context():
pass

@ -26,6 +26,10 @@ cdef class _CallState:
cdef grpc_call *c_call
cdef set due
# call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule
cdef object call_tracer_capsule
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *
cdef void maybe_delete_call_tracer(self) except *
cdef class _ChannelState:

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc import _observability
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
@ -71,6 +72,19 @@ cdef class _CallState:
def __cinit__(self):
self.due = set()
cdef void maybe_delete_call_tracer(self) except *:
if not self.call_tracer_capsule:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *:
with _observability.get_plugin() as plugin:
if not (plugin and plugin.observability_enabled):
return
capsule = plugin.create_client_call_tracer(method_name)
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
_set_call_tracer(self.c_call, capsule_ptr)
self.call_tracer_capsule = capsule
cdef class _ChannelState:
@ -230,6 +244,7 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
call_state.maybe_set_client_call_tracer_on_call(method)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
@ -238,7 +253,9 @@ cdef void _call(
call_state.c_call, c_call_credentials)
grpc_call_credentials_release(c_call_credentials)
if c_call_error != GRPC_CALL_OK:
grpc_call_unref(call_state.c_call)
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
_raise_call_error_no_metadata(c_call_error)
started_tags = set()
@ -248,7 +265,9 @@ cdef void _call(
started_tags.add(tag)
else:
grpc_call_cancel(call_state.c_call, NULL)
grpc_call_unref(call_state.c_call)
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
_raise_call_error(c_call_error, metadata)
else:
@ -263,9 +282,10 @@ cdef void _process_integrated_call_tag(
cdef _CallState call_state = state.integrated_call_states.pop(tag)
call_state.due.remove(tag)
if not call_state.due:
grpc_call_unref(call_state.c_call)
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
cdef class IntegratedCall:
@ -303,8 +323,11 @@ cdef object _process_segregated_call_tag(
grpc_completion_queue *c_completion_queue, _BatchOperationTag tag):
call_state.due.remove(tag)
if not call_state.due:
grpc_call_unref(call_state.c_call)
#TODO(xuanwn): Expand the scope of nogil
with nogil:
grpc_call_unref(call_state.c_call)
call_state.c_call = NULL
call_state.maybe_delete_call_tracer()
state.segregated_call_states.remove(call_state)
_destroy_c_completion_queue(c_completion_queue)
return True
@ -331,7 +354,8 @@ cdef class SegregatedCall:
self._channel_state, self._call_state, self._c_completion_queue, tag)
def on_failure():
self._call_state.due.clear()
grpc_call_unref(self._call_state.c_call)
with nogil:
grpc_call_unref(self._call_state.c_call)
self._call_state.c_call = NULL
self._channel_state.segregated_call_states.remove(self._call_state)
_destroy_c_completion_queue(self._c_completion_queue)

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from libcpp.string cimport string
cimport libc.time
ctypedef ssize_t intptr_t
@ -57,6 +59,28 @@ cdef extern from "<condition_variable>" namespace "std" nogil:
# gRPC Core Declarations
cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core":
cdef cppclass ClientCallTracer:
pass
cdef cppclass ServerCallTracer:
string TraceId() nogil
string SpanId() nogil
bint IsSampled() nogil
cdef cppclass ServerCallTracerFactory:
@staticmethod
void RegisterGlobal(ServerCallTracerFactory* factory) nogil
cdef extern from "src/core/lib/channel/context.h":
ctypedef enum grpc_context_index:
GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE
cdef extern from "src/core/lib/surface/call.h":
void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
void* value, void (*destroy)(void* value)) nogil
void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size) nogil

@ -0,0 +1,61 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import codecs
from typing import Optional
from libcpp.cast cimport static_cast
from grpc import _observability
cdef const char* CLIENT_CALL_TRACER = "client_call_tracer"
cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory"
def set_server_call_tracer_factory(object observability_plugin) -> None:
capsule = observability_plugin.create_server_call_tracer_factory()
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY)
_register_server_call_tracer_factory(capsule_ptr)
def clear_server_call_tracer_factory() -> None:
_register_server_call_tracer_factory(NULL)
def maybe_save_server_trace_context(RequestCallEvent event) -> None:
cdef ServerCallTracer* server_call_tracer
with _observability.get_plugin() as plugin:
if not (plugin and plugin.tracing_enabled):
return
server_call_tracer = static_cast['ServerCallTracer*'](_get_call_tracer(event.call.c_call))
# TraceId and SpanId is hex string, need to convert to str
trace_id = _decode(codecs.decode(server_call_tracer.TraceId(), 'hex_codec'))
span_id = _decode(codecs.decode(server_call_tracer.SpanId(), 'hex_codec'))
is_sampled = server_call_tracer.IsSampled()
plugin.save_trace_context(trace_id, span_id, is_sampled)
cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr):
cdef ClientCallTracer* call_tracer = <ClientCallTracer*>capsule_ptr
grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL)
cdef void* _get_call_tracer(grpc_call* call):
cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)
return call_tracer
cdef void _register_server_call_tracer_factory(void* capsule_ptr):
cdef ServerCallTracerFactory* call_tracer_factory = <ServerCallTracerFactory*>capsule_ptr
ServerCallTracerFactory.RegisterGlobal(call_tracer_factory)

@ -55,6 +55,7 @@ include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/vtable.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/observability.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"

@ -0,0 +1,268 @@
# Copyright 2023 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
import contextlib
import logging
import threading
from typing import Any, Generator, Generic, Optional, TypeVar
from grpc._cython import cygrpc as _cygrpc
_LOGGER = logging.getLogger(__name__)
_channel = Any # _channel.py imports this module.
ClientCallTracerCapsule = TypeVar('ClientCallTracerCapsule')
ServerCallTracerFactoryCapsule = TypeVar('ServerCallTracerFactoryCapsule')
_plugin_lock: threading.RLock = threading.RLock()
_OBSERVABILITY_PLUGIN: Optional[ObservabilityPlugin] = None # pylint: disable=used-before-assignment
class ObservabilityPlugin(Generic[ClientCallTracerCapsule,
ServerCallTracerFactoryCapsule],
metaclass=abc.ABCMeta):
"""Abstract base class for observability plugin.
*This is a semi-private class that was intended for the exclusive use of
the gRPC team.*
The ClientCallTracerCapsule and ClientCallTracerCapsule created by this
plugin should be inject to gRPC core using observability_init at the
start of a program, before any channels/servers are built.
Any future methods added to this interface cannot have the
@abc.abstractmethod annotation.
Attributes:
_stats_enabled: A bool indicates whether tracing is enabled.
_tracing_enabled: A bool indicates whether stats(metrics) is enabled.
"""
_tracing_enabled: bool = False
_stats_enabled: bool = False
@abc.abstractmethod
def create_client_call_tracer(
self, method_name: bytes) -> ClientCallTracerCapsule:
"""Creates a ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called after a call was created, the ClientCallTracer created
by this method will be saved to call context.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
method_name: The method name of the call in byte format.
Returns:
A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()
@abc.abstractmethod
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as name.
Args:
client_call_tracer: A PyCapsule which stores a ClientCallTracer object.
"""
raise NotImplementedError()
@abc.abstractmethod
def save_trace_context(self, trace_id: str, span_id: str,
is_sampled: bool) -> None:
"""Saves the trace_id and span_id related to the current span.
After register the plugin, if tracing is enabled, this method will be
called after the server finished sending response.
This method can be used to propagate census context.
Args:
trace_id: The identifier for the trace associated with the span as a
32-character hexadecimal encoded string,
e.g. 26ed0036f2eff2b7317bccce3e28d01f
span_id: The identifier for the span as a 16-character hexadecimal encoded
string. e.g. 113ec879e62583bc
is_sampled: A bool indicates whether the span is sampled.
"""
raise NotImplementedError()
@abc.abstractmethod
def create_server_call_tracer_factory(
self) -> ServerCallTracerFactoryCapsule:
"""Creates a ServerCallTracerFactoryCapsule.
After register the plugin, if tracing or stats is enabled, this method
will be called by calling observability_init, the ServerCallTracerFactory
created by this method will be registered to gRPC core.
The ServerCallTracerFactory is an object which implements
`grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule
using `server_call_tracer_factory` as name.
Returns:
A PyCapsule which stores a ServerCallTracerFactory object.
"""
raise NotImplementedError()
@abc.abstractmethod
def record_rpc_latency(self, method: str, rpc_latency: float,
status_code: Any) -> None:
"""Record the latency of the RPC.
After register the plugin, if stats is enabled, this method will be
called at the end of each RPC.
Args:
method: The fully-qualified name of the RPC method being invoked.
rpc_latency: The latency for the RPC, equals to the time between
when the client invokes the RPC and when the client receives the status.
status_code: An element of grpc.StatusCode in string format representing the
final status for the RPC.
"""
raise NotImplementedError()
def set_tracing(self, enable: bool) -> None:
"""Enable or disable tracing.
Args:
enable: A bool indicates whether tracing should be enabled.
"""
self._tracing_enabled = enable
def set_stats(self, enable: bool) -> None:
"""Enable or disable stats(metrics).
Args:
enable: A bool indicates whether stats should be enabled.
"""
self._stats_enabled = enable
@property
def tracing_enabled(self) -> bool:
return self._tracing_enabled
@property
def stats_enabled(self) -> bool:
return self._stats_enabled
@property
def observability_enabled(self) -> bool:
return self.tracing_enabled or self.stats_enabled
@contextlib.contextmanager
def get_plugin() -> Generator[Optional[ObservabilityPlugin], None, None]:
"""Get the ObservabilityPlugin in _observability module.
Returns:
The ObservabilityPlugin currently registered with the _observability
module. Or None if no plugin exists at the time of calling this method.
"""
with _plugin_lock:
yield _OBSERVABILITY_PLUGIN
def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None:
"""Save ObservabilityPlugin to _observability module.
Args:
observability_plugin: The ObservabilityPlugin to save.
Raises:
ValueError: If an ObservabilityPlugin was already registered at the
time of calling this method.
"""
global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement
with _plugin_lock:
if observability_plugin and _OBSERVABILITY_PLUGIN:
raise ValueError("observability_plugin was already set!")
_OBSERVABILITY_PLUGIN = observability_plugin
def observability_init(observability_plugin: ObservabilityPlugin) -> None:
"""Initialize observability with provided ObservabilityPlugin.
This method have to be called at the start of a program, before any
channels/servers are built.
Args:
observability_plugin: The ObservabilityPlugin to use.
Raises:
ValueError: If an ObservabilityPlugin was already registered at the
time of calling this method.
"""
set_plugin(observability_plugin)
try:
_cygrpc.set_server_call_tracer_factory(observability_plugin)
except Exception: # pylint:disable=broad-except
_LOGGER.exception("Failed to set server call tracer factory!")
def observability_deinit() -> None:
"""Clear the observability context, including ObservabilityPlugin and
ServerCallTracerFactory
This method have to be called after exit observability context so that
it's possible to re-initialize again.
"""
set_plugin(None)
_cygrpc.clear_server_call_tracer_factory()
def delete_call_tracer(client_call_tracer_capsule: Any) -> None:
"""Deletes the ClientCallTracer stored in ClientCallTracerCapsule.
This method will be called at the end of the call to destroy the ClientCallTracer.
The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer`
interface and wrapped in a PyCapsule using `client_call_tracer` as the name.
Args:
client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object.
"""
with get_plugin() as plugin:
if not (plugin and plugin.observability_enabled):
return
plugin.delete_client_call_tracer(client_call_tracer_capsule)
def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
"""Record the latency of the RPC, if the plugin is registered and stats is enabled.
This method will be called at the end of each RPC.
Args:
state: a grpc._channel._RPCState object which contains the stats related to the
RPC.
"""
with get_plugin() as plugin:
if not (plugin and plugin.stats_enabled):
return
rpc_latency = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency.total_seconds() * 1000
plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)

@ -619,6 +619,7 @@ def _unary_response_in_pool(
request_deserializer: Optional[SerializingFunction],
response_serializer: Optional[SerializingFunction]) -> None:
cygrpc.install_context_from_request_call_event(rpc_event)
try:
argument = argument_thunk()
if argument is not None:

@ -0,0 +1,6 @@
build/
include/
*.c
*.cpp
*.egg-info
*.so

@ -0,0 +1,2 @@
### TODO(xuanwn)
* Fill in content.

@ -0,0 +1,80 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:cython_library.bzl", "pyx_library")
package(default_visibility = ["//visibility:public"])
# TODO(xuanwn): We also need support Python-native build
cc_library(
name = "observability",
srcs = [
"client_call_tracer.cc",
"observability_util.cc",
"python_census_context.cc",
"sampler.cc",
"server_call_tracer.cc",
],
hdrs = [
"client_call_tracer.h",
"constants.h",
"observability_util.h",
"python_census_context.h",
"sampler.h",
"server_call_tracer.h",
],
includes = ["."],
deps = [
#TODO(xuanwn): Confirm only referenced code is inlcuded in shared object library
"//:grpc",
"//:grpc_rpc_encoding",
"//src/cpp/ext/gcp:observability_config",
],
)
pyx_library(
name = "cyobservability",
srcs = [
"_cyobservability.pxd",
"_cyobservability.pyx",
],
deps = [
":observability",
],
)
py_library(
name = "pyobservability",
srcs = [
"__init__.py",
"_observability.py",
"_gcp_observability.py",
"_open_census_exporter.py",
# The fllowing files are used only for OC stackdriver extension.
# TODO(xuanwn): Uncomment after those files are implemented.
# "_measures.py",
# "_open_census.py",
# "_views.py",
],
imports = [
".",
"../",
],
srcs_version = "PY3ONLY",
deps = [
":cyobservability",
"//src/python/grpcio/grpc:grpcio",
],
)

@ -0,0 +1,17 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc_observability._gcp_observability import GCPOpenCensusObservability
__all__ = ('GCPOpenCensusObservability',)

@ -0,0 +1,152 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from libcpp.string cimport string
from libcpp.vector cimport vector
ctypedef signed long long int64_t
cdef extern from "<queue>" namespace "std" nogil:
cdef cppclass queue[T]:
bint empty()
T& front()
void pop()
cdef extern from "<mutex>" namespace "std" nogil:
cdef cppclass mutex:
mutex()
cdef cppclass unique_lock[Mutex]:
unique_lock(Mutex&)
cdef extern from "<condition_variable>" namespace "std" nogil:
cdef cppclass condition_variable:
void notify_all()
cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core":
cdef cppclass ClientCallTracer:
pass
cdef extern from "python_census_context.h" namespace "grpc_observability":
cdef void EnablePythonCensusStats(bint enable) nogil
cdef void EnablePythonCensusTracing(bint enable) nogil
union MeasurementValue:
double value_double
int64_t value_int
ctypedef struct Label:
string key
string value
ctypedef struct Annotation:
string time_stamp
string description
ctypedef struct Measurement:
cMetricsName name
MeasurementType type
MeasurementValue value
ctypedef struct SpanCensusData:
string name
string start_time
string end_time
string trace_id
string span_id
string parent_span_id
string status
vector[Label] span_labels
vector[Annotation] span_annotations
int64_t child_span_count
bint should_sample
cdef extern from "observability_util.h" namespace "grpc_observability":
cdef cGcpObservabilityConfig ReadAndActivateObservabilityConfig() nogil
cdef void NativeObservabilityInit() except +
cdef void* CreateClientCallTracer(const char* method,
const char* trace_id,
const char* parent_span_id) except +
cdef void* CreateServerCallTracerFactory() except +
cdef queue[NativeCensusData]* g_census_data_buffer
cdef void AwaitNextBatchLocked(unique_lock[mutex]&, int) nogil
cdef bint PythonCensusStatsEnabled() nogil
cdef bint PythonCensusTracingEnabled() nogil
cdef mutex g_census_data_buffer_mutex
cdef condition_variable g_census_data_buffer_cv
cppclass NativeCensusData "::grpc_observability::CensusData":
DataType type
Measurement measurement_data
SpanCensusData span_data
vector[Label] labels
ctypedef struct CloudMonitoring:
pass
ctypedef struct CloudTrace:
float sampling_rate
ctypedef struct CloudLogging:
pass
ctypedef struct cGcpObservabilityConfig "::grpc_observability::GcpObservabilityConfig":
CloudMonitoring cloud_monitoring
CloudTrace cloud_trace
CloudLogging cloud_logging
string project_id
vector[Label] labels
bint is_valid
cdef extern from "constants.h" namespace "grpc_observability":
ctypedef enum DataType:
kSpanData
kMetricData
ctypedef enum MeasurementType:
kMeasurementDouble
kMeasurementInt
ctypedef enum cMetricsName "::grpc_observability::MetricsName":
# Client
kRpcClientApiLatencyMeasureName
kRpcClientSentMessagesPerRpcMeasureName
kRpcClientSentBytesPerRpcMeasureName
kRpcClientReceivedMessagesPerRpcMeasureName
kRpcClientReceivedBytesPerRpcMeasureName
kRpcClientRoundtripLatencyMeasureName
kRpcClientServerLatencyMeasureName
kRpcClientStartedRpcsMeasureName
kRpcClientRetriesPerCallMeasureName
kRpcClientTransparentRetriesPerCallMeasureName
kRpcClientRetryDelayPerCallMeasureName
kRpcClientTransportLatencyMeasureName
# Server
kRpcServerSentMessagesPerRpcMeasureName
kRpcServerSentBytesPerRpcMeasureName
kRpcServerReceivedMessagesPerRpcMeasureName
kRpcServerReceivedBytesPerRpcMeasureName
kRpcServerServerLatencyMeasureName
kRpcServerStartedRpcsMeasureName
string kClientMethod
string kClientStatus
cdef extern from "sampler.h" namespace "grpc_observability":
cdef cppclass ProbabilitySampler:
@staticmethod
ProbabilitySampler& Get()
void SetThreshold(double sampling_rate)

@ -0,0 +1,342 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cimport cpython
from cython.operator cimport dereference
import enum
import functools
import logging
import os
from threading import Thread
from typing import List, Mapping, Tuple, Union
import _observability
# Time we wait for batch exporting census data
# TODO(xuanwn): change interval to a more appropriate number
CENSUS_EXPORT_BATCH_INTERVAL_SECS = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL_SECS', 0.5))
GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT', 10))
cdef const char* CLIENT_CALL_TRACER = "client_call_tracer"
cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory"
cdef bint GLOBAL_SHUTDOWN_EXPORT_THREAD = False
cdef object GLOBAL_EXPORT_THREAD
_LOGGER = logging.getLogger(__name__)
class _CyMetricsName:
CY_CLIENT_API_LATENCY = kRpcClientApiLatencyMeasureName
CY_CLIENT_SNET_MESSSAGES_PER_RPC = kRpcClientSentMessagesPerRpcMeasureName
CY_CLIENT_SEND_BYTES_PER_RPC = kRpcClientSentBytesPerRpcMeasureName
CY_CLIENT_RECEIVED_MESSAGES_PER_RPC = kRpcClientReceivedMessagesPerRpcMeasureName
CY_CLIENT_RECEIVED_BYTES_PER_RPC = kRpcClientReceivedBytesPerRpcMeasureName
CY_CLIENT_ROUNDTRIP_LATENCY = kRpcClientRoundtripLatencyMeasureName
CY_CLIENT_SERVER_LATENCY = kRpcClientServerLatencyMeasureName
CY_CLIENT_STARTED_RPCS = kRpcClientStartedRpcsMeasureName
CY_CLIENT_RETRIES_PER_CALL = kRpcClientRetriesPerCallMeasureName
CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL = kRpcClientTransparentRetriesPerCallMeasureName
CY_CLIENT_RETRY_DELAY_PER_CALL = kRpcClientRetryDelayPerCallMeasureName
CY_CLIENT_TRANSPORT_LATENCY = kRpcClientTransportLatencyMeasureName
CY_SERVER_SENT_MESSAGES_PER_RPC = kRpcServerSentMessagesPerRpcMeasureName
CY_SERVER_SENT_BYTES_PER_RPC = kRpcServerSentBytesPerRpcMeasureName
CY_SERVER_RECEIVED_MESSAGES_PER_RPC = kRpcServerReceivedMessagesPerRpcMeasureName
CY_SERVER_RECEIVED_BYTES_PER_RPC = kRpcServerReceivedBytesPerRpcMeasureName
CY_SERVER_SERVER_LATENCY = kRpcServerServerLatencyMeasureName
CY_SERVER_STARTED_RPCS = kRpcServerStartedRpcsMeasureName
@enum.unique
class MetricsName(enum.Enum):
CLIENT_STARTED_RPCS = _CyMetricsName.CY_CLIENT_STARTED_RPCS
CLIENT_API_LATENCY = _CyMetricsName.CY_CLIENT_API_LATENCY
CLIENT_SNET_MESSSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_SNET_MESSSAGES_PER_RPC
CLIENT_SEND_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_SEND_BYTES_PER_RPC
CLIENT_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_MESSAGES_PER_RPC
CLIENT_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_BYTES_PER_RPC
CLIENT_ROUNDTRIP_LATENCY = _CyMetricsName.CY_CLIENT_ROUNDTRIP_LATENCY
CLIENT_SERVER_LATENCY = _CyMetricsName.CY_CLIENT_SERVER_LATENCY
CLIENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_RETRIES_PER_CALL
CLIENT_TRANSPARENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL
CLIENT_RETRY_DELAY_PER_CALL = _CyMetricsName.CY_CLIENT_RETRY_DELAY_PER_CALL
CLIENT_TRANSPORT_LATENCY = _CyMetricsName.CY_CLIENT_TRANSPORT_LATENCY
SERVER_SENT_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_MESSAGES_PER_RPC
SERVER_SENT_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_BYTES_PER_RPC
SERVER_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_MESSAGES_PER_RPC
SERVER_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_BYTES_PER_RPC
SERVER_SERVER_LATENCY = _CyMetricsName.CY_SERVER_SERVER_LATENCY
SERVER_STARTED_RPCS = _CyMetricsName.CY_SERVER_STARTED_RPCS
# Delay map creation due to circular dependencies
_CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING = {x.value: x for x in MetricsName}
def cyobservability_init(object exporter) -> None:
exporter: _observability.Exporter
NativeObservabilityInit()
_start_exporting_thread(exporter)
def _start_exporting_thread(object exporter) -> None:
exporter: _observability.Exporter
global GLOBAL_EXPORT_THREAD
global GLOBAL_SHUTDOWN_EXPORT_THREAD
GLOBAL_SHUTDOWN_EXPORT_THREAD = False
# TODO(xuanwn): Change it to daemon thread.
GLOBAL_EXPORT_THREAD = Thread(target=_export_census_data, args=(exporter,))
GLOBAL_EXPORT_THREAD.start()
def set_gcp_observability_config(object py_config) -> bool:
py_config: _gcp_observability.GcpObservabilityPythonConfig
py_labels = {}
sampling_rate = 0.0
cdef cGcpObservabilityConfig c_config = ReadAndActivateObservabilityConfig()
if not c_config.is_valid:
return False
for label in c_config.labels:
py_labels[_decode(label.key)] = _decode(label.value)
if PythonCensusTracingEnabled():
sampling_rate = c_config.cloud_trace.sampling_rate
# Save sampling rate to global sampler.
ProbabilitySampler.Get().SetThreshold(sampling_rate)
py_config.set_configuration(_decode(c_config.project_id), sampling_rate, py_labels,
PythonCensusTracingEnabled(), PythonCensusStatsEnabled())
return True
def create_client_call_tracer(bytes method_name, bytes trace_id,
bytes parent_span_id=b'') -> cpython.PyObject:
"""Create a ClientCallTracer and save to PyCapsule.
Returns: A grpc_observability._observability.ClientCallTracerCapsule object.
"""
cdef char* c_method = cpython.PyBytes_AsString(method_name)
cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id)
cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id)
cdef void* call_tracer = CreateClientCallTracer(c_method, c_trace_id, c_parent_span_id)
capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL)
return capsule
def create_server_call_tracer_factory_capsule() -> cpython.PyObject:
"""Create a ServerCallTracerFactory and save to PyCapsule.
Returns: A grpc_observability._observability.ServerCallTracerFactoryCapsule object.
"""
cdef void* call_tracer_factory = CreateServerCallTracerFactory()
capsule = cpython.PyCapsule_New(call_tracer_factory, SERVER_CALL_TRACER_FACTORY, NULL)
return capsule
def delete_client_call_tracer(object client_call_tracer) -> None:
client_call_tracer: grpc._observability.ClientCallTracerCapsule
if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER):
capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER)
call_tracer_ptr = <ClientCallTracer*>capsule_ptr
del call_tracer_ptr
def _c_label_to_labels(vector[Label] c_labels) -> Mapping[str, str]:
py_labels = {}
for label in c_labels:
py_labels[_decode(label.key)] = _decode(label.value)
return py_labels
def _c_measurement_to_measurement(object measurement
) -> Mapping[str, Union[enum, Mapping[str, Union[float, int]]]]:
"""Convert Cython Measurement to Python measurement.
Args:
measurement: Actual measurement repesented by Cython type Measurement, using object here
since Cython refuse to automatically convert a union with unsafe type combinations.
Returns:
A mapping object with keys and values as following:
name -> cMetricsName
type -> MeasurementType
value -> {value_double: float | value_int: int}
"""
measurement: Measurement
py_measurement = {}
py_measurement['name'] = measurement['name']
py_measurement['type'] = measurement['type']
if measurement['type'] == kMeasurementDouble:
py_measurement['value'] = {'value_double': measurement['value']['value_double']}
else:
py_measurement['value'] = {'value_int': measurement['value']['value_int']}
return py_measurement
def _c_annotation_to_annotations(vector[Annotation] c_annotations) -> List[Tuple[str, str]]:
py_annotations = []
for annotation in c_annotations:
py_annotations.append((_decode(annotation.time_stamp),
_decode(annotation.description)))
return py_annotations
def observability_deinit() -> None:
_shutdown_exporting_thread()
EnablePythonCensusStats(False)
EnablePythonCensusTracing(False)
@functools.lru_cache(maxsize=None)
def _cy_metric_name_to_py_metric_name(cMetricsName metric_name) -> MetricsName:
try:
return _CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING[metric_name]
except KeyError:
raise ValueError('Invalid metric name %s' % metric_name)
def _get_stats_data(object measurement, object labels) -> _observability.StatsData:
"""Convert a Python measurement to StatsData.
Args:
measurement: A dict of type Mapping[str, Union[enum, Mapping[str, Union[float, int]]]]
with keys and values as following:
name -> cMetricsName
type -> MeasurementType
value -> {value_double: float | value_int: int}
labels: Labels assciociated with stats data with type of dict[str, str].
"""
measurement: Measurement
labels: Mapping[str, str]
metric_name = _cy_metric_name_to_py_metric_name(measurement['name'])
if measurement['type'] == kMeasurementDouble:
py_stat = _observability.StatsData(name=metric_name, measure_double=True,
value_float=measurement['value']['value_double'],
labels=labels)
else:
py_stat = _observability.StatsData(name=metric_name, measure_double=False,
value_int=measurement['value']['value_int'],
labels=labels)
return py_stat
def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels,
vector[Annotation] span_annotations) -> _observability.TracingData:
py_span_labels = _c_label_to_labels(span_labels)
py_span_annotations = _c_annotation_to_annotations(span_annotations)
return _observability.TracingData(name=_decode(span_data.name),
start_time = _decode(span_data.start_time),
end_time = _decode(span_data.end_time),
trace_id = _decode(span_data.trace_id),
span_id = _decode(span_data.span_id),
parent_span_id = _decode(span_data.parent_span_id),
status = _decode(span_data.status),
should_sample = span_data.should_sample,
child_span_count = span_data.child_span_count,
span_labels = py_span_labels,
span_annotations = py_span_annotations)
def _record_rpc_latency(object exporter, str method, float rpc_latency, str status_code) -> None:
exporter: _observability.Exporter
measurement = {}
measurement['name'] = kRpcClientApiLatencyMeasureName
measurement['type'] = kMeasurementDouble
measurement['value'] = {'value_double': rpc_latency}
labels = {}
labels[_decode(kClientMethod)] = method.strip("/")
labels[_decode(kClientStatus)] = status_code
metric = _get_stats_data(measurement, labels)
exporter.export_stats_data([metric])
cdef void _export_census_data(object exporter):
"""Main function running in export thread."""
exporter: _observability.Exporter
cdef int export_interval_ms = CENSUS_EXPORT_BATCH_INTERVAL_SECS * 1000
while True:
with nogil:
while not GLOBAL_SHUTDOWN_EXPORT_THREAD:
lk = new unique_lock[mutex](g_census_data_buffer_mutex)
# Wait for next batch of census data OR timeout at fixed interval.
# Batch export census data to minimize the time we acquiring the GIL.
AwaitNextBatchLocked(dereference(lk), export_interval_ms)
# Break only when buffer have data
if not g_census_data_buffer.empty():
del lk
break
else:
del lk
_flush_census_data(exporter)
if GLOBAL_SHUTDOWN_EXPORT_THREAD:
break # Break to shutdown exporting thead
# Flush one last time before shutdown thread
_flush_census_data(exporter)
cdef void _flush_census_data(object exporter):
exporter: _observability.Exporter
lk = new unique_lock[mutex](g_census_data_buffer_mutex)
if g_census_data_buffer.empty():
del lk
return
py_metrics_batch = []
py_spans_batch = []
while not g_census_data_buffer.empty():
c_census_data = g_census_data_buffer.front()
if c_census_data.type == kMetricData:
py_labels = _c_label_to_labels(c_census_data.labels)
py_measurement = _c_measurement_to_measurement(c_census_data.measurement_data)
py_metric = _get_stats_data(py_measurement, py_labels)
py_metrics_batch.append(py_metric)
else:
py_span = _get_tracing_data(c_census_data.span_data, c_census_data.span_data.span_labels,
c_census_data.span_data.span_annotations)
py_spans_batch.append(py_span)
g_census_data_buffer.pop()
del lk
exporter.export_stats_data(py_metrics_batch)
exporter.export_tracing_data(py_spans_batch)
cdef void _shutdown_exporting_thread():
with nogil:
global GLOBAL_SHUTDOWN_EXPORT_THREAD
GLOBAL_SHUTDOWN_EXPORT_THREAD = True
g_census_data_buffer_cv.notify_all()
GLOBAL_EXPORT_THREAD.join(timeout=GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT)
cdef str _decode(bytes bytestring):
if isinstance(bytestring, (str,)):
return <str>bytestring
else:
try:
return bytestring.decode('utf8')
except UnicodeDecodeError:
_LOGGER.exception('Invalid encoding on %s', bytestring)
return bytestring.decode('latin1')

@ -0,0 +1,186 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
import logging
import threading
import time
from typing import Any, Mapping, Optional
import grpc
from grpc_observability import _cyobservability # pytype: disable=pyi-error
from grpc_observability._open_census_exporter import OpenCensusExporter
from opencensus.trace import execution_context
from opencensus.trace import span_context as span_context_module
from opencensus.trace import trace_options as trace_options_module
_LOGGER = logging.getLogger(__name__)
ClientCallTracerCapsule = Any # it appears only once in the function signature
ServerCallTracerFactoryCapsule = Any # it appears only once in the function signature
grpc_observability = Any # grpc_observability.py imports this module.
GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.OK: "OK",
grpc.StatusCode.CANCELLED: "CANCELLED",
grpc.StatusCode.UNKNOWN: "UNKNOWN",
grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
grpc.StatusCode.ABORTED: "ABORTED",
grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
grpc.StatusCode.INTERNAL: "INTERNAL",
grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
@dataclass
class GcpObservabilityPythonConfig:
_singleton = None
_lock: threading.RLock = threading.RLock()
project_id: str = ""
stats_enabled: bool = False
tracing_enabled: bool = False
labels: Optional[Mapping[str, str]] = field(default_factory=dict)
sampling_rate: Optional[float] = 0.0
@staticmethod
def get():
with GcpObservabilityPythonConfig._lock:
if GcpObservabilityPythonConfig._singleton is None:
GcpObservabilityPythonConfig._singleton = GcpObservabilityPythonConfig(
)
return GcpObservabilityPythonConfig._singleton
def set_configuration(self,
project_id: str,
sampling_rate: Optional[float] = 0.0,
labels: Optional[Mapping[str, str]] = None,
tracing_enabled: bool = False,
stats_enabled: bool = False) -> None:
self.project_id = project_id
self.stats_enabled = stats_enabled
self.tracing_enabled = tracing_enabled
self.labels = labels
self.sampling_rate = sampling_rate
# pylint: disable=no-self-use
class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
"""GCP OpenCensus based plugin implementation.
If no exporter is passed, the default will be OpenCensus StackDriver
based exporter.
For more details, please refer to User Guide:
* https://cloud.google.com/stackdriver/docs/solutions/grpc
Attributes:
config: Configuration for GCP OpenCensus Observability.
exporter: Exporter used to export data.
"""
config: GcpObservabilityPythonConfig
exporter: "grpc_observability.Exporter"
def __init__(self, exporter: "grpc_observability.Exporter" = None):
self.exporter = None
self.config = GcpObservabilityPythonConfig.get()
if exporter:
self.exporter = exporter
else:
self.exporter = OpenCensusExporter(self.config.get().labels)
config_valid = _cyobservability.set_gcp_observability_config(
self.config)
if not config_valid:
raise ValueError("Invalid configuration")
if self.config.tracing_enabled:
self.set_tracing(True)
if self.config.stats_enabled:
self.set_stats(True)
def __enter__(self):
try:
_cyobservability.cyobservability_init(self.exporter)
#TODO(xuanwn): Use specific exceptons
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception("GCPOpenCensusObservability failed with: %s", e)
grpc._observability.observability_init(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.exit()
def exit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
# immediately after exit, it's possible that core didn't call RecordEnd
# in callTracer, and all data recorded by calling RecordEnd will be
# lost.
# The time equals to the time in AwaitNextBatchLocked.
# TODO(xuanwn): explicit synchronization
# https://github.com/grpc/grpc/issues/33262
time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
self.set_tracing(False)
self.set_stats(False)
_cyobservability.observability_deinit()
grpc._observability.observability_deinit()
def create_client_call_tracer(
self, method_name: bytes) -> ClientCallTracerCapsule:
current_span = execution_context.get_current_span()
if current_span:
# Propagate existing OC context
trace_id = current_span.context_tracer.trace_id.encode('utf8')
parent_span_id = current_span.span_id.encode('utf8')
capsule = _cyobservability.create_client_call_tracer(
method_name, trace_id, parent_span_id)
else:
trace_id = span_context_module.generate_trace_id().encode('utf8')
capsule = _cyobservability.create_client_call_tracer(
method_name, trace_id)
return capsule
def create_server_call_tracer_factory(
self) -> ServerCallTracerFactoryCapsule:
capsule = _cyobservability.create_server_call_tracer_factory_capsule()
return capsule
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
def save_trace_context(self, trace_id: str, span_id: str,
is_sampled: bool) -> None:
trace_options = trace_options_module.TraceOptions(0)
trace_options.set_enabled(is_sampled)
span_context = span_context_module.SpanContext(
trace_id=trace_id, span_id=span_id, trace_options=trace_options)
current_tracer = execution_context.get_opencensus_tracer()
current_tracer.span_context = span_context
def record_rpc_latency(self, method: str, rpc_latency: float,
status_code: grpc.StatusCode) -> None:
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
_cyobservability._record_rpc_latency(self.exporter, method, rpc_latency,
status_code)

@ -0,0 +1,101 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
from dataclasses import dataclass
from dataclasses import field
from typing import List, Mapping, Tuple
class Exporter(metaclass=abc.ABCMeta):
"""Abstract base class for census data exporters."""
@abc.abstractmethod
def export_stats_data(self, stats_data: List[TracingData]) -> None:
"""Exports a list of TracingData objects to the exporter's destination.
Args:
stats_data: A list of TracingData objects to export.
"""
raise NotImplementedError()
@abc.abstractmethod
def export_tracing_data(self, tracing_data: List[StatsData]) -> None:
"""Exports a list of StatsData objects to the exporter's destination.
Args:
tracing_data: A list of StatsData objects to export.
"""
raise NotImplementedError()
@dataclass(frozen=True)
class StatsData:
"""A data class representing stats data.
Attributes:
name: An element of grpc_observability._cyobservability.MetricsName, e.g.
MetricsName.CLIENT_STARTED_RPCS.
measure_double: A bool indicate whether the metric is a floating-point
value.
value_int: The actual metric value if measure_double is False.
value_float: The actual metric value if measure_double is True.
labels: A dictionary that maps label tags associated with this metric to
corresponding label value.
"""
name: "grpc_observability._cyobservability.MetricsName"
measure_double: bool
value_int: int = 0
value_float: float = 0.0
labels: Mapping[str, str] = field(default_factory=dict)
@dataclass(frozen=True)
class TracingData:
"""A data class representing tracing data.
Attributes:
name: The name for tracing data, also the name for the Span.
start_time: The start time for the span in RFC3339 UTC "Zulu" format, e.g.
2014-10-02T15:01:23Z
end_time: The end time for the span in RFC3339 UTC "Zulu" format, e.g.
2014-10-02T15:01:23Z
trace_id: The identifier for the trace associated with this span as a
32-character hexadecimal encoded string,
e.g. 26ed0036f2eff2b7317bccce3e28d01f
span_id: The identifier for the span as a 16-character hexadecimal encoded
string. e.g. 113ec879e62583bc
parent_span_id: An option identifier for the span's parent id.
status: An element of grpc.StatusCode in string format representing the
final status for the trace data.
should_sample: A bool indicates whether the span is sampled.
child_span_count: The number of child span associated with this span.
span_labels: A dictionary that maps labels tags associated with this
span to corresponding label value.
span_annotations: A dictionary that maps annotation timeStamp with
description. The timeStamp have a format which can be converted
to Python datetime.datetime, e.g. 2023-05-29 17:07:09.895
"""
name: str
start_time: str
end_time: str
trace_id: str
span_id: str
parent_span_id: str
status: str
should_sample: bool
child_span_count: int
span_labels: Mapping[str, str] = field(default_factory=dict)
span_annotations: List[Tuple[str, str]] = field(default_factory=list)

@ -0,0 +1,33 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import List
from grpc_observability import _observability # pytype: disable=pyi-error
logger = logging.getLogger(__name__)
class OpenCensusExporter(_observability.Exporter):
def export_stats_data(self,
stats_data: List[_observability.StatsData]) -> None:
# TODO(xuanwn): Add implementation
raise NotImplementedError()
def export_tracing_data(
self, tracing_data: List[_observability.TracingData]) -> None:
# TODO(xuanwn): Add implementation
raise NotImplementedError()

@ -0,0 +1,281 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/python/grpcio_observability/grpc_observability/client_call_tracer.h"
#include <constants.h>
#include <observability_util.h>
#include <python_census_context.h>
#include <stddef.h>
#include <algorithm>
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include <grpc/slice.h>
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/slice/slice.h"
namespace grpc_observability {
constexpr uint32_t PythonOpenCensusCallTracer::
PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen;
constexpr uint32_t
PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen;
//
// OpenCensusCallTracer
//
PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
const char* method, const char* trace_id, const char* parent_span_id,
bool tracing_enabled)
: method_(GetMethod(method)), tracing_enabled_(tracing_enabled) {
GenerateClientContext(absl::StrCat("Sent.", method_),
absl::string_view(trace_id),
absl::string_view(parent_span_id), &context_);
}
void PythonOpenCensusCallTracer::GenerateContext() {}
void PythonOpenCensusCallTracer::RecordAnnotation(
absl::string_view annotation) {
// If tracing is disabled, the following will be a no-op.
context_.AddSpanAnnotation(annotation);
}
PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
if (PythonCensusStatsEnabled()) {
context_.Labels().emplace_back(kClientMethod, std::string(method_));
RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1,
context_.Labels()); // exclude first attempt
RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
transparent_retries_, context_.Labels());
RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
ToDoubleMilliseconds(retry_delay_), context_.Labels());
}
if (tracing_enabled_) {
context_.EndSpan();
if (IsSampled()) {
RecordSpan(context_.Span().ToCensusData());
}
}
}
PythonCensusContext
PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() {
auto context = PythonCensusContext(absl::StrCat("Attempt.", method_),
&(context_.Span()), context_.Labels());
return context;
}
PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer*
PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) {
uint64_t attempt_num;
{
grpc_core::MutexLock lock(&mu_);
if (transparent_retries_ != 0 || retries_ != 0) {
if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) {
retry_delay_ += absl::Now() - time_at_last_attempt_end_;
}
}
attempt_num = retries_;
if (is_transparent_retry) {
++transparent_retries_;
} else {
++retries_;
}
++num_active_rpcs_;
}
context_.IncreaseChildSpanCount();
return new PythonOpenCensusCallAttemptTracer(this, attempt_num,
is_transparent_retry);
}
//
// PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer
//
PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
uint64_t attempt_num,
bool is_transparent_retry)
: parent_(parent),
context_(parent_->CreateCensusContextForCallAttempt()),
start_time_(absl::Now()) {
if (parent_->tracing_enabled_) {
context_.AddSpanAttribute("previous-rpc-attempts",
absl::StrCat(attempt_num));
context_.AddSpanAttribute("transparent-retry",
absl::StrCat(is_transparent_retry));
}
if (!PythonCensusStatsEnabled()) {
return;
}
context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels());
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (parent_->tracing_enabled_) {
char tracing_buf[kMaxTraceContextLen];
size_t tracing_len =
TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen);
if (tracing_len > 0) {
send_initial_metadata->Set(
grpc_core::GrpcTraceBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len));
}
}
if (!PythonCensusStatsEnabled()) {
return;
}
grpc_slice tags = grpc_empty_slice();
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(),
grpc_core::Slice(tags));
}
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) {
++sent_message_count_;
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) {
++recv_message_count_;
}
namespace {
// Returns 0 if no server stats are present in the metadata.
uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) {
if (!PythonCensusStatsEnabled()) {
return 0;
}
const grpc_core::Slice* grpc_server_stats_bin_ptr =
b->get_pointer(grpc_core::GrpcServerStatsBinMetadata());
if (grpc_server_stats_bin_ptr == nullptr) {
return 0;
}
uint64_t elapsed_time = 0;
ServerStatsDeserialize(
reinterpret_cast<const char*>(grpc_server_stats_bin_ptr->data()),
grpc_server_stats_bin_ptr->size(), &elapsed_time);
return elapsed_time;
}
} // namespace
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) {
if (!PythonCensusStatsEnabled()) {
return;
}
auto status_code_ = status.code();
uint64_t elapsed_time = 0;
if (recv_trailing_metadata != nullptr) {
elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata);
}
std::string final_status = absl::StatusCodeToString(status_code_);
context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
context_.Labels().emplace_back(kClientStatus, final_status);
RecordDoubleMetric(
kRpcClientSentBytesPerRpcMeasureName,
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0),
context_.Labels());
RecordDoubleMetric(
kRpcClientReceivedBytesPerRpcMeasureName,
static_cast<double>(transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0),
context_.Labels());
RecordDoubleMetric(
kRpcClientServerLatencyMeasureName,
absl::ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time)),
context_.Labels());
RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
absl::ToDoubleMilliseconds(absl::Now() - start_time_),
context_.Labels());
if (grpc_core::IsTransportSuppliesClientLatencyEnabled()) {
if (transport_stream_stats != nullptr &&
gpr_time_cmp(transport_stream_stats->latency,
gpr_inf_future(GPR_TIMESPAN)) != 0) {
double latency_ms = absl::ToDoubleMilliseconds(absl::Microseconds(
gpr_timespec_to_micros(transport_stream_stats->latency)));
RecordDoubleMetric(kRpcClientTransportLatencyMeasureName, latency_ms,
context_.Labels());
}
}
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordCancel(absl::Status /*cancel_error*/) {}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd(
const gpr_timespec& /*latency*/) {
if (PythonCensusStatsEnabled()) {
context_.Labels().emplace_back(kClientMethod,
std::string(parent_->method_));
context_.Labels().emplace_back(kClientStatus,
StatusCodeToString(status_code_));
RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName,
sent_message_count_, context_.Labels());
RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName,
recv_message_count_, context_.Labels());
grpc_core::MutexLock lock(&parent_->mu_);
if (--parent_->num_active_rpcs_ == 0) {
parent_->time_at_last_attempt_end_ = absl::Now();
}
}
if (parent_->tracing_enabled_) {
if (status_code_ != absl::StatusCode::kOk) {
context_.Span().SetStatus(StatusCodeToString(status_code_));
}
context_.EndSpan();
if (IsSampled()) {
RecordSpan(context_.Span().ToCensusData());
}
}
// After RecordEnd, Core will make no further usage of this CallAttemptTracer,
// so we are free it here.
delete this;
}
void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
RecordAnnotation(absl::string_view annotation) {
// If tracing is disabled, the following will be a no-op.
context_.AddSpanAnnotation(annotation);
}
} // namespace grpc_observability

@ -0,0 +1,139 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H
#define GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H
#include <stdint.h>
#include <string>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/escaping.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include <grpc/support/time.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/python/grpcio_observability/grpc_observability/python_census_context.h"
namespace grpc_observability {
class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
public:
class PythonOpenCensusCallAttemptTracer : public CallAttemptTracer {
public:
PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent,
uint64_t attempt_num,
bool is_transparent_retry);
std::string TraceId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().TraceId()));
}
std::string SpanId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().SpanId()));
}
bool IsSampled() override { return context_.SpanContext().IsSampled(); }
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(
const grpc_core::SliceBuffer& /*send_message*/) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& /*send_compressed_message*/) override {}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(
const grpc_core::SliceBuffer& /*recv_message*/) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {}
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /*latency*/) override;
void RecordAnnotation(absl::string_view annotation) override;
private:
// Maximum size of trace context is sent on the wire.
static constexpr uint32_t kMaxTraceContextLen = 64;
// Maximum size of tags that are sent on the wire.
static constexpr uint32_t kMaxTagsLen = 2048;
PythonOpenCensusCallTracer* parent_;
PythonCensusContext context_;
// Start time (for measuring latency).
absl::Time start_time_;
// Number of messages in this RPC.
uint64_t recv_message_count_ = 0;
uint64_t sent_message_count_ = 0;
// End status code
absl::StatusCode status_code_;
};
explicit PythonOpenCensusCallTracer(const char* method, const char* trace_id,
const char* parent_span_id,
bool tracing_enabled);
~PythonOpenCensusCallTracer() override;
std::string TraceId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().TraceId()));
}
std::string SpanId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().SpanId()));
}
bool IsSampled() override { return context_.SpanContext().IsSampled(); }
void GenerateContext();
PythonOpenCensusCallAttemptTracer* StartNewAttempt(
bool is_transparent_retry) override;
void RecordAnnotation(absl::string_view annotation) override;
private:
PythonCensusContext CreateCensusContextForCallAttempt();
// Client method.
absl::string_view method_;
PythonCensusContext context_;
bool tracing_enabled_;
mutable grpc_core::Mutex mu_;
// Non-transparent attempts per call
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Transparent retries per call
uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Retry delay
absl::Duration retry_delay_ ABSL_GUARDED_BY(&mu_);
absl::Time time_at_last_attempt_end_ ABSL_GUARDED_BY(&mu_);
uint64_t num_active_rpcs_ ABSL_GUARDED_BY(&mu_) = 0;
};
} // namespace grpc_observability
#endif // GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H

@ -0,0 +1,54 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H
#define GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H
#include <string>
namespace grpc_observability {
const std::string kClientMethod = "grpc_client_method";
const std::string kClientStatus = "grpc_client_status";
const std::string kServerMethod = "grpc_server_method";
const std::string kServerStatus = "grpc_server_status";
typedef enum { kMeasurementDouble = 0, kMeasurementInt } MeasurementType;
typedef enum { kSpanData = 0, kMetricData } DataType;
typedef enum {
kRpcClientApiLatencyMeasureName = 0,
kRpcClientSentMessagesPerRpcMeasureName,
kRpcClientSentBytesPerRpcMeasureName,
kRpcClientReceivedMessagesPerRpcMeasureName,
kRpcClientReceivedBytesPerRpcMeasureName,
kRpcClientRoundtripLatencyMeasureName,
kRpcClientServerLatencyMeasureName,
kRpcClientStartedRpcsMeasureName,
kRpcClientRetriesPerCallMeasureName,
kRpcClientTransparentRetriesPerCallMeasureName,
kRpcClientRetryDelayPerCallMeasureName,
kRpcClientTransportLatencyMeasureName,
kRpcServerSentMessagesPerRpcMeasureName,
kRpcServerSentBytesPerRpcMeasureName,
kRpcServerReceivedMessagesPerRpcMeasureName,
kRpcServerReceivedBytesPerRpcMeasureName,
kRpcServerServerLatencyMeasureName,
kRpcServerStartedRpcsMeasureName
} MetricsName;
} // namespace grpc_observability
#endif // GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H

@ -0,0 +1,224 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/python/grpcio_observability/grpc_observability/observability_util.h"
#include <constants.h>
#include <python_census_context.h>
#include <chrono>
#include <cstdlib>
#include <map>
#include <string>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/cpp/ext/gcp/observability_config.h"
#include "src/python/grpcio_observability/grpc_observability/client_call_tracer.h"
#include "src/python/grpcio_observability/grpc_observability/server_call_tracer.h"
namespace grpc_observability {
std::queue<CensusData>* g_census_data_buffer;
std::mutex g_census_data_buffer_mutex;
std::condition_variable g_census_data_buffer_cv;
// TODO(xuanwn): Change below to a more appropriate number.
// Assume buffer will store 100 CensusData and start export when buffer is 70%
// full.
constexpr float kExportThreshold = 0.7;
constexpr int kMaxExportBufferSize = 100;
namespace {
float GetExportThreadHold() {
const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD");
if (value != nullptr) {
return std::stof(value);
}
return kExportThreshold;
}
int GetMaxExportBufferSize() {
const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE");
if (value != nullptr) {
return std::stoi(value);
}
return kMaxExportBufferSize;
}
} // namespace
void RecordIntMetric(MetricsName name, int64_t value,
const std::vector<Label>& labels) {
Measurement measurement_data;
measurement_data.type = kMeasurementInt;
measurement_data.name = name;
measurement_data.value.value_int = value;
CensusData data = CensusData(measurement_data, labels);
AddCensusDataToBuffer(data);
}
void RecordDoubleMetric(MetricsName name, double value,
const std::vector<Label>& labels) {
Measurement measurement_data;
measurement_data.type = kMeasurementDouble;
measurement_data.name = name;
measurement_data.value.value_double = value;
CensusData data = CensusData(measurement_data, labels);
AddCensusDataToBuffer(data);
}
void RecordSpan(const SpanCensusData& span_census_data) {
CensusData data = CensusData(span_census_data);
AddCensusDataToBuffer(data);
}
void NativeObservabilityInit() {
g_census_data_buffer = new std::queue<CensusData>;
}
void* CreateClientCallTracer(const char* method, const char* trace_id,
const char* parent_span_id) {
void* client_call_tracer = new PythonOpenCensusCallTracer(
method, trace_id, parent_span_id, PythonCensusTracingEnabled());
return client_call_tracer;
}
void* CreateServerCallTracerFactory() {
void* server_call_tracer_factory =
new PythonOpenCensusServerCallTracerFactory();
return server_call_tracer_factory;
}
void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms) {
auto now = std::chrono::system_clock::now();
g_census_data_buffer_cv.wait_until(
lock, now + std::chrono::milliseconds(timeout_ms));
}
void AddCensusDataToBuffer(const CensusData& data) {
std::unique_lock<std::mutex> lk(g_census_data_buffer_mutex);
if (g_census_data_buffer->size() >= GetMaxExportBufferSize()) {
gpr_log(GPR_DEBUG,
"Reached maximum census data buffer size, discarding this "
"CensusData entry");
} else {
g_census_data_buffer->push(data);
}
if (g_census_data_buffer->size() >=
(GetExportThreadHold() * GetMaxExportBufferSize())) {
g_census_data_buffer_cv.notify_all();
}
}
GcpObservabilityConfig ReadAndActivateObservabilityConfig() {
auto config = grpc::internal::GcpObservabilityConfig::ReadFromEnv();
if (!config.ok()) {
return GcpObservabilityConfig();
}
if (!config->cloud_trace.has_value() &&
!config->cloud_monitoring.has_value() &&
!config->cloud_logging.has_value()) {
return GcpObservabilityConfig(true);
}
if (config->cloud_trace.has_value()) {
EnablePythonCensusTracing(true);
}
if (config->cloud_monitoring.has_value()) {
EnablePythonCensusStats(true);
}
std::vector<Label> labels;
std::string project_id = config->project_id;
CloudMonitoring cloud_monitoring_config = CloudMonitoring();
CloudTrace cloud_trace_config = CloudTrace();
CloudLogging cloud_logging_config = CloudLogging();
if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
labels.reserve(config->labels.size());
// Insert in user defined labels from the GCP Observability config.
for (const auto& label : config->labels) {
labels.emplace_back(label.first, label.second);
}
if (config->cloud_trace.has_value()) {
double sampleRate = config->cloud_trace->sampling_rate;
cloud_trace_config = CloudTrace(sampleRate);
}
if (config->cloud_monitoring.has_value()) {
cloud_monitoring_config = CloudMonitoring();
}
}
// Clound logging
if (config->cloud_logging.has_value()) {
// TODO(xuanwn): Read cloud logging config
}
return GcpObservabilityConfig(cloud_monitoring_config, cloud_trace_config,
cloud_logging_config, project_id, labels);
}
absl::string_view StatusCodeToString(grpc_status_code code) {
switch (code) {
case GRPC_STATUS_OK:
return "OK";
case GRPC_STATUS_CANCELLED:
return "CANCELLED";
case GRPC_STATUS_UNKNOWN:
return "UNKNOWN";
case GRPC_STATUS_INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case GRPC_STATUS_DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case GRPC_STATUS_NOT_FOUND:
return "NOT_FOUND";
case GRPC_STATUS_ALREADY_EXISTS:
return "ALREADY_EXISTS";
case GRPC_STATUS_PERMISSION_DENIED:
return "PERMISSION_DENIED";
case GRPC_STATUS_UNAUTHENTICATED:
return "UNAUTHENTICATED";
case GRPC_STATUS_RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case GRPC_STATUS_FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case GRPC_STATUS_ABORTED:
return "ABORTED";
case GRPC_STATUS_OUT_OF_RANGE:
return "OUT_OF_RANGE";
case GRPC_STATUS_UNIMPLEMENTED:
return "UNIMPLEMENTED";
case GRPC_STATUS_INTERNAL:
return "INTERNAL";
case GRPC_STATUS_UNAVAILABLE:
return "UNAVAILABLE";
case GRPC_STATUS_DATA_LOSS:
return "DATA_LOSS";
default:
// gRPC wants users of this enum to include a default branch so that
// adding values is not a breaking change.
return "UNKNOWN_STATUS";
}
}
} // namespace grpc_observability

@ -0,0 +1,114 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef OBSERVABILITY_MAIN_H
#define OBSERVABILITY_MAIN_H
#include <stdint.h>
#include <algorithm>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include <grpc/status.h>
#include "src/python/grpcio_observability/grpc_observability/constants.h"
#include "src/python/grpcio_observability/grpc_observability/python_census_context.h"
namespace grpc_observability {
struct CensusData {
DataType type;
std::vector<Label> labels;
// TODO(xuanwn): We can use union here
SpanCensusData span_data;
Measurement measurement_data;
CensusData() {}
CensusData(const Measurement& mm, const std::vector<Label>& labels)
: type(kMetricData), labels(std::move(labels)), measurement_data(mm) {}
CensusData(const SpanCensusData& sd) : type(kSpanData), span_data(sd) {}
};
struct CloudMonitoring {
CloudMonitoring() {}
};
struct CloudTrace {
float sampling_rate = 0.0;
CloudTrace() {}
CloudTrace(double sr) : sampling_rate(sr) {}
};
struct CloudLogging {
CloudLogging() {}
};
struct GcpObservabilityConfig {
CloudMonitoring cloud_monitoring;
CloudTrace cloud_trace;
CloudLogging cloud_logging;
std::string project_id;
std::vector<Label> labels;
bool is_valid;
GcpObservabilityConfig() : is_valid(false) {}
GcpObservabilityConfig(bool valid) : is_valid(true) {}
GcpObservabilityConfig(CloudMonitoring cloud_monitoring,
CloudTrace cloud_trace, CloudLogging cloud_logging,
const std::string& project_id,
const std::vector<Label>& labels)
: cloud_monitoring(cloud_monitoring),
cloud_trace(cloud_trace),
cloud_logging(cloud_logging),
project_id(project_id),
labels(labels),
is_valid(true) {}
};
// extern is required for Cython
extern std::queue<CensusData>* g_census_data_buffer;
extern std::mutex g_census_data_buffer_mutex;
extern std::condition_variable g_census_data_buffer_cv;
void* CreateClientCallTracer(const char* method, const char* trace_id,
const char* parent_span_id);
void* CreateServerCallTracerFactory();
void NativeObservabilityInit();
void AwaitNextBatchLocked(std::unique_lock<std::mutex>& lock, int timeout_ms);
void AddCensusDataToBuffer(const CensusData& buffer);
void RecordIntMetric(MetricsName name, int64_t value,
const std::vector<Label>& labels);
void RecordDoubleMetric(MetricsName name, double value,
const std::vector<Label>& labels);
void RecordSpan(const SpanCensusData& span_census_data);
GcpObservabilityConfig ReadAndActivateObservabilityConfig();
absl::string_view StatusCodeToString(grpc_status_code code);
} // namespace grpc_observability
#endif // OBSERVABILITY_MAIN_H

@ -0,0 +1,278 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/python/grpcio_observability/grpc_observability/python_census_context.h"
#include <string.h>
#include <iomanip>
#include <iostream>
#include <new>
#include "absl/numeric/int128.h"
#include "absl/random/random.h"
#include "absl/strings/escaping.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/filters/census/rpc_encoding.h"
namespace grpc_observability {
void EnablePythonCensusStats(bool enable) {
g_python_census_stats_enabled = enable;
}
void EnablePythonCensusTracing(bool enable) {
g_python_census_tracing_enabled = enable;
}
bool PythonCensusStatsEnabled() {
return g_python_census_stats_enabled.load(std::memory_order_relaxed);
}
bool PythonCensusTracingEnabled() {
return g_python_census_tracing_enabled.load(std::memory_order_relaxed);
}
void GenerateClientContext(absl::string_view method, absl::string_view trace_id,
absl::string_view parent_span_id,
PythonCensusContext* context) {
// Destruct the current CensusContext to free the Span memory before
// overwriting it below.
context->~PythonCensusContext();
if (method.empty()) {
new (context) PythonCensusContext();
return;
}
if (!parent_span_id.empty()) {
// Note that parent_span_id exist also means it was marked as sampled at
// Python OC, we'll respect that decision.
SpanContext parent_context =
SpanContext(std::string(trace_id), std::string(parent_span_id), true);
new (context) PythonCensusContext(method, parent_context);
return;
}
// Create span without parent.
new (context) PythonCensusContext(method, trace_id);
}
void GenerateServerContext(absl::string_view header, absl::string_view method,
PythonCensusContext* context) {
// Destruct the current CensusContext to free the Span memory before
// overwriting it below.
context->~PythonCensusContext();
if (method.empty()) {
new (context) PythonCensusContext();
return;
}
SpanContext parent_ctx = FromGrpcTraceBinHeader(header);
if (parent_ctx.IsValid()) {
new (context) PythonCensusContext(method, parent_ctx);
} else {
new (context) PythonCensusContext(method);
}
}
void ToGrpcTraceBinHeader(const PythonCensusContext& ctx, uint8_t* out) {
out[kVersionOfs] = kVersionId;
out[kTraceIdOfs] = kTraceIdField;
uint8_t trace_options_rep_[kSizeTraceOptions];
std::string trace_id =
absl::HexStringToBytes(absl::string_view(ctx.SpanContext().TraceId()));
std::string span_id =
absl::HexStringToBytes(absl::string_view(ctx.SpanContext().SpanId()));
trace_options_rep_[0] = ctx.SpanContext().IsSampled() ? 1 : 0;
memcpy(reinterpret_cast<uint8_t*>(&out[kTraceIdOfs + 1]), trace_id.c_str(),
kSizeTraceID);
out[kSpanIdOfs] = kSpanIdField;
memcpy(reinterpret_cast<uint8_t*>(&out[kSpanIdOfs + 1]), span_id.c_str(),
kSizeSpanID);
out[kTraceOptionsOfs] = kTraceOptionsField;
memcpy(reinterpret_cast<uint8_t*>(&out[kTraceOptionsOfs + 1]),
trace_options_rep_, kSizeTraceOptions);
}
SpanContext FromGrpcTraceBinHeader(absl::string_view header) {
if (header.size() < kGrpcTraceBinHeaderLen ||
header[kVersionOfs] != kVersionId ||
header[kTraceIdOfs] != kTraceIdField ||
header[kSpanIdOfs] != kSpanIdField ||
header[kTraceOptionsOfs] != kTraceOptionsField) {
return SpanContext(); // Invalid.
}
uint8_t options = header[kTraceOptionsOfs + 1] & 1;
constexpr uint8_t kIsSampled = 1;
uint8_t trace_id_rep_[kTraceIdSize];
memcpy(trace_id_rep_,
reinterpret_cast<const uint8_t*>(&header[kTraceIdOfs + 1]),
kTraceIdSize);
uint8_t span_id_rep_[kSpanIdSize];
memcpy(span_id_rep_,
reinterpret_cast<const uint8_t*>(&header[kSpanIdOfs + 1]),
kSpanIdSize);
uint8_t trace_option_rep_[kTraceOptionsLen];
memcpy(trace_option_rep_, &options, kTraceOptionsLen);
SpanContext context(
absl::BytesToHexString(absl::string_view(
reinterpret_cast<const char*>(trace_id_rep_), kTraceIdSize)),
absl::BytesToHexString(absl::string_view(
reinterpret_cast<const char*>(span_id_rep_), kSpanIdSize)),
trace_option_rep_[0] & kIsSampled);
return context;
}
size_t TraceContextSerialize(const PythonCensusContext& context,
char* tracing_buf, size_t tracing_buf_size) {
if (tracing_buf_size < kGrpcTraceBinHeaderLen) {
return 0;
}
ToGrpcTraceBinHeader(context, reinterpret_cast<uint8_t*>(tracing_buf));
return kGrpcTraceBinHeaderLen;
}
size_t StatsContextSerialize(size_t /*max_tags_len*/, grpc_slice* /*tags*/) {
return 0;
}
size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
uint64_t* server_elapsed_time) {
return grpc::internal::RpcServerStatsEncoding::Decode(
absl::string_view(buf, buf_size), server_elapsed_time);
}
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
size_t buf_size) {
return grpc::internal::RpcServerStatsEncoding::Encode(server_elapsed_time,
buf, buf_size);
}
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) {
return final_info->stats.transport_stream_stats.incoming.data_bytes;
}
uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) {
return final_info->stats.transport_stream_stats.outgoing.data_bytes;
}
namespace {
// span_id is a 16-character hexadecimal encoded string.
std::string GenerateSpanId() {
uint64_t span_id = absl::Uniform<uint64_t>(absl::BitGen());
std::stringstream hex_string;
hex_string << std::setfill('0') << std::setw(16) << std::hex << span_id;
return std::string(hex_string.str());
}
// trace_id is a 32-character hexadecimal encoded string
std::string GenerateTraceId() {
absl::uint128 trace_id = absl::Uniform<absl::uint128>(absl::BitGen());
std::stringstream hex_string;
hex_string << std::setfill('0') << std::setw(32) << std::hex << trace_id;
return std::string(hex_string.str());
}
} // namespace
//
// Span
//
Span Span::StartSpan(absl::string_view name, const Span* parent) {
std::string span_id = GenerateSpanId();
std::string trace_id;
std::string parent_span_id;
bool should_sample;
auto start_time = absl::Now();
if (parent != nullptr) {
parent_span_id = parent->Context().SpanId();
trace_id = parent->Context().TraceId();
should_sample = parent->Context().IsSampled();
} else {
trace_id = GenerateTraceId();
should_sample = ShouldSample(trace_id);
}
SpanContext context = SpanContext(trace_id, span_id, should_sample);
return Span(std::string(name), parent_span_id, start_time, context);
}
Span Span::StartSpan(absl::string_view name,
const SpanContext& parent_context) {
std::string trace_id = parent_context.TraceId();
std::string parent_span_id = parent_context.SpanId();
std::string span_id = GenerateSpanId();
bool should_sample = parent_context.IsSampled();
auto start_time = absl::Now();
SpanContext context(trace_id, span_id, should_sample);
return Span(std::string(name), parent_span_id, start_time, context);
}
Span Span::StartSpan(absl::string_view name, absl::string_view trace_id) {
std::string span_id = GenerateSpanId();
auto start_time = absl::Now();
bool should_sample = ShouldSample(std::string(trace_id));
SpanContext context(std::string(trace_id), span_id, should_sample);
return Span(std::string(name), "", start_time, context);
}
void Span::SetStatus(absl::string_view status) {
status_ = std::string(status);
}
void Span::AddAttribute(absl::string_view key, absl::string_view value) {
span_labels_.emplace_back(std::string(key), std::string(value));
}
void Span::AddAnnotation(absl::string_view description) {
// Need a string format which can be converted to Python datetime.datetime
// class directly.
std::string time_stamp =
absl::FormatTime("%Y-%m-%d %H:%M:%E3S", absl::Now(), absl::UTCTimeZone());
span_annotations_.emplace_back(
Annotation{time_stamp, std::string(description)});
}
SpanCensusData Span::ToCensusData() const {
SpanCensusData census_data;
absl::TimeZone utc = absl::UTCTimeZone();
census_data.name = name_;
// Need a string format which can be exported to StackDriver directly.
// See format details:
// https://cloud.google.com/trace/docs/reference/v2/rest/v2/projects.traces/batchWrite
census_data.start_time =
absl::FormatTime("%Y-%m-%dT%H:%M:%E6SZ", start_time_, utc);
census_data.end_time =
absl::FormatTime("%Y-%m-%dT%H:%M:%E6SZ", end_time_, utc);
census_data.trace_id = Context().TraceId();
census_data.span_id = Context().SpanId();
census_data.should_sample = Context().IsSampled();
census_data.parent_span_id = parent_span_id_;
census_data.status = status_;
census_data.span_labels = span_labels_;
census_data.span_annotations = span_annotations_;
census_data.child_span_count = child_span_count_;
return census_data;
}
} // namespace grpc_observability

@ -0,0 +1,327 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PYRHON_OPENCENSUS_H
#define GRPC_PYRHON_OPENCENSUS_H
#include <stddef.h>
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/slice.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/python/grpcio_observability/grpc_observability/constants.h"
#include "src/python/grpcio_observability/grpc_observability/sampler.h"
namespace grpc_observability {
namespace {
std::atomic<bool> g_python_census_stats_enabled(false);
std::atomic<bool> g_python_census_tracing_enabled(false);
} // namespace
// Enables/Disables Python census stats/tracing. It's only safe to do at the
// start of a program, before any channels/servers are built.
void EnablePythonCensusStats(bool enable);
void EnablePythonCensusTracing(bool enable);
// Gets the current status of Python OpenCensus stats/tracing
bool PythonCensusStatsEnabled();
bool PythonCensusTracingEnabled();
static constexpr size_t kTraceIdSize = 16;
static constexpr size_t kSpanIdSize = 8;
constexpr uint8_t kVersionId = 0;
constexpr uint8_t kTraceIdField = 0;
constexpr uint8_t kSpanIdField = 1;
constexpr uint8_t kTraceOptionsField = 2;
constexpr int kVersionLen = 1;
constexpr int kTraceIdLen = 16;
constexpr int kSpanIdLen = 8;
constexpr int kTraceOptionsLen = 1;
constexpr int kVersionOfs = 0;
constexpr int kTraceIdOfs = 1;
constexpr int kSpanIdOfs = kTraceIdOfs + 1 + kTraceIdLen;
constexpr int kTraceOptionsOfs = kSpanIdOfs + 1 + kSpanIdLen;
static constexpr size_t kSizeTraceID = 16;
static constexpr size_t kSizeSpanID = 8;
static constexpr size_t kSizeTraceOptions = 1;
// The length of the grpc-trace-bin value:
// 1 (version)
// + 1 (trace_id field)
// + 16 (length of trace_id)
// + 1 (span_id field)
// + 8 (span_id length)
// + 1 (trace_options field)
// + 1 (trace_options length)
// ----
// 29
constexpr int kGrpcTraceBinHeaderLen =
kVersionLen + 1 + kTraceIdLen + 1 + kSpanIdLen + 1 + kTraceOptionsLen;
struct Tag {
std::string key;
std::string value;
};
struct Label {
Label() {}
Label(std::string k, std::string v) : key(k), value(v) {}
std::string key;
std::string value;
};
union MeasurementValue {
double value_double;
int64_t value_int;
};
struct Measurement {
MetricsName name;
MeasurementType type;
MeasurementValue value;
};
struct Annotation {
std::string time_stamp;
std::string description;
};
struct SpanCensusData {
std::string name;
std::string start_time;
std::string end_time;
std::string trace_id;
std::string span_id;
std::string parent_span_id;
std::string status;
std::vector<Label> span_labels;
std::vector<Annotation> span_annotations;
int64_t child_span_count;
bool should_sample;
};
// SpanContext is associated with span to help manage the current context of a
// span. It's created when creating a new Span and will be destroyed together
// with associated Span.
class SpanContext final {
public:
SpanContext() : is_valid_(false) {}
SpanContext(const std::string& trace_id, const std::string& span_id,
bool should_sample)
: trace_id_(trace_id),
span_id_(span_id),
should_sample_(should_sample),
is_valid_(true) {}
// Returns the TraceId associated with this SpanContext.
std::string TraceId() const { return trace_id_; }
// Returns the SpanId associated with this SpanContext.
std::string SpanId() const { return span_id_; }
bool IsSampled() const { return should_sample_; }
bool IsValid() const { return is_valid_; }
private:
std::string trace_id_;
std::string span_id_;
bool should_sample_;
bool is_valid_;
};
// Span is associated with PythonCensusContext to help manage tracing related
// data. It's created by calling StartSpan and will be destroyed together with
// associated PythonCensusContext.
class Span final {
public:
explicit Span(const std::string& name, const std::string& parent_span_id,
absl::Time start_time, const SpanContext& context)
: name_(name),
parent_span_id_(parent_span_id),
start_time_(start_time),
context_(context) {}
void End() { end_time_ = absl::Now(); }
void IncreaseChildSpanCount() { ++child_span_count_; }
static Span StartSpan(absl::string_view name, const Span* parent);
static Span StartSpan(absl::string_view name,
const SpanContext& parent_context);
static Span StartSpan(absl::string_view name, absl::string_view trace_id);
static Span BlankSpan() { return StartSpan("", ""); }
const SpanContext& Context() const { return context_; }
void SetStatus(absl::string_view status);
void AddAttribute(absl::string_view key, absl::string_view value);
void AddAnnotation(absl::string_view description);
SpanCensusData ToCensusData() const;
private:
static bool ShouldSample(const std::string& trace_id) {
return ProbabilitySampler::Get().ShouldSample(trace_id);
}
std::string name_;
std::string parent_span_id_;
absl::Time start_time_;
absl::Time end_time_;
std::string status_;
std::vector<Label> span_labels_;
std::vector<Annotation> span_annotations_;
SpanContext context_;
uint64_t child_span_count_ = 0;
};
// PythonCensusContext is associated with each clientCallTrcer,
// clientCallAttemptTracer and ServerCallTracer to help manage the span,
// spanContext and labels for each tracer. Craete a new PythonCensusContext will
// always reasult in creating a new span (and a new SpanContext for that span).
// It's created during callTraceer initialization and will be destroyed after
// the destruction of each callTracer.
class PythonCensusContext {
public:
PythonCensusContext() : span_(Span::BlankSpan()), labels_({}) {}
explicit PythonCensusContext(absl::string_view name)
: span_(Span::StartSpan(name, nullptr)), labels_({}) {}
PythonCensusContext(absl::string_view name, absl::string_view trace_id)
: span_(Span::StartSpan(name, trace_id)), labels_({}) {}
PythonCensusContext(absl::string_view name, const SpanContext& parent_context)
: span_(Span::StartSpan(name, parent_context)), labels_({}) {}
PythonCensusContext(absl::string_view name, const Span* parent,
const std::vector<Label>& labels)
: span_(Span::StartSpan(name, parent)), labels_(labels) {}
// For attempt Spans only
PythonCensusContext(absl::string_view name, const Span* parent)
: span_(Span::StartSpan(name, parent)), labels_({}) {}
Span& Span() { return span_; }
std::vector<Label>& Labels() { return labels_; } // Only used for metrics
const SpanContext& SpanContext() const { return span_.Context(); }
void AddSpanAttribute(absl::string_view key, absl::string_view attribute) {
span_.AddAttribute(key, attribute);
}
void AddSpanAnnotation(absl::string_view description) {
span_.AddAnnotation(description);
}
void IncreaseChildSpanCount() { span_.IncreaseChildSpanCount(); }
void EndSpan() { Span().End(); }
private:
grpc_observability::Span span_;
std::vector<Label> labels_;
};
// Creates a new client context that is by default a new root context.
// If the current context is the default context then the newly created
// span automatically becomes a root span. This should only be called with a
// blank CensusContext as it overwrites it.
void GenerateClientContext(absl::string_view method, absl::string_view trace_id,
absl::string_view parent_span_id,
PythonCensusContext* context);
// Deserialize the incoming SpanContext and generate a new server context based
// on that. This new span will never be a root span. This should only be called
// with a blank CensusContext as it overwrites it.
void GenerateServerContext(absl::string_view header, absl::string_view method,
PythonCensusContext* context);
inline absl::string_view GetMethod(const char* method) {
if (std::string(method).empty()) {
return "";
}
// Check for leading '/' and trim it if present.
return absl::StripPrefix(absl::string_view(method), "/");
}
// Fills a pre-allocated buffer with the value for the grpc-trace-bin header.
// The buffer must be at least kGrpcTraceBinHeaderLen bytes long.
void ToGrpcTraceBinHeader(const PythonCensusContext& ctx, uint8_t* out);
// Parses the value of the binary grpc-trace-bin header, returning a
// SpanContext. If parsing fails, IsValid will be false.
//
// Example value, hex encoded:
// 00 (version)
// 00 (trace_id field)
// 12345678901234567890123456789012 (trace_id)
// 01 (span_id field)
// 0000000000003039 (span_id)
// 02 (trace_options field)
// 01 (options: enabled)
//
// See also:
// https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding.md
SpanContext FromGrpcTraceBinHeader(absl::string_view header);
// Serializes the outgoing trace context. tracing_buf must be
// opencensus::trace::propagation::kGrpcTraceBinHeaderLen bytes long.
size_t TraceContextSerialize(const PythonCensusContext& context,
char* tracing_buf, size_t tracing_buf_size);
// Serializes the outgoing stats context. Field IDs are 1 byte followed by
// field data. A 1 byte version ID is always encoded first. Tags are directly
// serialized into the given grpc_slice.
size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags);
// Deserialize incoming server stats. Returns the number of bytes deserialized.
size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
uint64_t* server_elapsed_time);
// Serialize outgoing server stats. Returns the number of bytes serialized.
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
size_t buf_size);
// Returns the incoming data size from the grpc call final info.
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info);
// Returns the outgoing data size from the grpc call final info.
uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info);
} // namespace grpc_observability
#endif // GRPC_PYRHON_OPENCENSUS_H

@ -0,0 +1,69 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/python/grpcio_observability/grpc_observability/sampler.h"
#include <cmath>
#include <cstdint>
#include "absl/strings/escaping.h"
namespace grpc_observability {
namespace {
// Converts a probability in [0, 1] to a threshold in [0, UINT64_MAX].
uint64_t CalculateThreshold(double probability) {
if (probability <= 0.0) return 0;
if (probability >= 1.0) return UINT64_MAX;
// We can't directly return probability * UINT64_MAX.
//
// UINT64_MAX is (2^64)-1, but as a double rounds up to 2^64.
// For probabilities >= 1-(2^-54), the product wraps to zero!
// Instead, calculate the high and low 32 bits separately.
const double product = UINT32_MAX * probability;
double hi_bits, lo_bits = ldexp(modf(product, &hi_bits), 32) + product;
return (static_cast<uint64_t>(hi_bits) << 32) +
static_cast<uint64_t>(lo_bits);
}
uint64_t CalculateThresholdFromBuffer(const std::string& trace_id) {
const std::string trace_id_bytes = absl::HexStringToBytes(trace_id);
const uint8_t* buf = reinterpret_cast<const uint8_t*>(trace_id_bytes.c_str());
uint64_t res = 0;
for (int i = 0; i < 8; ++i) {
res |= (static_cast<uint64_t>(buf[i]) << (i * 8));
}
return res;
}
} // namespace
ProbabilitySampler& ProbabilitySampler::Get() {
static ProbabilitySampler* sampler = new ProbabilitySampler;
return *sampler;
}
void ProbabilitySampler::SetThreshold(double probability) {
uint64_t threshold = CalculateThreshold(probability);
threshold_ = threshold;
}
bool ProbabilitySampler::ShouldSample(const std::string& trace_id) {
if (threshold_ == 0 || trace_id.length() < 32) return false;
// All Spans within the same Trace will get the same sampling decision, so
// full trees of Spans will be sampled.
return CalculateThresholdFromBuffer(trace_id) <= threshold_;
}
} // namespace grpc_observability

@ -0,0 +1,43 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef SAMPLER_MAIN_H
#define SAMPLER_MAIN_H
#include <cstdint>
#include <string>
namespace grpc_observability {
// Returns true or false for sampling based on the given probability. Objects of
// this class should be cached between uses because there is a cost to
// constructing them.
class ProbabilitySampler final {
public:
static ProbabilitySampler& Get();
bool ShouldSample(const std::string& trace_id);
void SetThreshold(double probability);
private:
ProbabilitySampler() = default;
// Probability is converted to a value between [0, UINT64_MAX].
uint64_t threshold_;
};
} // namespace grpc_observability
#endif // SAMPLER_MAIN_H

@ -0,0 +1,248 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/python/grpcio_observability/grpc_observability/server_call_tracer.h"
// TODO(xuanwn): clean up includes
#include <grpc/support/port_platform.h>
#include <constants.h>
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <initializer_list>
#include <string>
#include <utility>
#include <vector>
#include "absl/strings/escaping.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/python/grpcio_observability/grpc_observability/observability_util.h"
#include "src/python/grpcio_observability/grpc_observability/python_census_context.h"
namespace grpc_observability {
namespace {
// server metadata elements
struct ServerO11yMetadata {
grpc_core::Slice path;
grpc_core::Slice tracing_slice;
grpc_core::Slice census_proto;
};
void GetO11yMetadata(const grpc_metadata_batch* b, ServerO11yMetadata* som) {
const auto* path = b->get_pointer(grpc_core::HttpPathMetadata());
if (path != nullptr) {
som->path = path->Ref();
}
if (PythonCensusTracingEnabled()) {
const auto* grpc_trace_bin =
b->get_pointer(grpc_core::GrpcTraceBinMetadata());
if (grpc_trace_bin != nullptr) {
som->tracing_slice = grpc_trace_bin->Ref();
}
}
if (PythonCensusStatsEnabled()) {
const auto* grpc_tags_bin =
b->get_pointer(grpc_core::GrpcTagsBinMetadata());
if (grpc_tags_bin != nullptr) {
som->census_proto = grpc_tags_bin->Ref();
}
}
}
} // namespace
//
// PythonOpenCensusServerCallTracer
//
class PythonOpenCensusServerCallTracer : public grpc_core::ServerCallTracer {
public:
// Maximum size of server stats that are sent on the wire.
static constexpr uint32_t kMaxServerStatsLen = 16;
PythonOpenCensusServerCallTracer()
: start_time_(absl::Now()),
recv_message_count_(0),
sent_message_count_(0) {}
std::string TraceId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().TraceId()));
}
std::string SpanId() override {
return absl::BytesToHexString(
absl::string_view(context_.SpanContext().SpanId()));
}
bool IsSampled() override { return context_.SpanContext().IsSampled(); }
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
// It's not a requirement to have this metric thus left unimplemented.
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) override;
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
++sent_message_count_;
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;
void RecordReceivedMessage(
const grpc_core::SliceBuffer& recv_message) override {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
++recv_message_count_;
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {
elapsed_time_ = absl::Now() - start_time_;
}
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordAnnotation(absl::string_view annotation) override {
context_.AddSpanAnnotation(annotation);
}
private:
PythonCensusContext context_;
// server method
grpc_core::Slice path_;
absl::string_view method_;
absl::Time start_time_;
absl::Duration elapsed_time_;
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference it when adding metadata to
// response.
char stats_buf_[kMaxServerStatsLen];
};
void PythonOpenCensusServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
ServerO11yMetadata som;
GetO11yMetadata(recv_initial_metadata, &som);
path_ = std::move(som.path);
method_ = GetMethod(path_);
auto tracing_enabled = PythonCensusTracingEnabled();
GenerateServerContext(
tracing_enabled ? som.tracing_slice.as_string_view() : "",
absl::StrCat("Recv.", method_), &context_);
if (PythonCensusStatsEnabled()) {
context_.Labels().emplace_back(kServerMethod, std::string(method_));
RecordIntMetric(kRpcServerStartedRpcsMeasureName, 1, context_.Labels());
}
}
void PythonOpenCensusServerCallTracer::RecordSendTrailingMetadata(
grpc_metadata_batch* send_trailing_metadata) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
if (PythonCensusStatsEnabled() && send_trailing_metadata != nullptr) {
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
send_trailing_metadata->Set(
grpc_core::GrpcServerStatsBinMetadata(),
grpc_core::Slice::FromCopiedBuffer(stats_buf_, len));
}
}
}
void PythonOpenCensusServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
if (PythonCensusStatsEnabled()) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
context_.Labels().emplace_back(kServerMethod, std::string(method_));
context_.Labels().emplace_back(
kServerStatus,
std::string(StatusCodeToString(final_info->final_status)));
RecordDoubleMetric(kRpcServerSentBytesPerRpcMeasureName,
static_cast<double>(response_size), context_.Labels());
RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
static_cast<double>(request_size), context_.Labels());
RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_ms,
context_.Labels());
RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,
sent_message_count_, context_.Labels());
RecordIntMetric(kRpcServerReceivedMessagesPerRpcMeasureName,
recv_message_count_, context_.Labels());
}
if (PythonCensusTracingEnabled()) {
context_.EndSpan();
if (IsSampled()) {
RecordSpan(context_.Span().ToCensusData());
}
}
// After RecordEnd, Core will make no further usage of this ServerCallTracer,
// so we are free it here.
delete this;
}
//
// PythonOpenCensusServerCallTracerFactory
//
grpc_core::ServerCallTracer*
PythonOpenCensusServerCallTracerFactory::CreateNewServerCallTracer(
grpc_core::Arena* arena) {
// We don't use arena here to to ensure that memory is allocated and freed in
// the same DLL in Windows.
(void)arena;
return new PythonOpenCensusServerCallTracer();
}
} // namespace grpc_observability

@ -0,0 +1,46 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_PYRHON_OPENCENSUS_SERVER_CALL_TRACER_H
#define GRPC_PYRHON_OPENCENSUS_SERVER_CALL_TRACER_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
namespace grpc_observability {
class PythonOpenCensusServerCallTracerFactory
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
grpc_core::Arena* arena) override;
};
inline absl::string_view GetMethod(const grpc_core::Slice& path) {
if (path.empty()) {
return "";
}
// Check for leading '/' and trim it if present.
return absl::StripPrefix(path.as_string_view(), "/");
}
} // namespace grpc_observability
#endif // GRPC_PYRHON_OPENCENSUS_SERVER_CALL_TRACER_H

@ -0,0 +1,30 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
package(default_visibility = ["//visibility:public"])
py_test(
name = "_observability_test",
size = "small",
srcs = glob(["*.py"]),
imports = ["../../"],
main = "_observability_test.py",
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_observability/grpc_observability:pyobservability",
"//src/python/grpcio_tests/tests/testing",
requirement("opencensus"),
],
)

@ -0,0 +1,401 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
import json
import logging
import os
import random
from typing import Any, Dict, List
import unittest
import grpc
import grpc_observability
from grpc_observability import _cyobservability
from grpc_observability import _observability
logger = logging.getLogger(__name__)
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x00'
_UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
STREAM_LENGTH = 5
CONFIG_ENV_VAR_NAME = 'GRPC_GCP_OBSERVABILITY_CONFIG'
CONFIG_FILE_ENV_VAR_NAME = 'GRPC_GCP_OBSERVABILITY_CONFIG_FILE'
_VALID_CONFIG_TRACING_STATS = {
'project_id': 'test-project',
'cloud_trace': {
'sampling_rate': 1.00
},
'cloud_monitoring': {}
}
_VALID_CONFIG_TRACING_ONLY = {
'project_id': 'test-project',
'cloud_trace': {
'sampling_rate': 1.00
},
}
_VALID_CONFIG_STATS_ONLY = {
'project_id': 'test-project',
'cloud_monitoring': {}
}
_VALID_CONFIG_STATS_ONLY_STR = """
{
'project_id': 'test-project',
'cloud_monitoring': {}
}
"""
# Depends on grpc_core::IsTransportSuppliesClientLatencyEnabled,
# the following metrcis might not exist.
_SKIP_VEFIRY = [_cyobservability.MetricsName.CLIENT_TRANSPORT_LATENCY]
_SPAN_PREFIXS = ['Recv', 'Sent', 'Attempt']
class TestExporter(_observability.Exporter):
def __init__(self, metrics: List[_observability.StatsData],
spans: List[_observability.TracingData]):
self.span_collecter = spans
self.metric_collecter = metrics
self._server = None
def export_stats_data(self,
stats_data: List[_observability.StatsData]) -> None:
self.metric_collecter.extend(stats_data)
def export_tracing_data(
self, tracing_data: List[_observability.TracingData]) -> None:
self.span_collecter.extend(tracing_data)
def handle_unary_unary(request, servicer_context):
return _RESPONSE
def handle_unary_stream(request, servicer_context):
for _ in range(STREAM_LENGTH):
yield _RESPONSE
def handle_stream_unary(request_iterator, servicer_context):
return _RESPONSE
def handle_stream_stream(request_iterator, servicer_context):
for request in request_iterator:
yield _RESPONSE
class _MethodHandler(grpc.RpcMethodHandler):
def __init__(self, request_streaming, response_streaming):
self.request_streaming = request_streaming
self.response_streaming = response_streaming
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
self.stream_stream = lambda x, y: handle_stream_stream(x, y)
elif self.request_streaming:
self.stream_unary = lambda x, y: handle_stream_unary(x, y)
elif self.response_streaming:
self.unary_stream = lambda x, y: handle_unary_stream(x, y)
else:
self.unary_unary = lambda x, y: handle_unary_unary(x, y)
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _UNARY_UNARY:
return _MethodHandler(False, False)
elif handler_call_details.method == _UNARY_STREAM:
return _MethodHandler(False, True)
elif handler_call_details.method == _STREAM_UNARY:
return _MethodHandler(True, False)
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True)
else:
return None
class ObservabilityTest(unittest.TestCase):
def setUp(self):
self.all_metric = []
self.all_span = []
self.test_exporter = TestExporter(self.all_metric, self.all_span)
self._server = None
self._port = None
def tearDown(self):
os.environ[CONFIG_ENV_VAR_NAME] = ''
os.environ[CONFIG_FILE_ENV_VAR_NAME] = ''
if self._server:
self._server.stop(0)
def testRecordUnaryUnary(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertGreater(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
def testThrowErrorWithoutConfig(self):
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
pass
def testThrowErrorWithInvalidConfig(self):
_INVALID_CONFIG = 'INVALID'
self._set_config_file(_INVALID_CONFIG)
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
pass
def testNoErrorAndDataWithEmptyConfig(self):
_EMPTY_CONFIG = {}
self._set_config_file(_EMPTY_CONFIG)
# Empty config still require project_id
os.environ['GCP_PROJECT'] = 'test-project'
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertEqual(len(self.all_metric), 0)
self.assertEqual(len(self.all_span), 0)
def testThrowErrorWhenCallingMultipleInit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter) as o11y:
grpc._observability.observability_init(o11y)
def testRecordUnaryUnaryStatsOnly(self):
self._set_config_file(_VALID_CONFIG_STATS_ONLY)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertEqual(len(self.all_span), 0)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testRecordUnaryUnaryTracingOnly(self):
self._set_config_file(_VALID_CONFIG_TRACING_ONLY)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertEqual(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_spans(self.all_span)
def testRecordUnaryStream(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_stream_call()
self.assertGreater(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
def testRecordStreamUnary(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.stream_unary_call()
self.assertTrue(len(self.all_metric) > 0)
self.assertTrue(len(self.all_span) > 0)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
def testRecordStreamStream(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.stream_stream_call()
self.assertGreater(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
def testNoRecordBeforeInit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
self._start_server()
self.unary_unary_call()
self.assertEqual(len(self.all_metric), 0)
self.assertEqual(len(self.all_span), 0)
self._server.stop(0)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertGreater(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
def testNoRecordAfterExit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertGreater(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
current_metric_len = len(self.all_metric)
current_spans_len = len(self.all_span)
self._validate_metrics(self.all_metric)
self._validate_spans(self.all_span)
self.unary_unary_call()
self.assertEqual(len(self.all_metric), current_metric_len)
self.assertEqual(len(self.all_span), current_spans_len)
def testTraceSamplingRate(self):
# Make 40 UnaryCall's
# With 50% sampling rate, we should get 10-30 traces with >99.93% probability
# Each trace will have three span (Send, Recv, Attempt)
_CALLS = 40
_LOWER_BOUND = 10 * 3
_HIGHER_BOUND = 30 * 3
_VALID_CONFIG_TRACING_ONLY_SAMPLE_HALF = {
'project_id': 'test-project',
'cloud_trace': {
'sampling_rate': 0.5
},
}
self._set_config_file(_VALID_CONFIG_TRACING_ONLY_SAMPLE_HALF)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
for _ in range(_CALLS):
self.unary_unary_call()
self.assertEqual(len(self.all_metric), 0)
self.assertGreaterEqual(len(self.all_span), _LOWER_BOUND)
self.assertLessEqual(len(self.all_span), _HIGHER_BOUND)
self._validate_spans(self.all_span)
def testConfigFileOverEnvVar(self):
# env var have only stats enabled
os.environ[CONFIG_ENV_VAR_NAME] = _VALID_CONFIG_STATS_ONLY_STR
# config_file have only tracing enabled
self._set_config_file(_VALID_CONFIG_TRACING_ONLY)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter):
self._start_server()
self.unary_unary_call()
self.assertEqual(len(self.all_metric), 0)
self.assertGreater(len(self.all_span), 0)
self._validate_spans(self.all_span)
def _set_config_file(self, config: Dict[str, Any]) -> None:
# Using random name here so multiple tests can run with different config files.
config_file_path = '/tmp/' + str(random.randint(0, 100000))
with open(config_file_path, 'w', encoding='utf-8') as f:
f.write(json.dumps(config))
os.environ[CONFIG_FILE_ENV_VAR_NAME] = config_file_path
def unary_unary_call(self):
with grpc.insecure_channel(f'localhost:{self._port}') as channel:
multi_callable = channel.unary_unary(_UNARY_UNARY)
unused_response, call = multi_callable.with_call(_REQUEST)
def unary_stream_call(self):
with grpc.insecure_channel(f'localhost:{self._port}') as channel:
multi_callable = channel.unary_stream(_UNARY_STREAM)
call = multi_callable(_REQUEST)
for _ in call:
pass
def stream_unary_call(self):
with grpc.insecure_channel(f'localhost:{self._port}') as channel:
multi_callable = channel.stream_unary(_STREAM_UNARY)
unused_response, call = multi_callable.with_call(
iter([_REQUEST] * STREAM_LENGTH))
def stream_stream_call(self):
with grpc.insecure_channel(f'localhost:{self._port}') as channel:
multi_callable = channel.stream_stream(_STREAM_STREAM)
call = multi_callable(iter([_REQUEST] * STREAM_LENGTH))
for _ in call:
pass
def _start_server(self) -> None:
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self._server.add_generic_rpc_handlers((_GenericHandler(),))
self._port = self._server.add_insecure_port('[::]:0')
self._server.start()
def _validate_metrics(self,
metrics: List[_observability.StatsData]) -> None:
metric_names = set(metric.name for metric in metrics)
for name in _cyobservability.MetricsName:
if name in _SKIP_VEFIRY:
continue
if name not in metric_names:
logger.error('metric %s not found in exported metrics: %s!',
name, metric_names)
self.assertTrue(name in metric_names)
def _validate_spans(self,
tracing_data: List[_observability.TracingData]) -> None:
span_names = set(data.name for data in tracing_data)
for prefix in _SPAN_PREFIXS:
prefix_exist = any(prefix in name for name in span_names)
if not prefix_exist:
logger.error(
'missing span with prefix %s in exported spans: %s!',
prefix, span_names)
self.assertTrue(prefix_exist)
if __name__ == "__main__":
logging.basicConfig()
unittest.main(verbosity=2)

@ -28,6 +28,7 @@ DIRS=(
'src/python/grpcio_reflection/grpc_reflection'
'src/python/grpcio_testing/grpc_testing'
'src/python/grpcio_status/grpc_status'
'src/python/grpcio_observability/grpc_observability'
'tools/run_tests/xds_k8s_test_driver/bin'
'tools/run_tests/xds_k8s_test_driver/framework'
)

@ -16,7 +16,7 @@
set -e
# directories to run against
DIRS="examples/cpp examples/android/binder src/core/lib src/core/tsi src/core/ext src/cpp test/core test/cpp include src/compiler src/ruby src/objective-c tools/distrib/python"
DIRS="examples/cpp examples/android/binder src/core/lib src/core/tsi src/core/ext src/cpp test/core test/cpp include src/compiler src/ruby src/objective-c tools/distrib/python src/python/grpcio_observability"
# file matching patterns to check
GLOB="*.h *.c *.cc *.m *.mm"

@ -74,6 +74,7 @@ export ENABLED_MODULES='
src/core/ext
src/core/lib
src/cpp
src/python/grpcio_observability
test/core
fuzztest
'

Loading…
Cancel
Save