From 27bc6fe7797e43298dc931b96dc57322d0852a9f Mon Sep 17 00:00:00 2001
From: Richard Belleville <rbellevi@google.com>
Date: Thu, 10 Feb 2022 18:25:12 -0800
Subject: [PATCH] Reimplement Gevent Integration (#28276)

* WIP

* Add gevent test suite run under Bazel.

* Fix things up

* Yapf

* Fix up Bazel files

* Make py_grpc_test fancier

* Attempt to fix Windows RBE

* Attempt to kick GitHub

* Fix Python 2 runs

* Yet more fixes

* And the patch file too

* I am an idiot

* Mark gevent tests flaky

* Try to make rules_python more tolerant

* Typo

* Exclude reconnect test from gevent

* Remove unnecessary parts of patch

* Buildifier

* You saw nothing

* isort

* Move py_grpc_test to an internal-only file

* Review comments

* More reviewer comments

* Review

* Add initial changes for gevent

* WIP. Run completion_queue_next in a threadpool

* WIP.

* WIP

* Re-remove skip annotation

* Finally working

* Reactivate tests

* Clean up

* Move C++ threading utilities to grpc.pxi

* Unbreak sync stack

* Refix test flake

* WIP. Trying to get things working properly

* Move test stuff to test runner

* Clean up

* Can't handle exceptions if you don't compile with exceptions

* Remove debug stuff unintentionally left in

* Add greenlet switch loggging and fix threading issue

* Only run a greenlet scheduling greenlet when there are open channels

* Format

* Add threadpool modifications to old runner

* And actually import gevent
---
 bazel/_gevent_test_main.py                    |  55 +-
 bazel/gevent_test.bzl                         |   3 +-
 src/python/grpcio/grpc/_channel.py            |   4 +
 .../_cygrpc/aio/completion_queue.pxd.pxi      |  18 -
 .../_cython/_cygrpc/completion_queue.pxd.pxi  |   4 +
 .../_cython/_cygrpc/completion_queue.pyx.pxi  |  36 +-
 .../grpcio/grpc/_cython/_cygrpc/grpc.pxi      |  33 ++
 .../grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi  |  28 +-
 .../grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi  | 506 ++++--------------
 .../grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi | 132 -----
 .../grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi |  63 ---
 src/python/grpcio/grpc/_cython/cygrpc.pxd     |   1 -
 src/python/grpcio/grpc/_cython/cygrpc.pyx     |   2 -
 src/python/grpcio_tests/commands.py           |  11 +
 .../grpcio_tests/tests/unit/BUILD.bazel       |   4 +
 .../tests/unit/_compression_test.py           |   3 -
 .../unit/_contextvars_propagation_test.py     |  11 +-
 .../tests/unit/_local_credentials_test.py     |   4 -
 .../tests/unit/_metadata_code_details_test.py |   2 -
 .../tests/unit/_reconnect_test.py             |   3 -
 .../tests/unit/_rpc_part_1_test.py            |   5 +-
 .../tests/unit/_rpc_part_2_test.py            |   3 -
 .../tests/unit/_rpc_test_helpers.py           |   2 +-
 .../grpcio_tests/tests/unit/_signal_client.py |  12 +
 .../tests/unit/_signal_handling_test.py       |  22 +-
 25 files changed, 276 insertions(+), 691 deletions(-)
 delete mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
 delete mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi

diff --git a/bazel/_gevent_test_main.py b/bazel/_gevent_test_main.py
index 52629c61d3c..f7936daaf0d 100644
--- a/bazel/_gevent_test_main.py
+++ b/bazel/_gevent_test_main.py
@@ -12,13 +12,59 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import gevent
+from gevent import monkey
+
+monkey.patch_all()
+threadpool = gevent.hub.get_hub().threadpool
+
+# Currently, each channel corresponds to a single native thread in the
+# gevent threadpool. Thus, when the unit test suite spins up hundreds of
+# channels concurrently, some will be starved out, causing the test to
+# increase in duration. We increase the max size here so this does not
+# happen.
+threadpool.maxsize = 1024
+threadpool.size = 32
+
+import traceback, signal
+from typing import Sequence
+
+
+import grpc.experimental.gevent
+grpc.experimental.gevent.init_gevent()
+
+import gevent
+import greenlet
+import datetime
+
 import grpc
 import unittest
 import sys
 import os
 import pkgutil
 
-from typing import Sequence
+def trace_callback(event, args):
+    if event in ("switch", "throw"):
+        origin, target = args
+        sys.stderr.write("{} Transfer from {} to {} with {}\n".format(datetime.datetime.now(), origin, target, event))
+    else:
+        sys.stderr.write("Unknown event {}.\n".format(event))
+    sys.stderr.flush()
+
+if os.getenv("GREENLET_TRACE") is not None:
+    greenlet.settrace(trace_callback)
+
+def debug(sig, frame):
+    d={'_frame':frame}
+    d.update(frame.f_globals)
+    d.update(frame.f_locals)
+
+    sys.stderr.write("Traceback:\n{}".format("\n".join(traceback.format_stack(frame))))
+    import gevent.util; gevent.util.print_run_info()
+    sys.stderr.flush()
+
+signal.signal(signal.SIGTERM, debug)
+
 
 class SingleLoader(object):
     def __init__(self, pattern: str):
@@ -38,13 +84,6 @@ class SingleLoader(object):
         return self.suite
 
 if __name__ == "__main__":
-    from gevent import monkey
-
-    monkey.patch_all()
-
-    import grpc.experimental.gevent
-    grpc.experimental.gevent.init_gevent()
-    import gevent
 
     if len(sys.argv) != 2:
         print(f"USAGE: {sys.argv[0]} TARGET_MODULE", file=sys.stderr)
diff --git a/bazel/gevent_test.bzl b/bazel/gevent_test.bzl
index 9be3ae003eb..173d7064696 100644
--- a/bazel/gevent_test.bzl
+++ b/bazel/gevent_test.bzl
@@ -11,7 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 """
 Houses py_grpc_gevent_test.
 """
@@ -81,6 +80,6 @@ def py_grpc_gevent_test(
         srcs = [copied_main_filename],
         main = copied_main_filename,
         python_version = "PY3",
-        flaky = True,
+        flaky = False,
         **kwargs
     )
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 8f9d7e972a2..b36e70f4a99 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -1481,6 +1481,8 @@ class Channel(grpc.Channel):
         self._call_state = _ChannelCallState(self._channel)
         self._connectivity_state = _ChannelConnectivityState(self._channel)
         cygrpc.fork_register_channel(self)
+        if cygrpc.g_gevent_activated:
+            cygrpc.gevent_increment_channel_count()
 
     def _process_python_options(self, python_options):
         """Sets channel attributes according to python-only channel options."""
@@ -1547,6 +1549,8 @@ class Channel(grpc.Channel):
         self._unsubscribe_all()
         self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
         cygrpc.fork_unregister_channel(self)
+        if cygrpc.g_gevent_activated:
+            cygrpc.gevent_decrement_channel_count()
 
     def _close_on_fork(self):
         self._unsubscribe_all()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
index 712a589798e..578131f7eef 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/completion_queue.pxd.pxi
@@ -12,24 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython
-# links it with exception handling. It introduces new dependencies.
-cdef extern from "<queue>" namespace "std" nogil:
-    cdef cppclass queue[T]:
-        queue()
-        bint empty()
-        T& front()
-        void pop()
-        void push(T&)
-        size_t size()
-
-
-cdef extern from "<mutex>" namespace "std" nogil:
-    cdef cppclass mutex:
-        mutex()
-        void lock()
-        void unlock()
-
 
 ctypedef queue[grpc_event] cpp_event_queue
 
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
index 0307f74cbef..983aa6a87b1 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi
@@ -12,12 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+cdef int g_interrupt_check_period_ms = 200
 
 cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *
 
 
 cdef _interpret_event(grpc_event c_event)
 
+cdef class _LatentEventArg:
+  cdef grpc_completion_queue *c_completion_queue
+  cdef object deadline
 
 cdef class CompletionQueue:
 
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 5eb5f087067..f9f5de2d83b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -13,14 +13,12 @@
 # limitations under the License.
 
 
-cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
-
-
 cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) except *:
