[Aio] Resolve deprecated warnings from asyncio (#27635)

* [Aio] Resolve deprecated warnings from asyncio

* Use 3.6 compatible way of creating an asyncio.Task
pull/27644/head
Lidi Zheng 4 years ago committed by GitHub
parent 7aa43b7a55
commit 60028a82a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi
  2. 14
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  3. 2
      src/python/grpcio/grpc/aio/_call.py
  4. 2
      src/python/grpcio/grpc/aio/_channel.py
  5. 3
      src/python/grpcio/grpc/aio/_interceptor.py
  6. 5
      src/python/grpcio_tests/tests_aio/unit/_constants.py
  7. 2
      src/python/grpcio_tests/tests_aio/unit/compatibility_test.py

@ -56,7 +56,7 @@ class _EOF:
def __bool__(self):
return False
def __len__(self):
return 0
@ -144,7 +144,7 @@ async def generator_to_async_generator(object gen, object loop, object thread_po
TypeError: StopIteration interacts badly with generators and cannot be
raised into a Future
"""
queue = asyncio.Queue(maxsize=1, loop=loop)
queue = asyncio.Queue(maxsize=1)
def yield_to_queue():
try:

@ -854,7 +854,7 @@ cdef class AioServer:
self.add_generic_rpc_handlers(generic_handlers)
self._serving_task = None
self._shutdown_lock = asyncio.Lock(loop=self._loop)
self._shutdown_lock = asyncio.Lock()
self._shutdown_completed = self._loop.create_future()
self._shutdown_callback_wrapper = CallbackWrapper(
self._shutdown_completed,
@ -1008,12 +1008,8 @@ cdef class AioServer:
else:
try:
await asyncio.wait_for(
asyncio.shield(
self._shutdown_completed,
loop=self._loop
),
asyncio.shield(self._shutdown_completed),
grace,
loop=self._loop,
)
except asyncio.TimeoutError:
# Cancels all ongoing calls by the end of grace period.
@ -1033,12 +1029,8 @@ cdef class AioServer:
else:
try:
await asyncio.wait_for(
asyncio.shield(
self._shutdown_completed,
loop=self._loop,
),
asyncio.shield(self._shutdown_completed),
timeout,
loop=self._loop,
)
except asyncio.TimeoutError:
if self._crash_exception is not None:

@ -371,7 +371,7 @@ class _StreamRequestMixin(Call):
def _init_stream_request_mixin(
self, request_iterator: Optional[RequestIterableType]):
self._metadata_sent = asyncio.Event(loop=self._loop)
self._metadata_sent = asyncio.Event()
self._done_writing_flag = False
# If user passes in an async iterator, create a consumer Task.

@ -358,7 +358,7 @@ class Channel(_base_channel.Channel):
# If needed, try to wait for them to finish.
# Call objects are not always awaitables.
if grace and call_tasks:
await asyncio.wait(call_tasks, timeout=grace, loop=self._loop)
await asyncio.wait(call_tasks, timeout=grace)
# Time to cancel existing calls.
for call in calls:

@ -523,7 +523,8 @@ class _InterceptedStreamRequestMixin:
# Write might never end up since the call could abrubtly finish,
# we give up on the first awaitable object that finishes.
_, _ = await asyncio.wait(
(self._write_to_iterator_queue.put(request), call.code()),
(self._loop.create_task(self._write_to_iterator_queue.put(request)),
self._loop.create_task(call.code())),
return_when=asyncio.FIRST_COMPLETED)
if call.done():

@ -12,5 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
UNREACHABLE_TARGET = '0.0.0.1:1111'
# If we use an unreachable IP, depending on the network stack, we might not get
# with an RST fast enough. This used to cause tests to flake under different
# platforms.
UNREACHABLE_TARGET = 'foo/bar'
UNARY_CALL_WITH_SLEEP_VALUE = 0.2

@ -78,7 +78,7 @@ class TestCompatibility(AioTestBase):
await self._async_server.stop(None)
async def _run_in_another_thread(self, func: Callable[[], None]):
work_done = asyncio.Event(loop=self.loop)
work_done = asyncio.Event()
def thread_work():
func()

Loading…
Cancel
Save