[python] Add observability instrumentation to asyncio stack (#33992)

This is already present in the grpc python sync stack and has been
missing from aio stack.

CC @gnossen           

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
veblush-patch-3
Hima Sajeev 1 year ago committed by GitHub
parent e1cb290ef8
commit 90af0a115d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
  2. 23
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  3. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  4. 12
      src/python/grpcio/grpc/aio/_call.py

@ -19,6 +19,9 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call):
def install_context_from_request_call_event(RequestCallEvent event):
maybe_save_server_trace_context(event)
def install_context_from_request_call_event_aio(GrpcCallWrapper event):
pass
def uninstall_context():
pass
@ -31,5 +34,8 @@ cdef class CensusContext:
def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
pass
def set_instrumentation_context_on_call_aio(GrpcCallWrapper call_state, CensusContext census_ctx):
pass
def get_deadline_from_context():
return None

@ -295,12 +295,14 @@ cdef class _AioCall(GrpcCallWrapper):
async def unary_unary(self,
bytes request,
tuple outbound_initial_metadata):
tuple outbound_initial_metadata,
object context = None):
"""Performs a unary unary RPC.
Args:
request: the serialized requests in bytes.
outbound_initial_metadata: optional outbound metadata.
context: instrumentation context.
"""
cdef tuple ops
@ -313,6 +315,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
if context is not None:
set_instrumentation_context_on_call_aio(self, context)
ops = (initial_metadata_op, send_message_op, send_close_op,
receive_initial_metadata_op, receive_message_op,
receive_status_on_client_op)
@ -390,7 +394,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def initiate_unary_stream(self,
bytes request,
tuple outbound_initial_metadata):
tuple outbound_initial_metadata,
object context = None):
"""Implementation of the start of a unary-stream call."""
# Peer may prematurely end this RPC at any point. We need a corutine
# that watches if the server sends the final status.
@ -406,6 +411,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef Operation send_close_op = SendCloseFromClientOperation(
_EMPTY_FLAGS)
if context is not None:
set_instrumentation_context_on_call_aio(self, context)
outbound_ops = (
initial_metadata_op,
send_message_op,
@ -429,7 +436,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def stream_unary(self,
tuple outbound_initial_metadata,
object metadata_sent_observer):
object metadata_sent_observer,
object context = None):
"""Actual implementation of the complete unary-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
@ -460,6 +468,9 @@ cdef class _AioCall(GrpcCallWrapper):
cdef tuple inbound_ops
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
if context is not None:
set_instrumentation_context_on_call_aio(self, context)
inbound_ops = (receive_message_op, receive_status_on_client_op)
# Executes all operations in one batch.
@ -484,7 +495,8 @@ cdef class _AioCall(GrpcCallWrapper):
async def initiate_stream_stream(self,
tuple outbound_initial_metadata,
object metadata_sent_observer):
object metadata_sent_observer,
object context = None):
"""Actual implementation of the complete stream-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
@ -495,6 +507,9 @@ cdef class _AioCall(GrpcCallWrapper):
# that watches if the server sends the final status.
status_task = self._loop.create_task(self._handle_status_once_received())
if context is not None:
set_instrumentation_context_on_call_aio(self, context)
try:
# Sends out initial_metadata ASAP.
await _send_initial_metadata(self,

@ -401,6 +401,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
# Executes application logic
cdef object response_message
cdef _SyncServicerContext sync_servicer_context
install_context_from_request_call_event_aio(rpc_state)
if _is_async_handler(unary_handler):
# Run async method handlers in this coroutine
@ -453,6 +454,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
rpc_state.metadata_sent = True
rpc_state.status_sent = True
await execute_batch(rpc_state, finish_ops, loop)
uninstall_context()
async def _finish_handler_with_stream_responses(RPCState rpc_state,

@ -561,6 +561,7 @@ class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
loop,
)
self._request = request
self._context = cygrpc.build_census_context()
self._invocation_task = loop.create_task(self._invoke())
self._init_unary_response_mixin(self._invocation_task)
@ -574,7 +575,7 @@ class UnaryUnaryCall(_UnaryResponseMixin, Call, _base_call.UnaryUnaryCall):
# https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
try:
serialized_response = await self._cython_call.unary_unary(
serialized_request, self._metadata
serialized_request, self._metadata, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
@ -624,6 +625,7 @@ class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
loop,
)
self._request = request
self._context = cygrpc.build_census_context()
self._send_unary_request_task = loop.create_task(
self._send_unary_request()
)
@ -635,7 +637,7 @@ class UnaryStreamCall(_StreamResponseMixin, Call, _base_call.UnaryStreamCall):
)
try:
await self._cython_call.initiate_unary_stream(
serialized_request, self._metadata
serialized_request, self._metadata, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
@ -679,13 +681,14 @@ class StreamUnaryCall(
loop,
)
self._context = cygrpc.build_census_context()
self._init_stream_request_mixin(request_iterator)
self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))
async def _conduct_rpc(self) -> ResponseType:
try:
serialized_response = await self._cython_call.stream_unary(
self._metadata, self._metadata_sent_observer
self._metadata, self._metadata_sent_observer, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
@ -731,6 +734,7 @@ class StreamStreamCall(
response_deserializer,
loop,
)
self._context = cygrpc.build_census_context()
self._initializer = self._loop.create_task(self._prepare_rpc())
self._init_stream_request_mixin(request_iterator)
self._init_stream_response_mixin(self._initializer)
@ -743,7 +747,7 @@ class StreamStreamCall(
"""
try:
await self._cython_call.initiate_stream_stream(
self._metadata, self._metadata_sent_observer
self._metadata, self._metadata_sent_observer, self._context
)
except asyncio.CancelledError:
if not self.cancelled():

Loading…
Cancel
Save