+  global g_interrupt_check_period_ms
   cdef gpr_timespec c_increment
   cdef gpr_timespec c_timeout
   cdef gpr_timespec c_deadline
-  c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN)
+  c_increment = gpr_time_from_millis(g_interrupt_check_period_ms, GPR_TIMESPAN)
   if deadline is None:
     c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
   else:
@@ -33,7 +31,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep
         c_timeout = c_deadline
 
       c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL)
-  
+
       if (c_event.type != GRPC_QUEUE_TIMEOUT or
           gpr_time_cmp(c_timeout, c_deadline) == 0):
         break
@@ -42,7 +40,6 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) excep
     cpython.PyErr_CheckSignals()
   return c_event
 
-
 cdef _interpret_event(grpc_event c_event):
   cdef _Tag tag
   if c_event.type == GRPC_QUEUE_TIMEOUT:
@@ -58,11 +55,25 @@ cdef _interpret_event(grpc_event c_event):
     cpython.Py_DECREF(tag)
     return tag, tag.event(c_event)
 
+cdef _internal_latent_event(_LatentEventArg latent_event_arg):
+  cdef grpc_event c_event = _next(latent_event_arg.c_completion_queue, latent_event_arg.deadline)
+  return _interpret_event(c_event)
 
 cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline):
-  cdef grpc_event c_event = _next(c_completion_queue, deadline)
-  return _interpret_event(c_event)
+    global g_gevent_activated
+
+    latent_event_arg = _LatentEventArg()
+    latent_event_arg.c_completion_queue = c_completion_queue
+    latent_event_arg.deadline = deadline
 
+    if g_gevent_activated:
+      # For gevent, completion_queue_next is run in a native thread pool.
+      global g_gevent_threadpool
+
+      result = g_gevent_threadpool.apply(_internal_latent_event, (latent_event_arg,))
+      return result
+    else:
+      return _internal_latent_event(latent_event_arg)
 
 cdef class CompletionQueue:
 
@@ -87,10 +98,17 @@ cdef class CompletionQueue:
       self.is_shutdown = True
     return event
 
+  def _internal_poll(self, deadline):
+    return self._interpret_event(_next(self.c_completion_queue, deadline))
+
   # We name this 'poll' to avoid problems with CPython's expectations for
   # 'special' methods (like next and __next__).
   def poll(self, deadline=None):
-    return self._interpret_event(_next(self.c_completion_queue, deadline))
+    global g_gevent_activated
+    if g_gevent_activated:
+      return g_gevent_threadpool.apply(CompletionQueue._internal_poll, (self, deadline))
+    else:
+      return self._internal_poll(deadline)
 
   def shutdown(self):
     with nogil:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 7fde61d7dd2..c7f09ede7e8 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -25,6 +25,37 @@ ctypedef unsigned short     uint16_t
 ctypedef unsigned int       uint32_t
 ctypedef unsigned long long uint64_t
 
+# C++ Utilities
+
+# NOTE(lidiz) Unfortunately, we can't use "cimport" here because Cython
+# links it with exception handling. It introduces new dependencies.
+cdef extern from "<queue>" namespace "std" nogil:
+    cdef cppclass queue[T]:
+        queue()
+        bint empty()
+        T& front()
+        T& back()
+        void pop()
+        void push(T&)
+        size_t size()
+
+
+cdef extern from "<mutex>" namespace "std" nogil:
+    cdef cppclass mutex:
+        mutex()
+        void lock()
+        void unlock()
+
+    cdef cppclass unique_lock[Mutex]:
+      unique_lock(Mutex&)
+
+cdef extern from "<condition_variable>" namespace "std" nogil:
+  cdef cppclass condition_variable:
+    condition_variable()
+    void notify_all()
+    void wait(unique_lock[mutex]&)
+
+# gRPC Core Declarations
 
 cdef extern from "grpc/support/alloc.h":
 
@@ -700,3 +731,5 @@ cdef extern from "grpc/grpc_security_constants.h":
 cdef extern from "src/core/lib/iomgr/error.h":
   ctypedef grpc_error* grpc_error_handle
   grpc_error_handle GRPC_ERROR_CANCELLED
+  struct grpc_error:
+    pass
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
index bd1d73145b9..baa9fb54a3e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi
@@ -13,29 +13,9 @@
 # limitations under the License.
 # distutils: language=c++
 
-cdef class TimerWrapper:
+g_gevent_threadpool = None
+g_gevent_activated = False
 
-  cdef grpc_custom_timer *c_timer
-  cdef object timer
-  cdef object event
+cpdef void gevent_increment_channel_count()
 
