[Python CSM] Changes to enable Python CSM interop tests (#36790)

Passed test run:
- [x] [grpc/core/master/linux/psm-csm-python](https://source.cloud.google.com/results/invocations/bf34eda3-aa41-47b1-87e8-95b66afedb6e)

Related Interop framework change:
* https://github.com/grpc/psm-interop/pull/91
<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #36790

PiperOrigin-RevId: 639953335
pull/36807/head
Xuan Wang 6 months ago committed by Copybara-Service
parent d623f31d97
commit 82ca82ca81
  1. 6
      requirements.bazel.txt
  2. 82
      src/python/grpcio_csm_observability/README.rst
  3. 2
      src/python/grpcio_csm_observability/setup.py
  4. 4
      src/python/grpcio_observability/README.rst
  5. 10
      src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel
  6. 66
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py
  7. 49
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py

@ -26,11 +26,11 @@ pyasn1==0.5.0
rsa==4.9
greenlet==1.1.3.post0
zope.interface==6.1
opentelemetry-sdk==1.24.0
opentelemetry-api==1.24.0
opentelemetry-sdk==1.25.0
opentelemetry-api==1.25.0
importlib-metadata==6.11.0
opentelemetry-resourcedetector-gcp==1.6.0a0
opentelemetry-exporter-prometheus==0.45b0
opentelemetry-exporter-prometheus==0.46b0
prometheus_client==0.20.0
Deprecated==1.2.14
opentelemetry-semantic-conventions==0.42b0

@ -1,5 +1,83 @@
gRPC Python CSM Observability
=========================
=============================
Package for gRPC Python CSM Observability.
TODO(xuanwn): Add more content.
Supported Python Versions
-------------------------
Python >= 3.8
Installation
------------
Currently gRPC Python CSM Observability is **only available for Linux**.
Installing From PyPI
~~~~~~~~~~~~~~~~~~~~
::
$ pip install grpcio-csm-observability
Installing From Source
~~~~~~~~~~~~~~~~~~~~~~
::
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
$ git clone -b RELEASE_TAG_HERE https://github.com/grpc/grpc $REPO_ROOT
$ cd $REPO_ROOT
$ git submodule update --init
$ cd src/python/grpcio_csm_observability
# For the next command do `sudo pip install` if you get permission-denied errors
$ pip install .
Dependencies
------------
gRPC Python CSM Observability Depends on the following packages:
::
grpcio
grpcio-observability
opentelemetry-sdk
Usage
-----
Example usage is similar to `the example here <https://github.com/grpc/grpc/tree/master/examples/python/observability>`_, instead of importing from ``grpc_observability``, you should import from ``grpc_csm_observability``:
.. code-block:: python
import grpc_csm_observability
csm_otel_plugin = grpc_csm_observability.CsmOpenTelemetryPlugin(
meter_provider=provider
)
We also provide several environment variables to help you optimize gRPC python observability for your particular use.
* Note: The term "Census" here is just for historical backwards compatibility reasons and does not imply any dependencies.
1. GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL
* This controls how frequently telemetry data collected within gRPC Core is sent to Python layer.
* Default value is 0.5 (Seconds).
2. GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE
* This controls the maximum number of telemetry data items that can be held in the buffer within gRPC Core before they are sent to Python.
* Default value is 10,000.
3. GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD
* This setting acts as a trigger: When the buffer in gRPC Core reaches a certain percentage of its capacity, the telemetry data is sent to Python.
* Default value is 0.7 (Which means buffer will start export when it's 70% full).
4. GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT
* This controls the maximum time allowed for the exporting thread (responsible for sending data to Python) to complete.
* Main thread will terminate the exporting thread after this timeout.
* Default value is 10 (Seconds).

@ -36,7 +36,7 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
"opentelemetry-sdk>=1.24.0",
"opentelemetry-sdk>=1.25.0",
"opentelemetry-resourcedetector-gcp>=1.6.0a0",
"grpcio=={version}".format(version=grpc_version.VERSION),
"protobuf>=5.26.1,<6.0dev",

@ -19,7 +19,7 @@ time the data is collected and the time it becomes available through Python expo
Supported Python Versions
-------------------------
Python >= 3.7
Python >= 3.8
Installation
------------
@ -63,7 +63,7 @@ gRPC Python Observability Depends on the following packages:
::
grpcio
opentelemetry-api==1.21.0
opentelemetry-api
Usage

@ -11,11 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
py_binary(
name = "xds_interop_client",
srcs = ["xds_interop_client.py"],
python_version = "PY3",
deps = [
# csm_observability dependency was added first because we have another opentelemetry module in xds_protos,
# Since we need the actual opentelemetry module, we need include csm_observability before grpc_admin.
deps = ["//src/python/grpcio_csm_observability/grpc_csm_observability:csm_observability"] + [
"//src/proto/grpc/testing:empty_py_pb2",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:py_test_proto",
@ -23,6 +28,7 @@ py_binary(
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_admin/grpc_admin",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
requirement("opentelemetry-exporter-prometheus"),
],
)
@ -37,8 +43,10 @@ py_binary(
"//src/proto/grpc/testing:test_py_pb2_grpc",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
"//src/python/grpcio_csm_observability/grpc_csm_observability:csm_observability",
"//src/python/grpcio_health_checking/grpc_health/v1:grpc_health",
"//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection",
requirement("opentelemetry-exporter-prometheus"),
],
)

@ -36,6 +36,10 @@ import grpc
from grpc import _typing as grpc_typing
import grpc_admin
from grpc_channelz.v1 import channelz
from grpc_csm_observability import CsmOpenTelemetryPlugin
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from prometheus_client import start_http_server
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
@ -65,6 +69,8 @@ _METHOD_STR_TO_ENUM = {
_METHOD_ENUM_TO_STR = {v: k for k, v in _METHOD_STR_TO_ENUM.items()}
_PROMETHEUS_PORT = 9464
PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]]
@ -269,13 +275,28 @@ def _start_rpc(
stub: test_pb2_grpc.TestServiceStub,
timeout: float,
futures: Mapping[int, Tuple[FutureFromCallType, str]],
request_payload_size: int,
response_payload_size: int,
) -> None:
logger.debug(f"Sending {method} request to backend: {request_id}")
if method == "UnaryCall":
request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
response_size=response_payload_size,
payload=messages_pb2.Payload(body=b"0" * request_payload_size),
)
future = stub.UnaryCall.future(
messages_pb2.SimpleRequest(), metadata=metadata, timeout=timeout
request, metadata=metadata, timeout=timeout
)
elif method == "EmptyCall":
if request_payload_size > 0:
logger.error(
f"request_payload_size should not be set for EMPTY_CALL"
)
if response_payload_size > 0:
logger.error(
f"response_payload_size should not be set for EMPTY_CALL"
)
future = stub.EmptyCall.future(
empty_pb2.Empty(), metadata=metadata, timeout=timeout
)
@ -365,6 +386,8 @@ class _ChannelConfiguration:
rpc_timeout_sec: int,
print_response: bool,
secure_mode: bool,
request_payload_size: int,
response_payload_size: int,
):
# condition is signalled when a change is made to the config.
self.condition = threading.Condition()
@ -376,6 +399,8 @@ class _ChannelConfiguration:
self.rpc_timeout_sec = rpc_timeout_sec
self.print_response = print_response
self.secure_mode = secure_mode
self.response_payload_size = response_payload_size
self.request_payload_size = request_payload_size
def _run_single_channel(config: _ChannelConfiguration) -> None:
@ -415,6 +440,8 @@ def _run_single_channel(config: _ChannelConfiguration) -> None:
stub,
float(config.rpc_timeout_sec),
futures,
config.request_payload_size,
config.response_payload_size,
)
print_response = config.print_response
_remove_completed_rpcs(futures, config.print_response)
@ -501,6 +528,10 @@ def _run(
per_method_metadata: PerMethodMetadataType,
) -> None:
logger.info("Starting python xDS Interop Client.")
csm_plugin = None
if args.enable_csm_observability:
csm_plugin = _prepare_csm_observability_plugin()
csm_plugin.register_global()
global _global_server # pylint: disable=global-statement
method_handles = []
channel_configs = {}
@ -517,6 +548,8 @@ def _run(
args.rpc_timeout_sec,
args.print_response,
args.secure_mode,
args.request_payload_size,
args.response_payload_size,
)
channel_configs[method] = channel_config
method_handles.append(_MethodHandle(args.num_channels, channel_config))
@ -534,6 +567,8 @@ def _run(
_global_server.wait_for_termination()
for method_handle in method_handles:
method_handle.stop()
if csm_plugin:
csm_plugin.deregister_global()
def parse_metadata_arg(metadata_arg: str) -> PerMethodMetadataType:
@ -569,6 +604,17 @@ def bool_arg(arg: str) -> bool:
raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin:
# Start Prometheus client
start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0")
reader = PrometheusMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
csm_plugin = CsmOpenTelemetryPlugin(
meter_provider=meter_provider,
)
return csm_plugin
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run Python XDS interop client."
@ -621,6 +667,24 @@ if __name__ == "__main__":
parser.add_argument(
"--log_file", default=None, type=str, help="A file to log to."
)
parser.add_argument(
"--enable_csm_observability",
help="Whether to enable CSM Observability",
default=False,
action="store_true",
)
parser.add_argument(
"--request_payload_size",
default=0,
type=int,
help="Set the SimpleRequest.payload.body to a string of repeated 0 (zero) ASCII characters of the given size in bytes.",
)
parser.add_argument(
"--response_payload_size",
default=0,
type=int,
help="Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).",
)
rpc_help = "A comma-delimited list of RPC methods to run. Must be one of "
rpc_help += ", ".join(_SUPPORTED_METHODS)
rpc_help += "."

@ -26,10 +26,14 @@ from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple
import grpc
from grpc_channelz.v1 import channelz
from grpc_channelz.v1 import channelz_pb2
from grpc_csm_observability import CsmOpenTelemetryPlugin
from grpc_health.v1 import health as grpc_health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from grpc_reflection.v1alpha import reflection
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from prometheus_client import start_http_server
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
@ -41,6 +45,7 @@ from src.proto.grpc.testing import test_pb2_grpc
# tests.
_LISTEN_HOST = "0.0.0.0"
_PROMETHEUS_PORT = 9464
_THREAD_POOL_SIZE = 256
@ -66,7 +71,12 @@ class TestService(test_pb2_grpc.TestServiceServicer):
self, request: messages_pb2.SimpleRequest, context: grpc.ServicerContext
) -> messages_pb2.SimpleResponse:
context.send_initial_metadata((("hostname", self._hostname),))
response = messages_pb2.SimpleResponse()
if request.response_size > 0:
response = messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(body=b"0" * request.response_size)
)
else:
response = messages_pb2.SimpleResponse()
response.server_id = self._server_id
response.hostname = self._hostname
return response
@ -114,8 +124,16 @@ def _configure_test_server(
def _run(
port: int, maintenance_port: int, secure_mode: bool, server_id: str
port: int,
maintenance_port: int,
secure_mode: bool,
server_id: str,
enable_csm_observability: bool,
) -> None:
csm_plugin = None
if enable_csm_observability:
csm_plugin = _prepare_csm_observability_plugin()
csm_plugin.register_global()
if port == maintenance_port:
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)
@ -142,6 +160,8 @@ def _run(
logger.info("Test server listening on port %d", port)
test_server.wait_for_termination()
maintenance_server.wait_for_termination()
if csm_plugin:
csm_plugin.deregister_global()
def bool_arg(arg: str) -> bool:
@ -153,6 +173,17 @@ def bool_arg(arg: str) -> bool:
raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin:
# Start Prometheus client
start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0")
reader = PrometheusMetricReader()
meter_provider = MeterProvider(metric_readers=[reader])
csm_plugin = CsmOpenTelemetryPlugin(
meter_provider=meter_provider,
)
return csm_plugin
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run Python xDS interop server."
@ -184,6 +215,12 @@ if __name__ == "__main__":
default=False,
action="store_true",
)
parser.add_argument(
"--enable_csm_observability",
help="Whether to enable CSM Observability",
default=False,
action="store_true",
)
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
@ -194,4 +231,10 @@ if __name__ == "__main__":
"--port and --maintenance_port must not be the same when"
" --secure_mode is set."
)
_run(args.port, args.maintenance_port, args.secure_mode, args.server_id)
_run(
args.port,
args.maintenance_port,
args.secure_mode,
args.server_id,
args.enable_csm_observability,
)

Loading…
Cancel
Save