From 629b7a14da57c214458c513c72a1b8e98ab48304 Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Mon, 5 Jun 2023 16:11:52 -0700 Subject: [PATCH] [python O11Y] Initial Implementation (#32974) Testing Command: `bazel test --cache_test_results=no --test_output=streamed --runs_per_test=1 --test_timeout=10 "//src/python/grpcio_tests/tests/observability:_observability_test"` ### TODO: * Better error handling. --- BUILD | 22 + bazel/grpc_build_system.bzl | 1 + requirements.bazel.txt | 1 + setup.cfg | 1 + src/cpp/ext/gcp/BUILD | 5 +- src/python/grpcio/grpc/BUILD.bazel | 6 + src/python/grpcio/grpc/_channel.py | 26 ++ .../grpc/_cython/_cygrpc/_hooks.pyx.pxi | 2 +- .../grpc/_cython/_cygrpc/channel.pxd.pxi | 4 + .../grpc/_cython/_cygrpc/channel.pyx.pxi | 36 +- .../grpcio/grpc/_cython/_cygrpc/grpc.pxi | 24 ++ .../_cython/_cygrpc/observability.pyx.pxi | 61 +++ src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + src/python/grpcio/grpc/_observability.py | 268 ++++++++++++ src/python/grpcio/grpc/_server.py | 1 + src/python/grpcio_observability/.gitignore | 6 + src/python/grpcio_observability/README.rst | 2 + .../grpc_observability/BUILD.bazel | 80 ++++ .../grpc_observability/__init__.py | 17 + .../grpc_observability/_cyobservability.pxd | 152 +++++++ .../grpc_observability/_cyobservability.pyx | 342 +++++++++++++++ .../grpc_observability/_gcp_observability.py | 186 ++++++++ .../grpc_observability/_observability.py | 101 +++++ .../_open_census_exporter.py | 33 ++ .../grpc_observability/client_call_tracer.cc | 281 ++++++++++++ .../grpc_observability/client_call_tracer.h | 139 ++++++ .../grpc_observability/constants.h | 54 +++ .../grpc_observability/observability_util.cc | 224 ++++++++++ .../grpc_observability/observability_util.h | 114 +++++ .../python_census_context.cc | 278 ++++++++++++ .../python_census_context.h | 327 ++++++++++++++ .../grpc_observability/sampler.cc | 69 +++ .../grpc_observability/sampler.h | 43 ++ .../grpc_observability/server_call_tracer.cc | 248 +++++++++++ .../grpc_observability/server_call_tracer.h | 46 ++ .../tests/observability/BUILD.bazel | 30 ++ .../observability/_observability_test.py | 401 ++++++++++++++++++ tools/distrib/pylint_code.sh | 1 + .../clang_format_all_the_things.sh | 2 +- tools/dockerfile/grpc_iwyu/iwyu.sh | 1 + 40 files changed, 3627 insertions(+), 9 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi create mode 100644 src/python/grpcio/grpc/_observability.py create mode 100644 src/python/grpcio_observability/.gitignore create mode 100644 src/python/grpcio_observability/README.rst create mode 100644 src/python/grpcio_observability/grpc_observability/BUILD.bazel create mode 100644 src/python/grpcio_observability/grpc_observability/__init__.py create mode 100644 src/python/grpcio_observability/grpc_observability/_cyobservability.pxd create mode 100644 src/python/grpcio_observability/grpc_observability/_cyobservability.pyx create mode 100644 src/python/grpcio_observability/grpc_observability/_gcp_observability.py create mode 100644 src/python/grpcio_observability/grpc_observability/_observability.py create mode 100644 src/python/grpcio_observability/grpc_observability/_open_census_exporter.py create mode 100644 src/python/grpcio_observability/grpc_observability/client_call_tracer.cc create mode 100644 src/python/grpcio_observability/grpc_observability/client_call_tracer.h create mode 100644 src/python/grpcio_observability/grpc_observability/constants.h create mode 100644 src/python/grpcio_observability/grpc_observability/observability_util.cc create mode 100644 src/python/grpcio_observability/grpc_observability/observability_util.h create mode 100644 src/python/grpcio_observability/grpc_observability/python_census_context.cc create mode 100644 src/python/grpcio_observability/grpc_observability/python_census_context.h create mode 100644 src/python/grpcio_observability/grpc_observability/sampler.cc create mode 100644 src/python/grpcio_observability/grpc_observability/sampler.h create mode 100644 src/python/grpcio_observability/grpc_observability/server_call_tracer.cc create mode 100644 src/python/grpcio_observability/grpc_observability/server_call_tracer.h create mode 100644 src/python/grpcio_tests/tests/observability/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests/observability/_observability_test.py diff --git a/BUILD b/BUILD index 63c3342fdad..6396b78fd09 100644 --- a/BUILD +++ b/BUILD @@ -2274,6 +2274,28 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_rpc_encoding", + srcs = [ + "src/cpp/ext/filters/census/rpc_encoding.cc", + ], + hdrs = [ + "src/cpp/ext/filters/census/rpc_encoding.h", + ], + external_deps = [ + "absl/base", + "absl/base:core_headers", + "absl/meta:type_traits", + "absl/status", + "absl/strings", + "absl/time", + ], + language = "c++", + tags = ["nofixdeps"], + visibility = ["@grpc:grpc_python_observability"], + deps = ["gpr_platform"], +) + grpc_cc_library( name = "grpc_opencensus_plugin", srcs = [ diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index 691e8687830..5051d352d37 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -113,6 +113,7 @@ def _update_visibility(visibility): "tsi": PRIVATE, "xds": PRIVATE, "xds_client_core": PRIVATE, + "grpc_python_observability": PRIVATE, } final_visibility = [] for rule in visibility: diff --git a/requirements.bazel.txt b/requirements.bazel.txt index f2dbfd7af85..cffb2eb985d 100644 --- a/requirements.bazel.txt +++ b/requirements.bazel.txt @@ -15,3 +15,4 @@ gevent==22.08.0 zope.event==4.5.0 setuptools==44.1.1 xds-protos==0.0.11 +opencensus==0.10.0 diff --git a/setup.cfg b/setup.cfg index 09aa3860cd2..7bf1b5e4236 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ inputs = src/python/grpcio/grpc/experimental src/python/grpcio/grpc src/python/grpcio_tests/tests_aio + src/python/grpcio_observability/grpc_observability examples/python/auth examples/python/helloworld exclude = diff --git a/src/cpp/ext/gcp/BUILD b/src/cpp/ext/gcp/BUILD index 606d7f2a517..cc49fecb1cf 100644 --- a/src/cpp/ext/gcp/BUILD +++ b/src/cpp/ext/gcp/BUILD @@ -81,7 +81,10 @@ grpc_cc_library( "absl/types:optional", ], language = "c++", - visibility = ["//test:__subpackages__"], + visibility = [ + "//test:__subpackages__", + "@grpc:grpc_python_observability", + ], deps = [ "//:gpr", "//:grpc_base", diff --git a/src/python/grpcio/grpc/BUILD.bazel b/src/python/grpcio/grpc/BUILD.bazel index 91d55d762ce..75961b3effc 100644 --- a/src/python/grpcio/grpc/BUILD.bazel +++ b/src/python/grpcio/grpc/BUILD.bazel @@ -94,6 +94,11 @@ py_library( srcs = ["_typing.py"], ) +py_library( + name = "_observability", + srcs = ["_observability.py"], +) + py_library( name = "grpcio", srcs = ["__init__.py"], @@ -102,6 +107,7 @@ py_library( ], imports = ["../"], deps = [ + ":_observability", ":_runtime_protos", ":_simple_stubs", ":_typing", diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d31344fd0e6..074a3f0e785 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -14,6 +14,7 @@ """Invocation-side implementation of gRPC Python.""" import copy +from datetime import datetime import functools import logging import os @@ -28,6 +29,7 @@ import grpc # pytype: disable=pyi-error from grpc import _common # pytype: disable=pyi-error from grpc import _compression # pytype: disable=pyi-error from grpc import _grpcio_metadata # pytype: disable=pyi-error +from grpc import _observability # pytype: disable=pyi-error from grpc._cython import cygrpc from grpc._typing import ChannelArgumentType from grpc._typing import DeserializingFunction @@ -114,6 +116,9 @@ class _RPCState(object): cancelled: bool callbacks: List[NullaryCallbackType] fork_epoch: Optional[int] + rpc_start_time: Optional[datetime] + rpc_end_time: Optional[datetime] + method: Optional[str] def __init__(self, due: Sequence[cygrpc.OperationType], initial_metadata: Optional[MetadataType], @@ -136,6 +141,11 @@ class _RPCState(object): self.code = code self.details = details self.debug_error_string = None + # The following three fields are used for observability. + # Updates to those fields do not trigger self.condition. + self.rpc_start_time = None + self.rpc_end_time = None + self.method = None # The semantics of grpc.Future.cancel and grpc.Future.cancelled are # slightly wonky, so they have to be tracked separately from the rest of the @@ -191,6 +201,8 @@ def _handle_event( state.code = code state.details = batch_operation.details() state.debug_error_string = batch_operation.error_string() + state.rpc_end_time = datetime.utcnow() + _observability.maybe_record_rpc_latency(state) callbacks.extend(state.callbacks) state.callbacks = None return callbacks @@ -1014,6 +1026,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): operations, None, ),), self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) event = call.next_event() _handle_event(event, state, self._response_deserializer) return state, call @@ -1062,6 +1076,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, (operations,), event_handler, self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) return _MultiThreadedRendezvous(state, call, self._response_deserializer, deadline) @@ -1120,6 +1136,8 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, None, _determine_deadline(deadline), metadata, call_credentials, operations_and_tags, self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) return _SingleThreadedRendezvous(state, call, self._response_deserializer, deadline) @@ -1180,6 +1198,8 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): None if credentials is None else credentials._credentials, operations, _event_handler(state, self._response_deserializer), self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) return _MultiThreadedRendezvous(state, call, self._response_deserializer, deadline) @@ -1223,6 +1243,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): None if credentials is None else credentials._credentials, _stream_unary_invocation_operations_and_tags( augmented_metadata, initial_metadata_flags), self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -1281,6 +1303,8 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): _stream_unary_invocation_operations(metadata, initial_metadata_flags), event_handler, self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _MultiThreadedRendezvous(state, call, @@ -1338,6 +1362,8 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): None, _determine_deadline(deadline), augmented_metadata, None if credentials is None else credentials._credentials, operations, event_handler, self._context) + state.rpc_start_time = datetime.utcnow() + state.method = _common.decode(self._method) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _MultiThreadedRendezvous(state, call, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi index de4d71b8196..eb943e33b2e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi @@ -17,7 +17,7 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call): raise NotImplementedError("No custom hooks are implemented") def install_context_from_request_call_event(RequestCallEvent event): - pass + maybe_save_server_trace_context(event) def uninstall_context(): pass diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi index eb27f2df7ad..c46cdb95e1a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -26,6 +26,10 @@ cdef class _CallState: cdef grpc_call *c_call cdef set due + # call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule + cdef object call_tracer_capsule + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except * + cdef void maybe_delete_call_tracer(self) except * cdef class _ChannelState: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index d49a4210f7c..bf3c60ce4d4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from grpc import _observability _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( 'Internal gRPC call error %d. ' + @@ -71,6 +72,19 @@ cdef class _CallState: def __cinit__(self): self.due = set() + cdef void maybe_delete_call_tracer(self) except *: + if not self.call_tracer_capsule: + return + _observability.delete_call_tracer(self.call_tracer_capsule) + + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *: + with _observability.get_plugin() as plugin: + if not (plugin and plugin.observability_enabled): + return + capsule = plugin.create_client_call_tracer(method_name) + capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER) + _set_call_tracer(self.c_call, capsule_ptr) + self.call_tracer_capsule = capsule cdef class _ChannelState: @@ -230,6 +244,7 @@ cdef void _call( grpc_slice_unref(method_slice) if host_slice_ptr: grpc_slice_unref(host_slice) + call_state.maybe_set_client_call_tracer_on_call(method) if context is not None: set_census_context_on_call(call_state, context) if credentials is not None: @@ -238,7 +253,9 @@ cdef void _call( call_state.c_call, c_call_credentials) grpc_call_credentials_release(c_call_credentials) if c_call_error != GRPC_CALL_OK: - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL _raise_call_error_no_metadata(c_call_error) started_tags = set() @@ -248,7 +265,9 @@ cdef void _call( started_tags.add(tag) else: grpc_call_cancel(call_state.c_call, NULL) - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL _raise_call_error(c_call_error, metadata) else: @@ -263,9 +282,10 @@ cdef void _process_integrated_call_tag( cdef _CallState call_state = state.integrated_call_states.pop(tag) call_state.due.remove(tag) if not call_state.due: - grpc_call_unref(call_state.c_call) + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL - + call_state.maybe_delete_call_tracer() cdef class IntegratedCall: @@ -303,8 +323,11 @@ cdef object _process_segregated_call_tag( grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): call_state.due.remove(tag) if not call_state.due: - grpc_call_unref(call_state.c_call) + #TODO(xuanwn): Expand the scope of nogil + with nogil: + grpc_call_unref(call_state.c_call) call_state.c_call = NULL + call_state.maybe_delete_call_tracer() state.segregated_call_states.remove(call_state) _destroy_c_completion_queue(c_completion_queue) return True @@ -331,7 +354,8 @@ cdef class SegregatedCall: self._channel_state, self._call_state, self._c_completion_queue, tag) def on_failure(): self._call_state.due.clear() - grpc_call_unref(self._call_state.c_call) + with nogil: + grpc_call_unref(self._call_state.c_call) self._call_state.c_call = NULL self._channel_state.segregated_call_states.remove(self._call_state) _destroy_c_completion_queue(self._c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 37ee7154a24..e1bc87d4abd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from libcpp.string cimport string + cimport libc.time ctypedef ssize_t intptr_t @@ -57,6 +59,28 @@ cdef extern from "" namespace "std" nogil: # gRPC Core Declarations +cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core": + cdef cppclass ClientCallTracer: + pass + + cdef cppclass ServerCallTracer: + string TraceId() nogil + string SpanId() nogil + bint IsSampled() nogil + + cdef cppclass ServerCallTracerFactory: + @staticmethod + void RegisterGlobal(ServerCallTracerFactory* factory) nogil + +cdef extern from "src/core/lib/channel/context.h": + ctypedef enum grpc_context_index: + GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE + +cdef extern from "src/core/lib/surface/call.h": + void grpc_call_context_set(grpc_call* call, grpc_context_index elem, + void* value, void (*destroy)(void* value)) nogil + void *grpc_call_context_get(grpc_call* call, grpc_context_index elem) nogil + cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi new file mode 100644 index 00000000000..aa7dce5e8ac --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/observability.pyx.pxi @@ -0,0 +1,61 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import codecs +from typing import Optional + +from libcpp.cast cimport static_cast + +from grpc import _observability + + +cdef const char* CLIENT_CALL_TRACER = "client_call_tracer" +cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" + + +def set_server_call_tracer_factory(object observability_plugin) -> None: + capsule = observability_plugin.create_server_call_tracer_factory() + capsule_ptr = cpython.PyCapsule_GetPointer(capsule, SERVER_CALL_TRACER_FACTORY) + _register_server_call_tracer_factory(capsule_ptr) + + +def clear_server_call_tracer_factory() -> None: + _register_server_call_tracer_factory(NULL) + + +def maybe_save_server_trace_context(RequestCallEvent event) -> None: + cdef ServerCallTracer* server_call_tracer + with _observability.get_plugin() as plugin: + if not (plugin and plugin.tracing_enabled): + return + server_call_tracer = static_cast['ServerCallTracer*'](_get_call_tracer(event.call.c_call)) + # TraceId and SpanId is hex string, need to convert to str + trace_id = _decode(codecs.decode(server_call_tracer.TraceId(), 'hex_codec')) + span_id = _decode(codecs.decode(server_call_tracer.SpanId(), 'hex_codec')) + is_sampled = server_call_tracer.IsSampled() + plugin.save_trace_context(trace_id, span_id, is_sampled) + + +cdef void _set_call_tracer(grpc_call* call, void* capsule_ptr): + cdef ClientCallTracer* call_tracer = capsule_ptr + grpc_call_context_set(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, call_tracer, NULL) + + +cdef void* _get_call_tracer(grpc_call* call): + cdef void* call_tracer = grpc_call_context_get(call, GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE) + return call_tracer + + +cdef void _register_server_call_tracer_factory(void* capsule_ptr): + cdef ServerCallTracerFactory* call_tracer_factory = capsule_ptr + ServerCallTracerFactory.RegisterGlobal(call_tracer_factory) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index ca1b4c89f9c..2fd2347352a 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -55,6 +55,7 @@ include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" include "_cygrpc/vtable.pyx.pxi" include "_cygrpc/_hooks.pyx.pxi" +include "_cygrpc/observability.pyx.pxi" include "_cygrpc/grpc_gevent.pyx.pxi" diff --git a/src/python/grpcio/grpc/_observability.py b/src/python/grpcio/grpc/_observability.py new file mode 100644 index 00000000000..8e36baf7509 --- /dev/null +++ b/src/python/grpcio/grpc/_observability.py @@ -0,0 +1,268 @@ +# Copyright 2023 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import abc +import contextlib +import logging +import threading +from typing import Any, Generator, Generic, Optional, TypeVar + +from grpc._cython import cygrpc as _cygrpc + +_LOGGER = logging.getLogger(__name__) + +_channel = Any # _channel.py imports this module. +ClientCallTracerCapsule = TypeVar('ClientCallTracerCapsule') +ServerCallTracerFactoryCapsule = TypeVar('ServerCallTracerFactoryCapsule') + +_plugin_lock: threading.RLock = threading.RLock() +_OBSERVABILITY_PLUGIN: Optional[ObservabilityPlugin] = None # pylint: disable=used-before-assignment + + +class ObservabilityPlugin(Generic[ClientCallTracerCapsule, + ServerCallTracerFactoryCapsule], + metaclass=abc.ABCMeta): + """Abstract base class for observability plugin. + + *This is a semi-private class that was intended for the exclusive use of + the gRPC team.* + + The ClientCallTracerCapsule and ClientCallTracerCapsule created by this + plugin should be inject to gRPC core using observability_init at the + start of a program, before any channels/servers are built. + + Any future methods added to this interface cannot have the + @abc.abstractmethod annotation. + + Attributes: + _stats_enabled: A bool indicates whether tracing is enabled. + _tracing_enabled: A bool indicates whether stats(metrics) is enabled. + """ + _tracing_enabled: bool = False + _stats_enabled: bool = False + + @abc.abstractmethod + def create_client_call_tracer( + self, method_name: bytes) -> ClientCallTracerCapsule: + """Creates a ClientCallTracerCapsule. + + After register the plugin, if tracing or stats is enabled, this method + will be called after a call was created, the ClientCallTracer created + by this method will be saved to call context. + + The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` + interface and wrapped in a PyCapsule using `client_call_tracer` as name. + + Args: + method_name: The method name of the call in byte format. + + Returns: + A PyCapsule which stores a ClientCallTracer object. + """ + raise NotImplementedError() + + @abc.abstractmethod + def delete_client_call_tracer( + self, client_call_tracer: ClientCallTracerCapsule) -> None: + """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. + + After register the plugin, if tracing or stats is enabled, this method + will be called at the end of the call to destroy the ClientCallTracer. + + The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` + interface and wrapped in a PyCapsule using `client_call_tracer` as name. + + Args: + client_call_tracer: A PyCapsule which stores a ClientCallTracer object. + """ + raise NotImplementedError() + + @abc.abstractmethod + def save_trace_context(self, trace_id: str, span_id: str, + is_sampled: bool) -> None: + """Saves the trace_id and span_id related to the current span. + + After register the plugin, if tracing is enabled, this method will be + called after the server finished sending response. + + This method can be used to propagate census context. + + Args: + trace_id: The identifier for the trace associated with the span as a + 32-character hexadecimal encoded string, + e.g. 26ed0036f2eff2b7317bccce3e28d01f + span_id: The identifier for the span as a 16-character hexadecimal encoded + string. e.g. 113ec879e62583bc + is_sampled: A bool indicates whether the span is sampled. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_server_call_tracer_factory( + self) -> ServerCallTracerFactoryCapsule: + """Creates a ServerCallTracerFactoryCapsule. + + After register the plugin, if tracing or stats is enabled, this method + will be called by calling observability_init, the ServerCallTracerFactory + created by this method will be registered to gRPC core. + + The ServerCallTracerFactory is an object which implements + `grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule + using `server_call_tracer_factory` as name. + + Returns: + A PyCapsule which stores a ServerCallTracerFactory object. + """ + raise NotImplementedError() + + @abc.abstractmethod + def record_rpc_latency(self, method: str, rpc_latency: float, + status_code: Any) -> None: + """Record the latency of the RPC. + + After register the plugin, if stats is enabled, this method will be + called at the end of each RPC. + + Args: + method: The fully-qualified name of the RPC method being invoked. + rpc_latency: The latency for the RPC, equals to the time between + when the client invokes the RPC and when the client receives the status. + status_code: An element of grpc.StatusCode in string format representing the + final status for the RPC. + """ + raise NotImplementedError() + + def set_tracing(self, enable: bool) -> None: + """Enable or disable tracing. + + Args: + enable: A bool indicates whether tracing should be enabled. + """ + self._tracing_enabled = enable + + def set_stats(self, enable: bool) -> None: + """Enable or disable stats(metrics). + + Args: + enable: A bool indicates whether stats should be enabled. + """ + self._stats_enabled = enable + + @property + def tracing_enabled(self) -> bool: + return self._tracing_enabled + + @property + def stats_enabled(self) -> bool: + return self._stats_enabled + + @property + def observability_enabled(self) -> bool: + return self.tracing_enabled or self.stats_enabled + + +@contextlib.contextmanager +def get_plugin() -> Generator[Optional[ObservabilityPlugin], None, None]: + """Get the ObservabilityPlugin in _observability module. + + Returns: + The ObservabilityPlugin currently registered with the _observability + module. Or None if no plugin exists at the time of calling this method. + """ + with _plugin_lock: + yield _OBSERVABILITY_PLUGIN + + +def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None: + """Save ObservabilityPlugin to _observability module. + + Args: + observability_plugin: The ObservabilityPlugin to save. + + Raises: + ValueError: If an ObservabilityPlugin was already registered at the + time of calling this method. + """ + global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement + with _plugin_lock: + if observability_plugin and _OBSERVABILITY_PLUGIN: + raise ValueError("observability_plugin was already set!") + _OBSERVABILITY_PLUGIN = observability_plugin + + +def observability_init(observability_plugin: ObservabilityPlugin) -> None: + """Initialize observability with provided ObservabilityPlugin. + + This method have to be called at the start of a program, before any + channels/servers are built. + + Args: + observability_plugin: The ObservabilityPlugin to use. + + Raises: + ValueError: If an ObservabilityPlugin was already registered at the + time of calling this method. + """ + set_plugin(observability_plugin) + try: + _cygrpc.set_server_call_tracer_factory(observability_plugin) + except Exception: # pylint:disable=broad-except + _LOGGER.exception("Failed to set server call tracer factory!") + + +def observability_deinit() -> None: + """Clear the observability context, including ObservabilityPlugin and + ServerCallTracerFactory + + This method have to be called after exit observability context so that + it's possible to re-initialize again. + """ + set_plugin(None) + _cygrpc.clear_server_call_tracer_factory() + + +def delete_call_tracer(client_call_tracer_capsule: Any) -> None: + """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. + + This method will be called at the end of the call to destroy the ClientCallTracer. + + The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` + interface and wrapped in a PyCapsule using `client_call_tracer` as the name. + + Args: + client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object. + """ + with get_plugin() as plugin: + if not (plugin and plugin.observability_enabled): + return + plugin.delete_client_call_tracer(client_call_tracer_capsule) + + +def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: + """Record the latency of the RPC, if the plugin is registered and stats is enabled. + + This method will be called at the end of each RPC. + + Args: + state: a grpc._channel._RPCState object which contains the stats related to the + RPC. + """ + with get_plugin() as plugin: + if not (plugin and plugin.stats_enabled): + return + rpc_latency = state.rpc_end_time - state.rpc_start_time + rpc_latency_ms = rpc_latency.total_seconds() * 1000 + plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 49ad0d2f5e5..d6802bfeadc 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -619,6 +619,7 @@ def _unary_response_in_pool( request_deserializer: Optional[SerializingFunction], response_serializer: Optional[SerializingFunction]) -> None: cygrpc.install_context_from_request_call_event(rpc_event) + try: argument = argument_thunk() if argument is not None: diff --git a/src/python/grpcio_observability/.gitignore b/src/python/grpcio_observability/.gitignore new file mode 100644 index 00000000000..1516c11f4fb --- /dev/null +++ b/src/python/grpcio_observability/.gitignore @@ -0,0 +1,6 @@ +build/ +include/ +*.c +*.cpp +*.egg-info +*.so diff --git a/src/python/grpcio_observability/README.rst b/src/python/grpcio_observability/README.rst new file mode 100644 index 00000000000..b46aa89147f --- /dev/null +++ b/src/python/grpcio_observability/README.rst @@ -0,0 +1,2 @@ +### TODO(xuanwn) +* Fill in content. \ No newline at end of file diff --git a/src/python/grpcio_observability/grpc_observability/BUILD.bazel b/src/python/grpcio_observability/grpc_observability/BUILD.bazel new file mode 100644 index 00000000000..7a099d69e99 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/BUILD.bazel @@ -0,0 +1,80 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:cython_library.bzl", "pyx_library") + +package(default_visibility = ["//visibility:public"]) + +# TODO(xuanwn): We also need support Python-native build + +cc_library( + name = "observability", + srcs = [ + "client_call_tracer.cc", + "observability_util.cc", + "python_census_context.cc", + "sampler.cc", + "server_call_tracer.cc", + ], + hdrs = [ + "client_call_tracer.h", + "constants.h", + "observability_util.h", + "python_census_context.h", + "sampler.h", + "server_call_tracer.h", + ], + includes = ["."], + deps = [ + #TODO(xuanwn): Confirm only referenced code is inlcuded in shared object library + "//:grpc", + "//:grpc_rpc_encoding", + "//src/cpp/ext/gcp:observability_config", + ], +) + +pyx_library( + name = "cyobservability", + srcs = [ + "_cyobservability.pxd", + "_cyobservability.pyx", + ], + deps = [ + ":observability", + ], +) + +py_library( + name = "pyobservability", + srcs = [ + "__init__.py", + "_observability.py", + "_gcp_observability.py", + "_open_census_exporter.py", + # The fllowing files are used only for OC stackdriver extension. + # TODO(xuanwn): Uncomment after those files are implemented. + # "_measures.py", + # "_open_census.py", + # "_views.py", + ], + imports = [ + ".", + "../", + ], + srcs_version = "PY3ONLY", + deps = [ + ":cyobservability", + "//src/python/grpcio/grpc:grpcio", + ], +) diff --git a/src/python/grpcio_observability/grpc_observability/__init__.py b/src/python/grpcio_observability/grpc_observability/__init__.py new file mode 100644 index 00000000000..909317be364 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc_observability._gcp_observability import GCPOpenCensusObservability + +__all__ = ('GCPOpenCensusObservability',) diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd new file mode 100644 index 00000000000..c68d70205ea --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd @@ -0,0 +1,152 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from libcpp.string cimport string +from libcpp.vector cimport vector + +ctypedef signed long long int64_t + +cdef extern from "" namespace "std" nogil: + cdef cppclass queue[T]: + bint empty() + T& front() + void pop() + +cdef extern from "" namespace "std" nogil: + cdef cppclass mutex: + mutex() + + cdef cppclass unique_lock[Mutex]: + unique_lock(Mutex&) + +cdef extern from "" namespace "std" nogil: + cdef cppclass condition_variable: + void notify_all() + +cdef extern from "src/core/lib/channel/call_tracer.h" namespace "grpc_core": + cdef cppclass ClientCallTracer: + pass + +cdef extern from "python_census_context.h" namespace "grpc_observability": + cdef void EnablePythonCensusStats(bint enable) nogil + cdef void EnablePythonCensusTracing(bint enable) nogil + + union MeasurementValue: + double value_double + int64_t value_int + + ctypedef struct Label: + string key + string value + + ctypedef struct Annotation: + string time_stamp + string description + + ctypedef struct Measurement: + cMetricsName name + MeasurementType type + MeasurementValue value + + ctypedef struct SpanCensusData: + string name + string start_time + string end_time + string trace_id + string span_id + string parent_span_id + string status + vector[Label] span_labels + vector[Annotation] span_annotations + int64_t child_span_count + bint should_sample + +cdef extern from "observability_util.h" namespace "grpc_observability": + cdef cGcpObservabilityConfig ReadAndActivateObservabilityConfig() nogil + cdef void NativeObservabilityInit() except + + cdef void* CreateClientCallTracer(const char* method, + const char* trace_id, + const char* parent_span_id) except + + cdef void* CreateServerCallTracerFactory() except + + cdef queue[NativeCensusData]* g_census_data_buffer + cdef void AwaitNextBatchLocked(unique_lock[mutex]&, int) nogil + cdef bint PythonCensusStatsEnabled() nogil + cdef bint PythonCensusTracingEnabled() nogil + cdef mutex g_census_data_buffer_mutex + cdef condition_variable g_census_data_buffer_cv + + cppclass NativeCensusData "::grpc_observability::CensusData": + DataType type + Measurement measurement_data + SpanCensusData span_data + vector[Label] labels + + ctypedef struct CloudMonitoring: + pass + + ctypedef struct CloudTrace: + float sampling_rate + + ctypedef struct CloudLogging: + pass + + ctypedef struct cGcpObservabilityConfig "::grpc_observability::GcpObservabilityConfig": + CloudMonitoring cloud_monitoring + CloudTrace cloud_trace + CloudLogging cloud_logging + string project_id + vector[Label] labels + bint is_valid + +cdef extern from "constants.h" namespace "grpc_observability": + ctypedef enum DataType: + kSpanData + kMetricData + + ctypedef enum MeasurementType: + kMeasurementDouble + kMeasurementInt + + ctypedef enum cMetricsName "::grpc_observability::MetricsName": + # Client + kRpcClientApiLatencyMeasureName + kRpcClientSentMessagesPerRpcMeasureName + kRpcClientSentBytesPerRpcMeasureName + kRpcClientReceivedMessagesPerRpcMeasureName + kRpcClientReceivedBytesPerRpcMeasureName + kRpcClientRoundtripLatencyMeasureName + kRpcClientServerLatencyMeasureName + kRpcClientStartedRpcsMeasureName + kRpcClientRetriesPerCallMeasureName + kRpcClientTransparentRetriesPerCallMeasureName + kRpcClientRetryDelayPerCallMeasureName + kRpcClientTransportLatencyMeasureName + + # Server + kRpcServerSentMessagesPerRpcMeasureName + kRpcServerSentBytesPerRpcMeasureName + kRpcServerReceivedMessagesPerRpcMeasureName + kRpcServerReceivedBytesPerRpcMeasureName + kRpcServerServerLatencyMeasureName + kRpcServerStartedRpcsMeasureName + + string kClientMethod + string kClientStatus + +cdef extern from "sampler.h" namespace "grpc_observability": + cdef cppclass ProbabilitySampler: + @staticmethod + ProbabilitySampler& Get() + + void SetThreshold(double sampling_rate) diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx new file mode 100644 index 00000000000..a0d9d5db947 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx @@ -0,0 +1,342 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cimport cpython +from cython.operator cimport dereference + +import enum +import functools +import logging +import os +from threading import Thread +from typing import List, Mapping, Tuple, Union + +import _observability + +# Time we wait for batch exporting census data +# TODO(xuanwn): change interval to a more appropriate number +CENSUS_EXPORT_BATCH_INTERVAL_SECS = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL_SECS', 0.5)) +GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT = float(os.environ.get('GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT', 10)) +cdef const char* CLIENT_CALL_TRACER = "client_call_tracer" +cdef const char* SERVER_CALL_TRACER_FACTORY = "server_call_tracer_factory" +cdef bint GLOBAL_SHUTDOWN_EXPORT_THREAD = False +cdef object GLOBAL_EXPORT_THREAD + +_LOGGER = logging.getLogger(__name__) + +class _CyMetricsName: + CY_CLIENT_API_LATENCY = kRpcClientApiLatencyMeasureName + CY_CLIENT_SNET_MESSSAGES_PER_RPC = kRpcClientSentMessagesPerRpcMeasureName + CY_CLIENT_SEND_BYTES_PER_RPC = kRpcClientSentBytesPerRpcMeasureName + CY_CLIENT_RECEIVED_MESSAGES_PER_RPC = kRpcClientReceivedMessagesPerRpcMeasureName + CY_CLIENT_RECEIVED_BYTES_PER_RPC = kRpcClientReceivedBytesPerRpcMeasureName + CY_CLIENT_ROUNDTRIP_LATENCY = kRpcClientRoundtripLatencyMeasureName + CY_CLIENT_SERVER_LATENCY = kRpcClientServerLatencyMeasureName + CY_CLIENT_STARTED_RPCS = kRpcClientStartedRpcsMeasureName + CY_CLIENT_RETRIES_PER_CALL = kRpcClientRetriesPerCallMeasureName + CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL = kRpcClientTransparentRetriesPerCallMeasureName + CY_CLIENT_RETRY_DELAY_PER_CALL = kRpcClientRetryDelayPerCallMeasureName + CY_CLIENT_TRANSPORT_LATENCY = kRpcClientTransportLatencyMeasureName + CY_SERVER_SENT_MESSAGES_PER_RPC = kRpcServerSentMessagesPerRpcMeasureName + CY_SERVER_SENT_BYTES_PER_RPC = kRpcServerSentBytesPerRpcMeasureName + CY_SERVER_RECEIVED_MESSAGES_PER_RPC = kRpcServerReceivedMessagesPerRpcMeasureName + CY_SERVER_RECEIVED_BYTES_PER_RPC = kRpcServerReceivedBytesPerRpcMeasureName + CY_SERVER_SERVER_LATENCY = kRpcServerServerLatencyMeasureName + CY_SERVER_STARTED_RPCS = kRpcServerStartedRpcsMeasureName + +@enum.unique +class MetricsName(enum.Enum): + CLIENT_STARTED_RPCS = _CyMetricsName.CY_CLIENT_STARTED_RPCS + CLIENT_API_LATENCY = _CyMetricsName.CY_CLIENT_API_LATENCY + CLIENT_SNET_MESSSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_SNET_MESSSAGES_PER_RPC + CLIENT_SEND_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_SEND_BYTES_PER_RPC + CLIENT_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_MESSAGES_PER_RPC + CLIENT_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_CLIENT_RECEIVED_BYTES_PER_RPC + CLIENT_ROUNDTRIP_LATENCY = _CyMetricsName.CY_CLIENT_ROUNDTRIP_LATENCY + CLIENT_SERVER_LATENCY = _CyMetricsName.CY_CLIENT_SERVER_LATENCY + CLIENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_RETRIES_PER_CALL + CLIENT_TRANSPARENT_RETRIES_PER_CALL = _CyMetricsName.CY_CLIENT_TRANSPARENT_RETRIES_PER_CALL + CLIENT_RETRY_DELAY_PER_CALL = _CyMetricsName.CY_CLIENT_RETRY_DELAY_PER_CALL + CLIENT_TRANSPORT_LATENCY = _CyMetricsName.CY_CLIENT_TRANSPORT_LATENCY + SERVER_SENT_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_MESSAGES_PER_RPC + SERVER_SENT_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_SENT_BYTES_PER_RPC + SERVER_RECEIVED_MESSAGES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_MESSAGES_PER_RPC + SERVER_RECEIVED_BYTES_PER_RPC = _CyMetricsName.CY_SERVER_RECEIVED_BYTES_PER_RPC + SERVER_SERVER_LATENCY = _CyMetricsName.CY_SERVER_SERVER_LATENCY + SERVER_STARTED_RPCS = _CyMetricsName.CY_SERVER_STARTED_RPCS + +# Delay map creation due to circular dependencies +_CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING = {x.value: x for x in MetricsName} + +def cyobservability_init(object exporter) -> None: + exporter: _observability.Exporter + + NativeObservabilityInit() + _start_exporting_thread(exporter) + + +def _start_exporting_thread(object exporter) -> None: + exporter: _observability.Exporter + + global GLOBAL_EXPORT_THREAD + global GLOBAL_SHUTDOWN_EXPORT_THREAD + GLOBAL_SHUTDOWN_EXPORT_THREAD = False + # TODO(xuanwn): Change it to daemon thread. + GLOBAL_EXPORT_THREAD = Thread(target=_export_census_data, args=(exporter,)) + GLOBAL_EXPORT_THREAD.start() + + +def set_gcp_observability_config(object py_config) -> bool: + py_config: _gcp_observability.GcpObservabilityPythonConfig + + py_labels = {} + sampling_rate = 0.0 + + cdef cGcpObservabilityConfig c_config = ReadAndActivateObservabilityConfig() + if not c_config.is_valid: + return False + + for label in c_config.labels: + py_labels[_decode(label.key)] = _decode(label.value) + + if PythonCensusTracingEnabled(): + sampling_rate = c_config.cloud_trace.sampling_rate + # Save sampling rate to global sampler. + ProbabilitySampler.Get().SetThreshold(sampling_rate) + + py_config.set_configuration(_decode(c_config.project_id), sampling_rate, py_labels, + PythonCensusTracingEnabled(), PythonCensusStatsEnabled()) + return True + + +def create_client_call_tracer(bytes method_name, bytes trace_id, + bytes parent_span_id=b'') -> cpython.PyObject: + """Create a ClientCallTracer and save to PyCapsule. + + Returns: A grpc_observability._observability.ClientCallTracerCapsule object. + """ + cdef char* c_method = cpython.PyBytes_AsString(method_name) + cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id) + cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id) + + cdef void* call_tracer = CreateClientCallTracer(c_method, c_trace_id, c_parent_span_id) + capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL) + return capsule + + +def create_server_call_tracer_factory_capsule() -> cpython.PyObject: + """Create a ServerCallTracerFactory and save to PyCapsule. + + Returns: A grpc_observability._observability.ServerCallTracerFactoryCapsule object. + """ + cdef void* call_tracer_factory = CreateServerCallTracerFactory() + capsule = cpython.PyCapsule_New(call_tracer_factory, SERVER_CALL_TRACER_FACTORY, NULL) + return capsule + + +def delete_client_call_tracer(object client_call_tracer) -> None: + client_call_tracer: grpc._observability.ClientCallTracerCapsule + + if cpython.PyCapsule_IsValid(client_call_tracer, CLIENT_CALL_TRACER): + capsule_ptr = cpython.PyCapsule_GetPointer(client_call_tracer, CLIENT_CALL_TRACER) + call_tracer_ptr = capsule_ptr + del call_tracer_ptr + + +def _c_label_to_labels(vector[Label] c_labels) -> Mapping[str, str]: + py_labels = {} + for label in c_labels: + py_labels[_decode(label.key)] = _decode(label.value) + return py_labels + + +def _c_measurement_to_measurement(object measurement + ) -> Mapping[str, Union[enum, Mapping[str, Union[float, int]]]]: + """Convert Cython Measurement to Python measurement. + + Args: + measurement: Actual measurement repesented by Cython type Measurement, using object here + since Cython refuse to automatically convert a union with unsafe type combinations. + + Returns: + A mapping object with keys and values as following: + name -> cMetricsName + type -> MeasurementType + value -> {value_double: float | value_int: int} + """ + measurement: Measurement + + py_measurement = {} + py_measurement['name'] = measurement['name'] + py_measurement['type'] = measurement['type'] + if measurement['type'] == kMeasurementDouble: + py_measurement['value'] = {'value_double': measurement['value']['value_double']} + else: + py_measurement['value'] = {'value_int': measurement['value']['value_int']} + return py_measurement + + +def _c_annotation_to_annotations(vector[Annotation] c_annotations) -> List[Tuple[str, str]]: + py_annotations = [] + for annotation in c_annotations: + py_annotations.append((_decode(annotation.time_stamp), + _decode(annotation.description))) + return py_annotations + + +def observability_deinit() -> None: + _shutdown_exporting_thread() + EnablePythonCensusStats(False) + EnablePythonCensusTracing(False) + + +@functools.lru_cache(maxsize=None) +def _cy_metric_name_to_py_metric_name(cMetricsName metric_name) -> MetricsName: + try: + return _CY_METRICS_NAME_TO_PY_METRICS_NAME_MAPPING[metric_name] + except KeyError: + raise ValueError('Invalid metric name %s' % metric_name) + + +def _get_stats_data(object measurement, object labels) -> _observability.StatsData: + """Convert a Python measurement to StatsData. + + Args: + measurement: A dict of type Mapping[str, Union[enum, Mapping[str, Union[float, int]]]] + with keys and values as following: + name -> cMetricsName + type -> MeasurementType + value -> {value_double: float | value_int: int} + labels: Labels assciociated with stats data with type of dict[str, str]. + """ + measurement: Measurement + labels: Mapping[str, str] + + metric_name = _cy_metric_name_to_py_metric_name(measurement['name']) + if measurement['type'] == kMeasurementDouble: + py_stat = _observability.StatsData(name=metric_name, measure_double=True, + value_float=measurement['value']['value_double'], + labels=labels) + else: + py_stat = _observability.StatsData(name=metric_name, measure_double=False, + value_int=measurement['value']['value_int'], + labels=labels) + return py_stat + + +def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels, + vector[Annotation] span_annotations) -> _observability.TracingData: + py_span_labels = _c_label_to_labels(span_labels) + py_span_annotations = _c_annotation_to_annotations(span_annotations) + return _observability.TracingData(name=_decode(span_data.name), + start_time = _decode(span_data.start_time), + end_time = _decode(span_data.end_time), + trace_id = _decode(span_data.trace_id), + span_id = _decode(span_data.span_id), + parent_span_id = _decode(span_data.parent_span_id), + status = _decode(span_data.status), + should_sample = span_data.should_sample, + child_span_count = span_data.child_span_count, + span_labels = py_span_labels, + span_annotations = py_span_annotations) + + +def _record_rpc_latency(object exporter, str method, float rpc_latency, str status_code) -> None: + exporter: _observability.Exporter + + measurement = {} + measurement['name'] = kRpcClientApiLatencyMeasureName + measurement['type'] = kMeasurementDouble + measurement['value'] = {'value_double': rpc_latency} + + labels = {} + labels[_decode(kClientMethod)] = method.strip("/") + labels[_decode(kClientStatus)] = status_code + metric = _get_stats_data(measurement, labels) + exporter.export_stats_data([metric]) + + +cdef void _export_census_data(object exporter): + """Main function running in export thread.""" + exporter: _observability.Exporter + + cdef int export_interval_ms = CENSUS_EXPORT_BATCH_INTERVAL_SECS * 1000 + while True: + with nogil: + while not GLOBAL_SHUTDOWN_EXPORT_THREAD: + lk = new unique_lock[mutex](g_census_data_buffer_mutex) + # Wait for next batch of census data OR timeout at fixed interval. + # Batch export census data to minimize the time we acquiring the GIL. + AwaitNextBatchLocked(dereference(lk), export_interval_ms) + + # Break only when buffer have data + if not g_census_data_buffer.empty(): + del lk + break + else: + del lk + + _flush_census_data(exporter) + + if GLOBAL_SHUTDOWN_EXPORT_THREAD: + break # Break to shutdown exporting thead + + # Flush one last time before shutdown thread + _flush_census_data(exporter) + + +cdef void _flush_census_data(object exporter): + exporter: _observability.Exporter + + lk = new unique_lock[mutex](g_census_data_buffer_mutex) + if g_census_data_buffer.empty(): + del lk + return + py_metrics_batch = [] + py_spans_batch = [] + while not g_census_data_buffer.empty(): + c_census_data = g_census_data_buffer.front() + if c_census_data.type == kMetricData: + py_labels = _c_label_to_labels(c_census_data.labels) + py_measurement = _c_measurement_to_measurement(c_census_data.measurement_data) + py_metric = _get_stats_data(py_measurement, py_labels) + py_metrics_batch.append(py_metric) + else: + py_span = _get_tracing_data(c_census_data.span_data, c_census_data.span_data.span_labels, + c_census_data.span_data.span_annotations) + py_spans_batch.append(py_span) + g_census_data_buffer.pop() + + del lk + exporter.export_stats_data(py_metrics_batch) + exporter.export_tracing_data(py_spans_batch) + + +cdef void _shutdown_exporting_thread(): + with nogil: + global GLOBAL_SHUTDOWN_EXPORT_THREAD + GLOBAL_SHUTDOWN_EXPORT_THREAD = True + g_census_data_buffer_cv.notify_all() + GLOBAL_EXPORT_THREAD.join(timeout=GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT) + + +cdef str _decode(bytes bytestring): + if isinstance(bytestring, (str,)): + return bytestring + else: + try: + return bytestring.decode('utf8') + except UnicodeDecodeError: + _LOGGER.exception('Invalid encoding on %s', bytestring) + return bytestring.decode('latin1') diff --git a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py new file mode 100644 index 00000000000..674676bf3b2 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py @@ -0,0 +1,186 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +import logging +import threading +import time +from typing import Any, Mapping, Optional + +import grpc +from grpc_observability import _cyobservability # pytype: disable=pyi-error +from grpc_observability._open_census_exporter import OpenCensusExporter +from opencensus.trace import execution_context +from opencensus.trace import span_context as span_context_module +from opencensus.trace import trace_options as trace_options_module + +_LOGGER = logging.getLogger(__name__) + +ClientCallTracerCapsule = Any # it appears only once in the function signature +ServerCallTracerFactoryCapsule = Any # it appears only once in the function signature +grpc_observability = Any # grpc_observability.py imports this module. + +GRPC_STATUS_CODE_TO_STRING = { + grpc.StatusCode.OK: "OK", + grpc.StatusCode.CANCELLED: "CANCELLED", + grpc.StatusCode.UNKNOWN: "UNKNOWN", + grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", + grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", + grpc.StatusCode.NOT_FOUND: "NOT_FOUND", + grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", + grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", + grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", + grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", + grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", + grpc.StatusCode.ABORTED: "ABORTED", + grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", + grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", + grpc.StatusCode.INTERNAL: "INTERNAL", + grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", + grpc.StatusCode.DATA_LOSS: "DATA_LOSS", +} + + +@dataclass +class GcpObservabilityPythonConfig: + _singleton = None + _lock: threading.RLock = threading.RLock() + project_id: str = "" + stats_enabled: bool = False + tracing_enabled: bool = False + labels: Optional[Mapping[str, str]] = field(default_factory=dict) + sampling_rate: Optional[float] = 0.0 + + @staticmethod + def get(): + with GcpObservabilityPythonConfig._lock: + if GcpObservabilityPythonConfig._singleton is None: + GcpObservabilityPythonConfig._singleton = GcpObservabilityPythonConfig( + ) + return GcpObservabilityPythonConfig._singleton + + def set_configuration(self, + project_id: str, + sampling_rate: Optional[float] = 0.0, + labels: Optional[Mapping[str, str]] = None, + tracing_enabled: bool = False, + stats_enabled: bool = False) -> None: + self.project_id = project_id + self.stats_enabled = stats_enabled + self.tracing_enabled = tracing_enabled + self.labels = labels + self.sampling_rate = sampling_rate + + +# pylint: disable=no-self-use +class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): + """GCP OpenCensus based plugin implementation. + + If no exporter is passed, the default will be OpenCensus StackDriver + based exporter. + + For more details, please refer to User Guide: + * https://cloud.google.com/stackdriver/docs/solutions/grpc + + Attributes: + config: Configuration for GCP OpenCensus Observability. + exporter: Exporter used to export data. + """ + config: GcpObservabilityPythonConfig + exporter: "grpc_observability.Exporter" + + def __init__(self, exporter: "grpc_observability.Exporter" = None): + self.exporter = None + self.config = GcpObservabilityPythonConfig.get() + if exporter: + self.exporter = exporter + else: + self.exporter = OpenCensusExporter(self.config.get().labels) + config_valid = _cyobservability.set_gcp_observability_config( + self.config) + if not config_valid: + raise ValueError("Invalid configuration") + + if self.config.tracing_enabled: + self.set_tracing(True) + if self.config.stats_enabled: + self.set_stats(True) + + def __enter__(self): + try: + _cyobservability.cyobservability_init(self.exporter) + #TODO(xuanwn): Use specific exceptons + except Exception as e: # pylint: disable=broad-except + _LOGGER.exception("GCPOpenCensusObservability failed with: %s", e) + + grpc._observability.observability_init(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.exit() + + def exit(self) -> None: + # Sleep so we don't loss any data. If we shutdown export thread + # immediately after exit, it's possible that core didn't call RecordEnd + # in callTracer, and all data recorded by calling RecordEnd will be + # lost. + # The time equals to the time in AwaitNextBatchLocked. + # TODO(xuanwn): explicit synchronization + # https://github.com/grpc/grpc/issues/33262 + time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) + self.set_tracing(False) + self.set_stats(False) + _cyobservability.observability_deinit() + grpc._observability.observability_deinit() + + def create_client_call_tracer( + self, method_name: bytes) -> ClientCallTracerCapsule: + current_span = execution_context.get_current_span() + if current_span: + # Propagate existing OC context + trace_id = current_span.context_tracer.trace_id.encode('utf8') + parent_span_id = current_span.span_id.encode('utf8') + capsule = _cyobservability.create_client_call_tracer( + method_name, trace_id, parent_span_id) + else: + trace_id = span_context_module.generate_trace_id().encode('utf8') + capsule = _cyobservability.create_client_call_tracer( + method_name, trace_id) + return capsule + + def create_server_call_tracer_factory( + self) -> ServerCallTracerFactoryCapsule: + capsule = _cyobservability.create_server_call_tracer_factory_capsule() + return capsule + + def delete_client_call_tracer( + self, client_call_tracer: ClientCallTracerCapsule) -> None: + _cyobservability.delete_client_call_tracer(client_call_tracer) + + def save_trace_context(self, trace_id: str, span_id: str, + is_sampled: bool) -> None: + trace_options = trace_options_module.TraceOptions(0) + trace_options.set_enabled(is_sampled) + span_context = span_context_module.SpanContext( + trace_id=trace_id, span_id=span_id, trace_options=trace_options) + current_tracer = execution_context.get_opencensus_tracer() + current_tracer.span_context = span_context + + def record_rpc_latency(self, method: str, rpc_latency: float, + status_code: grpc.StatusCode) -> None: + status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") + _cyobservability._record_rpc_latency(self.exporter, method, rpc_latency, + status_code) diff --git a/src/python/grpcio_observability/grpc_observability/_observability.py b/src/python/grpcio_observability/grpc_observability/_observability.py new file mode 100644 index 00000000000..178d71f2d72 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_observability.py @@ -0,0 +1,101 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import abc +from dataclasses import dataclass +from dataclasses import field +from typing import List, Mapping, Tuple + + +class Exporter(metaclass=abc.ABCMeta): + """Abstract base class for census data exporters.""" + + @abc.abstractmethod + def export_stats_data(self, stats_data: List[TracingData]) -> None: + """Exports a list of TracingData objects to the exporter's destination. + + Args: + stats_data: A list of TracingData objects to export. + """ + raise NotImplementedError() + + @abc.abstractmethod + def export_tracing_data(self, tracing_data: List[StatsData]) -> None: + """Exports a list of StatsData objects to the exporter's destination. + + Args: + tracing_data: A list of StatsData objects to export. + """ + raise NotImplementedError() + + +@dataclass(frozen=True) +class StatsData: + """A data class representing stats data. + + Attributes: + name: An element of grpc_observability._cyobservability.MetricsName, e.g. + MetricsName.CLIENT_STARTED_RPCS. + measure_double: A bool indicate whether the metric is a floating-point + value. + value_int: The actual metric value if measure_double is False. + value_float: The actual metric value if measure_double is True. + labels: A dictionary that maps label tags associated with this metric to + corresponding label value. + """ + name: "grpc_observability._cyobservability.MetricsName" + measure_double: bool + value_int: int = 0 + value_float: float = 0.0 + labels: Mapping[str, str] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TracingData: + """A data class representing tracing data. + + Attributes: + name: The name for tracing data, also the name for the Span. + start_time: The start time for the span in RFC3339 UTC "Zulu" format, e.g. + 2014-10-02T15:01:23Z + end_time: The end time for the span in RFC3339 UTC "Zulu" format, e.g. + 2014-10-02T15:01:23Z + trace_id: The identifier for the trace associated with this span as a + 32-character hexadecimal encoded string, + e.g. 26ed0036f2eff2b7317bccce3e28d01f + span_id: The identifier for the span as a 16-character hexadecimal encoded + string. e.g. 113ec879e62583bc + parent_span_id: An option identifier for the span's parent id. + status: An element of grpc.StatusCode in string format representing the + final status for the trace data. + should_sample: A bool indicates whether the span is sampled. + child_span_count: The number of child span associated with this span. + span_labels: A dictionary that maps labels tags associated with this + span to corresponding label value. + span_annotations: A dictionary that maps annotation timeStamp with + description. The timeStamp have a format which can be converted + to Python datetime.datetime, e.g. 2023-05-29 17:07:09.895 + """ + name: str + start_time: str + end_time: str + trace_id: str + span_id: str + parent_span_id: str + status: str + should_sample: bool + child_span_count: int + span_labels: Mapping[str, str] = field(default_factory=dict) + span_annotations: List[Tuple[str, str]] = field(default_factory=list) diff --git a/src/python/grpcio_observability/grpc_observability/_open_census_exporter.py b/src/python/grpcio_observability/grpc_observability/_open_census_exporter.py new file mode 100644 index 00000000000..a78d7077057 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_open_census_exporter.py @@ -0,0 +1,33 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import List + +from grpc_observability import _observability # pytype: disable=pyi-error + +logger = logging.getLogger(__name__) + + +class OpenCensusExporter(_observability.Exporter): + + def export_stats_data(self, + stats_data: List[_observability.StatsData]) -> None: + # TODO(xuanwn): Add implementation + raise NotImplementedError() + + def export_tracing_data( + self, tracing_data: List[_observability.TracingData]) -> None: + # TODO(xuanwn): Add implementation + raise NotImplementedError() diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc new file mode 100644 index 00000000000..fc6df51f80c --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -0,0 +1,281 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/python/grpcio_observability/grpc_observability/client_call_tracer.h" + +#include +#include +#include +#include + +#include +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" + +#include + +#include "src/core/lib/experiments/experiments.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/slice/slice.h" + +namespace grpc_observability { + +constexpr uint32_t PythonOpenCensusCallTracer:: + PythonOpenCensusCallAttemptTracer::kMaxTraceContextLen; +constexpr uint32_t + PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::kMaxTagsLen; + +// +// OpenCensusCallTracer +// + +PythonOpenCensusCallTracer::PythonOpenCensusCallTracer( + const char* method, const char* trace_id, const char* parent_span_id, + bool tracing_enabled) + : method_(GetMethod(method)), tracing_enabled_(tracing_enabled) { + GenerateClientContext(absl::StrCat("Sent.", method_), + absl::string_view(trace_id), + absl::string_view(parent_span_id), &context_); +} + +void PythonOpenCensusCallTracer::GenerateContext() {} + +void PythonOpenCensusCallTracer::RecordAnnotation( + absl::string_view annotation) { + // If tracing is disabled, the following will be a no-op. + context_.AddSpanAnnotation(annotation); +} + +PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() { + if (PythonCensusStatsEnabled()) { + context_.Labels().emplace_back(kClientMethod, std::string(method_)); + RecordIntMetric(kRpcClientRetriesPerCallMeasureName, retries_ - 1, + context_.Labels()); // exclude first attempt + RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName, + transparent_retries_, context_.Labels()); + RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName, + ToDoubleMilliseconds(retry_delay_), context_.Labels()); + } + + if (tracing_enabled_) { + context_.EndSpan(); + if (IsSampled()) { + RecordSpan(context_.Span().ToCensusData()); + } + } +} + +PythonCensusContext +PythonOpenCensusCallTracer::CreateCensusContextForCallAttempt() { + auto context = PythonCensusContext(absl::StrCat("Attempt.", method_), + &(context_.Span()), context_.Labels()); + return context; +} + +PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer* +PythonOpenCensusCallTracer::StartNewAttempt(bool is_transparent_retry) { + uint64_t attempt_num; + { + grpc_core::MutexLock lock(&mu_); + if (transparent_retries_ != 0 || retries_ != 0) { + if (PythonCensusStatsEnabled() && num_active_rpcs_ == 0) { + retry_delay_ += absl::Now() - time_at_last_attempt_end_; + } + } + attempt_num = retries_; + if (is_transparent_retry) { + ++transparent_retries_; + } else { + ++retries_; + } + ++num_active_rpcs_; + } + context_.IncreaseChildSpanCount(); + return new PythonOpenCensusCallAttemptTracer(this, attempt_num, + is_transparent_retry); +} + +// +// PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer +// + +PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent, + uint64_t attempt_num, + bool is_transparent_retry) + : parent_(parent), + context_(parent_->CreateCensusContextForCallAttempt()), + start_time_(absl::Now()) { + if (parent_->tracing_enabled_) { + context_.AddSpanAttribute("previous-rpc-attempts", + absl::StrCat(attempt_num)); + context_.AddSpanAttribute("transparent-retry", + absl::StrCat(is_transparent_retry)); + } + if (!PythonCensusStatsEnabled()) { + return; + } + context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_)); + RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels()); +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) { + if (parent_->tracing_enabled_) { + char tracing_buf[kMaxTraceContextLen]; + size_t tracing_len = + TraceContextSerialize(context_, tracing_buf, kMaxTraceContextLen); + if (tracing_len > 0) { + send_initial_metadata->Set( + grpc_core::GrpcTraceBinMetadata(), + grpc_core::Slice::FromCopiedBuffer(tracing_buf, tracing_len)); + } + } + if (!PythonCensusStatsEnabled()) { + return; + } + grpc_slice tags = grpc_empty_slice(); + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + send_initial_metadata->Set(grpc_core::GrpcTagsBinMetadata(), + grpc_core::Slice(tags)); + } +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordSendMessage(const grpc_core::SliceBuffer& /*send_message*/) { + ++sent_message_count_; +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordReceivedMessage(const grpc_core::SliceBuffer& /*recv_message*/) { + ++recv_message_count_; +} + +namespace { + +// Returns 0 if no server stats are present in the metadata. +uint64_t GetElapsedTimeFromTrailingMetadata(const grpc_metadata_batch* b) { + if (!PythonCensusStatsEnabled()) { + return 0; + } + + const grpc_core::Slice* grpc_server_stats_bin_ptr = + b->get_pointer(grpc_core::GrpcServerStatsBinMetadata()); + if (grpc_server_stats_bin_ptr == nullptr) { + return 0; + } + + uint64_t elapsed_time = 0; + ServerStatsDeserialize( + reinterpret_cast(grpc_server_stats_bin_ptr->data()), + grpc_server_stats_bin_ptr->size(), &elapsed_time); + return elapsed_time; +} + +} // namespace + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordReceivedTrailingMetadata( + absl::Status status, grpc_metadata_batch* recv_trailing_metadata, + const grpc_transport_stream_stats* transport_stream_stats) { + if (!PythonCensusStatsEnabled()) { + return; + } + auto status_code_ = status.code(); + uint64_t elapsed_time = 0; + if (recv_trailing_metadata != nullptr) { + elapsed_time = GetElapsedTimeFromTrailingMetadata(recv_trailing_metadata); + } + + std::string final_status = absl::StatusCodeToString(status_code_); + context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_)); + context_.Labels().emplace_back(kClientStatus, final_status); + RecordDoubleMetric( + kRpcClientSentBytesPerRpcMeasureName, + static_cast(transport_stream_stats != nullptr + ? transport_stream_stats->outgoing.data_bytes + : 0), + context_.Labels()); + RecordDoubleMetric( + kRpcClientReceivedBytesPerRpcMeasureName, + static_cast(transport_stream_stats != nullptr + ? transport_stream_stats->incoming.data_bytes + : 0), + context_.Labels()); + RecordDoubleMetric( + kRpcClientServerLatencyMeasureName, + absl::ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time)), + context_.Labels()); + RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName, + absl::ToDoubleMilliseconds(absl::Now() - start_time_), + context_.Labels()); + if (grpc_core::IsTransportSuppliesClientLatencyEnabled()) { + if (transport_stream_stats != nullptr && + gpr_time_cmp(transport_stream_stats->latency, + gpr_inf_future(GPR_TIMESPAN)) != 0) { + double latency_ms = absl::ToDoubleMilliseconds(absl::Microseconds( + gpr_timespec_to_micros(transport_stream_stats->latency))); + RecordDoubleMetric(kRpcClientTransportLatencyMeasureName, latency_ms, + context_.Labels()); + } + } +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordCancel(absl::Status /*cancel_error*/) {} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::RecordEnd( + const gpr_timespec& /*latency*/) { + if (PythonCensusStatsEnabled()) { + context_.Labels().emplace_back(kClientMethod, + std::string(parent_->method_)); + context_.Labels().emplace_back(kClientStatus, + StatusCodeToString(status_code_)); + RecordIntMetric(kRpcClientSentMessagesPerRpcMeasureName, + sent_message_count_, context_.Labels()); + RecordIntMetric(kRpcClientReceivedMessagesPerRpcMeasureName, + recv_message_count_, context_.Labels()); + + grpc_core::MutexLock lock(&parent_->mu_); + if (--parent_->num_active_rpcs_ == 0) { + parent_->time_at_last_attempt_end_ = absl::Now(); + } + } + + if (parent_->tracing_enabled_) { + if (status_code_ != absl::StatusCode::kOk) { + context_.Span().SetStatus(StatusCodeToString(status_code_)); + } + context_.EndSpan(); + if (IsSampled()) { + RecordSpan(context_.Span().ToCensusData()); + } + } + + // After RecordEnd, Core will make no further usage of this CallAttemptTracer, + // so we are free it here. + delete this; +} + +void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: + RecordAnnotation(absl::string_view annotation) { + // If tracing is disabled, the following will be a no-op. + context_.AddSpanAnnotation(annotation); +} + +} // namespace grpc_observability diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h new file mode 100644 index 00000000000..db35da35733 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -0,0 +1,139 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H +#define GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H + +#include + +#include + +#include "absl/base/thread_annotations.h" +#include "absl/status/status.h" +#include "absl/strings/escaping.h" +#include "absl/strings/string_view.h" +#include "absl/time/time.h" + +#include + +#include "src/core/lib/channel/call_tracer.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport.h" +#include "src/python/grpcio_observability/grpc_observability/python_census_context.h" + +namespace grpc_observability { + +class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { + public: + class PythonOpenCensusCallAttemptTracer : public CallAttemptTracer { + public: + PythonOpenCensusCallAttemptTracer(PythonOpenCensusCallTracer* parent, + uint64_t attempt_num, + bool is_transparent_retry); + std::string TraceId() override { + return absl::BytesToHexString( + absl::string_view(context_.SpanContext().TraceId())); + } + + std::string SpanId() override { + return absl::BytesToHexString( + absl::string_view(context_.SpanContext().SpanId())); + } + + bool IsSampled() override { return context_.SpanContext().IsSampled(); } + + void RecordSendInitialMetadata( + grpc_metadata_batch* send_initial_metadata) override; + void RecordSendTrailingMetadata( + grpc_metadata_batch* /*send_trailing_metadata*/) override {} + void RecordSendMessage( + const grpc_core::SliceBuffer& /*send_message*/) override; + void RecordSendCompressedMessage( + const grpc_core::SliceBuffer& /*send_compressed_message*/) override {} + void RecordReceivedInitialMetadata( + grpc_metadata_batch* /*recv_initial_metadata*/) override {} + void RecordReceivedMessage( + const grpc_core::SliceBuffer& /*recv_message*/) override; + void RecordReceivedDecompressedMessage( + const grpc_core::SliceBuffer& /*recv_decompressed_message*/) override {} + void RecordReceivedTrailingMetadata( + absl::Status status, grpc_metadata_batch* recv_trailing_metadata, + const grpc_transport_stream_stats* transport_stream_stats) override; + void RecordCancel(grpc_error_handle cancel_error) override; + void RecordEnd(const gpr_timespec& /*latency*/) override; + void RecordAnnotation(absl::string_view annotation) override; + + private: + // Maximum size of trace context is sent on the wire. + static constexpr uint32_t kMaxTraceContextLen = 64; + // Maximum size of tags that are sent on the wire. + static constexpr uint32_t kMaxTagsLen = 2048; + PythonOpenCensusCallTracer* parent_; + PythonCensusContext context_; + // Start time (for measuring latency). + absl::Time start_time_; + // Number of messages in this RPC. + uint64_t recv_message_count_ = 0; + uint64_t sent_message_count_ = 0; + // End status code + absl::StatusCode status_code_; + }; + + explicit PythonOpenCensusCallTracer(const char* method, const char* trace_id, + const char* parent_span_id, + bool tracing_enabled); + ~PythonOpenCensusCallTracer() override; + + std::string TraceId() override { + return absl::BytesToHexString( + absl::string_view(context_.SpanContext().TraceId())); + } + + std::string SpanId() override { + return absl::BytesToHexString( + absl::string_view(context_.SpanContext().SpanId())); + } + + bool IsSampled() override { return context_.SpanContext().IsSampled(); } + + void GenerateContext(); + PythonOpenCensusCallAttemptTracer* StartNewAttempt( + bool is_transparent_retry) override; + + void RecordAnnotation(absl::string_view annotation) override; + + private: + PythonCensusContext CreateCensusContextForCallAttempt(); + + // Client method. + absl::string_view method_; + PythonCensusContext context_; + bool tracing_enabled_; + mutable grpc_core::Mutex mu_; + // Non-transparent attempts per call + uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0; + // Transparent retries per call + uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0; + // Retry delay + absl::Duration retry_delay_ ABSL_GUARDED_BY(&mu_); + absl::Time time_at_last_attempt_end_ ABSL_GUARDED_BY(&mu_); + uint64_t num_active_rpcs_ ABSL_GUARDED_BY(&mu_) = 0; +}; + +} // namespace grpc_observability + +#endif // GRPC_PYRHON_OPENCENSUS_CLIENT_CALL_TRACER_H diff --git a/src/python/grpcio_observability/grpc_observability/constants.h b/src/python/grpcio_observability/grpc_observability/constants.h new file mode 100644 index 00000000000..924c5c65cad --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/constants.h @@ -0,0 +1,54 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H +#define GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H + +#include + +namespace grpc_observability { + +const std::string kClientMethod = "grpc_client_method"; +const std::string kClientStatus = "grpc_client_status"; +const std::string kServerMethod = "grpc_server_method"; +const std::string kServerStatus = "grpc_server_status"; + +typedef enum { kMeasurementDouble = 0, kMeasurementInt } MeasurementType; + +typedef enum { kSpanData = 0, kMetricData } DataType; + +typedef enum { + kRpcClientApiLatencyMeasureName = 0, + kRpcClientSentMessagesPerRpcMeasureName, + kRpcClientSentBytesPerRpcMeasureName, + kRpcClientReceivedMessagesPerRpcMeasureName, + kRpcClientReceivedBytesPerRpcMeasureName, + kRpcClientRoundtripLatencyMeasureName, + kRpcClientServerLatencyMeasureName, + kRpcClientStartedRpcsMeasureName, + kRpcClientRetriesPerCallMeasureName, + kRpcClientTransparentRetriesPerCallMeasureName, + kRpcClientRetryDelayPerCallMeasureName, + kRpcClientTransportLatencyMeasureName, + kRpcServerSentMessagesPerRpcMeasureName, + kRpcServerSentBytesPerRpcMeasureName, + kRpcServerReceivedMessagesPerRpcMeasureName, + kRpcServerReceivedBytesPerRpcMeasureName, + kRpcServerServerLatencyMeasureName, + kRpcServerStartedRpcsMeasureName +} MetricsName; + +} // namespace grpc_observability + +#endif // GRPC_PYTHON_OBSERVABILITY_CONSTANTS_H diff --git a/src/python/grpcio_observability/grpc_observability/observability_util.cc b/src/python/grpcio_observability/grpc_observability/observability_util.cc new file mode 100644 index 00000000000..921a31e3d7e --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/observability_util.cc @@ -0,0 +1,224 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/python/grpcio_observability/grpc_observability/observability_util.h" + +#include +#include + +#include +#include +#include +#include + +#include "absl/status/statusor.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include + +#include "src/cpp/ext/gcp/observability_config.h" +#include "src/python/grpcio_observability/grpc_observability/client_call_tracer.h" +#include "src/python/grpcio_observability/grpc_observability/server_call_tracer.h" + +namespace grpc_observability { + +std::queue* g_census_data_buffer; +std::mutex g_census_data_buffer_mutex; +std::condition_variable g_census_data_buffer_cv; +// TODO(xuanwn): Change below to a more appropriate number. +// Assume buffer will store 100 CensusData and start export when buffer is 70% +// full. +constexpr float kExportThreshold = 0.7; +constexpr int kMaxExportBufferSize = 100; + +namespace { + +float GetExportThreadHold() { + const char* value = std::getenv("GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD"); + if (value != nullptr) { + return std::stof(value); + } + return kExportThreshold; +} + +int GetMaxExportBufferSize() { + const char* value = std::getenv("GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE"); + if (value != nullptr) { + return std::stoi(value); + } + return kMaxExportBufferSize; +} + +} // namespace + +void RecordIntMetric(MetricsName name, int64_t value, + const std::vector