-cdef class SocketWrapper:
-  cdef object sockopts
-  cdef object socket
-  cdef object closed
-  cdef grpc_custom_socket *c_socket
-  cdef char* c_buffer
-  cdef size_t len
-  cdef grpc_custom_socket *accepting_socket
-
-  cdef grpc_custom_connect_callback connect_cb
-  cdef grpc_custom_write_callback write_cb
-  cdef grpc_custom_read_callback read_cb
-  cdef grpc_custom_accept_callback accept_cb
-  cdef grpc_custom_close_callback close_cb
-
-
-cdef class ResolveWrapper:
-  cdef grpc_custom_resolver *c_resolver
-  cdef const char* c_host
-  cdef const char* c_port
+cpdef void gevent_decrement_channel_count()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
index 9aa012de0b1..41d27df5948 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi
@@ -14,418 +14,124 @@
 # distutils: language=c++
 
 from libc cimport string
-import errno
-import sys
-gevent_g = None
-gevent_socket = None
-gevent_hub = None
-gevent_event = None
-g_event = None
-g_pool = None
-
-def _spawn_greenlet(*args):
-  greenlet = g_pool.spawn(*args)
-
-###############################
-### socket implementation ###
-###############################
-
-cdef class SocketWrapper:
-  def __cinit__(self):
-    fork_handlers_and_grpc_init()
-    self.sockopts = []
-    self.socket = None
-    self.c_socket = NULL
-    self.c_buffer = NULL
-    self.len = 0
-
-  def __dealloc__(self):
-    grpc_shutdown()
-
-cdef grpc_error_handle socket_init(grpc_custom_socket* socket, int domain) with gil:
-  sw = SocketWrapper()
-  sw.c_socket = socket
-  sw.sockopts = []
-  cpython.Py_INCREF(sw)
-  # Python doesn't support AF_UNSPEC sockets, so we defer creation until
-  # bind/connect when we know what type of socket we need
-  sw.socket = None
-  sw.closed = False
-  sw.accepting_socket = NULL
-  socket.impl = <void*>sw
-  return grpc_error_none()
-
-cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple):
-  try:
-    socket_wrapper.socket.connect(addr_tuple)
-    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                              grpc_error_none())
-  except IOError as io_error:
-    socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                              socket_error("connect", str(io_error)))
-  g_event.set()
-
-def socket_connect_async(socket_wrapper, addr_tuple):
-  socket_connect_async_cython(socket_wrapper, addr_tuple)
-
-cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr,
-                         size_t addr_len,
-                         grpc_custom_connect_callback cb) with gil:
-  py_socket = None
-  socket_wrapper = <SocketWrapper>socket.impl
-  socket_wrapper.connect_cb = cb
-  addr_tuple = sockaddr_to_tuple(addr, addr_len)
-  if sockaddr_is_ipv4(addr, addr_len):
-      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
-  else:
-      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
-  applysockopts(py_socket)
-  socket_wrapper.socket = py_socket
-  _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple)
-
-cdef void socket_destroy(grpc_custom_socket* socket) with gil:
-  cpython.Py_DECREF(<SocketWrapper>socket.impl)
-
-cdef void socket_shutdown(grpc_custom_socket* socket) with gil:
-  try:
-    (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR)
-  except IOError as io_error:
-    if io_error.errno != errno.ENOTCONN:
-      raise io_error
-
-cdef void socket_close(grpc_custom_socket* socket,
-                       grpc_custom_close_callback cb) with gil:
-  socket_wrapper = (<SocketWrapper>socket.impl)
-  if socket_wrapper.socket is not None:
-    socket_wrapper.socket.close()
-    socket_wrapper.closed = True
-    socket_wrapper.close_cb = cb
-    # Delay the close callback until the accept() call has picked it up
-    if socket_wrapper.accepting_socket != NULL:
-      return
-  socket_wrapper.close_cb(socket)
-
-def socket_sendmsg(socket, write_bytes):
-  try:
-    return socket.sendmsg(write_bytes)
-  except AttributeError:
-    # sendmsg not available on all Pythons/Platforms
-    return socket.send(b''.join(write_bytes))
-
-cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes):
-  try:
-    while write_bytes:
-      sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes)
-      while sent_byte_count > 0:
-        if sent_byte_count < len(write_bytes[0]):
-          write_bytes[0] = write_bytes[0][sent_byte_count:]
-          sent_byte_count = 0
-        else:
-          sent_byte_count -= len(write_bytes[0])
-          write_bytes = write_bytes[1:]
-    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                            grpc_error_none())
-  except IOError as io_error:
-    socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                            socket_error("send", str(io_error)))
-  g_event.set()
-
-def socket_write_async(socket_wrapper, write_bytes):
-  socket_write_async_cython(socket_wrapper, write_bytes)
-
-cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer,
-                       grpc_custom_write_callback cb) with gil:
-  cdef char* start
-  sw = <SocketWrapper>socket.impl
-  sw.write_cb = cb
-  write_bytes = []
-  for i in range(buffer.count):
-    start = grpc_slice_buffer_start(buffer, i)
-    length = grpc_slice_buffer_length(buffer, i)
-    write_bytes.append(<bytes>start[:length])
-  _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes)
-
-cdef socket_read_async_cython(SocketWrapper socket_wrapper):
-  cdef char* buff_char_arr
-  try:
-    buff_str = socket_wrapper.socket.recv(socket_wrapper.len)
-    buff_char_arr = buff_str
-    string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str))
-    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                           len(buff_str), grpc_error_none())
-  except IOError as io_error:
-    socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket,
-                           -1, socket_error("recv", str(io_error)))
-  g_event.set()
-
-def socket_read_async(socket_wrapper):
-  socket_read_async_cython(socket_wrapper)
-
-cdef void socket_read(grpc_custom_socket* socket, char* buffer,
-                      size_t length, grpc_custom_read_callback cb) with gil:
-  sw = <SocketWrapper>socket.impl
-  sw.read_cb = cb
-  sw.c_buffer = buffer
-  sw.len = length
-  _spawn_greenlet(socket_read_async, sw)
-
-cdef grpc_error_handle socket_getpeername(grpc_custom_socket* socket,
-                                    const grpc_sockaddr* addr,
-                                    int* length) with gil:
-  cdef char* src_buf
-  peer = (<SocketWrapper>socket.impl).socket.getpeername()
-
-  cdef grpc_resolved_address c_addr
-  hostname = str_to_bytes(peer[0])
-  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
-  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
-  length[0] = c_addr.len
-  return grpc_error_none()  
-
-cdef grpc_error_handle socket_getsockname(grpc_custom_socket* socket,
-                                    const grpc_sockaddr* addr,
-                                    int* length) with gil:
-  cdef char* src_buf
-  cdef grpc_resolved_address c_addr
-  if (<SocketWrapper>socket.impl).socket is None:
-    peer = ('0.0.0.0', 0)
-  else:
-    peer = (<SocketWrapper>socket.impl).socket.getsockname()
-  hostname = str_to_bytes(peer[0])
-  grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
-  string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
-  length[0] = c_addr.len
-  return grpc_error_none()
-
-def applysockopts(s):
-  s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1)
-  s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True)
-
-cdef grpc_error_handle socket_bind(grpc_custom_socket* socket,
-                             const grpc_sockaddr* addr,
-                             size_t len, int flags) with gil:
-  addr_tuple = sockaddr_to_tuple(addr, len)
-  try:
-    try:
-      py_socket = gevent_socket.socket(gevent_socket.AF_INET)
-      applysockopts(py_socket)
-      py_socket.bind(addr_tuple)
-    except gevent_socket.gaierror as e:
-      py_socket = gevent_socket.socket(gevent_socket.AF_INET6)
-      applysockopts(py_socket)
-      py_socket.bind(addr_tuple)
-    (<SocketWrapper>socket.impl).socket = py_socket
-  except IOError as io_error:
-    return socket_error("bind", str(io_error))
-  else:
-    return grpc_error_none()
-
-cdef grpc_error_handle socket_listen(grpc_custom_socket* socket) with gil:
-  (<SocketWrapper>socket.impl).socket.listen(50)
-  return grpc_error_none()
-
-cdef void accept_callback_cython(SocketWrapper s) except *:
-   try:
-     conn, address = s.socket.accept()
-     sw = SocketWrapper()
-     sw.closed = False
-     sw.c_socket = s.accepting_socket
-     sw.sockopts = []
-     sw.socket = conn
-     sw.c_socket.impl = <void*>sw
-     sw.accepting_socket = NULL
-     cpython.Py_INCREF(sw)
-     s.accepting_socket = NULL
-     s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none())
-   except IOError as io_error:
-      #TODO actual error
-      s.accepting_socket = NULL
-      s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket,
-                  socket_error("accept", str(io_error)))
-      if s.closed:
-        s.close_cb(<grpc_custom_socket*>s.c_socket)
-   g_event.set()
-
-def socket_accept_async(s):
-  accept_callback_cython(s)
+from cython.operator cimport dereference
 
-cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client,
-                        grpc_custom_accept_callback cb) with gil:
-  sw = <SocketWrapper>socket.impl
-  sw.accepting_socket = client
-  sw.accept_cb = cb
-  _spawn_greenlet(socket_accept_async, sw)
+from cpython cimport Py_INCREF, Py_DECREF
 
-#####################################
-######Resolver implementation #######
-#####################################
-
-cdef class ResolveWrapper:
-  def __cinit__(self):
-    fork_handlers_and_grpc_init()
-    self.c_resolver = NULL
-    self.c_host = NULL
-    self.c_port = NULL
-
-  def __dealloc__(self):
-    grpc_shutdown()
-
-cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper):
-  try:
-    res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port)
-    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
-                                 tuples_to_resolvaddr(res), grpc_error_none())
-  except IOError as io_error:
-    grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver,
-                                 <grpc_resolved_addresses*>0,
-                                 socket_error("getaddrinfo", str(io_error)))
-  g_event.set()
-
-def socket_resolve_async_python(resolve_wrapper):
-  socket_resolve_async_cython(resolve_wrapper)
-
-cdef void socket_resolve_async(grpc_custom_resolver* r, const char* host, const char* port) with gil:
-  rw = ResolveWrapper()
-  rw.c_resolver = r
-  rw.c_host = host
-  rw.c_port = port
-  _spawn_greenlet(socket_resolve_async_python, rw)
-
-cdef grpc_error_handle socket_resolve(const char* host, const char* port,
-                                grpc_resolved_addresses** res) with gil:
-    try:
-      result = gevent_socket.getaddrinfo(host, port)
-      res[0] = tuples_to_resolvaddr(result)
-      return grpc_error_none()
-    except IOError as io_error:
-      return socket_error("getaddrinfo", str(io_error))
-
-###############################
-### timer implementation ######
-###############################
-
-cdef class TimerWrapper:
-  def __cinit__(self, deadline):
-    fork_handlers_and_grpc_init()
-    self.timer = gevent_hub.get_hub().loop.timer(deadline)
-    self.event = None
-
-  def start(self):
-    self.event = gevent_event.Event()
-    self.timer.start(self.on_finish)
-
-  def on_finish(self):
-    grpc_custom_timer_callback(self.c_timer, grpc_error_none())
-    self.timer.stop()
-    g_event.set()
-
-  def stop(self):
-    self.event.set()
-    self.timer.stop()
-
-  def __dealloc__(self):
-    grpc_shutdown()
-
-cdef void timer_start(grpc_custom_timer* t) with gil:
-  timer = TimerWrapper(t.timeout_ms / 1000.0)
-  timer.c_timer = t
-  t.timer = <void*>timer
-  timer.start()
-
-cdef void timer_stop(grpc_custom_timer* t) with gil:
-  time_wrapper = <object>t.timer
-  time_wrapper.stop()
-
-###############################
-### pollset implementation ###
-###############################
-
-cdef void init_loop() with gil:
-  pass
-
-cdef void destroy_loop() with gil:
-  g_pool.join()
-
-cdef void kick_loop() with gil:
-  g_event.set()
-
-def _run_loop(timeout_ms):
-  timeout = timeout_ms / 1000.0
-  if timeout_ms > 0:
-    try:
-      g_event.wait(timeout)
-    finally:
-      g_event.clear()
-
-cdef grpc_error_handle run_loop(size_t timeout_ms) with gil:
-  try:
-    _run_loop(timeout_ms)
-    return grpc_error_none()
-  except BaseException:
-    exc_info = sys.exc_info()
-    # Avoid running any Python code after setting the exception
-    cpython.PyErr_SetObject(exc_info[0], exc_info[1])
-    return GRPC_ERROR_CANCELLED
-
-###############################
-### Initializer ###############
-###############################
+import atexit
+import errno
+import sys
 
