Change channel tests to use public API

This allows for testing other implementations of grpc.Server.
pull/8414/head
Ken Payson 8 years ago
parent bb4c5f3d3f
commit 667301a25f
  1. 12
      src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
  2. 4
      src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py

@ -34,8 +34,6 @@ import time
import unittest
import grpc
from grpc import _channel
from grpc import _server
from tests.unit.framework.common import test_constants
from tests.unit import _thread_pool
@ -78,7 +76,7 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
callback = _Callback()
channel = _channel.Channel('localhost:12345', (), None)
channel = grpc.insecure_channel('localhost:12345')
channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
channel.subscribe(callback.update, try_to_connect=True)
@ -105,13 +103,13 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
first_callback = _Callback()
second_callback = _Callback()
channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel = grpc.insecure_channel('localhost:{}'.format(port))
channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@ -146,12 +144,12 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_reachable_then_unreachable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
callback = _Callback()
channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel = grpc.insecure_channel('localhost:{}'.format(port))
channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(_ready_in_connectivities)
# Now take down the server and confirm that channel readiness is repudiated.

@ -33,8 +33,6 @@ import threading
import unittest
import grpc
from grpc import _channel
from grpc import _server
from tests.unit.framework.common import test_constants
from tests.unit import _thread_pool
@ -79,7 +77,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
server = _server.Server(thread_pool, (), ())
server = grpc.server(thread_pool)
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))

Loading…
Cancel
Save