Simplify the creation and destruction of grpc_call

pull/21232/head
Lidi Zheng 5 years ago
parent 6f9b77103a
commit eff9f936b9
  1. 45
      src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
  2. 12
      src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi

@ -24,11 +24,15 @@ _UNKNOWN_CANCELLATION_DETAILS = 'RPC cancelled for unknown reason.'
cdef class _AioCall: cdef class _AioCall:
def __cinit__(self, AioChannel channel): def __cinit__(self,
AioChannel channel,
object deadline,
bytes method):
self._channel = channel self._channel = channel
self._references = [] self._references = []
self._grpc_call_wrapper = GrpcCallWrapper() self._grpc_call_wrapper = GrpcCallWrapper()
self._loop = asyncio.get_event_loop() self._loop = asyncio.get_event_loop()
self._create_grpc_call(deadline, method)
self._status_received = asyncio.Event(loop=self._loop) self._status_received = asyncio.Event(loop=self._loop)
@ -71,13 +75,8 @@ cdef class _AioCall:
grpc_slice_unref(method_slice) grpc_slice_unref(method_slice)
cdef void _destroy_grpc_call(self): cdef void _destroy_grpc_call(self):
"""Destroys the corresponding Core object for this RPC. """Destroys the corresponding Core object for this RPC."""
grpc_call_unref(self._grpc_call_wrapper.call)
This method is idempotent. Multiple calls should not result in crashes.
"""
if self._grpc_call_wrapper.call != NULL:
grpc_call_unref(self._grpc_call_wrapper.call)
self._grpc_call_wrapper.call = NULL
cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future): cdef AioRpcStatus _cancel_and_create_status(self, object cancellation_future):
"""Cancels the RPC in Core, and return the final RPC status.""" """Cancels the RPC in Core, and return the final RPC status."""
@ -116,9 +115,7 @@ cdef class _AioCall:
return status return status
async def unary_unary(self, async def unary_unary(self,
bytes method,
bytes request, bytes request,
object deadline,
object cancellation_future, object cancellation_future,
object initial_metadata_observer, object initial_metadata_observer,
object status_observer): object status_observer):
@ -149,22 +146,18 @@ cdef class _AioCall:
receive_status_on_client_op) receive_status_on_client_op)
try: try:
self._create_grpc_call(deadline, method) await async_start_batch(self._grpc_call_wrapper,
try: ops,
await async_start_batch(self._grpc_call_wrapper, self._loop)
ops, except asyncio.CancelledError:
self._loop) status = self._cancel_and_create_status(cancellation_future)
except asyncio.CancelledError: initial_metadata_observer(None)
status = self._cancel_and_create_status(cancellation_future) status_observer(status)
status_observer(status) raise
raise else:
finally:
# If the RPC failed, receive_initial_metadata_op.initial_metadata
# will return None instead of crash.
initial_metadata_observer( initial_metadata_observer(
receive_initial_metadata_op.initial_metadata() receive_initial_metadata_op.initial_metadata()
) )
self._destroy_grpc_call()
status = AioRpcStatus( status = AioRpcStatus(
receive_status_on_client_op.code(), receive_status_on_client_op.code(),
@ -229,9 +222,7 @@ cdef class _AioCall:
yield received_message yield received_message
async def unary_stream(self, async def unary_stream(self,
bytes method,
bytes request, bytes request,
object deadline,
object cancellation_future, object cancellation_future,
object initial_metadata_observer, object initial_metadata_observer,
object status_observer): object status_observer):
@ -257,10 +248,6 @@ cdef class _AioCall:
send_close_op, send_close_op,
) )
# Creates the grpc_call Core object, it needs to be deleted explicitly
# through _destroy_grpc_call call in other methods.
self._create_grpc_call(deadline, method)
# Actually sends out the request message. # Actually sends out the request message.
await async_start_batch(self._grpc_call_wrapper, await async_start_batch(self._grpc_call_wrapper,
outbound_ops, outbound_ops,

@ -38,10 +38,8 @@ cdef class AioChannel:
Returns: Returns:
The response message in bytes. The response message in bytes.
""" """
cdef _AioCall call = _AioCall(self) cdef _AioCall call = _AioCall(self, deadline, method)
return await call.unary_unary(method, return await call.unary_unary(request,
request,
deadline,
cancellation_future, cancellation_future,
initial_metadata_observer, initial_metadata_observer,
status_observer) status_observer)
@ -58,10 +56,8 @@ cdef class AioChannel:
Returns: Returns:
An async generator that yields raw responses. An async generator that yields raw responses.
""" """
cdef _AioCall call = _AioCall(self) cdef _AioCall call = _AioCall(self, deadline, method)
return call.unary_stream(method, return call.unary_stream(request,
request,
deadline,
cancellation_future, cancellation_future,
initial_metadata_observer, initial_metadata_observer,
status_observer) status_observer)

Loading…
Cancel
Save