-cdef grpc_socket_vtable gevent_socket_vtable
-cdef grpc_custom_resolver_vtable gevent_resolver_vtable
-cdef grpc_custom_timer_vtable gevent_timer_vtable
-cdef grpc_custom_poller_vtable gevent_pollset_vtable
+gevent_hub = None
+g_gevent_pool = None
+g_gevent_threadpool = None
+g_gevent_activated = False
+
+
+cdef queue[void*] g_greenlets_to_run
+cdef condition_variable g_greenlets_cv
+cdef mutex g_greenlets_mu
+cdef bint g_shutdown_greenlets_to_run_queue = False
+cdef int g_channel_count = 0
+
+
+cdef _submit_to_greenlet_queue(object cb, tuple args):
+  cdef tuple to_call = (cb,) + args
+  cdef unique_lock[mutex]* lk
+  Py_INCREF(to_call)
+  with nogil:
+    lk = new unique_lock[mutex](g_greenlets_mu)
+    g_greenlets_to_run.push(<void*>(to_call))
+    del lk
+    g_greenlets_cv.notify_all()
+
+
+cpdef void gevent_increment_channel_count():
+  global g_channel_count
+  cdef int old_channel_count
+  with nogil:
+    lk = new unique_lock[mutex](g_greenlets_mu)
+    old_channel_count = g_channel_count
+    g_channel_count += 1
+    del lk
+  if old_channel_count == 0:
+    run_spawn_greenlets()
+
+
+cpdef void gevent_decrement_channel_count():
+  global g_channel_count
+  with nogil:
+    lk = new unique_lock[mutex](g_greenlets_mu)
+    g_channel_count -= 1
+    if g_channel_count == 0:
+      g_greenlets_cv.notify_all()
+    del lk
+
+
+cdef object await_next_greenlet():
+  cdef unique_lock[mutex]* lk
+  with nogil:
+    # Cython doesn't allow us to do proper stack allocations, so we can't take
+    # advantage of RAII.
+    lk = new unique_lock[mutex](g_greenlets_mu)
+    while not g_shutdown_greenlets_to_run_queue and g_channel_count != 0:
+      if not g_greenlets_to_run.empty():
+        break
+      g_greenlets_cv.wait(dereference(lk))
+  if g_channel_count == 0:
+    del lk
+    return None
+  if g_shutdown_greenlets_to_run_queue:
+    del lk
+    return None
+  cdef object to_call = <object>g_greenlets_to_run.front()
+  Py_DECREF(to_call)
+  g_greenlets_to_run.pop()
+  del lk
+  return to_call
+
+def spawn_greenlets():
+  while True:
+    to_call = g_gevent_threadpool.apply(await_next_greenlet, ())
+    if to_call is None:
+      break
+    fn = to_call[0]
+    args = to_call[1:]
+    fn(*args)
+
+def run_spawn_greenlets():
+  g_gevent_pool.spawn(spawn_greenlets)
+
+def shutdown_await_next_greenlet():
+  global g_shutdown_greenlets_to_run_queue
+  cdef unique_lock[mutex]* lk
+  with nogil:
+    lk = new unique_lock[mutex](g_greenlets_mu)
+    g_shutdown_greenlets_to_run_queue = True
+  del lk
+  g_greenlets_cv.notify_all()
 
 def init_grpc_gevent():
   # Lazily import gevent
-  global gevent_socket
-  global gevent_g
   global gevent_hub
-  global gevent_event
-  global g_event
-  global g_pool
+  global g_gevent_threadpool
+  global g_gevent_activated
+  global g_interrupt_check_period_ms
+  global g_gevent_pool
+
   import gevent
-  gevent_g = gevent
-  import gevent.socket
-  gevent_socket = gevent.socket
-  import gevent.hub
-  gevent_hub = gevent.hub
-  import gevent.event
-  gevent_event = gevent.event
   import gevent.pool
 
-  g_event = gevent.event.Event()
-  g_pool = gevent.pool.Group()
-
-  def cb_func(cb, args):
-    _spawn_greenlet(cb, *args)
-  set_async_callback_func(cb_func)
+  gevent_hub = gevent.hub
+  g_gevent_threadpool = gevent_hub.get_hub().threadpool
 
-  gevent_resolver_vtable.resolve = socket_resolve
-  gevent_resolver_vtable.resolve_async = socket_resolve_async
+  g_gevent_activated = True
+  g_interrupt_check_period_ms = 2000
 
-  gevent_socket_vtable.init = socket_init
-  gevent_socket_vtable.connect = socket_connect
-  gevent_socket_vtable.destroy = socket_destroy
-  gevent_socket_vtable.shutdown = socket_shutdown
-  gevent_socket_vtable.close = socket_close
-  gevent_socket_vtable.write = socket_write
-  gevent_socket_vtable.read = socket_read
-  gevent_socket_vtable.getpeername = socket_getpeername
-  gevent_socket_vtable.getsockname = socket_getsockname
-  gevent_socket_vtable.bind = socket_bind
-  gevent_socket_vtable.listen = socket_listen
-  gevent_socket_vtable.accept = socket_accept
+  g_gevent_pool = gevent.pool.Group()
 
-  gevent_timer_vtable.start = timer_start
-  gevent_timer_vtable.stop = timer_stop
 
-  gevent_pollset_vtable.init = init_loop
-  gevent_pollset_vtable.poll = run_loop
-  gevent_pollset_vtable.kick = kick_loop
-  gevent_pollset_vtable.shutdown = destroy_loop
+  set_async_callback_func(_submit_to_greenlet_queue)
 
