From b023ae18e08129f8408d87bc1559f7cd4e265946 Mon Sep 17 00:00:00 2001 From: Xuan Wang Date: Wed, 3 Jan 2024 20:43:35 +0000 Subject: [PATCH] Revert "Automated rollback of commit d2da19e99e95d5df051227ae355efea3d9a898fb." This reverts commit 96b9e8d3e3fd93ed4c6285c2c5b60efc621a6f93. --- requirements.bazel.txt | 2 + src/python/grpcio/grpc/_channel.py | 31 ++ .../grpc/_cython/_cygrpc/channel.pxd.pxi | 3 +- .../grpc/_cython/_cygrpc/channel.pyx.pxi | 11 +- src/python/grpcio/grpc/_observability.py | 12 +- .../_parallel_compile_patch.py | 12 +- .../grpc_observability/BUILD.bazel | 22 +- .../grpc_observability/__init__.py | 7 +- .../grpc_observability/_cyobservability.pxd | 2 + .../grpc_observability/_cyobservability.pyx | 11 +- .../grpc_observability/_gcp_observability.py | 12 +- .../_open_telemetry_exporter.py | 38 +++ .../_open_telemetry_measures.py | 97 ++++++ .../_open_telemetry_observability.py | 155 +++++++++ .../_open_telemetry_plugin.py | 254 +++++++++++++++ .../grpc_observability/client_call_tracer.cc | 21 +- .../grpc_observability/client_call_tracer.h | 5 +- .../grpc_observability/constants.h | 9 +- .../grpc_observability/observability_util.cc | 6 +- .../grpc_observability/observability_util.h | 4 +- .../python_census_context.h | 4 + .../grpc_observability/server_call_tracer.cc | 4 +- .../tests/_sanity/_sanity_test.py | 9 +- .../tests/observability/BUILD.bazel | 22 +- .../observability/_observability_test.py | 305 ++++-------------- .../_open_telemetry_observability_test.py | 299 +++++++++++++++++ .../tests/observability/_test_server.py | 169 ++++++++++ src/python/grpcio_tests/tests/tests.json | 1 + .../_parallel_compile_patch.py.template | 23 ++ tools/distrib/install_all_python_modules.sh | 13 +- 30 files changed, 1269 insertions(+), 294 deletions(-) create mode 100644 src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py create mode 100644 src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py create mode 100644 src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py create mode 100644 src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py create mode 100644 src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py create mode 100644 src/python/grpcio_tests/tests/observability/_test_server.py create mode 100644 templates/src/python/grpcio_observability/_parallel_compile_patch.py.template diff --git a/requirements.bazel.txt b/requirements.bazel.txt index 0c8ec0cfbd7..e95972505d0 100644 --- a/requirements.bazel.txt +++ b/requirements.bazel.txt @@ -16,3 +16,5 @@ setuptools==44.1.1 xds-protos==0.0.11 absl-py==1.4.0 googleapis-common-protos==1.61.0 +opentelemetry-sdk==1.21.0 +opentelemetry-api==1.21.0 diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index fef906e51d5..33329d506e1 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -132,6 +132,7 @@ class _RPCState(object): rpc_start_time: Optional[float] # In relative seconds rpc_end_time: Optional[float] # In relative seconds method: Optional[str] + target: Optional[str] def __init__( self, @@ -163,6 +164,7 @@ class _RPCState(object): self.rpc_start_time = None self.rpc_end_time = None self.method = None + self.target = None # The semantics of grpc.Future.cancel and grpc.Future.cancelled are # slightly wonky, so they have to be tracked separately from the rest of the @@ -1048,6 +1050,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): _channel: cygrpc.Channel _managed_call: IntegratedCallFactory _method: bytes + _target: bytes _request_serializer: Optional[SerializingFunction] _response_deserializer: Optional[DeserializingFunction] _context: Any @@ -1058,12 +1061,14 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): channel: cygrpc.Channel, managed_call: IntegratedCallFactory, method: bytes, + target: bytes, request_serializer: Optional[SerializingFunction], response_deserializer: Optional[DeserializingFunction], ): self._channel = channel self._managed_call = managed_call self._method = method + self._target = target self._request_serializer = request_serializer self._response_deserializer = response_deserializer self._context = cygrpc.build_census_context() @@ -1123,6 +1128,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1194,6 +1200,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): event_handler = _event_handler(state, self._response_deserializer) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1213,6 +1220,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): _channel: cygrpc.Channel _method: bytes + _target: bytes _request_serializer: Optional[SerializingFunction] _response_deserializer: Optional[DeserializingFunction] _context: Any @@ -1222,11 +1230,13 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self, channel: cygrpc.Channel, method: bytes, + target: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, ): self._channel = channel self._method = method + self._target = target self._request_serializer = request_serializer self._response_deserializer = response_deserializer self._context = cygrpc.build_census_context() @@ -1278,6 +1288,7 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): operations_and_tags = tuple((ops, None) for ops in operations) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1297,6 +1308,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): _channel: cygrpc.Channel _managed_call: IntegratedCallFactory _method: bytes + _target: bytes _request_serializer: Optional[SerializingFunction] _response_deserializer: Optional[DeserializingFunction] _context: Any @@ -1307,12 +1319,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): channel: cygrpc.Channel, managed_call: IntegratedCallFactory, method: bytes, + target: bytes, request_serializer: SerializingFunction, response_deserializer: DeserializingFunction, ): self._channel = channel self._managed_call = managed_call self._method = method + self._target = target self._request_serializer = request_serializer self._response_deserializer = response_deserializer self._context = cygrpc.build_census_context() @@ -1354,6 +1368,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): ) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1374,6 +1389,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): _channel: cygrpc.Channel _managed_call: IntegratedCallFactory _method: bytes + _target: bytes _request_serializer: Optional[SerializingFunction] _response_deserializer: Optional[DeserializingFunction] _context: Any @@ -1384,12 +1400,14 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): channel: cygrpc.Channel, managed_call: IntegratedCallFactory, method: bytes, + target: bytes, request_serializer: Optional[SerializingFunction], response_deserializer: Optional[DeserializingFunction], ): self._channel = channel self._managed_call = managed_call self._method = method + self._target = target self._request_serializer = request_serializer self._response_deserializer = response_deserializer self._context = cygrpc.build_census_context() @@ -1413,6 +1431,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): ) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._channel.segregated_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1501,6 +1520,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): ) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1530,6 +1550,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): _channel: cygrpc.Channel _managed_call: IntegratedCallFactory _method: bytes + _target: bytes _request_serializer: Optional[SerializingFunction] _response_deserializer: Optional[DeserializingFunction] _context: Any @@ -1540,12 +1561,14 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): channel: cygrpc.Channel, managed_call: IntegratedCallFactory, method: bytes, + target: bytes, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ): self._channel = channel self._managed_call = managed_call self._method = method + self._target = target self._request_serializer = request_serializer self._response_deserializer = response_deserializer self._context = cygrpc.build_census_context() @@ -1579,6 +1602,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, self._response_deserializer) state.rpc_start_time = time.perf_counter() state.method = _common.decode(self._method) + state.target = _common.decode(self._target) call = self._managed_call( cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, @@ -1947,6 +1971,7 @@ class Channel(grpc.Channel): _channel: cygrpc.Channel _call_state: _ChannelCallState _connectivity_state: _ChannelConnectivityState + _target: str def __init__( self, @@ -1974,6 +1999,7 @@ class Channel(grpc.Channel): _augment_options(core_options, compression), credentials, ) + self._target = target self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) cygrpc.fork_register_channel(self) @@ -2013,6 +2039,7 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), + _common.encode(self._target), request_serializer, response_deserializer, ) @@ -2031,6 +2058,7 @@ class Channel(grpc.Channel): return _SingleThreadedUnaryStreamMultiCallable( self._channel, _common.encode(method), + _common.encode(self._target), request_serializer, response_deserializer, ) @@ -2039,6 +2067,7 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), + _common.encode(self._target), request_serializer, response_deserializer, ) @@ -2053,6 +2082,7 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), + _common.encode(self._target), request_serializer, response_deserializer, ) @@ -2067,6 +2097,7 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), + _common.encode(self._target), request_serializer, response_deserializer, ) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi index c46cdb95e1a..6e5416a9e31 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -28,12 +28,13 @@ cdef class _CallState: cdef set due # call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule cdef object call_tracer_capsule - cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except * + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except * cdef void maybe_delete_call_tracer(self) except * cdef class _ChannelState: + cdef bytes target cdef object condition cdef grpc_channel *c_channel # A boolean field indicating that the channel is open (if True) or is being diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 54119fcae69..f6db36ebde1 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -77,7 +77,7 @@ cdef class _CallState: return _observability.delete_call_tracer(self.call_tracer_capsule) - cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *: + cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *: # TODO(xuanwn): use channel args to exclude those metrics. for exclude_prefix in _observability._SERVICES_TO_EXCLUDE: if exclude_prefix in method_name: @@ -85,14 +85,15 @@ cdef class _CallState: with _observability.get_plugin() as plugin: if not (plugin and plugin.observability_enabled): return - capsule = plugin.create_client_call_tracer(method_name) + capsule = plugin.create_client_call_tracer(method_name, target) capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER) _set_call_tracer(self.c_call, capsule_ptr) self.call_tracer_capsule = capsule cdef class _ChannelState: - def __cinit__(self): + def __cinit__(self, target): + self.target = target self.condition = threading.Condition() self.open = True self.integrated_call_states = {} @@ -248,7 +249,7 @@ cdef void _call( grpc_slice_unref(method_slice) if host_slice_ptr: grpc_slice_unref(host_slice) - call_state.maybe_set_client_call_tracer_on_call(method) + call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target) if context is not None: set_census_context_on_call(call_state, context) if credentials is not None: @@ -473,7 +474,7 @@ cdef class Channel: ChannelCredentials channel_credentials): arguments = () if arguments is None else tuple(arguments) fork_handlers_and_grpc_init() - self._state = _ChannelState() + self._state = _ChannelState(target) self._state.c_call_completion_queue = ( grpc_completion_queue_create_for_next(NULL)) self._state.c_connectivity_completion_queue = ( diff --git a/src/python/grpcio/grpc/_observability.py b/src/python/grpcio/grpc/_observability.py index d4e8ce6ab7b..8bbbcd2bcea 100644 --- a/src/python/grpcio/grpc/_observability.py +++ b/src/python/grpcio/grpc/_observability.py @@ -62,7 +62,7 @@ class ObservabilityPlugin( @abc.abstractmethod def create_client_call_tracer( - self, method_name: bytes + self, method_name: bytes, target: bytes ) -> ClientCallTracerCapsule: """Creates a ClientCallTracerCapsule. @@ -75,6 +75,7 @@ class ObservabilityPlugin( Args: method_name: The method name of the call in byte format. + target: The channel target of the call in byte format. Returns: A PyCapsule which stores a ClientCallTracer object. @@ -140,7 +141,7 @@ class ObservabilityPlugin( @abc.abstractmethod def record_rpc_latency( - self, method: str, rpc_latency: float, status_code: Any + self, method: str, target: str, rpc_latency: float, status_code: Any ) -> None: """Record the latency of the RPC. @@ -149,7 +150,8 @@ class ObservabilityPlugin( Args: method: The fully-qualified name of the RPC method being invoked. - rpc_latency: The latency for the RPC, equals to the time between + target: The target name of the RPC method being invoked. + rpc_latency: The latency for the RPC in seconds, equals to the time between when the client invokes the RPC and when the client receives the status. status_code: An element of grpc.StatusCode in string format representing the final status for the RPC. @@ -280,4 +282,6 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: return rpc_latency_s = state.rpc_end_time - state.rpc_start_time rpc_latency_ms = rpc_latency_s * 1000 - plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code) + plugin.record_rpc_latency( + state.method, state.target, rpc_latency_ms, state.code + ) diff --git a/src/python/grpcio_observability/_parallel_compile_patch.py b/src/python/grpcio_observability/_parallel_compile_patch.py index 34ba5379216..9c98aa92436 100644 --- a/src/python/grpcio_observability/_parallel_compile_patch.py +++ b/src/python/grpcio_observability/_parallel_compile_patch.py @@ -11,6 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# +# This file has been automatically generated from a template file. +# Please make modifications to +# `$REPO_ROOT/templates/src/python/grpcio/_parallel_compile_patch.py.template` +# instead. This file can be regenerated from the template by running +# `tools/buildgen/generate_projects.sh`. + """Patches the compile() to allow enable parallel compilation of C/C++. build_ext has lots of C/C++ files and normally them one by one. @@ -27,10 +34,11 @@ except KeyError: import multiprocessing BUILD_EXT_COMPILER_JOBS = multiprocessing.cpu_count() +except ValueError: + BUILD_EXT_COMPILER_JOBS = 1 # monkey-patch for parallel compilation -# TODO(xuanwn): Use a template for this file. def _parallel_compile( self, sources, @@ -45,7 +53,7 @@ def _parallel_compile( # setup the same way as distutils.ccompiler.CCompiler # https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564 macros, objects, extra_postargs, pp_opts, build = self._setup_compile( - output_dir, macros, include_dirs, sources, depends, extra_postargs + str(output_dir), macros, include_dirs, sources, depends, extra_postargs ) cc_args = self._get_cc_args(pp_opts, debug, extra_preargs) diff --git a/src/python/grpcio_observability/grpc_observability/BUILD.bazel b/src/python/grpcio_observability/grpc_observability/BUILD.bazel index 2fbfb01aa09..afb35e43c3f 100644 --- a/src/python/grpcio_observability/grpc_observability/BUILD.bazel +++ b/src/python/grpcio_observability/grpc_observability/BUILD.bazel @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +load("@grpc_python_dependencies//:requirements.bzl", "requirement") load("//bazel:cython_library.bzl", "pyx_library") package(default_visibility = ["//visibility:private"]) @@ -52,17 +53,27 @@ pyx_library( ], ) -# Since `opencensus` and `opencensus-ext-stackdriver` are non-hermetic, +# Since `opentelemetry-sdk` and `opentelemetry-api` are non-hermetic, # pyobservability is for internal use only. +py_library( + name = "_opentelemetry_observability", + srcs = [ + "_open_telemetry_exporter.py", + "_open_telemetry_measures.py", + "_open_telemetry_observability.py", + "_open_telemetry_plugin.py", + ], + deps = [ + requirement("opentelemetry-sdk"), + requirement("opentelemetry-api"), + ], +) + py_library( name = "pyobservability", srcs = [ "__init__.py", - "_gcp_observability.py", - "_measures.py", "_observability.py", - "_observability_config.py", - "_views.py", ], imports = [ ".", @@ -73,6 +84,7 @@ py_library( "//:__subpackages__", ], deps = [ + ":_opentelemetry_observability", ":cyobservability", ], ) diff --git a/src/python/grpcio_observability/grpc_observability/__init__.py b/src/python/grpcio_observability/grpc_observability/__init__.py index 6a10a17fb0b..7dad3d83efe 100644 --- a/src/python/grpcio_observability/grpc_observability/__init__.py +++ b/src/python/grpcio_observability/grpc_observability/__init__.py @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from grpc_observability._gcp_observability import GCPOpenCensusObservability +from grpc_observability._open_telemetry_observability import ( + OpenTelemetryObservability, +) +from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin -__all__ = ("GCPOpenCensusObservability",) +__all__ = ("OpenTelemetryObservability", "OpenTelemetryPlugin") diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd index 9017099e30f..93a077b8679 100644 --- a/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pxd @@ -76,6 +76,7 @@ cdef extern from "observability_util.h" namespace "grpc_observability": cdef cGcpObservabilityConfig ReadAndActivateObservabilityConfig() nogil cdef void NativeObservabilityInit() except + cdef void* CreateClientCallTracer(const char* method, + const char* target, const char* trace_id, const char* parent_span_id) except + cdef void* CreateServerCallTracerFactory() except + @@ -144,6 +145,7 @@ cdef extern from "constants.h" namespace "grpc_observability": kRpcServerStartedRpcsMeasureName string kClientMethod + string kClientTarget string kClientStatus cdef extern from "sampler.h" namespace "grpc_observability": diff --git a/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx index f48674e01e2..236601770ee 100644 --- a/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx +++ b/src/python/grpcio_observability/grpc_observability/_cyobservability.pyx @@ -111,18 +111,22 @@ def activate_config(object py_config) -> None: if (py_config.stats_enabled): EnablePythonCensusStats(True); +def activate_stats() -> None: + EnablePythonCensusStats(True); -def create_client_call_tracer(bytes method_name, bytes trace_id, + +def create_client_call_tracer(bytes method_name, bytes target, bytes trace_id, bytes parent_span_id=b'') -> cpython.PyObject: """Create a ClientCallTracer and save to PyCapsule. Returns: A grpc_observability._observability.ClientCallTracerCapsule object. """ cdef char* c_method = cpython.PyBytes_AsString(method_name) + cdef char* c_target = cpython.PyBytes_AsString(target) cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id) cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id) - cdef void* call_tracer = CreateClientCallTracer(c_method, c_trace_id, c_parent_span_id) + cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id) capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL) return capsule @@ -244,7 +248,7 @@ def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels, span_annotations = py_span_annotations) -def _record_rpc_latency(object exporter, str method, float rpc_latency, str status_code) -> None: +def _record_rpc_latency(object exporter, str method, str target, float rpc_latency, str status_code) -> None: exporter: _observability.Exporter measurement = {} @@ -254,6 +258,7 @@ def _record_rpc_latency(object exporter, str method, float rpc_latency, str stat labels = {} labels[_decode(kClientMethod)] = method.strip("/") + labels[_decode(kClientTarget)] = target labels[_decode(kClientStatus)] = status_code metric = _get_stats_data(measurement, labels) exporter.export_stats_data([metric]) diff --git a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py index 58e9a9d47a8..797c4797d01 100644 --- a/src/python/grpcio_observability/grpc_observability/_gcp_observability.py +++ b/src/python/grpcio_observability/grpc_observability/_gcp_observability.py @@ -118,11 +118,11 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): grpc._observability.observability_deinit() def create_client_call_tracer( - self, method_name: bytes + self, method_name: bytes, target: bytes ) -> ClientCallTracerCapsule: trace_id = b"TRACE_ID" capsule = _cyobservability.create_client_call_tracer( - method_name, trace_id + method_name, target, trace_id ) return capsule @@ -143,9 +143,13 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin): pass def record_rpc_latency( - self, method: str, rpc_latency: float, status_code: grpc.StatusCode + self, + method: str, + target: str, + rpc_latency: float, + status_code: grpc.StatusCode, ) -> None: status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") _cyobservability._record_rpc_latency( - self.exporter, method, rpc_latency, status_code + self.exporter, method, target, rpc_latency, status_code ) diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py new file mode 100644 index 00000000000..be2738a8595 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py @@ -0,0 +1,38 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterable, List + +from grpc_observability import _observability # pytype: disable=pyi-error +from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin + + +class _OpenTelemetryExporterDelegator(_observability.Exporter): + _plugins: Iterable[_OpenTelemetryPlugin] + + def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]): + self._plugins = plugins + + def export_stats_data( + self, stats_data: List[_observability.StatsData] + ) -> None: + # Records stats data to MeterProvider. + for data in stats_data: + for plugin in self._plugins: + plugin.maybe_record_stats_data(data) + + def export_tracing_data( + self, tracing_data: List[_observability.TracingData] + ) -> None: + pass diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py new file mode 100644 index 00000000000..209ecd22c91 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py @@ -0,0 +1,97 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +from typing import List + +from grpc_observability._cyobservability import MetricsName + + +class Metric( + collections.namedtuple( + "Metric", + ["name", "cyname", "unit", "description"], + ) +): + pass + + +CLIENT_ATTEMPT_STARTED = Metric( + "grpc.client.attempt.started", + MetricsName.CLIENT_STARTED_RPCS, + "{attempt}", + "Number of client call attempts started", +) +CLIENT_ATTEMPT_DURATION = Metric( + "grpc.client.attempt.duration", + MetricsName.CLIENT_ROUNDTRIP_LATENCY, + "s", + "End-to-end time taken to complete a client call attempt", +) +CLIENT_RPC_DURATION = Metric( + "grpc.client.call.duration", + MetricsName.CLIENT_API_LATENCY, + "s", + "End-to-end time taken to complete a call from client's perspective", +) +CLIENT_ATTEMPT_SEND_BYTES = Metric( + "grpc.client.attempt.sent_total_compressed_message_size", + MetricsName.CLIENT_SEND_BYTES_PER_RPC, + "By", + "Compressed message bytes sent per client call attempt", +) +CLIENT_ATTEMPT_RECEIVED_BYTES = Metric( + "grpc.client.attempt.rcvd_total_compressed_message_size", + MetricsName.CLIENT_RECEIVED_BYTES_PER_RPC, + "By", + "Compressed message bytes received per call attempt", +) +SERVER_STARTED_RPCS = Metric( + "grpc.server.call.started", + MetricsName.SERVER_STARTED_RPCS, + "{call}", + "Number of server calls started", +) +SERVER_RPC_DURATION = Metric( + "grpc.server.call.duration", + MetricsName.SERVER_SERVER_LATENCY, + "s", + "End-to-end time taken to complete a call from server transport's perspective", +) +SERVER_RPC_SEND_BYTES = Metric( + "grpc.server.call.sent_total_compressed_message_size", + MetricsName.SERVER_SENT_BYTES_PER_RPC, + "By", + "Compressed message bytes sent per server call", +) +SERVER_RPC_RECEIVED_BYTES = Metric( + "grpc.server.call.rcvd_total_compressed_message_size", + MetricsName.SERVER_RECEIVED_BYTES_PER_RPC, + "By", + "Compressed message bytes received per server call", +) + + +def base_metrics() -> List[Metric]: + return [ + CLIENT_ATTEMPT_STARTED, + CLIENT_ATTEMPT_DURATION, + CLIENT_RPC_DURATION, + CLIENT_ATTEMPT_SEND_BYTES, + CLIENT_ATTEMPT_RECEIVED_BYTES, + SERVER_STARTED_RPCS, + SERVER_RPC_DURATION, + SERVER_RPC_SEND_BYTES, + SERVER_RPC_RECEIVED_BYTES, + ] diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py new file mode 100644 index 00000000000..8f3a96475df --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py @@ -0,0 +1,155 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Any, Iterable, Optional + +import grpc + +# pytype: disable=pyi-error +from grpc_observability import _cyobservability +from grpc_observability._open_telemetry_exporter import ( + _OpenTelemetryExporterDelegator, +) +from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin +from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin + +_LOGGER = logging.getLogger(__name__) + +ClientCallTracerCapsule = Any # it appears only once in the function signature +ServerCallTracerFactoryCapsule = ( + Any # it appears only once in the function signature +) +grpc_observability = Any # grpc_observability.py imports this module. + +GRPC_STATUS_CODE_TO_STRING = { + grpc.StatusCode.OK: "OK", + grpc.StatusCode.CANCELLED: "CANCELLED", + grpc.StatusCode.UNKNOWN: "UNKNOWN", + grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT", + grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED", + grpc.StatusCode.NOT_FOUND: "NOT_FOUND", + grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS", + grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED", + grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED", + grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED", + grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION", + grpc.StatusCode.ABORTED: "ABORTED", + grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE", + grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED", + grpc.StatusCode.INTERNAL: "INTERNAL", + grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE", + grpc.StatusCode.DATA_LOSS: "DATA_LOSS", +} + + +# pylint: disable=no-self-use +class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin): + """OpenTelemetry based plugin implementation. + + Args: + exporter: Exporter used to export data. + plugin: OpenTelemetryPlugin to enable. + """ + + exporter: "grpc_observability.Exporter" + plugins: Iterable[OpenTelemetryPlugin] + + def __init__( + self, + *, + plugins: Optional[Iterable[OpenTelemetryPlugin]] = None, + exporter: "grpc_observability.Exporter" = None, + ): + _plugins = [] + if plugins: + for plugin in plugins: + _plugins.append(_OpenTelemetryPlugin(plugin)) + + if exporter: + self.exporter = exporter + else: + self.exporter = _OpenTelemetryExporterDelegator(_plugins) + + try: + _cyobservability.activate_stats() + self.set_stats(True) + except Exception as e: # pylint: disable=broad-except + raise ValueError(f"Activate observability metrics failed with: {e}") + + def __enter__(self): + try: + _cyobservability.cyobservability_init(self.exporter) + # TODO(xuanwn): Use specific exceptons + except Exception as e: # pylint: disable=broad-except + _LOGGER.exception("Initiate observability failed with: %s", e) + + grpc._observability.observability_init(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.exit() + + def exit(self) -> None: + # Sleep so we don't loss any data. If we shutdown export thread + # immediately after exit, it's possible that core didn't call RecordEnd + # in callTracer, and all data recorded by calling RecordEnd will be + # lost. + # CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in + # AwaitNextBatchLocked. + # TODO(xuanwn): explicit synchronization + # https://github.com/grpc/grpc/issues/33262 + time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS) + self.set_tracing(False) + self.set_stats(False) + _cyobservability.observability_deinit() + grpc._observability.observability_deinit() + + def create_client_call_tracer( + self, method_name: bytes, target: bytes + ) -> ClientCallTracerCapsule: + trace_id = b"TRACE_ID" + capsule = _cyobservability.create_client_call_tracer( + method_name, target, trace_id + ) + return capsule + + def create_server_call_tracer_factory( + self, + ) -> ServerCallTracerFactoryCapsule: + capsule = _cyobservability.create_server_call_tracer_factory_capsule() + return capsule + + def delete_client_call_tracer( + self, client_call_tracer: ClientCallTracerCapsule + ) -> None: + _cyobservability.delete_client_call_tracer(client_call_tracer) + + def save_trace_context( + self, trace_id: str, span_id: str, is_sampled: bool + ) -> None: + pass + + def record_rpc_latency( + self, + method: str, + target: str, + rpc_latency: float, + status_code: grpc.StatusCode, + ) -> None: + status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN") + _cyobservability._record_rpc_latency( + self.exporter, method, target, rpc_latency, status_code + ) diff --git a/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py b/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py new file mode 100644 index 00000000000..823d30fa6b4 --- /dev/null +++ b/src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py @@ -0,0 +1,254 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import Dict, Iterable, List, Optional, Union + +# pytype: disable=pyi-error +import grpc +from grpc_observability import _open_telemetry_measures +from grpc_observability._cyobservability import MetricsName +from grpc_observability._observability import StatsData +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics import Histogram +from opentelemetry.sdk.metrics import Meter +from opentelemetry.sdk.metrics import MeterProvider + +GRPC_METHOD_LABEL = "grpc.method" +GRPC_TARGET_LABEL = "grpc.target" +GRPC_OTHER_LABEL_VALUE = "other" + + +class OpenTelemetryLabelInjector(abc.ABC): + """ + An interface that allows you to add additional labels on the calls traced. + + Please note that this class is still work in progress and NOT READY to be used. + """ + + _labels: List[Dict[str, str]] + + def __init__(self): + # Calls Python OTel API to detect resource and get labels, save + # those lables to OpenTelemetryLabelInjector.labels. + pass + + @abc.abstractmethod + def get_labels(self): + # Get additional labels for this OpenTelemetryLabelInjector. + raise NotImplementedError() + + +class OpenTelemetryPluginOption(abc.ABC): + """ + An interface that allows you to add additional function to OpenTelemetryPlugin. + + Please note that this class is still work in progress and NOT READY to be used. + """ + + @abc.abstractmethod + def is_active_on_method(self, method: str) -> bool: + """Determines whether this plugin option is active on a given method. + + Args: + method: Required. The RPC method, for example: `/helloworld.Greeter/SayHello`. + + Returns: + True if this this plugin option is active on the giving method, false otherwise. + """ + raise NotImplementedError() + + @abc.abstractmethod + def is_active_on_server(self, channel_args: List[str]) -> bool: + """Determines whether this plugin option is active on a given server. + + Args: + channel_args: Required. The channel args used for server. + TODO(xuanwn): detail on what channel_args will contain. + + Returns: + True if this this plugin option is active on the server, false otherwise. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_label_injector(self) -> Optional[OpenTelemetryLabelInjector]: + # Returns the LabelsInjector used by this plugin option, or None. + raise NotImplementedError() + + +# pylint: disable=no-self-use +class OpenTelemetryPlugin: + """Describes a Plugin for OpenTelemetry observability.""" + + def get_plugin_options( + self, + ) -> Iterable[OpenTelemetryPluginOption]: + return [] + + def get_meter_provider(self) -> Optional[MeterProvider]: + return None + + def target_attribute_filter( + self, target: str # pylint: disable=unused-argument + ) -> bool: + """ + If set, this will be called per channel to decide whether to record the + target attribute on client or to replace it with "other". + This helps reduce the cardinality on metrics in cases where many channels + are created with different targets in the same binary (which might happen + for example, if the channel target string uses IP addresses directly). + + Args: + target: The target for the RPC. + + Returns: + bool: True means the original target string will be used, False means target string + will be replaced with "other". + """ + return True + + def generic_method_attribute_filter( + self, method: str # pylint: disable=unused-argument + ) -> bool: + """ + If set, this will be called with a generic method type to decide whether to + record the method name or to replace it with "other". + + Note that pre-registered methods will always be recorded no matter what this + function returns. + + Args: + method: The method name for the RPC. + + Returns: + bool: True means the original method name will be used, False means method name + will be replaced with "other". + """ + return False + + +class _OpenTelemetryPlugin: + _plugin: OpenTelemetryPlugin + _metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]] + + def __init__(self, plugin: OpenTelemetryPlugin): + self._plugin = plugin + self._metric_to_recorder = dict() + + meter_provider = self._plugin.get_meter_provider() + if meter_provider: + meter = meter_provider.get_meter("grpc-python", grpc.__version__) + enabled_metrics = _open_telemetry_measures.base_metrics() + self._metric_to_recorder = self._register_metrics( + meter, enabled_metrics + ) + + def _should_record(self, stats_data: StatsData) -> bool: + # Decide if this plugin should record the stats_data. + return stats_data.name in self._metric_to_recorder.keys() + + def _record_stats_data(self, stats_data: StatsData) -> None: + recorder = self._metric_to_recorder[stats_data.name] + + target = stats_data.labels.get(GRPC_TARGET_LABEL, "") + if not self._plugin.target_attribute_filter(target): + # Filter target name. + stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE + + method = stats_data.labels.get(GRPC_METHOD_LABEL, "") + if not self._plugin.generic_method_attribute_filter(method): + # Filter method name. + stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE + + value = 0 + if stats_data.measure_double: + value = stats_data.value_float + else: + value = stats_data.value_int + if isinstance(recorder, Counter): + recorder.add(value, attributes=stats_data.labels) + elif isinstance(recorder, Histogram): + recorder.record(value, attributes=stats_data.labels) + + # pylint: disable=no-self-use + def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None: + # Records stats data to MeterProvider. + if self._should_record(stats_data): + self._record_stats_data(stats_data) + + def _register_metrics( + self, meter: Meter, metrics: List[_open_telemetry_measures.Metric] + ) -> Dict[MetricsName, Union[Counter, Histogram]]: + metric_to_recorder_map = {} + recorder = None + for metric in metrics: + if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED: + recorder = meter.create_counter( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif ( + metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES + ): + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS: + recorder = meter.create_counter( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.SERVER_RPC_DURATION: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES: + recorder = meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + metric_to_recorder_map[metric.cyname] = recorder + return metric_to_recorder_map diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc index 46a091a8fa2..d29aae548a4 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.cc @@ -41,9 +41,11 @@ constexpr uint32_t // PythonOpenCensusCallTracer::PythonOpenCensusCallTracer( - const char* method, const char* trace_id, const char* parent_span_id, - bool tracing_enabled) - : method_(GetMethod(method)), tracing_enabled_(tracing_enabled) { + const char* method, const char* target, const char* trace_id, + const char* parent_span_id, bool tracing_enabled) + : method_(GetMethod(method)), + target_(GetTarget(target)), + tracing_enabled_(tracing_enabled) { GenerateClientContext(absl::StrCat("Sent.", method_), absl::string_view(trace_id), absl::string_view(parent_span_id), &context_); @@ -84,7 +86,7 @@ PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() { RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName, transparent_retries_, context_.Labels()); RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName, - ToDoubleMilliseconds(retry_delay_), context_.Labels()); + ToDoubleSeconds(retry_delay_), context_.Labels()); } if (tracing_enabled_) { @@ -146,6 +148,7 @@ PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: return; } context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_)); + context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_)); RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels()); } @@ -225,6 +228,7 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: std::string final_status = absl::StatusCodeToString(status_code_); context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_)); + context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_)); context_.Labels().emplace_back(kClientStatus, final_status); RecordDoubleMetric( kRpcClientSentBytesPerRpcMeasureName, @@ -238,12 +242,11 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer:: ? transport_stream_stats->incoming.data_bytes : 0), context_.Labels()); - RecordDoubleMetric( - kRpcClientServerLatencyMeasureName, - absl::ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time)), - context_.Labels()); + RecordDoubleMetric(kRpcClientServerLatencyMeasureName, + absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)), + context_.Labels()); RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName, - absl::ToDoubleMilliseconds(absl::Now() - start_time_), + absl::ToDoubleSeconds(absl::Now() - start_time_), context_.Labels()); RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels()); } diff --git a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h index 47a279865af..f51edc50620 100644 --- a/src/python/grpcio_observability/grpc_observability/client_call_tracer.h +++ b/src/python/grpcio_observability/grpc_observability/client_call_tracer.h @@ -90,7 +90,8 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { absl::StatusCode status_code_; }; - explicit PythonOpenCensusCallTracer(const char* method, const char* trace_id, + explicit PythonOpenCensusCallTracer(const char* method, const char* target, + const char* trace_id, const char* parent_span_id, bool tracing_enabled); ~PythonOpenCensusCallTracer() override; @@ -119,6 +120,8 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer { // Client method. absl::string_view method_; + // Client target. + absl::string_view target_; PythonCensusContext context_; bool tracing_enabled_; mutable grpc_core::Mutex mu_; diff --git a/src/python/grpcio_observability/grpc_observability/constants.h b/src/python/grpcio_observability/grpc_observability/constants.h index 87dba68670a..79212599487 100644 --- a/src/python/grpcio_observability/grpc_observability/constants.h +++ b/src/python/grpcio_observability/grpc_observability/constants.h @@ -19,10 +19,11 @@ namespace grpc_observability { -const std::string kClientMethod = "grpc_client_method"; -const std::string kClientStatus = "grpc_client_status"; -const std::string kServerMethod = "grpc_server_method"; -const std::string kServerStatus = "grpc_server_status"; +const std::string kClientMethod = "grpc.method"; +const std::string kClientTarget = "grpc.target"; +const std::string kClientStatus = "grpc.status"; +const std::string kServerMethod = "grpc.method"; +const std::string kServerStatus = "grpc.status"; typedef enum { kMeasurementDouble = 0, kMeasurementInt } MeasurementType; diff --git a/src/python/grpcio_observability/grpc_observability/observability_util.cc b/src/python/grpcio_observability/grpc_observability/observability_util.cc index cbbab6a8c93..6af56d45601 100644 --- a/src/python/grpcio_observability/grpc_observability/observability_util.cc +++ b/src/python/grpcio_observability/grpc_observability/observability_util.cc @@ -91,10 +91,10 @@ void NativeObservabilityInit() { g_census_data_buffer = new std::queue; } -void* CreateClientCallTracer(const char* method, const char* trace_id, - const char* parent_span_id) { +void* CreateClientCallTracer(const char* method, const char* target, + const char* trace_id, const char* parent_span_id) { void* client_call_tracer = new PythonOpenCensusCallTracer( - method, trace_id, parent_span_id, PythonCensusTracingEnabled()); + method, target, trace_id, parent_span_id, PythonCensusTracingEnabled()); return client_call_tracer; } diff --git a/src/python/grpcio_observability/grpc_observability/observability_util.h b/src/python/grpcio_observability/grpc_observability/observability_util.h index 6b4c1f41a3f..593abc4062f 100644 --- a/src/python/grpcio_observability/grpc_observability/observability_util.h +++ b/src/python/grpcio_observability/grpc_observability/observability_util.h @@ -50,8 +50,8 @@ extern std::queue* g_census_data_buffer; extern std::mutex g_census_data_buffer_mutex; extern std::condition_variable g_census_data_buffer_cv; -void* CreateClientCallTracer(const char* method, const char* trace_id, - const char* parent_span_id); +void* CreateClientCallTracer(const char* method, const char* target, + const char* trace_id, const char* parent_span_id); void* CreateServerCallTracerFactory(); diff --git a/src/python/grpcio_observability/grpc_observability/python_census_context.h b/src/python/grpcio_observability/grpc_observability/python_census_context.h index 6d41a7988de..bca0ae65da1 100644 --- a/src/python/grpcio_observability/grpc_observability/python_census_context.h +++ b/src/python/grpcio_observability/grpc_observability/python_census_context.h @@ -278,6 +278,10 @@ inline absl::string_view GetMethod(const char* method) { return absl::StripPrefix(absl::string_view(method), "/"); } +inline absl::string_view GetTarget(const char* target) { + return absl::string_view(target); +} + // Fills a pre-allocated buffer with the value for the grpc-trace-bin header. // The buffer must be at least kGrpcTraceBinHeaderLen bytes long. void ToGrpcTraceBinHeader(const PythonCensusContext& ctx, uint8_t* out); diff --git a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc index dd7a4826eaf..722211a5c9e 100644 --- a/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc +++ b/src/python/grpcio_observability/grpc_observability/server_call_tracer.cc @@ -226,7 +226,7 @@ void PythonOpenCensusServerCallTracer::RecordEnd( if (PythonCensusStatsEnabled()) { const uint64_t request_size = GetOutgoingDataSize(final_info); const uint64_t response_size = GetIncomingDataSize(final_info); - double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); + double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_); context_.Labels().emplace_back(kServerMethod, std::string(method_)); context_.Labels().emplace_back( kServerStatus, @@ -235,7 +235,7 @@ void PythonOpenCensusServerCallTracer::RecordEnd( static_cast(response_size), context_.Labels()); RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName, static_cast(request_size), context_.Labels()); - RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_ms, + RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s, context_.Labels()); RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels()); RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName, diff --git a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py index 51e23f54e3b..66d920bda56 100644 --- a/src/python/grpcio_tests/tests/_sanity/_sanity_test.py +++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py @@ -42,6 +42,7 @@ class SanityTest(unittest.TestCase): tests_json_string = pkgutil.get_data(self.TEST_PKG_PATH, "tests.json") tests_json = json.loads(tests_json_string.decode()) + final_tests = [] # Observability is not supported in Windows and MacOS and Asyncio. if ( @@ -50,10 +51,12 @@ class SanityTest(unittest.TestCase): or self.TEST_PKG_PATH == "tests_aio" ): for test_case in tests_json: - if "_observability_test" in test_case: - tests_json.remove(test_case) + if "observability" not in test_case: + final_tests.append(test_case) + else: + final_tests = tests_json - self.assertSequenceEqual(tests_json, test_suite_names) + self.assertSequenceEqual(final_tests, test_suite_names) self.assertGreater(len(test_suite_names), 0) diff --git a/src/python/grpcio_tests/tests/observability/BUILD.bazel b/src/python/grpcio_tests/tests/observability/BUILD.bazel index a66724dd0d4..29bd6a90e60 100644 --- a/src/python/grpcio_tests/tests/observability/BUILD.bazel +++ b/src/python/grpcio_tests/tests/observability/BUILD.bazel @@ -13,13 +13,33 @@ # limitations under the License. package(default_visibility = ["//visibility:public"]) +py_library( + name = "test_server", + srcs = ["_test_server.py"], +) + py_test( name = "_observability_test", size = "small", - srcs = glob(["*.py"]), + srcs = ["_observability_test.py"], imports = ["../../"], main = "_observability_test.py", deps = [ + ":test_server", + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_observability/grpc_observability:pyobservability", + "//src/python/grpcio_tests/tests/testing", + ], +) + +py_test( + name = "_open_telemetry_observability_test", + size = "small", + srcs = ["_open_telemetry_observability_test.py"], + imports = ["../../"], + main = "_open_telemetry_observability_test.py", + deps = [ + ":test_server", "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_observability/grpc_observability:pyobservability", "//src/python/grpcio_tests/tests/testing", diff --git a/src/python/grpcio_tests/tests/observability/_observability_test.py b/src/python/grpcio_tests/tests/observability/_observability_test.py index 854844f341a..55450b035d9 100644 --- a/src/python/grpcio_tests/tests/observability/_observability_test.py +++ b/src/python/grpcio_tests/tests/observability/_observability_test.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent import futures -import json import logging import os -import random import sys -from typing import Any, Dict, List +from typing import List import unittest import grpc @@ -26,41 +23,11 @@ import grpc_observability from grpc_observability import _cyobservability from grpc_observability import _observability -logger = logging.getLogger(__name__) +from tests.observability import _test_server -_REQUEST = b"\x00\x00\x00" -_RESPONSE = b"\x00\x00\x00" +logger = logging.getLogger(__name__) -_UNARY_UNARY = "/test/UnaryUnary" -_UNARY_STREAM = "/test/UnaryStream" -_STREAM_UNARY = "/test/StreamUnary" -_STREAM_STREAM = "/test/StreamStream" STREAM_LENGTH = 5 -TRIGGER_RPC_METADATA = ("control", "trigger_rpc") -TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "") - -CONFIG_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG" -CONFIG_FILE_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE" - -_VALID_CONFIG_TRACING_STATS = { - "project_id": "test-project", - "cloud_trace": {"sampling_rate": 1.00}, - "cloud_monitoring": {}, -} -_VALID_CONFIG_TRACING_ONLY = { - "project_id": "test-project", - "cloud_trace": {"sampling_rate": 1.00}, -} -_VALID_CONFIG_STATS_ONLY = { - "project_id": "test-project", - "cloud_monitoring": {}, -} -_VALID_CONFIG_STATS_ONLY_STR = """ -{ - 'project_id': 'test-project', - 'cloud_monitoring': {} -} -""" class TestExporter(_observability.Exporter): @@ -84,69 +51,17 @@ class TestExporter(_observability.Exporter): self.span_collecter.extend(tracing_data) -def handle_unary_unary(request, servicer_context): - if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata(): - for k, v in servicer_context.invocation_metadata(): - if "port" in k: - unary_unary_call(port=int(v)) - if "to_new_server" in k: - second_server = grpc.server( - futures.ThreadPoolExecutor(max_workers=10) - ) - second_server.add_generic_rpc_handlers((_GenericHandler(),)) - second_server_port = second_server.add_insecure_port("[::]:0") - second_server.start() - unary_unary_call(port=second_server_port) - second_server.stop(0) - return _RESPONSE - - -def handle_unary_stream(request, servicer_context): - for _ in range(STREAM_LENGTH): - yield _RESPONSE - - -def handle_stream_unary(request_iterator, servicer_context): - return _RESPONSE - - -def handle_stream_stream(request_iterator, servicer_context): - for request in request_iterator: - yield _RESPONSE - - -class _MethodHandler(grpc.RpcMethodHandler): - def __init__(self, request_streaming, response_streaming): - self.request_streaming = request_streaming - self.response_streaming = response_streaming - self.request_deserializer = None - self.response_serializer = None - self.unary_unary = None - self.unary_stream = None - self.stream_unary = None - self.stream_stream = None - if self.request_streaming and self.response_streaming: - self.stream_stream = handle_stream_stream - elif self.request_streaming: - self.stream_unary = handle_stream_unary - elif self.response_streaming: - self.unary_stream = handle_unary_stream - else: - self.unary_unary = handle_unary_unary - - -class _GenericHandler(grpc.GenericRpcHandler): - def service(self, handler_call_details): - if handler_call_details.method == _UNARY_UNARY: - return _MethodHandler(False, False) - elif handler_call_details.method == _UNARY_STREAM: - return _MethodHandler(False, True) - elif handler_call_details.method == _STREAM_UNARY: - return _MethodHandler(True, False) - elif handler_call_details.method == _STREAM_STREAM: - return _MethodHandler(True, True) - else: - return None +class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor): + def intercept_unary_unary( + self, continuation, client_call_details, request_or_iterator + ): + response = continuation(client_call_details, request_or_iterator) + return response + + +class _ServerInterceptor(grpc.ServerInterceptor): + def intercept_service(self, continuation, handler_call_details): + return continuation(handler_call_details) @unittest.skipIf( @@ -162,173 +77,118 @@ class ObservabilityTest(unittest.TestCase): self._port = None def tearDown(self): - os.environ[CONFIG_ENV_VAR_NAME] = "" - os.environ[CONFIG_FILE_ENV_VAR_NAME] = "" if self._server: self._server.stop(0) def testRecordUnaryUnary(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_unary_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + _test_server.unary_unary_call(port=port) self.assertGreater(len(self.all_metric), 0) self._validate_metrics(self.all_metric) - def testThrowErrorWithoutConfig(self): - with self.assertRaises(ValueError): - with grpc_observability.GCPOpenCensusObservability( - exporter=self.test_exporter - ): - pass - - def testThrowErrorWithInvalidConfig(self): - _INVALID_CONFIG = "INVALID" - self._set_config_file(_INVALID_CONFIG) - with self.assertRaises(ValueError): - with grpc_observability.GCPOpenCensusObservability( - exporter=self.test_exporter - ): - pass - - def testNoErrorAndDataWithEmptyConfig(self): - _EMPTY_CONFIG = {} - self._set_config_file(_EMPTY_CONFIG) - # Empty config still require project_id - os.environ["GCP_PROJECT"] = "test-project" - with grpc_observability.GCPOpenCensusObservability( + def testRecordUnaryUnaryWithClientInterceptor(self): + interceptor = _ClientUnaryUnaryInterceptor() + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_unary_call(port=self._port) - - self.assertEqual(len(self.all_metric), 0) + server, port = _test_server.start_server() + self._server = server + _test_server.intercepted_unary_unary_call( + port=port, interceptors=interceptor + ) - def testThrowErrorWhenCallingMultipleInit(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with self.assertRaises(ValueError): - with grpc_observability.GCPOpenCensusObservability( - exporter=self.test_exporter - ) as o11y: - grpc._observability.observability_init(o11y) + self.assertGreater(len(self.all_metric), 0) + self._validate_metrics(self.all_metric) - def testRecordUnaryUnaryStatsOnly(self): - self._set_config_file(_VALID_CONFIG_STATS_ONLY) - with grpc_observability.GCPOpenCensusObservability( + def testRecordUnaryUnaryWithServerInterceptor(self): + interceptor = _ServerInterceptor() + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_unary_call(port=self._port) + server, port = _test_server.start_server(interceptors=[interceptor]) + self._server = server + _test_server.unary_unary_call(port=port) self.assertGreater(len(self.all_metric), 0) self._validate_metrics(self.all_metric) - def testRecordUnaryUnaryTracingOnly(self): - self._set_config_file(_VALID_CONFIG_TRACING_ONLY) - with grpc_observability.GCPOpenCensusObservability( - exporter=self.test_exporter - ): - self._start_server() - unary_unary_call(port=self._port) - - self.assertEqual(len(self.all_metric), 0) + def testThrowErrorWhenCallingMultipleInit(self): + with self.assertRaises(ValueError): + with grpc_observability.OpenTelemetryObservability( + exporter=self.test_exporter + ) as o11y: + grpc._observability.observability_init(o11y) def testRecordUnaryStream(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_stream_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + _test_server.unary_stream_call(port=port) self.assertGreater(len(self.all_metric), 0) self._validate_metrics(self.all_metric) def testRecordStreamUnary(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - stream_unary_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + _test_server.stream_unary_call(port=port) self.assertTrue(len(self.all_metric) > 0) - self.assertTrue(len(self.all_span) > 0) self._validate_metrics(self.all_metric) def testRecordStreamStream(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - stream_stream_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + _test_server.stream_stream_call(port=port) self.assertGreater(len(self.all_metric), 0) self._validate_metrics(self.all_metric) def testNoRecordBeforeInit(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - self._start_server() - unary_unary_call(port=self._port) + server, port = _test_server.start_server() + _test_server.unary_unary_call(port=port) self.assertEqual(len(self.all_metric), 0) - self._server.stop(0) + server.stop(0) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_unary_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + _test_server.unary_unary_call(port=port) self.assertGreater(len(self.all_metric), 0) self._validate_metrics(self.all_metric) def testNoRecordAfterExit(self): - self._set_config_file(_VALID_CONFIG_TRACING_STATS) - with grpc_observability.GCPOpenCensusObservability( + with grpc_observability.OpenTelemetryObservability( exporter=self.test_exporter ): - self._start_server() - unary_unary_call(port=self._port) + server, port = _test_server.start_server() + self._server = server + self._port = port + _test_server.unary_unary_call(port=port) self.assertGreater(len(self.all_metric), 0) current_metric_len = len(self.all_metric) self._validate_metrics(self.all_metric) - unary_unary_call(port=self._port) + _test_server.unary_unary_call(port=self._port) self.assertEqual(len(self.all_metric), current_metric_len) - def testConfigFileOverEnvVar(self): - # env var have only stats enabled - os.environ[CONFIG_ENV_VAR_NAME] = _VALID_CONFIG_STATS_ONLY_STR - # config_file have only tracing enabled - self._set_config_file(_VALID_CONFIG_TRACING_ONLY) - - with grpc_observability.GCPOpenCensusObservability( - exporter=self.test_exporter - ): - self._start_server() - unary_unary_call(port=self._port) - - self.assertEqual(len(self.all_metric), 0) - - def _set_config_file(self, config: Dict[str, Any]) -> None: - # Using random name here so multiple tests can run with different config files. - config_file_path = "/tmp/" + str(random.randint(0, 100000)) - with open(config_file_path, "w", encoding="utf-8") as f: - f.write(json.dumps(config)) - os.environ[CONFIG_FILE_ENV_VAR_NAME] = config_file_path - - def _start_server(self) -> None: - self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - self._server.add_generic_rpc_handlers((_GenericHandler(),)) - self._port = self._server.add_insecure_port("[::]:0") - self._server.start() - return self._port - def _validate_metrics( self, metrics: List[_observability.StatsData] ) -> None: @@ -343,41 +203,6 @@ class ObservabilityTest(unittest.TestCase): self.assertTrue(name in metric_names) -def unary_unary_call(port, metadata=None): - with grpc.insecure_channel(f"localhost:{port}") as channel: - multi_callable = channel.unary_unary(_UNARY_UNARY) - if metadata: - unused_response, call = multi_callable.with_call( - _REQUEST, metadata=metadata - ) - else: - unused_response, call = multi_callable.with_call(_REQUEST) - - -def unary_stream_call(port): - with grpc.insecure_channel(f"localhost:{port}") as channel: - multi_callable = channel.unary_stream(_UNARY_STREAM) - call = multi_callable(_REQUEST) - for _ in call: - pass - - -def stream_unary_call(port): - with grpc.insecure_channel(f"localhost:{port}") as channel: - multi_callable = channel.stream_unary(_STREAM_UNARY) - unused_response, call = multi_callable.with_call( - iter([_REQUEST] * STREAM_LENGTH) - ) - - -def stream_stream_call(port): - with grpc.insecure_channel(f"localhost:{port}") as channel: - multi_callable = channel.stream_stream(_STREAM_STREAM) - call = multi_callable(iter([_REQUEST] * STREAM_LENGTH)) - for _ in call: - pass - - if __name__ == "__main__": logging.basicConfig() unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py b/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py new file mode 100644 index 00000000000..a3115b0108a --- /dev/null +++ b/src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py @@ -0,0 +1,299 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import defaultdict +import datetime +import logging +import os +import sys +import time +from typing import Any, Callable, Dict, List, Optional, Set +import unittest + +import grpc_observability +from grpc_observability import _open_telemetry_measures +from grpc_observability._open_telemetry_plugin import GRPC_METHOD_LABEL +from grpc_observability._open_telemetry_plugin import GRPC_OTHER_LABEL_VALUE +from grpc_observability._open_telemetry_plugin import GRPC_TARGET_LABEL +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.metrics.export import MetricExporter +from opentelemetry.sdk.metrics.export import MetricsData +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +from tests.observability import _test_server + +logger = logging.getLogger(__name__) + +STREAM_LENGTH = 5 +OTEL_EXPORT_INTERVAL_S = 0.5 + + +class OTelMetricExporter(MetricExporter): + """Implementation of :class:`MetricExporter` that export metrics to the + provided metric_list. + + all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, + value is a list of labels recorded for that metric. + An example item of this dict: + {"grpc.client.attempt.started": + [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, + {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} + """ + + def __init__( + self, + all_metrics: Dict[str, List], + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] = None, + ): + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + self.all_metrics = all_metrics + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + self.record_metric(metrics_data) + return MetricExportResult.SUCCESS + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + def record_metric(self, metrics_data: MetricsData) -> None: + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for data_point in metric.data.data_points: + self.all_metrics[metric.name].append( + data_point.attributes + ) + + +class BaseTestOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin): + def __init__(self, provider: MeterProvider): + self.provider = provider + + def get_meter_provider(self) -> Optional[MeterProvider]: + return self.provider + + +@unittest.skipIf( + os.name == "nt" or "darwin" in sys.platform, + "Observability is not supported in Windows and MacOS", +) +class OpenTelemetryObservabilityTest(unittest.TestCase): + def setUp(self): + self.all_metrics = defaultdict(list) + otel_exporter = OTelMetricExporter(self.all_metrics) + reader = PeriodicExportingMetricReader( + exporter=otel_exporter, + export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, + ) + self._provider = MeterProvider(metric_readers=[reader]) + self._server = None + self._port = None + + def tearDown(self): + if self._server: + self._server.stop(0) + + def testRecordUnaryUnary(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + server, port = _test_server.start_server() + self._server = server + _test_server.unary_unary_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + + def testRecordUnaryUnaryClientOnly(self): + server, port = _test_server.start_server() + self._server = server + + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + _test_server.unary_unary_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_client_metrics_names(self.all_metrics) + + def testRecordUnaryStream(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + server, port = _test_server.start_server() + self._server = server + _test_server.unary_stream_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + + def testRecordStreamUnary(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + server, port = _test_server.start_server() + self._server = server + _test_server.stream_unary_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + + def testRecordStreamStream(self): + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + server, port = _test_server.start_server() + self._server = server + _test_server.stream_stream_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + + def testTargetAttributeFilter(self): + main_server, main_port = _test_server.start_server() + backup_server, backup_port = _test_server.start_server() + main_target = f"localhost:{main_port}" + backup_target = f"localhost:{backup_port}" + + # Replace target label with 'other' for main_server. + def target_filter(target: str) -> bool: + if main_target in target: + return False + return True + + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + otel_plugin.target_attribute_filter = target_filter + + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + _test_server.unary_unary_call(port=main_port) + _test_server.unary_unary_call(port=backup_port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_client_metrics_names(self.all_metrics) + + target_values = set() + for label_list in self.all_metrics.values(): + for labels in label_list: + if GRPC_TARGET_LABEL in labels: + target_values.add(labels[GRPC_TARGET_LABEL]) + self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values) + self.assertTrue(backup_target in target_values) + + main_server.stop(0) + backup_server.stop(0) + + def testMethodAttributeFilter(self): + # If method name is 'test/UnaryUnaryFiltered', is should be replaced with 'other'. + FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered" + + def method_filter(method: str) -> bool: + if FILTERED_METHOD_NAME in method: + return False + return True + + otel_plugin = BaseTestOpenTelemetryPlugin(self._provider) + otel_plugin.generic_method_attribute_filter = method_filter + + with grpc_observability.OpenTelemetryObservability( + plugins=[otel_plugin] + ): + server, port = _test_server.start_server() + self._server = server + _test_server.unary_unary_call(port=port) + _test_server.unary_unary_filtered_call(port=port) + + self._validate_metrics_exist(self.all_metrics) + self._validate_all_metrics_names(self.all_metrics) + method_values = set() + for label_list in self.all_metrics.values(): + for labels in label_list: + if GRPC_METHOD_LABEL in labels: + method_values.add(labels[GRPC_METHOD_LABEL]) + self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values) + self.assertTrue(FILTERED_METHOD_NAME not in method_values) + + def assert_eventually( + self, + predicate: Callable[[], bool], + *, + timeout: Optional[datetime.timedelta] = None, + message: Optional[Callable[[], str]] = None, + ) -> None: + message = message or (lambda: "Proposition did not evaluate to true") + timeout = timeout or datetime.timedelta(seconds=10) + end = datetime.datetime.now() + timeout + while datetime.datetime.now() < end: + if predicate(): + break + time.sleep(0.5) + else: + self.fail(message() + " after " + str(timeout)) + + def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None: + # Sleep here to make sure we have at least one export from OTel MetricExporter. + self.assert_eventually( + lambda: len(all_metrics.keys()) > 1, + message=lambda: f"No metrics was exported", + ) + + def _validate_all_metrics_names(self, metric_names: Set[str]) -> None: + self._validate_server_metrics_names(metric_names) + self._validate_client_metrics_names(metric_names) + + def _validate_server_metrics_names(self, metric_names: Set[str]) -> None: + for base_metric in _open_telemetry_measures.base_metrics(): + if "grpc.server" in base_metric.name: + self.assertTrue( + base_metric.name in metric_names, + msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", + ) + + def _validate_client_metrics_names(self, metric_names: Set[str]) -> None: + for base_metric in _open_telemetry_measures.base_metrics(): + if "grpc.client" in base_metric.name: + self.assertTrue( + base_metric.name in metric_names, + msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!", + ) + + +if __name__ == "__main__": + logging.basicConfig() + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/observability/_test_server.py b/src/python/grpcio_tests/tests/observability/_test_server.py new file mode 100644 index 00000000000..d8f516be612 --- /dev/null +++ b/src/python/grpcio_tests/tests/observability/_test_server.py @@ -0,0 +1,169 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +from typing import Tuple + +import grpc + +_REQUEST = b"\x00\x00\x00" +_RESPONSE = b"\x00\x00\x00" + +_UNARY_UNARY = "/test/UnaryUnary" +_UNARY_UNARY_FILTERED = "/test/UnaryUnaryFiltered" +_UNARY_STREAM = "/test/UnaryStream" +_STREAM_UNARY = "/test/StreamUnary" +_STREAM_STREAM = "/test/StreamStream" +STREAM_LENGTH = 5 +TRIGGER_RPC_METADATA = ("control", "trigger_rpc") +TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "") + + +def handle_unary_unary(request, servicer_context): + if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata(): + for k, v in servicer_context.invocation_metadata(): + if "port" in k: + unary_unary_call(port=int(v)) + if "to_new_server" in k: + second_server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10) + ) + second_server.add_generic_rpc_handlers((_GenericHandler(),)) + second_server_port = second_server.add_insecure_port("[::]:0") + second_server.start() + unary_unary_call(port=second_server_port) + second_server.stop(0) + return _RESPONSE + + +def handle_unary_stream(request, servicer_context): + for _ in range(STREAM_LENGTH): + yield _RESPONSE + + +def handle_stream_unary(request_iterator, servicer_context): + return _RESPONSE + + +def handle_stream_stream(request_iterator, servicer_context): + for request in request_iterator: + yield _RESPONSE + + +class _MethodHandler(grpc.RpcMethodHandler): + def __init__(self, request_streaming, response_streaming): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + self.stream_stream = handle_stream_stream + elif self.request_streaming: + self.stream_unary = handle_stream_unary + elif self.response_streaming: + self.unary_stream = handle_unary_stream + else: + self.unary_unary = handle_unary_unary + + +class _GenericHandler(grpc.GenericRpcHandler): + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(False, False) + if handler_call_details.method == _UNARY_UNARY_FILTERED: + return _MethodHandler(False, False) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler(False, True) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler(True, False) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(True, True) + else: + return None + + +def start_server(interceptors=None) -> Tuple[grpc.Server, int]: + if interceptors: + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=10), + interceptors=interceptors, + ) + else: + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server.add_generic_rpc_handlers((_GenericHandler(),)) + port = server.add_insecure_port("[::]:0") + server.start() + return server, port + + +def unary_unary_call(port, metadata=None): + with grpc.insecure_channel(f"localhost:{port}") as channel: + multi_callable = channel.unary_unary(_UNARY_UNARY) + if metadata: + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=metadata + ) + else: + unused_response, call = multi_callable.with_call(_REQUEST) + + +def intercepted_unary_unary_call(port, interceptors, metadata=None): + with grpc.insecure_channel(f"localhost:{port}") as channel: + intercept_channel = grpc.intercept_channel(channel, interceptors) + multi_callable = intercept_channel.unary_unary(_UNARY_UNARY) + if metadata: + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=metadata + ) + else: + unused_response, call = multi_callable.with_call(_REQUEST) + + +def unary_unary_filtered_call(port, metadata=None): + with grpc.insecure_channel(f"localhost:{port}") as channel: + multi_callable = channel.unary_unary(_UNARY_UNARY_FILTERED) + if metadata: + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=metadata + ) + else: + unused_response, call = multi_callable.with_call(_REQUEST) + + +def unary_stream_call(port): + with grpc.insecure_channel(f"localhost:{port}") as channel: + multi_callable = channel.unary_stream(_UNARY_STREAM) + call = multi_callable(_REQUEST) + for _ in call: + pass + + +def stream_unary_call(port): + with grpc.insecure_channel(f"localhost:{port}") as channel: + multi_callable = channel.stream_unary(_STREAM_UNARY) + unused_response, call = multi_callable.with_call( + iter([_REQUEST] * STREAM_LENGTH) + ) + + +def stream_stream_call(port): + with grpc.insecure_channel(f"localhost:{port}") as channel: + multi_callable = channel.stream_stream(_STREAM_STREAM) + call = multi_callable(iter([_REQUEST] * STREAM_LENGTH)) + for _ in call: + pass diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 93de9eed5a7..7ba494ac5b6 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -10,6 +10,7 @@ "tests.interop._insecure_intraop_test.InsecureIntraopTest", "tests.interop._secure_intraop_test.SecureIntraopTest", "tests.observability._observability_test.ObservabilityTest", + "tests.observability._open_telemetry_observability_test.OpenTelemetryObservabilityTest", "tests.protoc_plugin._python_plugin_test.ModuleMainTest", "tests.protoc_plugin._python_plugin_test.PythonPluginTest", "tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest", diff --git a/templates/src/python/grpcio_observability/_parallel_compile_patch.py.template b/templates/src/python/grpcio_observability/_parallel_compile_patch.py.template new file mode 100644 index 00000000000..1e1159666b3 --- /dev/null +++ b/templates/src/python/grpcio_observability/_parallel_compile_patch.py.template @@ -0,0 +1,23 @@ +%YAML 1.2 +--- | + # Copyright 2023 The gRPC Authors + # + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # + # This file has been automatically generated from a template file. + # Please make modifications to + # `$REPO_ROOT/templates/src/python/grpcio/_parallel_compile_patch.py.template` + # instead. This file can be regenerated from the template by running + # `tools/buildgen/generate_projects.sh`. + + <%include file="../_parallel_compile_patch.py.include" /> \ No newline at end of file diff --git a/tools/distrib/install_all_python_modules.sh b/tools/distrib/install_all_python_modules.sh index e7ef5b00242..41fc5111197 100755 --- a/tools/distrib/install_all_python_modules.sh +++ b/tools/distrib/install_all_python_modules.sh @@ -28,22 +28,29 @@ set -e BASEDIR=$(dirname "$0") BASEDIR=$(realpath "$BASEDIR")/../.. -PACKAGES="grpcio_channelz grpcio_csds grpcio_admin grpcio_health_checking grpcio_reflection grpcio_status grpcio_testing grpcio_tests" +PACKAGES="grpcio_channelz grpcio_csds grpcio_admin grpcio_health_checking grpcio_reflection grpcio_status grpcio_testing grpcio_tests" (cd "$BASEDIR"; - pip install --upgrade cython; + pip install --upgrade "cython<3.0.0rc1"; python setup.py install; pushd tools/distrib/python/grpcio_tools; ../make_grpcio_tools.py GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . popd; + pushd src/python/grpcio_observability; + ./make_grpcio_observability.py + GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . + popd; + pushd tools/distrib/python/xds_protos; + GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . + popd; pushd src/python; for PACKAGE in ${PACKAGES}; do pushd "${PACKAGE}"; python setup.py clean; maybe_run_command preprocess maybe_run_command build_package_protos - python setup.py install; + python -m pip install .; popd; done popd;