diff --git a/requirements.bazel.txt b/requirements.bazel.txt index c6036839b1f..8756fe8a6e1 100644 --- a/requirements.bazel.txt +++ b/requirements.bazel.txt @@ -26,11 +26,11 @@ pyasn1==0.5.0 rsa==4.9 greenlet==1.1.3.post0 zope.interface==6.1 -opentelemetry-sdk==1.24.0 -opentelemetry-api==1.24.0 +opentelemetry-sdk==1.25.0 +opentelemetry-api==1.25.0 importlib-metadata==6.11.0 opentelemetry-resourcedetector-gcp==1.6.0a0 -opentelemetry-exporter-prometheus==0.45b0 +opentelemetry-exporter-prometheus==0.46b0 prometheus_client==0.20.0 Deprecated==1.2.14 opentelemetry-semantic-conventions==0.42b0 diff --git a/src/python/grpcio_csm_observability/README.rst b/src/python/grpcio_csm_observability/README.rst index 8f2dc6ccd82..f2c549afdb1 100644 --- a/src/python/grpcio_csm_observability/README.rst +++ b/src/python/grpcio_csm_observability/README.rst @@ -1,5 +1,83 @@ gRPC Python CSM Observability -========================= +============================= Package for gRPC Python CSM Observability. -TODO(xuanwn): Add more content. + +Supported Python Versions +------------------------- +Python >= 3.8 + +Installation +------------ + +Currently gRPC Python CSM Observability is **only available for Linux**. + +Installing From PyPI +~~~~~~~~~~~~~~~~~~~~ + +:: + + $ pip install grpcio-csm-observability + + +Installing From Source +~~~~~~~~~~~~~~~~~~~~~~ + +:: + + $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice + $ git clone -b RELEASE_TAG_HERE https://github.com/grpc/grpc $REPO_ROOT + $ cd $REPO_ROOT + $ git submodule update --init + + $ cd src/python/grpcio_csm_observability + + # For the next command do `sudo pip install` if you get permission-denied errors + $ pip install . + + +Dependencies +------------ +gRPC Python CSM Observability Depends on the following packages: + +:: + + grpcio + grpcio-observability + opentelemetry-sdk + + +Usage +----- + +Example usage is similar to `the example here `_, instead of importing from ``grpc_observability``, you should import from ``grpc_csm_observability``: + +.. code-block:: python + + import grpc_csm_observability + + csm_otel_plugin = grpc_csm_observability.CsmOpenTelemetryPlugin( + meter_provider=provider + ) + + +We also provide several environment variables to help you optimize gRPC python observability for your particular use. + +* Note: The term "Census" here is just for historical backwards compatibility reasons and does not imply any dependencies. + +1. GRPC_PYTHON_CENSUS_EXPORT_BATCH_INTERVAL + * This controls how frequently telemetry data collected within gRPC Core is sent to Python layer. + * Default value is 0.5 (Seconds). + +2. GRPC_PYTHON_CENSUS_MAX_EXPORT_BUFFER_SIZE + * This controls the maximum number of telemetry data items that can be held in the buffer within gRPC Core before they are sent to Python. + * Default value is 10,000. + +3. GRPC_PYTHON_CENSUS_EXPORT_THRESHOLD + * This setting acts as a trigger: When the buffer in gRPC Core reaches a certain percentage of its capacity, the telemetry data is sent to Python. + * Default value is 0.7 (Which means buffer will start export when it's 70% full). + +4. GRPC_PYTHON_CENSUS_EXPORT_THREAD_TIMEOUT + * This controls the maximum time allowed for the exporting thread (responsible for sending data to Python) to complete. + * Main thread will terminate the exporting thread after this timeout. + * Default value is 10 (Seconds). diff --git a/src/python/grpcio_csm_observability/setup.py b/src/python/grpcio_csm_observability/setup.py index 582c6d9906c..c9a26db1ed5 100644 --- a/src/python/grpcio_csm_observability/setup.py +++ b/src/python/grpcio_csm_observability/setup.py @@ -36,7 +36,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - "opentelemetry-sdk>=1.24.0", + "opentelemetry-sdk>=1.25.0", "opentelemetry-resourcedetector-gcp>=1.6.0a0", "grpcio=={version}".format(version=grpc_version.VERSION), "protobuf>=5.26.1,<6.0dev", diff --git a/src/python/grpcio_observability/README.rst b/src/python/grpcio_observability/README.rst index 3db863b30e3..11ba92f90f5 100644 --- a/src/python/grpcio_observability/README.rst +++ b/src/python/grpcio_observability/README.rst @@ -19,7 +19,7 @@ time the data is collected and the time it becomes available through Python expo Supported Python Versions ------------------------- -Python >= 3.7 +Python >= 3.8 Installation ------------ @@ -63,7 +63,7 @@ gRPC Python Observability Depends on the following packages: :: grpcio - opentelemetry-api==1.21.0 + opentelemetry-api Usage diff --git a/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel index ca360871209..bf7beef5e16 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel +++ b/src/python/grpcio_tests/tests_py3_only/interop/BUILD.bazel @@ -11,11 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +load("@grpc_python_dependencies//:requirements.bzl", "requirement") + py_binary( name = "xds_interop_client", srcs = ["xds_interop_client.py"], python_version = "PY3", - deps = [ + # csm_observability dependency was added first because we have another opentelemetry module in xds_protos, + # Since we need the actual opentelemetry module, we need include csm_observability before grpc_admin. + deps = ["//src/python/grpcio_csm_observability/grpc_csm_observability:csm_observability"] + [ "//src/proto/grpc/testing:empty_py_pb2", "//src/proto/grpc/testing:py_messages_proto", "//src/proto/grpc/testing:py_test_proto", @@ -23,6 +28,7 @@ py_binary( "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_admin/grpc_admin", "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + requirement("opentelemetry-exporter-prometheus"), ], ) @@ -37,8 +43,10 @@ py_binary( "//src/proto/grpc/testing:test_py_pb2_grpc", "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//src/python/grpcio_csm_observability/grpc_csm_observability:csm_observability", "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", "//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection", + requirement("opentelemetry-exporter-prometheus"), ], ) diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py index 86565b6126f..a3d9e0407d0 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py @@ -36,6 +36,10 @@ import grpc from grpc import _typing as grpc_typing import grpc_admin from grpc_channelz.v1 import channelz +from grpc_csm_observability import CsmOpenTelemetryPlugin +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.sdk.metrics import MeterProvider +from prometheus_client import start_http_server from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 @@ -65,6 +69,8 @@ _METHOD_STR_TO_ENUM = { _METHOD_ENUM_TO_STR = {v: k for k, v in _METHOD_STR_TO_ENUM.items()} +_PROMETHEUS_PORT = 9464 + PerMethodMetadataType = Mapping[str, Sequence[Tuple[str, str]]] @@ -269,13 +275,28 @@ def _start_rpc( stub: test_pb2_grpc.TestServiceStub, timeout: float, futures: Mapping[int, Tuple[FutureFromCallType, str]], + request_payload_size: int, + response_payload_size: int, ) -> None: logger.debug(f"Sending {method} request to backend: {request_id}") if method == "UnaryCall": + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=response_payload_size, + payload=messages_pb2.Payload(body=b"0" * request_payload_size), + ) future = stub.UnaryCall.future( - messages_pb2.SimpleRequest(), metadata=metadata, timeout=timeout + request, metadata=metadata, timeout=timeout ) elif method == "EmptyCall": + if request_payload_size > 0: + logger.error( + f"request_payload_size should not be set for EMPTY_CALL" + ) + if response_payload_size > 0: + logger.error( + f"response_payload_size should not be set for EMPTY_CALL" + ) future = stub.EmptyCall.future( empty_pb2.Empty(), metadata=metadata, timeout=timeout ) @@ -365,6 +386,8 @@ class _ChannelConfiguration: rpc_timeout_sec: int, print_response: bool, secure_mode: bool, + request_payload_size: int, + response_payload_size: int, ): # condition is signalled when a change is made to the config. self.condition = threading.Condition() @@ -376,6 +399,8 @@ class _ChannelConfiguration: self.rpc_timeout_sec = rpc_timeout_sec self.print_response = print_response self.secure_mode = secure_mode + self.response_payload_size = response_payload_size + self.request_payload_size = request_payload_size def _run_single_channel(config: _ChannelConfiguration) -> None: @@ -415,6 +440,8 @@ def _run_single_channel(config: _ChannelConfiguration) -> None: stub, float(config.rpc_timeout_sec), futures, + config.request_payload_size, + config.response_payload_size, ) print_response = config.print_response _remove_completed_rpcs(futures, config.print_response) @@ -501,6 +528,10 @@ def _run( per_method_metadata: PerMethodMetadataType, ) -> None: logger.info("Starting python xDS Interop Client.") + csm_plugin = None + if args.enable_csm_observability: + csm_plugin = _prepare_csm_observability_plugin() + csm_plugin.register_global() global _global_server # pylint: disable=global-statement method_handles = [] channel_configs = {} @@ -517,6 +548,8 @@ def _run( args.rpc_timeout_sec, args.print_response, args.secure_mode, + args.request_payload_size, + args.response_payload_size, ) channel_configs[method] = channel_config method_handles.append(_MethodHandle(args.num_channels, channel_config)) @@ -534,6 +567,8 @@ def _run( _global_server.wait_for_termination() for method_handle in method_handles: method_handle.stop() + if csm_plugin: + csm_plugin.deregister_global() def parse_metadata_arg(metadata_arg: str) -> PerMethodMetadataType: @@ -569,6 +604,17 @@ def bool_arg(arg: str) -> bool: raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") +def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin: + # Start Prometheus client + start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0") + reader = PrometheusMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + csm_plugin = CsmOpenTelemetryPlugin( + meter_provider=meter_provider, + ) + return csm_plugin + + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Run Python XDS interop client." @@ -621,6 +667,24 @@ if __name__ == "__main__": parser.add_argument( "--log_file", default=None, type=str, help="A file to log to." ) + parser.add_argument( + "--enable_csm_observability", + help="Whether to enable CSM Observability", + default=False, + action="store_true", + ) + parser.add_argument( + "--request_payload_size", + default=0, + type=int, + help="Set the SimpleRequest.payload.body to a string of repeated 0 (zero) ASCII characters of the given size in bytes.", + ) + parser.add_argument( + "--response_payload_size", + default=0, + type=int, + help="Ask the server to respond with SimpleResponse.payload.body of the given length (may not be implemented on the server).", + ) rpc_help = "A comma-delimited list of RPC methods to run. Must be one of " rpc_help += ", ".join(_SUPPORTED_METHODS) rpc_help += "." diff --git a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py index 2e38fc58533..0d7b73ed348 100644 --- a/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py +++ b/src/python/grpcio_tests/tests_py3_only/interop/xds_interop_server.py @@ -26,10 +26,14 @@ from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple import grpc from grpc_channelz.v1 import channelz from grpc_channelz.v1 import channelz_pb2 +from grpc_csm_observability import CsmOpenTelemetryPlugin from grpc_health.v1 import health as grpc_health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from grpc_reflection.v1alpha import reflection +from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.sdk.metrics import MeterProvider +from prometheus_client import start_http_server from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import messages_pb2 @@ -41,6 +45,7 @@ from src.proto.grpc.testing import test_pb2_grpc # tests. _LISTEN_HOST = "0.0.0.0" +_PROMETHEUS_PORT = 9464 _THREAD_POOL_SIZE = 256 @@ -66,7 +71,12 @@ class TestService(test_pb2_grpc.TestServiceServicer): self, request: messages_pb2.SimpleRequest, context: grpc.ServicerContext ) -> messages_pb2.SimpleResponse: context.send_initial_metadata((("hostname", self._hostname),)) - response = messages_pb2.SimpleResponse() + if request.response_size > 0: + response = messages_pb2.SimpleResponse( + payload=messages_pb2.Payload(body=b"0" * request.response_size) + ) + else: + response = messages_pb2.SimpleResponse() response.server_id = self._server_id response.hostname = self._hostname return response @@ -114,8 +124,16 @@ def _configure_test_server( def _run( - port: int, maintenance_port: int, secure_mode: bool, server_id: str + port: int, + maintenance_port: int, + secure_mode: bool, + server_id: str, + enable_csm_observability: bool, ) -> None: + csm_plugin = None + if enable_csm_observability: + csm_plugin = _prepare_csm_observability_plugin() + csm_plugin.register_global() if port == maintenance_port: server = grpc.server( futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE) @@ -142,6 +160,8 @@ def _run( logger.info("Test server listening on port %d", port) test_server.wait_for_termination() maintenance_server.wait_for_termination() + if csm_plugin: + csm_plugin.deregister_global() def bool_arg(arg: str) -> bool: @@ -153,6 +173,17 @@ def bool_arg(arg: str) -> bool: raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") +def _prepare_csm_observability_plugin() -> CsmOpenTelemetryPlugin: + # Start Prometheus client + start_http_server(port=_PROMETHEUS_PORT, addr="0.0.0.0") + reader = PrometheusMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + csm_plugin = CsmOpenTelemetryPlugin( + meter_provider=meter_provider, + ) + return csm_plugin + + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Run Python xDS interop server." @@ -184,6 +215,12 @@ if __name__ == "__main__": default=False, action="store_true", ) + parser.add_argument( + "--enable_csm_observability", + help="Whether to enable CSM Observability", + default=False, + action="store_true", + ) args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) @@ -194,4 +231,10 @@ if __name__ == "__main__": "--port and --maintenance_port must not be the same when" " --secure_mode is set." ) - _run(args.port, args.maintenance_port, args.secure_mode, args.server_id) + _run( + args.port, + args.maintenance_port, + args.secure_mode, + args.server_id, + args.enable_csm_observability, + )