-  grpc_custom_iomgr_init(&gevent_socket_vtable,
-                         &gevent_resolver_vtable,
-                         &gevent_timer_vtable,
-                         &gevent_pollset_vtable)
+  # TODO: Document how this all works.
+  atexit.register(shutdown_await_next_greenlet)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
deleted file mode 100644
index 54174b3c13b..00000000000
--- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pxd.pxi
+++ /dev/null
@@ -1,132 +0,0 @@
-# Copyright 2019 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# distutils: language=c++
-
-from libcpp cimport bool as bool_t
-from libcpp.string cimport string as cppstring
-
-cdef extern from "grpc/impl/codegen/slice.h":
-  struct grpc_slice_buffer:
-    int count
-
-cdef extern from "src/core/lib/iomgr/error.h":
-  struct grpc_error:
-    pass
-  ctypedef grpc_error* grpc_error_handle
-
-# TODO(https://github.com/grpc/grpc/issues/20135) Change the filename
-# for something more meaningful.
-cdef extern from "src/core/lib/iomgr/python_util.h":
-  grpc_error_handle grpc_socket_error(char* error) 
-  char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i)
-  int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i)
-
-cdef extern from "src/core/lib/iomgr/sockaddr.h":
-  ctypedef struct grpc_sockaddr:
-    pass
-
-cdef extern from "src/core/lib/iomgr/resolve_address.h":
-  ctypedef struct grpc_resolved_addresses:
-    size_t naddrs
-    grpc_resolved_address* addrs
-
-  ctypedef struct grpc_resolved_address:
-    char[128] addr
-    size_t len
-
-cdef extern from "src/core/lib/iomgr/resolve_address_custom.h":
-  struct grpc_custom_resolver:
-    pass
-
-  struct grpc_custom_resolver_vtable:
-    grpc_error_handle (*resolve)(const char* host, const char* port, grpc_resolved_addresses** res);
-    void (*resolve_async)(grpc_custom_resolver* resolver, const char* host, const char* port);
-
-  void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
-                                    grpc_resolved_addresses* result,
-                                    grpc_error_handle error);
-
-cdef extern from "src/core/lib/iomgr/tcp_custom.h":
-  cdef int GRPC_CUSTOM_SOCKET_OPT_SO_REUSEPORT
-
-  struct grpc_custom_socket:
-    void* impl
-    # We don't care about the rest of the fields
-  ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
-                                             grpc_error_handle error)
-  ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
-                                           grpc_error_handle error)
-  ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
-                                          size_t nread, grpc_error_handle error)
-  ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
-                                            grpc_custom_socket* client,
-                                            grpc_error_handle error)
-  ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket)
-
-  struct grpc_socket_vtable:
-      grpc_error_handle (*init)(grpc_custom_socket* socket, int domain);
-      void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
-                      size_t len, grpc_custom_connect_callback cb);
-      void (*destroy)(grpc_custom_socket* socket);
-      void (*shutdown)(grpc_custom_socket* socket);
-      void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
-      void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
-                    grpc_custom_write_callback cb);
-      void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
-                   grpc_custom_read_callback cb);
-      grpc_error_handle (*getpeername)(grpc_custom_socket* socket,
-                                 const grpc_sockaddr* addr, int* len);
-      grpc_error_handle (*getsockname)(grpc_custom_socket* socket,
-                             const grpc_sockaddr* addr, int* len);
-      grpc_error_handle (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
-                          size_t len, int flags);
-      grpc_error_handle (*listen)(grpc_custom_socket* socket);
-      void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
-                     grpc_custom_accept_callback cb);
-
-cdef extern from "src/core/lib/iomgr/timer_custom.h":
-  struct grpc_custom_timer:
-    void* timer
-    int timeout_ms
-     # We don't care about the rest of the fields
-
-  struct grpc_custom_timer_vtable:
-    void (*start)(grpc_custom_timer* t);
-    void (*stop)(grpc_custom_timer* t);
-
-  void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error_handle error);
-
-cdef extern from "src/core/lib/iomgr/pollset_custom.h":
-  struct grpc_custom_poller_vtable:
-    void (*init)()
-    grpc_error_handle (*poll)(size_t timeout_ms)
-    void (*kick)()
-    void (*shutdown)()
-
-cdef extern from "src/core/lib/iomgr/iomgr_custom.h":
-  void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
-                            grpc_custom_resolver_vtable* resolver,
-                            grpc_custom_timer_vtable* timer,
-                            grpc_custom_poller_vtable* poller);
-
-cdef extern from "src/core/lib/address_utils/sockaddr_utils.h":
-  int grpc_sockaddr_get_port(const grpc_resolved_address *addr);
-  cppstring grpc_sockaddr_to_string(const grpc_resolved_address *addr,
-                                    bool_t normalize);
-  int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr,
-                             int port)
-  const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr)
-
-cdef extern from "src/core/lib/address_utils/parse_address.h":
-  grpc_error_handle grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port);
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi
deleted file mode 100644
index 292e0473af6..00000000000
--- a/src/python/grpcio/grpc/_cython/_cygrpc/iomgr.pyx.pxi
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2019 gRPC authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# distutils: language=c++
-
-from libc cimport string
-from libc.stdlib cimport malloc
-from libcpp.string cimport string as cppstring
-
-cdef grpc_error_handle grpc_error_none():
-  return <grpc_error_handle>0
-
-cdef grpc_error_handle socket_error(str syscall, str err):
-  error_str = "{} failed: {}".format(syscall, err)
-  error_bytes = str_to_bytes(error_str)
-  return grpc_socket_error(error_bytes)
-
-cdef resolved_addr_to_tuple(grpc_resolved_address* address):
-  cdef cppstring res_str
-  port = grpc_sockaddr_get_port(address)
-  res_str = grpc_sockaddr_to_string(address, False)
-  byte_str = _decode(res_str)
-  if byte_str.endswith(':' + str(port)):
-    byte_str = byte_str[:(0 - len(str(port)) - 1)]
-  byte_str = byte_str.lstrip('[')
-  byte_str = byte_str.rstrip(']')
-  byte_str = '{}'.format(byte_str)
-  return byte_str, port
-
-cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length):
-  cdef grpc_resolved_address c_addr
-  string.memcpy(<void*>c_addr.addr, <void*> address, length)
-  c_addr.len = length
-  return resolved_addr_to_tuple(&c_addr)
-
-cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length):
-  cdef grpc_resolved_address c_addr
-  string.memcpy(<void*>c_addr.addr, <void*> address, length)
-  c_addr.len = length
-  return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4'
-
-cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups):
-  cdef grpc_resolved_addresses* addresses
-  tups_set = set((tup[4][0], tup[4][1]) for tup in tups)
-  addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses))
-  addresses.naddrs = len(tups_set)
-  addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set))
-  i = 0
-  for tup in set(tups_set):
-    hostname = str_to_bytes(tup[0])
-    grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1])
-    i += 1
-  return addresses
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 70c5b5d31d0..ed04119143a 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -34,7 +34,6 @@ include "_cygrpc/time.pxd.pxi"
 include "_cygrpc/vtable.pxd.pxi"
 include "_cygrpc/_hooks.pxd.pxi"
 
