[Python AIO] Handle DeprecationWarnings when get current loop (#35583)

Fix: https://github.com/grpc/grpc/issues/35086

<!--

If you know who should review your pull request, please assign it to that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the appropriate
lang label.

-->

Closes #35583

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35583 from XuanWang-Amos:fix_aio_eventloop aa0d207e99
PiperOrigin-RevId: 603188288
pull/35774/head
Xuan Wang 1 year ago committed by Copybara-Service
parent 2288b63601
commit 03375e064f
  1. 12
      src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]"
@ -181,7 +183,15 @@ if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7:
try:
return asyncio.get_running_loop()
except RuntimeError:
return asyncio.get_event_loop_policy().get_event_loop()
with warnings.catch_warnings():
# Convert DeprecationWarning to errors so we can capture them with except
warnings.simplefilter("error", DeprecationWarning)
try:
return asyncio.get_event_loop_policy().get_event_loop()
# Since version 3.12, DeprecationWarning is emitted if there is no
# current event loop.
except DeprecationWarning:
return asyncio.get_event_loop_policy().new_event_loop()
else:
def get_working_loop():
"""Returns a running event loop."""

Loading…
Cancel
Save