Pull out context manager

pull/25365/head
Richard Belleville 4 years ago
parent d1efd2af6e
commit 5e23b2dcb7
  1. 78
      src/python/grpcio_tests/tests/unit/_xds_credentials_test.py

@ -17,6 +17,7 @@ import unittest
import logging import logging
from concurrent import futures from concurrent import futures
import contextlib
import grpc import grpc
import grpc.experimental import grpc.experimental
@ -31,54 +32,59 @@ class _GenericHandler(grpc.GenericRpcHandler):
lambda request, unused_context: request) lambda request, unused_context: request)
@contextlib.contextmanager
def xds_channel_server_without_xds(server_fallback_creds):
server = grpc.server(futures.ThreadPoolExecutor())
server.add_generic_rpc_handlers((_GenericHandler(),))
server_server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
server_creds = grpc.xds_server_credentials(server_fallback_creds)
port = server.add_secure_port("localhost:0", server_creds)
server.start()
try:
yield "localhost:{}".format(port)
finally:
server.stop(None)
class XdsCredentialsTest(unittest.TestCase): class XdsCredentialsTest(unittest.TestCase):
def test_xds_creds_fallback_ssl(self): def test_xds_creds_fallback_ssl(self):
# Since there is no xDS server, the fallback credentials will be used. # Since there is no xDS server, the fallback credentials will be used.
# In this case, SSL credentials. # In this case, SSL credentials.
server = grpc.server(futures.ThreadPoolExecutor())
server.add_generic_rpc_handlers((_GenericHandler(),))
server_fallback_creds = grpc.ssl_server_credentials( server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),)) ((resources.private_key(), resources.certificate_chain()),))
server_creds = grpc.xds_server_credentials(server_fallback_creds) with xds_channel_server_without_xds(
port = server.add_secure_port("localhost:0", server_creds) server_fallback_creds) as server_address:
server.start() override_options = (("grpc.ssl_target_name_override",
channel_fallback_creds = grpc.ssl_channel_credentials( "foo.test.google.fr"),)
root_certificates=resources.test_root_certificates(), channel_fallback_creds = grpc.ssl_channel_credentials(
private_key=resources.private_key(), root_certificates=resources.test_root_certificates(),
certificate_chain=resources.certificate_chain()) private_key=resources.private_key(),
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) certificate_chain=resources.certificate_chain())
server_address = "localhost:{}".format(port) channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
override_options = (("grpc.ssl_target_name_override", with grpc.secure_channel(server_address,
"foo.test.google.fr"),) channel_creds,
with grpc.secure_channel(server_address, options=override_options) as channel:
channel_creds, request = b"abc"
options=override_options) as channel: response = channel.unary_unary("/test/method")(
request = b"abc" request, wait_for_ready=True)
response = channel.unary_unary("/test/method")(request, self.assertEqual(response, request)
wait_for_ready=True)
self.assertEqual(response, request)
server.stop(None)
def test_xds_creds_fallback_insecure(self): def test_xds_creds_fallback_insecure(self):
# Since there is no xDS server, the fallback credentials will be used. # Since there is no xDS server, the fallback credentials will be used.
# In this case, insecure. # In this case, insecure.
server = grpc.server(futures.ThreadPoolExecutor())
server.add_generic_rpc_handlers((_GenericHandler(),))
server_fallback_creds = grpc.insecure_server_credentials() server_fallback_creds = grpc.insecure_server_credentials()
server_creds = grpc.xds_server_credentials(server_fallback_creds) with xds_channel_server_without_xds(
port = server.add_secure_port("localhost:0", server_creds) server_fallback_creds) as server_address:
server.start() channel_fallback_creds = grpc.experimental.insecure_channel_credentials(
channel_fallback_creds = grpc.experimental.insecure_channel_credentials( )
) channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) with grpc.secure_channel(server_address, channel_creds) as channel:
server_address = "localhost:{}".format(port) request = b"abc"
with grpc.secure_channel(server_address, channel_creds) as channel: response = channel.unary_unary("/test/method")(
request = b"abc" request, wait_for_ready=True)
response = channel.unary_unary("/test/method")(request, self.assertEqual(response, request)
wait_for_ready=True)
self.assertEqual(response, request)
server.stop(None)
def test_start_xds_server(self): def test_start_xds_server(self):
server = grpc.server(futures.ThreadPoolExecutor(), xds=True) server = grpc.server(futures.ThreadPoolExecutor(), xds=True)

Loading…
Cancel
Save