-include "_cygrpc/iomgr.pxd.pxi"
 
 include "_cygrpc/grpc_gevent.pxd.pxi"
 
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 4c0452d700f..ca1b4c89f9c 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -56,8 +56,6 @@ include "_cygrpc/time.pyx.pxi"
 include "_cygrpc/vtable.pyx.pxi"
 include "_cygrpc/_hooks.pyx.pxi"
 
-include "_cygrpc/iomgr.pyx.pxi"
-
 include "_cygrpc/grpc_gevent.pyx.pxi"
 
 include "_cygrpc/thread.pyx.pxi"
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index 84331aed63a..b9336d899db 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -245,9 +245,20 @@ class TestGevent(setuptools.Command):
         pass
 
     def run(self):
+        import gevent
         from gevent import monkey
         monkey.patch_all()
 
+        threadpool = gevent.hub.get_hub().threadpool
+
+        # Currently, each channel corresponds to a single native thread in the
+        # gevent threadpool. Thus, when the unit test suite spins up hundreds of
+        # channels concurrently, some will be starved out, causing the test to
+        # increase in duration. We increase the max size here so this does not
+        # happen.
+        threadpool.maxsize = 1024
+        threadpool.size = 32
+
         import grpc.experimental.gevent
 
         import tests
diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel
index bf8bdef0e38..2d2a246ed47 100644
--- a/src/python/grpcio_tests/tests/unit/BUILD.bazel
+++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 load("//bazel:internal_python_rules.bzl", "internal_py_grpc_test")
+load("@grpc_python_dependencies//:requirements.bzl", "requirement")
 
 package(default_visibility = ["//visibility:public"])
 
@@ -64,6 +65,9 @@ py_library(
 py_library(
     name = "_signal_client",
     srcs = ["_signal_client.py"],
+    deps = [
+        requirement("gevent"),
+    ],
 )
 
 py_library(
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index cd55db58baf..d58f4dc06ba 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -25,7 +25,6 @@ import grpc
 from grpc import _grpcio_metadata
 
 from tests.unit import _tcp_proxy
-from tests.unit import test_common
 from tests.unit.framework.common import test_constants
 
 _UNARY_UNARY = '/test/UnaryUnary'
@@ -259,8 +258,6 @@ def _stream_stream_client(channel, multicallable_kwargs, message):
                 i, response))
 
 
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "This test is nondeterministic under gevent.")
 class CompressionTest(unittest.TestCase):
 
     def assertCompressed(self, compression_ratio):
diff --git a/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py b/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py
index 128ec514d06..a90f07ed089 100644
--- a/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py
+++ b/src/python/grpcio_tests/tests/unit/_contextvars_propagation_test.py
@@ -75,7 +75,11 @@ if contextvars_supported():
     class TestCallCredentials(grpc.AuthMetadataPlugin):
 
         def __call__(self, context, callback):
-            if test_var.get() != _EXPECTED_VALUE:
+            if test_var.get(
+            ) != _EXPECTED_VALUE and not test_common.running_under_gevent():
+                # contextvars do not work under gevent, but the rest of this
+                # test is still valuable as a test of concurrent runs of the
+                # metadata credentials code path.
                 raise AssertionError("{} != {}".format(test_var.get(),
                                                        _EXPECTED_VALUE))
             callback((), None)
@@ -97,8 +101,6 @@ else:
 
 # TODO(https://github.com/grpc/grpc/issues/22257)
 @unittest.skipIf(os.name == "nt", "LocalCredentials not supported on Windows.")
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "ThreadLocals do not work under gevent.")
 class ContextVarsPropagationTest(unittest.TestCase):
 
     def test_propagation_to_auth_plugin(self):
@@ -145,7 +147,8 @@ class ContextVarsPropagationTest(unittest.TestCase):
                     exception_queue.put(e)
 
             threads = []
-            for _ in range(_RPC_COUNT):
+
+            for _ in range(_THREAD_COUNT):
                 q = queue.Queue()
                 thread = threading.Thread(target=_run_on_thread, args=(q,))
                 thread.setDaemon(True)
diff --git a/src/python/grpcio_tests/tests/unit/_local_credentials_test.py b/src/python/grpcio_tests/tests/unit/_local_credentials_test.py
index 5351a2b4cc1..ce92feed4b3 100644
--- a/src/python/grpcio_tests/tests/unit/_local_credentials_test.py
+++ b/src/python/grpcio_tests/tests/unit/_local_credentials_test.py
@@ -19,8 +19,6 @@ import unittest
 
 import grpc
 
-from tests.unit import test_common
-
 
 class _GenericHandler(grpc.GenericRpcHandler):
 
@@ -58,8 +56,6 @@ class LocalCredentialsTest(unittest.TestCase):
 
     @unittest.skipIf(os.name == 'nt',
                      'Unix Domain Socket is not supported on Windows')
-    @unittest.skipIf(test_common.running_under_gevent(),
-                     'UDS not supported under gevent.')
     def test_uds(self):
         server_addr = 'unix:/tmp/grpc_fullstack_test'
         channel_creds = grpc.local_channel_credentials(
diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
index 87441833c17..89c028b307b 100644
--- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
+++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
@@ -188,8 +188,6 @@ def _generic_handler(servicer):
     return grpc.method_handlers_generic_handler(_SERVICE, method_handlers)
 
 
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "Causes deadlock in gevent.")
 class MetadataCodeDetailsTest(unittest.TestCase):
 
     def setUp(self):
diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
index 62ab5a58fc6..90d010b9360 100644
--- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py
+++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
@@ -21,7 +21,6 @@ import unittest
 import grpc
 from grpc.framework.foundation import logging_pool
 
-from tests.unit import test_common
 from tests.unit.framework.common import bound_socket
 from tests.unit.framework.common import test_constants
 
@@ -35,8 +34,6 @@ def _handle_unary_unary(unused_request, unused_servicer_context):
     return _RESPONSE
 
 
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "Test is nondeterministic under gevent.")
 class ReconnectTest(unittest.TestCase):
 
     def test_reconnect(self):
diff --git a/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py b/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py
index 0ffa9eff94f..1c0886cc069 100644
--- a/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py
+++ b/src/python/grpcio_tests/tests/unit/_rpc_part_1_test.py
@@ -22,7 +22,6 @@ import unittest
 import grpc
 from grpc.framework.foundation import logging_pool
 
