[Python Otel] Re-apply Otel (#35439)

This reverts commit 96b9e8d3e3.

[Implement OpenTelemetry PR](https://github.com/grpc/grpc/pull/35292) was [reverted](96b9e8d3e3) because some tests started failing after import the changes to g3.

After investigation, we found root cause, it can be fixed both on our side and on gapic API side, we opened an issue to [gapic API team](https://github.com/googleapis/python-api-core/issues/579), this PR will includes the fixes on our side.

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35439

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35439 from XuanWang-Amos:reapply_otel 0133564438
PiperOrigin-RevId: 595746222
pull/35459/head
Xuan Wang 1 year ago committed by Copybara-Service
parent 808886375d
commit 48cf940fd1
  1. 2
      requirements.bazel.txt
  2. 80
      src/python/grpcio/grpc/_channel.py
  3. 3
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  4. 11
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  5. 12
      src/python/grpcio/grpc/_observability.py
  6. 12
      src/python/grpcio_observability/_parallel_compile_patch.py
  7. 22
      src/python/grpcio_observability/grpc_observability/BUILD.bazel
  8. 7
      src/python/grpcio_observability/grpc_observability/__init__.py
  9. 2
      src/python/grpcio_observability/grpc_observability/_cyobservability.pxd
  10. 11
      src/python/grpcio_observability/grpc_observability/_cyobservability.pyx
  11. 12
      src/python/grpcio_observability/grpc_observability/_gcp_observability.py
  12. 38
      src/python/grpcio_observability/grpc_observability/_open_telemetry_exporter.py
  13. 97
      src/python/grpcio_observability/grpc_observability/_open_telemetry_measures.py
  14. 155
      src/python/grpcio_observability/grpc_observability/_open_telemetry_observability.py
  15. 254
      src/python/grpcio_observability/grpc_observability/_open_telemetry_plugin.py
  16. 21
      src/python/grpcio_observability/grpc_observability/client_call_tracer.cc
  17. 5
      src/python/grpcio_observability/grpc_observability/client_call_tracer.h
  18. 9
      src/python/grpcio_observability/grpc_observability/constants.h
  19. 6
      src/python/grpcio_observability/grpc_observability/observability_util.cc
  20. 4
      src/python/grpcio_observability/grpc_observability/observability_util.h
  21. 4
      src/python/grpcio_observability/grpc_observability/python_census_context.h
  22. 4
      src/python/grpcio_observability/grpc_observability/server_call_tracer.cc
  23. 9
      src/python/grpcio_tests/tests/_sanity/_sanity_test.py
  24. 22
      src/python/grpcio_tests/tests/observability/BUILD.bazel
  25. 305
      src/python/grpcio_tests/tests/observability/_observability_test.py
  26. 299
      src/python/grpcio_tests/tests/observability/_open_telemetry_observability_test.py
  27. 169
      src/python/grpcio_tests/tests/observability/_test_server.py
  28. 1
      src/python/grpcio_tests/tests/tests.json
  29. 23
      templates/src/python/grpcio_observability/_parallel_compile_patch.py.template
  30. 13
      tools/distrib/install_all_python_modules.sh

@ -16,3 +16,5 @@ setuptools==44.1.1
xds-protos==0.0.11
absl-py==1.4.0
googleapis-common-protos==1.61.0
opentelemetry-sdk==1.21.0
opentelemetry-api==1.21.0

@ -132,6 +132,7 @@ class _RPCState(object):
rpc_start_time: Optional[float] # In relative seconds
rpc_end_time: Optional[float] # In relative seconds
method: Optional[str]
target: Optional[str]
def __init__(
self,
@ -163,6 +164,7 @@ class _RPCState(object):
self.rpc_start_time = None
self.rpc_end_time = None
self.method = None
self.target = None
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
@ -1048,22 +1050,35 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
@ -1123,6 +1138,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else:
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1194,6 +1210,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1213,20 +1230,32 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
__slots__ = [
"_channel",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
@ -1278,6 +1307,7 @@ class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
operations_and_tags = tuple((ops, None) for ops in operations)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1297,22 +1327,35 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
@ -1354,6 +1397,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1374,22 +1418,35 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction],
response_deserializer: Optional[DeserializingFunction],
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
@ -1413,6 +1470,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._channel.segregated_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1501,6 +1559,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1530,22 +1589,35 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
_channel: cygrpc.Channel
_managed_call: IntegratedCallFactory
_method: bytes
_target: bytes
_request_serializer: Optional[SerializingFunction]
_response_deserializer: Optional[DeserializingFunction]
_context: Any
__slots__ = [
"_channel",
"_managed_call",
"_method",
"_target",
"_request_serializer",
"_response_deserializer",
"_context",
]
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.Channel,
managed_call: IntegratedCallFactory,
method: bytes,
target: bytes,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
):
self._channel = channel
self._managed_call = managed_call
self._method = method
self._target = target
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_census_context()
@ -1579,6 +1651,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
event_handler = _event_handler(state, self._response_deserializer)
state.rpc_start_time = time.perf_counter()
state.method = _common.decode(self._method)
state.target = _common.decode(self._target)
call = self._managed_call(
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method,
@ -1947,6 +2020,7 @@ class Channel(grpc.Channel):
_channel: cygrpc.Channel
_call_state: _ChannelCallState
_connectivity_state: _ChannelConnectivityState
_target: str
def __init__(
self,
@ -1974,6 +2048,7 @@ class Channel(grpc.Channel):
_augment_options(core_options, compression),
credentials,
)
self._target = target
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
cygrpc.fork_register_channel(self)
@ -2013,6 +2088,7 @@ class Channel(grpc.Channel):
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
@ -2031,6 +2107,7 @@ class Channel(grpc.Channel):
return _SingleThreadedUnaryStreamMultiCallable(
self._channel,
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
@ -2039,6 +2116,7 @@ class Channel(grpc.Channel):
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
@ -2053,6 +2131,7 @@ class Channel(grpc.Channel):
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)
@ -2067,6 +2146,7 @@ class Channel(grpc.Channel):
self._channel,
_channel_managed_call_management(self._call_state),
_common.encode(method),
_common.encode(self._target),
request_serializer,
response_deserializer,
)

@ -28,12 +28,13 @@ cdef class _CallState:
cdef set due
# call_tracer_capsule should have type of grpc._observability.ClientCallTracerCapsule
cdef object call_tracer_capsule
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *
cdef void maybe_delete_call_tracer(self) except *
cdef class _ChannelState:
cdef bytes target
cdef object condition
cdef grpc_channel *c_channel
# A boolean field indicating that the channel is open (if True) or is being

@ -77,7 +77,7 @@ cdef class _CallState:
return
_observability.delete_call_tracer(self.call_tracer_capsule)
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name) except *:
cdef void maybe_set_client_call_tracer_on_call(self, bytes method_name, bytes target) except *:
# TODO(xuanwn): use channel args to exclude those metrics.
for exclude_prefix in _observability._SERVICES_TO_EXCLUDE:
if exclude_prefix in method_name:
@ -85,14 +85,15 @@ cdef class _CallState:
with _observability.get_plugin() as plugin:
if not (plugin and plugin.observability_enabled):
return
capsule = plugin.create_client_call_tracer(method_name)
capsule = plugin.create_client_call_tracer(method_name, target)
capsule_ptr = cpython.PyCapsule_GetPointer(capsule, CLIENT_CALL_TRACER)
_set_call_tracer(self.c_call, capsule_ptr)
self.call_tracer_capsule = capsule
cdef class _ChannelState:
def __cinit__(self):
def __cinit__(self, target):
self.target = target
self.condition = threading.Condition()
self.open = True
self.integrated_call_states = {}
@ -248,7 +249,7 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
call_state.maybe_set_client_call_tracer_on_call(method)
call_state.maybe_set_client_call_tracer_on_call(method, channel_state.target)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
@ -473,7 +474,7 @@ cdef class Channel:
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._state = _ChannelState(target)
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (

@ -62,7 +62,7 @@ class ObservabilityPlugin(
@abc.abstractmethod
def create_client_call_tracer(
self, method_name: bytes
self, method_name: bytes, target: bytes
) -> ClientCallTracerCapsule:
"""Creates a ClientCallTracerCapsule.
@ -75,6 +75,7 @@ class ObservabilityPlugin(
Args:
method_name: The method name of the call in byte format.
target: The channel target of the call in byte format.
Returns:
A PyCapsule which stores a ClientCallTracer object.
@ -140,7 +141,7 @@ class ObservabilityPlugin(
@abc.abstractmethod
def record_rpc_latency(
self, method: str, rpc_latency: float, status_code: Any
self, method: str, target: str, rpc_latency: float, status_code: Any
) -> None:
"""Record the latency of the RPC.
@ -149,7 +150,8 @@ class ObservabilityPlugin(
Args:
method: The fully-qualified name of the RPC method being invoked.
rpc_latency: The latency for the RPC, equals to the time between
target: The target name of the RPC method being invoked.
rpc_latency: The latency for the RPC in seconds, equals to the time between
when the client invokes the RPC and when the client receives the status.
status_code: An element of grpc.StatusCode in string format representing the
final status for the RPC.
@ -280,4 +282,6 @@ def maybe_record_rpc_latency(state: "_channel._RPCState") -> None:
return
rpc_latency_s = state.rpc_end_time - state.rpc_start_time
rpc_latency_ms = rpc_latency_s * 1000
plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)
plugin.record_rpc_latency(
state.method, state.target, rpc_latency_ms, state.code
)

@ -11,6 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file has been automatically generated from a template file.
# Please make modifications to
# `$REPO_ROOT/templates/src/python/grpcio/_parallel_compile_patch.py.template`
# instead. This file can be regenerated from the template by running
# `tools/buildgen/generate_projects.sh`.
"""Patches the compile() to allow enable parallel compilation of C/C++.
build_ext has lots of C/C++ files and normally them one by one.
@ -27,10 +34,11 @@ except KeyError:
import multiprocessing
BUILD_EXT_COMPILER_JOBS = multiprocessing.cpu_count()
except ValueError:
BUILD_EXT_COMPILER_JOBS = 1
# monkey-patch for parallel compilation
# TODO(xuanwn): Use a template for this file.
def _parallel_compile(
self,
sources,
@ -45,7 +53,7 @@ def _parallel_compile(
# setup the same way as distutils.ccompiler.CCompiler
# https://github.com/python/cpython/blob/31368a4f0e531c19affe2a1becd25fc316bc7501/Lib/distutils/ccompiler.py#L564
macros, objects, extra_postargs, pp_opts, build = self._setup_compile(
output_dir, macros, include_dirs, sources, depends, extra_postargs
str(output_dir), macros, include_dirs, sources, depends, extra_postargs
)
cc_args = self._get_cc_args(pp_opts, debug, extra_preargs)

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("//bazel:cython_library.bzl", "pyx_library")
package(default_visibility = ["//visibility:private"])
@ -52,17 +53,27 @@ pyx_library(
],
)
# Since `opencensus` and `opencensus-ext-stackdriver` are non-hermetic,
# Since `opentelemetry-sdk` and `opentelemetry-api` are non-hermetic,
# pyobservability is for internal use only.
py_library(
name = "_opentelemetry_observability",
srcs = [
"_open_telemetry_exporter.py",
"_open_telemetry_measures.py",
"_open_telemetry_observability.py",
"_open_telemetry_plugin.py",
],
deps = [
requirement("opentelemetry-sdk"),
requirement("opentelemetry-api"),
],
)
py_library(
name = "pyobservability",
srcs = [
"__init__.py",
"_gcp_observability.py",
"_measures.py",
"_observability.py",
"_observability_config.py",
"_views.py",
],
imports = [
".",
@ -73,6 +84,7 @@ py_library(
"//:__subpackages__",
],
deps = [
":_opentelemetry_observability",
":cyobservability",
],
)

@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc_observability._gcp_observability import GCPOpenCensusObservability
from grpc_observability._open_telemetry_observability import (
OpenTelemetryObservability,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
__all__ = ("GCPOpenCensusObservability",)
__all__ = ("OpenTelemetryObservability", "OpenTelemetryPlugin")

@ -76,6 +76,7 @@ cdef extern from "observability_util.h" namespace "grpc_observability":
cdef cGcpObservabilityConfig ReadAndActivateObservabilityConfig() nogil
cdef void NativeObservabilityInit() except +
cdef void* CreateClientCallTracer(const char* method,
const char* target,
const char* trace_id,
const char* parent_span_id) except +
cdef void* CreateServerCallTracerFactory() except +
@ -144,6 +145,7 @@ cdef extern from "constants.h" namespace "grpc_observability":
kRpcServerStartedRpcsMeasureName
string kClientMethod
string kClientTarget
string kClientStatus
cdef extern from "sampler.h" namespace "grpc_observability":

@ -111,18 +111,22 @@ def activate_config(object py_config) -> None:
if (py_config.stats_enabled):
EnablePythonCensusStats(True);
def activate_stats() -> None:
EnablePythonCensusStats(True);
def create_client_call_tracer(bytes method_name, bytes trace_id,
def create_client_call_tracer(bytes method_name, bytes target, bytes trace_id,
bytes parent_span_id=b'') -> cpython.PyObject:
"""Create a ClientCallTracer and save to PyCapsule.
Returns: A grpc_observability._observability.ClientCallTracerCapsule object.
"""
cdef char* c_method = cpython.PyBytes_AsString(method_name)
cdef char* c_target = cpython.PyBytes_AsString(target)
cdef char* c_trace_id = cpython.PyBytes_AsString(trace_id)
cdef char* c_parent_span_id = cpython.PyBytes_AsString(parent_span_id)
cdef void* call_tracer = CreateClientCallTracer(c_method, c_trace_id, c_parent_span_id)
cdef void* call_tracer = CreateClientCallTracer(c_method, c_target, c_trace_id, c_parent_span_id)
capsule = cpython.PyCapsule_New(call_tracer, CLIENT_CALL_TRACER, NULL)
return capsule
@ -244,7 +248,7 @@ def _get_tracing_data(SpanCensusData span_data, vector[Label] span_labels,
span_annotations = py_span_annotations)
def _record_rpc_latency(object exporter, str method, float rpc_latency, str status_code) -> None:
def _record_rpc_latency(object exporter, str method, str target, float rpc_latency, str status_code) -> None:
exporter: _observability.Exporter
measurement = {}
@ -254,6 +258,7 @@ def _record_rpc_latency(object exporter, str method, float rpc_latency, str stat
labels = {}
labels[_decode(kClientMethod)] = method.strip("/")
labels[_decode(kClientTarget)] = target
labels[_decode(kClientStatus)] = status_code
metric = _get_stats_data(measurement, labels)
exporter.export_stats_data([metric])

@ -118,11 +118,11 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
grpc._observability.observability_deinit()
def create_client_call_tracer(
self, method_name: bytes
self, method_name: bytes, target: bytes
) -> ClientCallTracerCapsule:
trace_id = b"TRACE_ID"
capsule = _cyobservability.create_client_call_tracer(
method_name, trace_id
method_name, target, trace_id
)
return capsule
@ -143,9 +143,13 @@ class GCPOpenCensusObservability(grpc._observability.ObservabilityPlugin):
pass
def record_rpc_latency(
self, method: str, rpc_latency: float, status_code: grpc.StatusCode
self,
method: str,
target: str,
rpc_latency: float,
status_code: grpc.StatusCode,
) -> None:
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
_cyobservability._record_rpc_latency(
self.exporter, method, rpc_latency, status_code
self.exporter, method, target, rpc_latency, status_code
)

@ -0,0 +1,38 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, List
from grpc_observability import _observability # pytype: disable=pyi-error
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin
class _OpenTelemetryExporterDelegator(_observability.Exporter):
_plugins: Iterable[_OpenTelemetryPlugin]
def __init__(self, plugins: Iterable[_OpenTelemetryPlugin]):
self._plugins = plugins
def export_stats_data(
self, stats_data: List[_observability.StatsData]
) -> None:
# Records stats data to MeterProvider.
for data in stats_data:
for plugin in self._plugins:
plugin.maybe_record_stats_data(data)
def export_tracing_data(
self, tracing_data: List[_observability.TracingData]
) -> None:
pass

@ -0,0 +1,97 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from typing import List
from grpc_observability._cyobservability import MetricsName
class Metric(
collections.namedtuple(
"Metric",
["name", "cyname", "unit", "description"],
)
):
pass
CLIENT_ATTEMPT_STARTED = Metric(
"grpc.client.attempt.started",
MetricsName.CLIENT_STARTED_RPCS,
"{attempt}",
"Number of client call attempts started",
)
CLIENT_ATTEMPT_DURATION = Metric(
"grpc.client.attempt.duration",
MetricsName.CLIENT_ROUNDTRIP_LATENCY,
"s",
"End-to-end time taken to complete a client call attempt",
)
CLIENT_RPC_DURATION = Metric(
"grpc.client.call.duration",
MetricsName.CLIENT_API_LATENCY,
"s",
"End-to-end time taken to complete a call from client's perspective",
)
CLIENT_ATTEMPT_SEND_BYTES = Metric(
"grpc.client.attempt.sent_total_compressed_message_size",
MetricsName.CLIENT_SEND_BYTES_PER_RPC,
"By",
"Compressed message bytes sent per client call attempt",
)
CLIENT_ATTEMPT_RECEIVED_BYTES = Metric(
"grpc.client.attempt.rcvd_total_compressed_message_size",
MetricsName.CLIENT_RECEIVED_BYTES_PER_RPC,
"By",
"Compressed message bytes received per call attempt",
)
SERVER_STARTED_RPCS = Metric(
"grpc.server.call.started",
MetricsName.SERVER_STARTED_RPCS,
"{call}",
"Number of server calls started",
)
SERVER_RPC_DURATION = Metric(
"grpc.server.call.duration",
MetricsName.SERVER_SERVER_LATENCY,
"s",
"End-to-end time taken to complete a call from server transport's perspective",
)
SERVER_RPC_SEND_BYTES = Metric(
"grpc.server.call.sent_total_compressed_message_size",
MetricsName.SERVER_SENT_BYTES_PER_RPC,
"By",
"Compressed message bytes sent per server call",
)
SERVER_RPC_RECEIVED_BYTES = Metric(
"grpc.server.call.rcvd_total_compressed_message_size",
MetricsName.SERVER_RECEIVED_BYTES_PER_RPC,
"By",
"Compressed message bytes received per server call",
)
def base_metrics() -> List[Metric]:
return [
CLIENT_ATTEMPT_STARTED,
CLIENT_ATTEMPT_DURATION,
CLIENT_RPC_DURATION,
CLIENT_ATTEMPT_SEND_BYTES,
CLIENT_ATTEMPT_RECEIVED_BYTES,
SERVER_STARTED_RPCS,
SERVER_RPC_DURATION,
SERVER_RPC_SEND_BYTES,
SERVER_RPC_RECEIVED_BYTES,
]

@ -0,0 +1,155 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any, Iterable, Optional
import grpc
# pytype: disable=pyi-error
from grpc_observability import _cyobservability
from grpc_observability._open_telemetry_exporter import (
_OpenTelemetryExporterDelegator,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin
from grpc_observability._open_telemetry_plugin import _OpenTelemetryPlugin
_LOGGER = logging.getLogger(__name__)
ClientCallTracerCapsule = Any # it appears only once in the function signature
ServerCallTracerFactoryCapsule = (
Any # it appears only once in the function signature
)
grpc_observability = Any # grpc_observability.py imports this module.
GRPC_STATUS_CODE_TO_STRING = {
grpc.StatusCode.OK: "OK",
grpc.StatusCode.CANCELLED: "CANCELLED",
grpc.StatusCode.UNKNOWN: "UNKNOWN",
grpc.StatusCode.INVALID_ARGUMENT: "INVALID_ARGUMENT",
grpc.StatusCode.DEADLINE_EXCEEDED: "DEADLINE_EXCEEDED",
grpc.StatusCode.NOT_FOUND: "NOT_FOUND",
grpc.StatusCode.ALREADY_EXISTS: "ALREADY_EXISTS",
grpc.StatusCode.PERMISSION_DENIED: "PERMISSION_DENIED",
grpc.StatusCode.UNAUTHENTICATED: "UNAUTHENTICATED",
grpc.StatusCode.RESOURCE_EXHAUSTED: "RESOURCE_EXHAUSTED",
grpc.StatusCode.FAILED_PRECONDITION: "FAILED_PRECONDITION",
grpc.StatusCode.ABORTED: "ABORTED",
grpc.StatusCode.OUT_OF_RANGE: "OUT_OF_RANGE",
grpc.StatusCode.UNIMPLEMENTED: "UNIMPLEMENTED",
grpc.StatusCode.INTERNAL: "INTERNAL",
grpc.StatusCode.UNAVAILABLE: "UNAVAILABLE",
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}
# pylint: disable=no-self-use
class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
"""OpenTelemetry based plugin implementation.
Args:
exporter: Exporter used to export data.
plugin: OpenTelemetryPlugin to enable.
"""
exporter: "grpc_observability.Exporter"
plugins: Iterable[OpenTelemetryPlugin]
def __init__(
self,
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
exporter: "grpc_observability.Exporter" = None,
):
_plugins = []
if plugins:
for plugin in plugins:
_plugins.append(_OpenTelemetryPlugin(plugin))
if exporter:
self.exporter = exporter
else:
self.exporter = _OpenTelemetryExporterDelegator(_plugins)
try:
_cyobservability.activate_stats()
self.set_stats(True)
except Exception as e: # pylint: disable=broad-except
raise ValueError(f"Activate observability metrics failed with: {e}")
def __enter__(self):
try:
_cyobservability.cyobservability_init(self.exporter)
# TODO(xuanwn): Use specific exceptons
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception("Initiate observability failed with: %s", e)
grpc._observability.observability_init(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.exit()
def exit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
# immediately after exit, it's possible that core didn't call RecordEnd
# in callTracer, and all data recorded by calling RecordEnd will be
# lost.
# CENSUS_EXPORT_BATCH_INTERVAL_SECS: The time equals to the time in
# AwaitNextBatchLocked.
# TODO(xuanwn): explicit synchronization
# https://github.com/grpc/grpc/issues/33262
time.sleep(_cyobservability.CENSUS_EXPORT_BATCH_INTERVAL_SECS)
self.set_tracing(False)
self.set_stats(False)
_cyobservability.observability_deinit()
grpc._observability.observability_deinit()
def create_client_call_tracer(
self, method_name: bytes, target: bytes
) -> ClientCallTracerCapsule:
trace_id = b"TRACE_ID"
capsule = _cyobservability.create_client_call_tracer(
method_name, target, trace_id
)
return capsule
def create_server_call_tracer_factory(
self,
) -> ServerCallTracerFactoryCapsule:
capsule = _cyobservability.create_server_call_tracer_factory_capsule()
return capsule
def delete_client_call_tracer(
self, client_call_tracer: ClientCallTracerCapsule
) -> None:
_cyobservability.delete_client_call_tracer(client_call_tracer)
def save_trace_context(
self, trace_id: str, span_id: str, is_sampled: bool
) -> None:
pass
def record_rpc_latency(
self,
method: str,
target: str,
rpc_latency: float,
status_code: grpc.StatusCode,
) -> None:
status_code = GRPC_STATUS_CODE_TO_STRING.get(status_code, "UNKNOWN")
_cyobservability._record_rpc_latency(
self.exporter, method, target, rpc_latency, status_code
)

@ -0,0 +1,254 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from typing import Dict, Iterable, List, Optional, Union
# pytype: disable=pyi-error
import grpc
from grpc_observability import _open_telemetry_measures
from grpc_observability._cyobservability import MetricsName
from grpc_observability._observability import StatsData
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics import Histogram
from opentelemetry.sdk.metrics import Meter
from opentelemetry.sdk.metrics import MeterProvider
GRPC_METHOD_LABEL = "grpc.method"
GRPC_TARGET_LABEL = "grpc.target"
GRPC_OTHER_LABEL_VALUE = "other"
class OpenTelemetryLabelInjector(abc.ABC):
"""
An interface that allows you to add additional labels on the calls traced.
Please note that this class is still work in progress and NOT READY to be used.
"""
_labels: List[Dict[str, str]]
def __init__(self):
# Calls Python OTel API to detect resource and get labels, save
# those lables to OpenTelemetryLabelInjector.labels.
pass
@abc.abstractmethod
def get_labels(self):
# Get additional labels for this OpenTelemetryLabelInjector.
raise NotImplementedError()
class OpenTelemetryPluginOption(abc.ABC):
"""
An interface that allows you to add additional function to OpenTelemetryPlugin.
Please note that this class is still work in progress and NOT READY to be used.
"""
@abc.abstractmethod
def is_active_on_method(self, method: str) -> bool:
"""Determines whether this plugin option is active on a given method.
Args:
method: Required. The RPC method, for example: `/helloworld.Greeter/SayHello`.
Returns:
True if this this plugin option is active on the giving method, false otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def is_active_on_server(self, channel_args: List[str]) -> bool:
"""Determines whether this plugin option is active on a given server.
Args:
channel_args: Required. The channel args used for server.
TODO(xuanwn): detail on what channel_args will contain.
Returns:
True if this this plugin option is active on the server, false otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def get_label_injector(self) -> Optional[OpenTelemetryLabelInjector]:
# Returns the LabelsInjector used by this plugin option, or None.
raise NotImplementedError()
# pylint: disable=no-self-use
class OpenTelemetryPlugin:
"""Describes a Plugin for OpenTelemetry observability."""
def get_plugin_options(
self,
) -> Iterable[OpenTelemetryPluginOption]:
return []
def get_meter_provider(self) -> Optional[MeterProvider]:
return None
def target_attribute_filter(
self, target: str # pylint: disable=unused-argument
) -> bool:
"""
If set, this will be called per channel to decide whether to record the
target attribute on client or to replace it with "other".
This helps reduce the cardinality on metrics in cases where many channels
are created with different targets in the same binary (which might happen
for example, if the channel target string uses IP addresses directly).
Args:
target: The target for the RPC.
Returns:
bool: True means the original target string will be used, False means target string
will be replaced with "other".
"""
return True
def generic_method_attribute_filter(
self, method: str # pylint: disable=unused-argument
) -> bool:
"""
If set, this will be called with a generic method type to decide whether to
record the method name or to replace it with "other".
Note that pre-registered methods will always be recorded no matter what this
function returns.
Args:
method: The method name for the RPC.
Returns:
bool: True means the original method name will be used, False means method name
will be replaced with "other".
"""
return False
class _OpenTelemetryPlugin:
_plugin: OpenTelemetryPlugin
_metric_to_recorder: Dict[MetricsName, Union[Counter, Histogram]]
def __init__(self, plugin: OpenTelemetryPlugin):
self._plugin = plugin
self._metric_to_recorder = dict()
meter_provider = self._plugin.get_meter_provider()
if meter_provider:
meter = meter_provider.get_meter("grpc-python", grpc.__version__)
enabled_metrics = _open_telemetry_measures.base_metrics()
self._metric_to_recorder = self._register_metrics(
meter, enabled_metrics
)
def _should_record(self, stats_data: StatsData) -> bool:
# Decide if this plugin should record the stats_data.
return stats_data.name in self._metric_to_recorder.keys()
def _record_stats_data(self, stats_data: StatsData) -> None:
recorder = self._metric_to_recorder[stats_data.name]
target = stats_data.labels.get(GRPC_TARGET_LABEL, "")
if not self._plugin.target_attribute_filter(target):
# Filter target name.
stats_data.labels[GRPC_TARGET_LABEL] = GRPC_OTHER_LABEL_VALUE
method = stats_data.labels.get(GRPC_METHOD_LABEL, "")
if not self._plugin.generic_method_attribute_filter(method):
# Filter method name.
stats_data.labels[GRPC_METHOD_LABEL] = GRPC_OTHER_LABEL_VALUE
value = 0
if stats_data.measure_double:
value = stats_data.value_float
else:
value = stats_data.value_int
if isinstance(recorder, Counter):
recorder.add(value, attributes=stats_data.labels)
elif isinstance(recorder, Histogram):
recorder.record(value, attributes=stats_data.labels)
# pylint: disable=no-self-use
def maybe_record_stats_data(self, stats_data: List[StatsData]) -> None:
# Records stats data to MeterProvider.
if self._should_record(stats_data):
self._record_stats_data(stats_data)
def _register_metrics(
self, meter: Meter, metrics: List[_open_telemetry_measures.Metric]
) -> Dict[MetricsName, Union[Counter, Histogram]]:
metric_to_recorder_map = {}
recorder = None
for metric in metrics:
if metric == _open_telemetry_measures.CLIENT_ATTEMPT_STARTED:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.CLIENT_ATTEMPT_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif (
metric == _open_telemetry_measures.CLIENT_ATTEMPT_RECEIVED_BYTES
):
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_STARTED_RPCS:
recorder = meter.create_counter(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_DURATION:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_SEND_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
elif metric == _open_telemetry_measures.SERVER_RPC_RECEIVED_BYTES:
recorder = meter.create_histogram(
name=metric.name,
unit=metric.unit,
description=metric.description,
)
metric_to_recorder_map[metric.cyname] = recorder
return metric_to_recorder_map

@ -41,9 +41,11 @@ constexpr uint32_t
//
PythonOpenCensusCallTracer::PythonOpenCensusCallTracer(
const char* method, const char* trace_id, const char* parent_span_id,
bool tracing_enabled)
: method_(GetMethod(method)), tracing_enabled_(tracing_enabled) {
const char* method, const char* target, const char* trace_id,
const char* parent_span_id, bool tracing_enabled)
: method_(GetMethod(method)),
target_(GetTarget(target)),
tracing_enabled_(tracing_enabled) {
GenerateClientContext(absl::StrCat("Sent.", method_),
absl::string_view(trace_id),
absl::string_view(parent_span_id), &context_);
@ -84,7 +86,7 @@ PythonOpenCensusCallTracer::~PythonOpenCensusCallTracer() {
RecordIntMetric(kRpcClientTransparentRetriesPerCallMeasureName,
transparent_retries_, context_.Labels());
RecordDoubleMetric(kRpcClientRetryDelayPerCallMeasureName,
ToDoubleMilliseconds(retry_delay_), context_.Labels());
ToDoubleSeconds(retry_delay_), context_.Labels());
}
if (tracing_enabled_) {
@ -146,6 +148,7 @@ PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
return;
}
context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
RecordIntMetric(kRpcClientStartedRpcsMeasureName, 1, context_.Labels());
}
@ -225,6 +228,7 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
std::string final_status = absl::StatusCodeToString(status_code_);
context_.Labels().emplace_back(kClientMethod, std::string(parent_->method_));
context_.Labels().emplace_back(kClientTarget, std::string(parent_->target_));
context_.Labels().emplace_back(kClientStatus, final_status);
RecordDoubleMetric(
kRpcClientSentBytesPerRpcMeasureName,
@ -238,12 +242,11 @@ void PythonOpenCensusCallTracer::PythonOpenCensusCallAttemptTracer::
? transport_stream_stats->incoming.data_bytes
: 0),
context_.Labels());
RecordDoubleMetric(
kRpcClientServerLatencyMeasureName,
absl::ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time)),
context_.Labels());
RecordDoubleMetric(kRpcClientServerLatencyMeasureName,
absl::ToDoubleSeconds(absl::Nanoseconds(elapsed_time)),
context_.Labels());
RecordDoubleMetric(kRpcClientRoundtripLatencyMeasureName,
absl::ToDoubleMilliseconds(absl::Now() - start_time_),
absl::ToDoubleSeconds(absl::Now() - start_time_),
context_.Labels());
RecordIntMetric(kRpcClientCompletedRpcMeasureName, 1, context_.Labels());
}

@ -90,7 +90,8 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
absl::StatusCode status_code_;
};
explicit PythonOpenCensusCallTracer(const char* method, const char* trace_id,
explicit PythonOpenCensusCallTracer(const char* method, const char* target,
const char* trace_id,
const char* parent_span_id,
bool tracing_enabled);
~PythonOpenCensusCallTracer() override;
@ -119,6 +120,8 @@ class PythonOpenCensusCallTracer : public grpc_core::ClientCallTracer {
// Client method.
absl::string_view method_;
// Client target.
absl::string_view target_;
PythonCensusContext context_;
bool tracing_enabled_;
mutable grpc_core::Mutex mu_;

@ -19,10 +19,11 @@
namespace grpc_observability {
const std::string kClientMethod = "grpc_client_method";
const std::string kClientStatus = "grpc_client_status";
const std::string kServerMethod = "grpc_server_method";
const std::string kServerStatus = "grpc_server_status";
const std::string kClientMethod = "grpc.method";
const std::string kClientTarget = "grpc.target";
const std::string kClientStatus = "grpc.status";
const std::string kServerMethod = "grpc.method";
const std::string kServerStatus = "grpc.status";
typedef enum { kMeasurementDouble = 0, kMeasurementInt } MeasurementType;

@ -91,10 +91,10 @@ void NativeObservabilityInit() {
g_census_data_buffer = new std::queue<CensusData>;
}
void* CreateClientCallTracer(const char* method, const char* trace_id,
const char* parent_span_id) {
void* CreateClientCallTracer(const char* method, const char* target,
const char* trace_id, const char* parent_span_id) {
void* client_call_tracer = new PythonOpenCensusCallTracer(
method, trace_id, parent_span_id, PythonCensusTracingEnabled());
method, target, trace_id, parent_span_id, PythonCensusTracingEnabled());
return client_call_tracer;
}

@ -50,8 +50,8 @@ extern std::queue<CensusData>* g_census_data_buffer;
extern std::mutex g_census_data_buffer_mutex;
extern std::condition_variable g_census_data_buffer_cv;
void* CreateClientCallTracer(const char* method, const char* trace_id,
const char* parent_span_id);
void* CreateClientCallTracer(const char* method, const char* target,
const char* trace_id, const char* parent_span_id);
void* CreateServerCallTracerFactory();

@ -278,6 +278,10 @@ inline absl::string_view GetMethod(const char* method) {
return absl::StripPrefix(absl::string_view(method), "/");
}
inline absl::string_view GetTarget(const char* target) {
return absl::string_view(target);
}
// Fills a pre-allocated buffer with the value for the grpc-trace-bin header.
// The buffer must be at least kGrpcTraceBinHeaderLen bytes long.
void ToGrpcTraceBinHeader(const PythonCensusContext& ctx, uint8_t* out);

@ -226,7 +226,7 @@ void PythonOpenCensusServerCallTracer::RecordEnd(
if (PythonCensusStatsEnabled()) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
double elapsed_time_s = absl::ToDoubleSeconds(elapsed_time_);
context_.Labels().emplace_back(kServerMethod, std::string(method_));
context_.Labels().emplace_back(
kServerStatus,
@ -235,7 +235,7 @@ void PythonOpenCensusServerCallTracer::RecordEnd(
static_cast<double>(response_size), context_.Labels());
RecordDoubleMetric(kRpcServerReceivedBytesPerRpcMeasureName,
static_cast<double>(request_size), context_.Labels());
RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_ms,
RecordDoubleMetric(kRpcServerServerLatencyMeasureName, elapsed_time_s,
context_.Labels());
RecordIntMetric(kRpcServerCompletedRpcMeasureName, 1, context_.Labels());
RecordIntMetric(kRpcServerSentMessagesPerRpcMeasureName,

@ -42,6 +42,7 @@ class SanityTest(unittest.TestCase):
tests_json_string = pkgutil.get_data(self.TEST_PKG_PATH, "tests.json")
tests_json = json.loads(tests_json_string.decode())
final_tests = []
# Observability is not supported in Windows and MacOS and Asyncio.
if (
@ -50,10 +51,12 @@ class SanityTest(unittest.TestCase):
or self.TEST_PKG_PATH == "tests_aio"
):
for test_case in tests_json:
if "_observability_test" in test_case:
tests_json.remove(test_case)
if "observability" not in test_case:
final_tests.append(test_case)
else:
final_tests = tests_json
self.assertSequenceEqual(tests_json, test_suite_names)
self.assertSequenceEqual(final_tests, test_suite_names)
self.assertGreater(len(test_suite_names), 0)

@ -13,13 +13,33 @@
# limitations under the License.
package(default_visibility = ["//visibility:public"])
py_library(
name = "test_server",
srcs = ["_test_server.py"],
)
py_test(
name = "_observability_test",
size = "small",
srcs = glob(["*.py"]),
srcs = ["_observability_test.py"],
imports = ["../../"],
main = "_observability_test.py",
deps = [
":test_server",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_observability/grpc_observability:pyobservability",
"//src/python/grpcio_tests/tests/testing",
],
)
py_test(
name = "_open_telemetry_observability_test",
size = "small",
srcs = ["_open_telemetry_observability_test.py"],
imports = ["../../"],
main = "_open_telemetry_observability_test.py",
deps = [
":test_server",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_observability/grpc_observability:pyobservability",
"//src/python/grpcio_tests/tests/testing",

@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
import json
import logging
import os
import random
import sys
from typing import Any, Dict, List
from typing import List
import unittest
import grpc
@ -26,41 +23,11 @@ import grpc_observability
from grpc_observability import _cyobservability
from grpc_observability import _observability
logger = logging.getLogger(__name__)
from tests.observability import _test_server
_REQUEST = b"\x00\x00\x00"
_RESPONSE = b"\x00\x00\x00"
logger = logging.getLogger(__name__)
_UNARY_UNARY = "/test/UnaryUnary"
_UNARY_STREAM = "/test/UnaryStream"
_STREAM_UNARY = "/test/StreamUnary"
_STREAM_STREAM = "/test/StreamStream"
STREAM_LENGTH = 5
TRIGGER_RPC_METADATA = ("control", "trigger_rpc")
TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "")
CONFIG_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG"
CONFIG_FILE_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE"
_VALID_CONFIG_TRACING_STATS = {
"project_id": "test-project",
"cloud_trace": {"sampling_rate": 1.00},
"cloud_monitoring": {},
}
_VALID_CONFIG_TRACING_ONLY = {
"project_id": "test-project",
"cloud_trace": {"sampling_rate": 1.00},
}
_VALID_CONFIG_STATS_ONLY = {
"project_id": "test-project",
"cloud_monitoring": {},
}
_VALID_CONFIG_STATS_ONLY_STR = """
{
'project_id': 'test-project',
'cloud_monitoring': {}
}
"""
class TestExporter(_observability.Exporter):
@ -84,69 +51,17 @@ class TestExporter(_observability.Exporter):
self.span_collecter.extend(tracing_data)
def handle_unary_unary(request, servicer_context):
if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata():
for k, v in servicer_context.invocation_metadata():
if "port" in k:
unary_unary_call(port=int(v))
if "to_new_server" in k:
second_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10)
)
second_server.add_generic_rpc_handlers((_GenericHandler(),))
second_server_port = second_server.add_insecure_port("[::]:0")
second_server.start()
unary_unary_call(port=second_server_port)
second_server.stop(0)
return _RESPONSE
def handle_unary_stream(request, servicer_context):
for _ in range(STREAM_LENGTH):
yield _RESPONSE
def handle_stream_unary(request_iterator, servicer_context):
return _RESPONSE
def handle_stream_stream(request_iterator, servicer_context):
for request in request_iterator:
yield _RESPONSE
class _MethodHandler(grpc.RpcMethodHandler):
def __init__(self, request_streaming, response_streaming):
self.request_streaming = request_streaming
self.response_streaming = response_streaming
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
self.stream_stream = handle_stream_stream
elif self.request_streaming:
self.stream_unary = handle_stream_unary
elif self.response_streaming:
self.unary_stream = handle_unary_stream
else:
self.unary_unary = handle_unary_unary
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _UNARY_UNARY:
return _MethodHandler(False, False)
elif handler_call_details.method == _UNARY_STREAM:
return _MethodHandler(False, True)
elif handler_call_details.method == _STREAM_UNARY:
return _MethodHandler(True, False)
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True)
else:
return None
class _ClientUnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor):
def intercept_unary_unary(
self, continuation, client_call_details, request_or_iterator
):
response = continuation(client_call_details, request_or_iterator)
return response
class _ServerInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
return continuation(handler_call_details)
@unittest.skipIf(
@ -162,173 +77,118 @@ class ObservabilityTest(unittest.TestCase):
self._port = None
def tearDown(self):
os.environ[CONFIG_ENV_VAR_NAME] = ""
os.environ[CONFIG_FILE_ENV_VAR_NAME] = ""
if self._server:
self._server.stop(0)
def testRecordUnaryUnary(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testThrowErrorWithoutConfig(self):
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter
):
pass
def testThrowErrorWithInvalidConfig(self):
_INVALID_CONFIG = "INVALID"
self._set_config_file(_INVALID_CONFIG)
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter
):
pass
def testNoErrorAndDataWithEmptyConfig(self):
_EMPTY_CONFIG = {}
self._set_config_file(_EMPTY_CONFIG)
# Empty config still require project_id
os.environ["GCP_PROJECT"] = "test-project"
with grpc_observability.GCPOpenCensusObservability(
def testRecordUnaryUnaryWithClientInterceptor(self):
interceptor = _ClientUnaryUnaryInterceptor()
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
self.assertEqual(len(self.all_metric), 0)
server, port = _test_server.start_server()
self._server = server
_test_server.intercepted_unary_unary_call(
port=port, interceptors=interceptor
)
def testThrowErrorWhenCallingMultipleInit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with self.assertRaises(ValueError):
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter
) as o11y:
grpc._observability.observability_init(o11y)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testRecordUnaryUnaryStatsOnly(self):
self._set_config_file(_VALID_CONFIG_STATS_ONLY)
with grpc_observability.GCPOpenCensusObservability(
def testRecordUnaryUnaryWithServerInterceptor(self):
interceptor = _ServerInterceptor()
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
server, port = _test_server.start_server(interceptors=[interceptor])
self._server = server
_test_server.unary_unary_call(port=port)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testRecordUnaryUnaryTracingOnly(self):
self._set_config_file(_VALID_CONFIG_TRACING_ONLY)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
self.assertEqual(len(self.all_metric), 0)
def testThrowErrorWhenCallingMultipleInit(self):
with self.assertRaises(ValueError):
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
) as o11y:
grpc._observability.observability_init(o11y)
def testRecordUnaryStream(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_stream_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
_test_server.unary_stream_call(port=port)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testRecordStreamUnary(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
stream_unary_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
_test_server.stream_unary_call(port=port)
self.assertTrue(len(self.all_metric) > 0)
self.assertTrue(len(self.all_span) > 0)
self._validate_metrics(self.all_metric)
def testRecordStreamStream(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
stream_stream_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
_test_server.stream_stream_call(port=port)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testNoRecordBeforeInit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
self._start_server()
unary_unary_call(port=self._port)
server, port = _test_server.start_server()
_test_server.unary_unary_call(port=port)
self.assertEqual(len(self.all_metric), 0)
self._server.stop(0)
server.stop(0)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
self.assertGreater(len(self.all_metric), 0)
self._validate_metrics(self.all_metric)
def testNoRecordAfterExit(self):
self._set_config_file(_VALID_CONFIG_TRACING_STATS)
with grpc_observability.GCPOpenCensusObservability(
with grpc_observability.OpenTelemetryObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
server, port = _test_server.start_server()
self._server = server
self._port = port
_test_server.unary_unary_call(port=port)
self.assertGreater(len(self.all_metric), 0)
current_metric_len = len(self.all_metric)
self._validate_metrics(self.all_metric)
unary_unary_call(port=self._port)
_test_server.unary_unary_call(port=self._port)
self.assertEqual(len(self.all_metric), current_metric_len)
def testConfigFileOverEnvVar(self):
# env var have only stats enabled
os.environ[CONFIG_ENV_VAR_NAME] = _VALID_CONFIG_STATS_ONLY_STR
# config_file have only tracing enabled
self._set_config_file(_VALID_CONFIG_TRACING_ONLY)
with grpc_observability.GCPOpenCensusObservability(
exporter=self.test_exporter
):
self._start_server()
unary_unary_call(port=self._port)
self.assertEqual(len(self.all_metric), 0)
def _set_config_file(self, config: Dict[str, Any]) -> None:
# Using random name here so multiple tests can run with different config files.
config_file_path = "/tmp/" + str(random.randint(0, 100000))
with open(config_file_path, "w", encoding="utf-8") as f:
f.write(json.dumps(config))
os.environ[CONFIG_FILE_ENV_VAR_NAME] = config_file_path
def _start_server(self) -> None:
self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self._server.add_generic_rpc_handlers((_GenericHandler(),))
self._port = self._server.add_insecure_port("[::]:0")
self._server.start()
return self._port
def _validate_metrics(
self, metrics: List[_observability.StatsData]
) -> None:
@ -343,41 +203,6 @@ class ObservabilityTest(unittest.TestCase):
self.assertTrue(name in metric_names)
def unary_unary_call(port, metadata=None):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.unary_unary(_UNARY_UNARY)
if metadata:
unused_response, call = multi_callable.with_call(
_REQUEST, metadata=metadata
)
else:
unused_response, call = multi_callable.with_call(_REQUEST)
def unary_stream_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.unary_stream(_UNARY_STREAM)
call = multi_callable(_REQUEST)
for _ in call:
pass
def stream_unary_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.stream_unary(_STREAM_UNARY)
unused_response, call = multi_callable.with_call(
iter([_REQUEST] * STREAM_LENGTH)
)
def stream_stream_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.stream_stream(_STREAM_STREAM)
call = multi_callable(iter([_REQUEST] * STREAM_LENGTH))
for _ in call:
pass
if __name__ == "__main__":
logging.basicConfig()
unittest.main(verbosity=2)

@ -0,0 +1,299 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import datetime
import logging
import os
import sys
import time
from typing import Any, Callable, Dict, List, Optional, Set
import unittest
import grpc_observability
from grpc_observability import _open_telemetry_measures
from grpc_observability._open_telemetry_plugin import GRPC_METHOD_LABEL
from grpc_observability._open_telemetry_plugin import GRPC_OTHER_LABEL_VALUE
from grpc_observability._open_telemetry_plugin import GRPC_TARGET_LABEL
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import AggregationTemporality
from opentelemetry.sdk.metrics.export import MetricExportResult
from opentelemetry.sdk.metrics.export import MetricExporter
from opentelemetry.sdk.metrics.export import MetricsData
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from tests.observability import _test_server
logger = logging.getLogger(__name__)
STREAM_LENGTH = 5
OTEL_EXPORT_INTERVAL_S = 0.5
class OTelMetricExporter(MetricExporter):
"""Implementation of :class:`MetricExporter` that export metrics to the
provided metric_list.
all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name,
value is a list of labels recorded for that metric.
An example item of this dict:
{"grpc.client.attempt.started":
[{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'},
{'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]}
"""
def __init__(
self,
all_metrics: Dict[str, List],
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[
type, "opentelemetry.sdk.metrics.view.Aggregation"
] = None,
):
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self.all_metrics = all_metrics
def export(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
self.record_metric(metrics_data)
return MetricExportResult.SUCCESS
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True
def record_metric(self, metrics_data: MetricsData) -> None:
for resource_metric in metrics_data.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
for data_point in metric.data.data_points:
self.all_metrics[metric.name].append(
data_point.attributes
)
class BaseTestOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin):
def __init__(self, provider: MeterProvider):
self.provider = provider
def get_meter_provider(self) -> Optional[MeterProvider]:
return self.provider
@unittest.skipIf(
os.name == "nt" or "darwin" in sys.platform,
"Observability is not supported in Windows and MacOS",
)
class OpenTelemetryObservabilityTest(unittest.TestCase):
def setUp(self):
self.all_metrics = defaultdict(list)
otel_exporter = OTelMetricExporter(self.all_metrics)
reader = PeriodicExportingMetricReader(
exporter=otel_exporter,
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000,
)
self._provider = MeterProvider(metric_readers=[reader])
self._server = None
self._port = None
def tearDown(self):
if self._server:
self._server.stop(0)
def testRecordUnaryUnary(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testRecordUnaryUnaryClientOnly(self):
server, port = _test_server.start_server()
self._server = server
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
_test_server.unary_unary_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_client_metrics_names(self.all_metrics)
def testRecordUnaryStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
server, port = _test_server.start_server()
self._server = server
_test_server.unary_stream_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testRecordStreamUnary(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
server, port = _test_server.start_server()
self._server = server
_test_server.stream_unary_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testRecordStreamStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
server, port = _test_server.start_server()
self._server = server
_test_server.stream_stream_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
def testTargetAttributeFilter(self):
main_server, main_port = _test_server.start_server()
backup_server, backup_port = _test_server.start_server()
main_target = f"localhost:{main_port}"
backup_target = f"localhost:{backup_port}"
# Replace target label with 'other' for main_server.
def target_filter(target: str) -> bool:
if main_target in target:
return False
return True
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
otel_plugin.target_attribute_filter = target_filter
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
_test_server.unary_unary_call(port=main_port)
_test_server.unary_unary_call(port=backup_port)
self._validate_metrics_exist(self.all_metrics)
self._validate_client_metrics_names(self.all_metrics)
target_values = set()
for label_list in self.all_metrics.values():
for labels in label_list:
if GRPC_TARGET_LABEL in labels:
target_values.add(labels[GRPC_TARGET_LABEL])
self.assertTrue(GRPC_OTHER_LABEL_VALUE in target_values)
self.assertTrue(backup_target in target_values)
main_server.stop(0)
backup_server.stop(0)
def testMethodAttributeFilter(self):
# If method name is 'test/UnaryUnaryFiltered', is should be replaced with 'other'.
FILTERED_METHOD_NAME = "test/UnaryUnaryFiltered"
def method_filter(method: str) -> bool:
if FILTERED_METHOD_NAME in method:
return False
return True
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
otel_plugin.generic_method_attribute_filter = method_filter
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
):
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)
_test_server.unary_unary_filtered_call(port=port)
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
method_values = set()
for label_list in self.all_metrics.values():
for labels in label_list:
if GRPC_METHOD_LABEL in labels:
method_values.add(labels[GRPC_METHOD_LABEL])
self.assertTrue(GRPC_OTHER_LABEL_VALUE in method_values)
self.assertTrue(FILTERED_METHOD_NAME not in method_values)
def assert_eventually(
self,
predicate: Callable[[], bool],
*,
timeout: Optional[datetime.timedelta] = None,
message: Optional[Callable[[], str]] = None,
) -> None:
message = message or (lambda: "Proposition did not evaluate to true")
timeout = timeout or datetime.timedelta(seconds=10)
end = datetime.datetime.now() + timeout
while datetime.datetime.now() < end:
if predicate():
break
time.sleep(0.5)
else:
self.fail(message() + " after " + str(timeout))
def _validate_metrics_exist(self, all_metrics: Dict[str, Any]) -> None:
# Sleep here to make sure we have at least one export from OTel MetricExporter.
self.assert_eventually(
lambda: len(all_metrics.keys()) > 1,
message=lambda: f"No metrics was exported",
)
def _validate_all_metrics_names(self, metric_names: Set[str]) -> None:
self._validate_server_metrics_names(metric_names)
self._validate_client_metrics_names(metric_names)
def _validate_server_metrics_names(self, metric_names: Set[str]) -> None:
for base_metric in _open_telemetry_measures.base_metrics():
if "grpc.server" in base_metric.name:
self.assertTrue(
base_metric.name in metric_names,
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
)
def _validate_client_metrics_names(self, metric_names: Set[str]) -> None:
for base_metric in _open_telemetry_measures.base_metrics():
if "grpc.client" in base_metric.name:
self.assertTrue(
base_metric.name in metric_names,
msg=f"metric {base_metric.name} not found in exported metrics: {metric_names}!",
)
if __name__ == "__main__":
logging.basicConfig()
unittest.main(verbosity=2)

@ -0,0 +1,169 @@
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent import futures
from typing import Tuple
import grpc
_REQUEST = b"\x00\x00\x00"
_RESPONSE = b"\x00\x00\x00"
_UNARY_UNARY = "/test/UnaryUnary"
_UNARY_UNARY_FILTERED = "/test/UnaryUnaryFiltered"
_UNARY_STREAM = "/test/UnaryStream"
_STREAM_UNARY = "/test/StreamUnary"
_STREAM_STREAM = "/test/StreamStream"
STREAM_LENGTH = 5
TRIGGER_RPC_METADATA = ("control", "trigger_rpc")
TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "")
def handle_unary_unary(request, servicer_context):
if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata():
for k, v in servicer_context.invocation_metadata():
if "port" in k:
unary_unary_call(port=int(v))
if "to_new_server" in k:
second_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10)
)
second_server.add_generic_rpc_handlers((_GenericHandler(),))
second_server_port = second_server.add_insecure_port("[::]:0")
second_server.start()
unary_unary_call(port=second_server_port)
second_server.stop(0)
return _RESPONSE
def handle_unary_stream(request, servicer_context):
for _ in range(STREAM_LENGTH):
yield _RESPONSE
def handle_stream_unary(request_iterator, servicer_context):
return _RESPONSE
def handle_stream_stream(request_iterator, servicer_context):
for request in request_iterator:
yield _RESPONSE
class _MethodHandler(grpc.RpcMethodHandler):
def __init__(self, request_streaming, response_streaming):
self.request_streaming = request_streaming
self.response_streaming = response_streaming
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
if self.request_streaming and self.response_streaming:
self.stream_stream = handle_stream_stream
elif self.request_streaming:
self.stream_unary = handle_stream_unary
elif self.response_streaming:
self.unary_stream = handle_unary_stream
else:
self.unary_unary = handle_unary_unary
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
if handler_call_details.method == _UNARY_UNARY:
return _MethodHandler(False, False)
if handler_call_details.method == _UNARY_UNARY_FILTERED:
return _MethodHandler(False, False)
elif handler_call_details.method == _UNARY_STREAM:
return _MethodHandler(False, True)
elif handler_call_details.method == _STREAM_UNARY:
return _MethodHandler(True, False)
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True)
else:
return None
def start_server(interceptors=None) -> Tuple[grpc.Server, int]:
if interceptors:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptors,
)
else:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
server.add_generic_rpc_handlers((_GenericHandler(),))
port = server.add_insecure_port("[::]:0")
server.start()
return server, port
def unary_unary_call(port, metadata=None):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.unary_unary(_UNARY_UNARY)
if metadata:
unused_response, call = multi_callable.with_call(
_REQUEST, metadata=metadata
)
else:
unused_response, call = multi_callable.with_call(_REQUEST)
def intercepted_unary_unary_call(port, interceptors, metadata=None):
with grpc.insecure_channel(f"localhost:{port}") as channel:
intercept_channel = grpc.intercept_channel(channel, interceptors)
multi_callable = intercept_channel.unary_unary(_UNARY_UNARY)
if metadata:
unused_response, call = multi_callable.with_call(
_REQUEST, metadata=metadata
)
else:
unused_response, call = multi_callable.with_call(_REQUEST)
def unary_unary_filtered_call(port, metadata=None):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.unary_unary(_UNARY_UNARY_FILTERED)
if metadata:
unused_response, call = multi_callable.with_call(
_REQUEST, metadata=metadata
)
else:
unused_response, call = multi_callable.with_call(_REQUEST)
def unary_stream_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.unary_stream(_UNARY_STREAM)
call = multi_callable(_REQUEST)
for _ in call:
pass
def stream_unary_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.stream_unary(_STREAM_UNARY)
unused_response, call = multi_callable.with_call(
iter([_REQUEST] * STREAM_LENGTH)
)
def stream_stream_call(port):
with grpc.insecure_channel(f"localhost:{port}") as channel:
multi_callable = channel.stream_stream(_STREAM_STREAM)
call = multi_callable(iter([_REQUEST] * STREAM_LENGTH))
for _ in call:
pass

@ -10,6 +10,7 @@
"tests.interop._insecure_intraop_test.InsecureIntraopTest",
"tests.interop._secure_intraop_test.SecureIntraopTest",
"tests.observability._observability_test.ObservabilityTest",
"tests.observability._open_telemetry_observability_test.OpenTelemetryObservabilityTest",
"tests.protoc_plugin._python_plugin_test.ModuleMainTest",
"tests.protoc_plugin._python_plugin_test.PythonPluginTest",
"tests.protoc_plugin._python_plugin_test.SimpleStubsPluginTest",

@ -0,0 +1,23 @@
%YAML 1.2
--- |
# Copyright 2023 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file has been automatically generated from a template file.
# Please make modifications to
# `$REPO_ROOT/templates/src/python/grpcio/_parallel_compile_patch.py.template`
# instead. This file can be regenerated from the template by running
# `tools/buildgen/generate_projects.sh`.
<%include file="../_parallel_compile_patch.py.include" />

@ -28,22 +28,29 @@ set -e
BASEDIR=$(dirname "$0")
BASEDIR=$(realpath "$BASEDIR")/../..
PACKAGES="grpcio_channelz grpcio_csds grpcio_admin grpcio_health_checking grpcio_reflection grpcio_status grpcio_testing grpcio_tests"
PACKAGES="grpcio_channelz grpcio_csds grpcio_admin grpcio_health_checking grpcio_reflection grpcio_status grpcio_testing grpcio_tests"
(cd "$BASEDIR";
pip install --upgrade cython;
pip install --upgrade "cython<3.0.0rc1";
python setup.py install;
pushd tools/distrib/python/grpcio_tools;
../make_grpcio_tools.py
GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install .
popd;
pushd src/python/grpcio_observability;
./make_grpcio_observability.py
GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install .
popd;
pushd tools/distrib/python/xds_protos;
GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install .
popd;
pushd src/python;
for PACKAGE in ${PACKAGES}; do
pushd "${PACKAGE}";
python setup.py clean;
maybe_run_command preprocess
maybe_run_command build_package_protos
python setup.py install;
python -m pip install .;
popd;
done
popd;

Loading…
Cancel
Save