mirror of https://github.com/grpc/grpc.git
[Python O11Y] Add observability example (#35637)
<!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. -->pull/35658/head^2
parent
76c45b98d1
commit
80f3a90556
9 changed files with 412 additions and 0 deletions
@ -0,0 +1,63 @@ |
|||||||
|
gRPC Observability Example |
||||||
|
===================== |
||||||
|
|
||||||
|
The examples here demonstrate how to setup gRPC Python Observability with Opentelemetry. |
||||||
|
|
||||||
|
More details about how to use gRPC Python Observability APIs can be found in [OpenTelemetry Metrics gRFC](https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#opentelemetry-metrics). |
||||||
|
|
||||||
|
### Install Requirements |
||||||
|
|
||||||
|
1. Navigate to this directory: |
||||||
|
|
||||||
|
```sh |
||||||
|
cd examples/python/observability |
||||||
|
``` |
||||||
|
|
||||||
|
2. Install requirements: |
||||||
|
|
||||||
|
```sh |
||||||
|
python -m pip install -r requirements.txt |
||||||
|
``` |
||||||
|
|
||||||
|
### Run the Server |
||||||
|
|
||||||
|
Start the server: |
||||||
|
|
||||||
|
```sh |
||||||
|
python -m observability_greeter_server |
||||||
|
``` |
||||||
|
|
||||||
|
### Run the Client |
||||||
|
|
||||||
|
Note that client should start within 10 seconds of the server becoming active. |
||||||
|
|
||||||
|
```sh |
||||||
|
python -m observability_greeter_client |
||||||
|
``` |
||||||
|
|
||||||
|
### Verifying Metrics |
||||||
|
|
||||||
|
The example will print a list of metric names collected. |
||||||
|
|
||||||
|
Server Side: |
||||||
|
|
||||||
|
``` |
||||||
|
Server started, listening on 50051 |
||||||
|
Metrics exported on Server side: |
||||||
|
grpc.server.call.started |
||||||
|
grpc.server.call.sent_total_compressed_message_size |
||||||
|
grpc.server.call.rcvd_total_compressed_message_size |
||||||
|
grpc.server.call.duration |
||||||
|
``` |
||||||
|
|
||||||
|
Client Side: |
||||||
|
|
||||||
|
``` |
||||||
|
Greeter client received: Hello You |
||||||
|
Metrics exported on client side: |
||||||
|
grpc.client.call.duration |
||||||
|
grpc.client.attempt.started |
||||||
|
grpc.client.attempt.sent_total_compressed_message_size |
||||||
|
grpc.client.attempt.rcvd_total_compressed_message_size |
||||||
|
grpc.client.attempt.duration |
||||||
|
``` |
@ -0,0 +1,30 @@ |
|||||||
|
# -*- coding: utf-8 -*- |
||||||
|
# Generated by the protocol buffer compiler. DO NOT EDIT! |
||||||
|
# source: helloworld.proto |
||||||
|
"""Generated protocol buffer code.""" |
||||||
|
from google.protobuf.internal import builder as _builder |
||||||
|
from google.protobuf import descriptor as _descriptor |
||||||
|
from google.protobuf import descriptor_pool as _descriptor_pool |
||||||
|
from google.protobuf import symbol_database as _symbol_database |
||||||
|
# @@protoc_insertion_point(imports) |
||||||
|
|
||||||
|
_sym_db = _symbol_database.Default() |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') |
||||||
|
|
||||||
|
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) |
||||||
|
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', globals()) |
||||||
|
if _descriptor._USE_C_DESCRIPTORS == False: |
||||||
|
|
||||||
|
DESCRIPTOR._options = None |
||||||
|
DESCRIPTOR._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW' |
||||||
|
_HELLOREQUEST._serialized_start=32 |
||||||
|
_HELLOREQUEST._serialized_end=60 |
||||||
|
_HELLOREPLY._serialized_start=62 |
||||||
|
_HELLOREPLY._serialized_end=91 |
||||||
|
_GREETER._serialized_start=93 |
||||||
|
_GREETER._serialized_end=166 |
||||||
|
# @@protoc_insertion_point(module_scope) |
@ -0,0 +1,17 @@ |
|||||||
|
from google.protobuf import descriptor as _descriptor |
||||||
|
from google.protobuf import message as _message |
||||||
|
from typing import ClassVar as _ClassVar, Optional as _Optional |
||||||
|
|
||||||
|
DESCRIPTOR: _descriptor.FileDescriptor |
||||||
|
|
||||||
|
class HelloReply(_message.Message): |
||||||
|
__slots__ = ["message"] |
||||||
|
MESSAGE_FIELD_NUMBER: _ClassVar[int] |
||||||
|
message: str |
||||||
|
def __init__(self, message: _Optional[str] = ...) -> None: ... |
||||||
|
|
||||||
|
class HelloRequest(_message.Message): |
||||||
|
__slots__ = ["name"] |
||||||
|
NAME_FIELD_NUMBER: _ClassVar[int] |
||||||
|
name: str |
||||||
|
def __init__(self, name: _Optional[str] = ...) -> None: ... |
@ -0,0 +1,70 @@ |
|||||||
|
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! |
||||||
|
"""Client and server classes corresponding to protobuf-defined services.""" |
||||||
|
import grpc |
||||||
|
|
||||||
|
import helloworld_pb2 as helloworld__pb2 |
||||||
|
|
||||||
|
|
||||||
|
class GreeterStub(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
def __init__(self, channel): |
||||||
|
"""Constructor. |
||||||
|
|
||||||
|
Args: |
||||||
|
channel: A grpc.Channel. |
||||||
|
""" |
||||||
|
self.SayHello = channel.unary_unary( |
||||||
|
'/helloworld.Greeter/SayHello', |
||||||
|
request_serializer=helloworld__pb2.HelloRequest.SerializeToString, |
||||||
|
response_deserializer=helloworld__pb2.HelloReply.FromString, |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
class GreeterServicer(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
def SayHello(self, request, context): |
||||||
|
"""Sends a greeting |
||||||
|
""" |
||||||
|
context.set_code(grpc.StatusCode.UNIMPLEMENTED) |
||||||
|
context.set_details('Method not implemented!') |
||||||
|
raise NotImplementedError('Method not implemented!') |
||||||
|
|
||||||
|
|
||||||
|
def add_GreeterServicer_to_server(servicer, server): |
||||||
|
rpc_method_handlers = { |
||||||
|
'SayHello': grpc.unary_unary_rpc_method_handler( |
||||||
|
servicer.SayHello, |
||||||
|
request_deserializer=helloworld__pb2.HelloRequest.FromString, |
||||||
|
response_serializer=helloworld__pb2.HelloReply.SerializeToString, |
||||||
|
), |
||||||
|
} |
||||||
|
generic_handler = grpc.method_handlers_generic_handler( |
||||||
|
'helloworld.Greeter', rpc_method_handlers) |
||||||
|
server.add_generic_rpc_handlers((generic_handler,)) |
||||||
|
|
||||||
|
|
||||||
|
# This class is part of an EXPERIMENTAL API. |
||||||
|
class Greeter(object): |
||||||
|
"""The greeting service definition. |
||||||
|
""" |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def SayHello(request, |
||||||
|
target, |
||||||
|
options=(), |
||||||
|
channel_credentials=None, |
||||||
|
call_credentials=None, |
||||||
|
insecure=False, |
||||||
|
compression=None, |
||||||
|
wait_for_ready=None, |
||||||
|
timeout=None, |
||||||
|
metadata=None): |
||||||
|
return grpc.experimental.unary_unary(request, target, '/helloworld.Greeter/SayHello', |
||||||
|
helloworld__pb2.HelloRequest.SerializeToString, |
||||||
|
helloworld__pb2.HelloReply.FromString, |
||||||
|
options, channel_credentials, |
||||||
|
insecure, call_credentials, compression, wait_for_ready, timeout, metadata) |
@ -0,0 +1,71 @@ |
|||||||
|
# Copyright 2024 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
"""gRPC Python helloworld.Greeter client with observability enabled.""" |
||||||
|
|
||||||
|
from collections import defaultdict |
||||||
|
import logging |
||||||
|
import time |
||||||
|
from typing import Optional |
||||||
|
|
||||||
|
import grpc |
||||||
|
import grpc_observability |
||||||
|
import helloworld_pb2 |
||||||
|
import helloworld_pb2_grpc |
||||||
|
import open_telemetry_exporter |
||||||
|
from opentelemetry.sdk.metrics import MeterProvider |
||||||
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader |
||||||
|
|
||||||
|
OTEL_EXPORT_INTERVAL_S = 0.5 |
||||||
|
|
||||||
|
|
||||||
|
class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin): |
||||||
|
def __init__(self, provider: MeterProvider): |
||||||
|
self.provider = provider |
||||||
|
|
||||||
|
def get_meter_provider(self) -> Optional[MeterProvider]: |
||||||
|
return self.provider |
||||||
|
|
||||||
|
|
||||||
|
def run(): |
||||||
|
all_metrics = defaultdict(list) |
||||||
|
otel_exporter = open_telemetry_exporter.OTelMetricExporter(all_metrics) |
||||||
|
reader = PeriodicExportingMetricReader( |
||||||
|
exporter=otel_exporter, |
||||||
|
export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, |
||||||
|
) |
||||||
|
provider = MeterProvider(metric_readers=[reader]) |
||||||
|
otel_plugin = BaseOpenTelemetryPlugin(provider) |
||||||
|
|
||||||
|
with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): |
||||||
|
with grpc.insecure_channel(target="localhost:50051") as channel: |
||||||
|
stub = helloworld_pb2_grpc.GreeterStub(channel) |
||||||
|
try: |
||||||
|
response = stub.SayHello( |
||||||
|
helloworld_pb2.HelloRequest(name="You") |
||||||
|
) |
||||||
|
print(f"Greeter client received: {response.message}") |
||||||
|
except grpc.RpcError as rpc_error: |
||||||
|
print("Call failed with code: ", rpc_error.code()) |
||||||
|
|
||||||
|
# Sleep to make sure all metrics are exported. |
||||||
|
time.sleep(5) |
||||||
|
|
||||||
|
print("Metrics exported on client side:") |
||||||
|
for metric in all_metrics: |
||||||
|
print(metric) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
logging.basicConfig() |
||||||
|
run() |
@ -0,0 +1,80 @@ |
|||||||
|
# Copyright 2024 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
"""The Python implementation of the GRPC helloworld.Greeter server with observability enabled.""" |
||||||
|
|
||||||
|
from collections import defaultdict |
||||||
|
from concurrent import futures |
||||||
|
import logging |
||||||
|
import time |
||||||
|
from typing import Optional |
||||||
|
|
||||||
|
import grpc |
||||||
|
import grpc_observability |
||||||
|
import helloworld_pb2 |
||||||
|
import helloworld_pb2_grpc |
||||||
|
import open_telemetry_exporter |
||||||
|
from opentelemetry.sdk.metrics import MeterProvider |
||||||
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader |
||||||
|
|
||||||
|
_OTEL_EXPORT_INTERVAL_S = 0.5 |
||||||
|
_SERVER_PORT = "50051" |
||||||
|
|
||||||
|
|
||||||
|
class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin): |
||||||
|
def __init__(self, provider: MeterProvider): |
||||||
|
self.provider = provider |
||||||
|
|
||||||
|
def get_meter_provider(self) -> Optional[MeterProvider]: |
||||||
|
return self.provider |
||||||
|
|
||||||
|
|
||||||
|
class Greeter(helloworld_pb2_grpc.GreeterServicer): |
||||||
|
def SayHello(self, request, context): |
||||||
|
message = request.name |
||||||
|
return helloworld_pb2.HelloReply(message=f"Hello {message}") |
||||||
|
|
||||||
|
|
||||||
|
def serve(): |
||||||
|
all_metrics = defaultdict(list) |
||||||
|
otel_exporter = open_telemetry_exporter.OTelMetricExporter( |
||||||
|
all_metrics, print_live=False |
||||||
|
) |
||||||
|
reader = PeriodicExportingMetricReader( |
||||||
|
exporter=otel_exporter, |
||||||
|
export_interval_millis=_OTEL_EXPORT_INTERVAL_S * 1000, |
||||||
|
) |
||||||
|
provider = MeterProvider(metric_readers=[reader]) |
||||||
|
otel_plugin = BaseOpenTelemetryPlugin(provider) |
||||||
|
|
||||||
|
with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): |
||||||
|
server = grpc.server( |
||||||
|
thread_pool=futures.ThreadPoolExecutor(max_workers=10), |
||||||
|
) |
||||||
|
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) |
||||||
|
server.add_insecure_port("[::]:" + _SERVER_PORT) |
||||||
|
server.start() |
||||||
|
print("Server started, listening on " + _SERVER_PORT) |
||||||
|
|
||||||
|
# Sleep to make sure client made RPC call and all metrics are exported. |
||||||
|
time.sleep(10) |
||||||
|
print("Metrics exported on Server side:") |
||||||
|
for metric in all_metrics: |
||||||
|
print(metric) |
||||||
|
|
||||||
|
server.stop(0) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
logging.basicConfig() |
||||||
|
serve() |
@ -0,0 +1,75 @@ |
|||||||
|
# Copyright 2024 gRPC authors. |
||||||
|
# |
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
# you may not use this file except in compliance with the License. |
||||||
|
# You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
|
||||||
|
from typing import Dict, List |
||||||
|
|
||||||
|
from opentelemetry.sdk.metrics.export import AggregationTemporality |
||||||
|
from opentelemetry.sdk.metrics.export import MetricExportResult |
||||||
|
from opentelemetry.sdk.metrics.export import MetricExporter |
||||||
|
from opentelemetry.sdk.metrics.export import MetricsData |
||||||
|
|
||||||
|
|
||||||
|
class OTelMetricExporter(MetricExporter): |
||||||
|
"""Implementation of :class:`MetricExporter` that export metrics to the |
||||||
|
provided metric_list. |
||||||
|
|
||||||
|
all_metrics: A dict whose keys are grpc_observability._opentelemetry_measures.Metric.name, |
||||||
|
value is a list of labels recorded for that metric. |
||||||
|
An example item of this dict: |
||||||
|
{"grpc.client.attempt.started": |
||||||
|
[{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, |
||||||
|
{'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} |
||||||
|
""" |
||||||
|
|
||||||
|
def __init__( |
||||||
|
self, |
||||||
|
all_metrics: Dict[str, List], |
||||||
|
preferred_temporality: Dict[type, AggregationTemporality] = None, |
||||||
|
preferred_aggregation: Dict[ |
||||||
|
type, "opentelemetry.sdk.metrics.view.Aggregation" |
||||||
|
] = None, |
||||||
|
print_live: bool = False, |
||||||
|
): |
||||||
|
super().__init__( |
||||||
|
preferred_temporality=preferred_temporality, |
||||||
|
preferred_aggregation=preferred_aggregation, |
||||||
|
) |
||||||
|
self._all_metrics = all_metrics |
||||||
|
self._print_live = print_live |
||||||
|
|
||||||
|
def export( |
||||||
|
self, |
||||||
|
metrics_data: MetricsData, |
||||||
|
timeout_millis: float = 10_000, |
||||||
|
**kwargs, |
||||||
|
) -> MetricExportResult: |
||||||
|
self.record_metric(metrics_data) |
||||||
|
return MetricExportResult.SUCCESS |
||||||
|
|
||||||
|
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: |
||||||
|
pass |
||||||
|
|
||||||
|
def force_flush(self, timeout_millis: float = 10_000) -> bool: |
||||||
|
return True |
||||||
|
|
||||||
|
def record_metric(self, metrics_data: MetricsData) -> None: |
||||||
|
for resource_metric in metrics_data.resource_metrics: |
||||||
|
for scope_metric in resource_metric.scope_metrics: |
||||||
|
for metric in scope_metric.metrics: |
||||||
|
for data_point in metric.data.data_points: |
||||||
|
self._all_metrics[metric.name].append( |
||||||
|
data_point.attributes |
||||||
|
) |
||||||
|
if self._print_live: |
||||||
|
print(f"Metric exporter received: {metric.name}") |
@ -0,0 +1,2 @@ |
|||||||
|
grpcio>=1.62.0 |
||||||
|
grpcio-observability>=1.62.0 |
Loading…
Reference in new issue