-from tests.unit import test_common
 from tests.unit._rpc_test_helpers import BaseRPCTest
 from tests.unit._rpc_test_helpers import Callback
 from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
@@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable
 from tests.unit.framework.common import test_constants
 
 
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "This test is nondeterministic under gevent.")
 class RPCPart1Test(BaseRPCTest, unittest.TestCase):
 
     def testExpiredStreamRequestBlockingUnaryResponse(self):
@@ -238,4 +235,4 @@ class RPCPart1Test(BaseRPCTest, unittest.TestCase):
 
 if __name__ == '__main__':
     logging.basicConfig()
-    unittest.main(verbosity=2)
+    unittest.main(verbosity=3)
diff --git a/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py b/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py
index 6a82a9588f6..a8e6ddeb534 100644
--- a/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py
+++ b/src/python/grpcio_tests/tests/unit/_rpc_part_2_test.py
@@ -22,7 +22,6 @@ import unittest
 import grpc
 from grpc.framework.foundation import logging_pool
 
-from tests.unit import test_common
 from tests.unit._rpc_test_helpers import BaseRPCTest
 from tests.unit._rpc_test_helpers import Callback
 from tests.unit._rpc_test_helpers import TIMEOUT_SHORT
@@ -37,8 +36,6 @@ from tests.unit._rpc_test_helpers import unary_unary_multi_callable
 from tests.unit.framework.common import test_constants
 
 
-@unittest.skipIf(test_common.running_under_gevent(),
-                 "Causes deadlock under gevent.")
 class RPCPart2Test(BaseRPCTest, unittest.TestCase):
 
     def testDefaultThreadPoolIsUsed(self):
diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py b/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py
index a3f18a9a490..49c08e009a7 100644
--- a/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py
+++ b/src/python/grpcio_tests/tests/unit/_rpc_test_helpers.py
@@ -36,7 +36,7 @@ _STREAM_UNARY = '/test/StreamUnary'
 _STREAM_STREAM = '/test/StreamStream'
 _STREAM_STREAM_NON_BLOCKING = '/test/StreamStreamNonBlocking'
 
-TIMEOUT_SHORT = datetime.timedelta(seconds=1).total_seconds()
+TIMEOUT_SHORT = datetime.timedelta(seconds=4).total_seconds()
 
 
 class Callback(object):
diff --git a/src/python/grpcio_tests/tests/unit/_signal_client.py b/src/python/grpcio_tests/tests/unit/_signal_client.py
index 5cc68831f7e..eac83b1844a 100644
--- a/src/python/grpcio_tests/tests/unit/_signal_client.py
+++ b/src/python/grpcio_tests/tests/unit/_signal_client.py
@@ -108,7 +108,19 @@ if __name__ == '__main__':
     parser.add_argument('--exception',
                         help='Whether the signal throws an exception',
                         action='store_true')
+    parser.add_argument('--gevent',
+                        help='Whether to run under gevent.',
+                        action='store_true')
     args = parser.parse_args()
+    if args.gevent:
+        from gevent import monkey
+        import gevent.util
+
+        monkey.patch_all()
+
+        import grpc.experimental.gevent
+        grpc.experimental.gevent.init_gevent()
+
     if args.arity == 'unary' and not args.exception:
         main_unary(args.server)
     elif args.arity == 'streaming' and not args.exception:
diff --git a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
index 600bd8fce64..df38159d546 100644
--- a/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
+++ b/src/python/grpcio_tests/tests/unit/_signal_handling_test.py
@@ -41,6 +41,10 @@ else:
 
 _HOST = 'localhost'
 
+# The gevent test harness cannot run the monkeypatch code for the child process,
+# so we need to instrument it manually.
+_GEVENT_ARG = ("--gevent",) if test_common.running_under_gevent() else ()
+
 
 class _GenericHandler(grpc.GenericRpcHandler):
 
@@ -142,8 +146,8 @@ class SignalHandlingTest(unittest.TestCase):
         server_target = '{}:{}'.format(_HOST, self._port)
         with tempfile.TemporaryFile(mode='r') as client_stdout:
             with tempfile.TemporaryFile(mode='r') as client_stderr:
-                client = _start_client((server_target, 'unary'), client_stdout,
-                                       client_stderr)
+                client = _start_client((server_target, 'unary') + _GEVENT_ARG,
+                                       client_stdout, client_stderr)
                 self._handler.await_connected_client()
                 client.send_signal(signal.SIGINT)
                 self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
@@ -157,8 +161,9 @@ class SignalHandlingTest(unittest.TestCase):
         server_target = '{}:{}'.format(_HOST, self._port)
         with tempfile.TemporaryFile(mode='r') as client_stdout:
             with tempfile.TemporaryFile(mode='r') as client_stderr:
-                client = _start_client((server_target, 'streaming'),
-                                       client_stdout, client_stderr)
+                client = _start_client(
+                    (server_target, 'streaming') + _GEVENT_ARG, client_stdout,
+                    client_stderr)
                 self._handler.await_connected_client()
                 client.send_signal(signal.SIGINT)
                 self.assertFalse(client.wait(), msg=_read_stream(client_stderr))
@@ -171,8 +176,9 @@ class SignalHandlingTest(unittest.TestCase):
         server_target = '{}:{}'.format(_HOST, self._port)
         with tempfile.TemporaryFile(mode='r') as client_stdout:
             with tempfile.TemporaryFile(mode='r') as client_stderr:
-                client = _start_client(('--exception', server_target, 'unary'),
-                                       client_stdout, client_stderr)
+                client = _start_client(
+                    ('--exception', server_target, 'unary') + _GEVENT_ARG,
+                    client_stdout, client_stderr)
                 self._handler.await_connected_client()
                 client.send_signal(signal.SIGINT)
                 client.wait()
@@ -184,8 +190,8 @@ class SignalHandlingTest(unittest.TestCase):
         with tempfile.TemporaryFile(mode='r') as client_stdout:
             with tempfile.TemporaryFile(mode='r') as client_stderr:
                 client = _start_client(
-                    ('--exception', server_target, 'streaming'), client_stdout,
-                    client_stderr)
+                    ('--exception', server_target, 'streaming') + _GEVENT_ARG,
+                    client_stdout, client_stderr)
                 self._handler.await_connected_client()
                 client.send_signal(signal.SIGINT)
                 client.wait()