diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 8df670d04e6..e1c1e36a392 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1609,6 +1609,21 @@ def ssl_channel_credentials(root_certificates=None, certificate_chain)) +def xds_channel_credentials(fallback_credentials=None): + """Creates a ChannelCredentials for use with xDS. This is an EXPERIMENTAL + API. + + Args: + fallback_credentials: Credentials to use in case it is not possible to + establish a secure connection via xDS. If no fallback_credentials + argument is supplied, a default SSLChannelCredentials is used. + """ + fallback_credentials = ssl_channel_credentials( + ) if fallback_credentials is None else fallback_credentials + return ChannelCredentials( + _cygrpc.XDSChannelCredentials(fallback_credentials._credentials)) + + def metadata_call_credentials(metadata_plugin, name=None): """Construct CallCredentials from an AuthMetadataPlugin. @@ -1708,6 +1723,29 @@ def ssl_server_credentials(private_key_certificate_chain_pairs, ], require_client_auth)) +def xds_server_credentials(fallback_credentials): + """Creates a ServerCredentials for use with xDS. This is an EXPERIMENTAL + API. + + Args: + fallback_credentials: Credentials to use in case it is not possible to + establish a secure connection via xDS. No default value is provided. + """ + return ServerCredentials( + _cygrpc.xds_server_credentials(fallback_credentials._credentials)) + + +def insecure_server_credentials(): + """Creates a credentials object directing the server to use no credentials. + This is an EXPERIMENTAL API. + + This object cannot be used directly in a call to `add_secure_port`. + Instead, it should be used to construct other credentials objects, e.g. + with xds_server_credentials. + """ + return ServerCredentials(_cygrpc.insecure_server_credentials()) + + def ssl_server_certificate_configuration(private_key_certificate_chain_pairs, root_certificates=None): """Creates a ServerCertificateConfiguration for use with a Server. @@ -1983,7 +2021,8 @@ def server(thread_pool, interceptors=None, options=None, maximum_concurrent_rpcs=None, - compression=None): + compression=None, + xds=False): """Creates a Server with which RPCs can be serviced. Args: @@ -2004,6 +2043,8 @@ def server(thread_pool, compression: An element of grpc.compression, e.g. grpc.compression.Gzip. This compression algorithm will be used for the lifetime of the server unless overridden. This is an EXPERIMENTAL option. + xds: If set to true, retrieves server configuration via xDS. This is an + EXPERIMENTAL option. Returns: A Server object. @@ -2013,7 +2054,7 @@ def server(thread_pool, () if handlers is None else handlers, () if interceptors is None else interceptors, () if options is None else options, - maximum_concurrent_rpcs, compression) + maximum_concurrent_rpcs, compression, xds) @contextlib.contextmanager diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 73d9fb4ea97..c1b998cc887 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -822,7 +822,8 @@ cdef class AioServer: init_grpc_aio() # NOTE(lidiz) Core objects won't be deallocated automatically. # If AioServer.shutdown is not called, those objects will leak. - self._server = Server(options) + # TODO(rbellevi): Support xDS in aio server. + self._server = Server(options, False) grpc_server_register_completion_queue( self._server.c_server, global_completion_queue(), diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index ddaedb30bd4..827f6f17ca3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -76,6 +76,13 @@ cdef class CompositeChannelCredentials(ChannelCredentials): cdef grpc_channel_credentials *c(self) except * +cdef class XDSChannelCredentials(ChannelCredentials): + + cdef readonly ChannelCredentials _fallback_credentials + + cdef grpc_channel_credentials *c(self) except * + + cdef class ServerCertificateConfig: cdef grpc_ssl_server_certificate_config *c_cert_config diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index b1d0a8b2f4a..e7e59261b45 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -178,6 +178,18 @@ cdef class CompositeChannelCredentials(ChannelCredentials): return c_composition +cdef class XDSChannelCredentials(ChannelCredentials): + + def __cinit__(self, fallback_credentials): + self._fallback_credentials = fallback_credentials + + cdef grpc_channel_credentials *c(self) except *: + cdef grpc_channel_credentials *c_fallback_creds = self._fallback_credentials.c() + cdef grpc_channel_credentials *xds_creds = grpc_xds_credentials_create(c_fallback_creds) + grpc_channel_credentials_release(c_fallback_creds) + return xds_creds + + cdef class ServerCertificateConfig: def __cinit__(self): @@ -347,11 +359,31 @@ cdef class LocalChannelCredentials(ChannelCredentials): def channel_credentials_local(grpc_local_connect_type local_connect_type): return LocalChannelCredentials(local_connect_type) +cdef class InsecureChannelCredentials(ChannelCredentials): + + cdef grpc_channel_credentials *c(self) except *: + return grpc_insecure_credentials_create() + +def channel_credentials_insecure(): + return InsecureChannelCredentials() + def server_credentials_local(grpc_local_connect_type local_connect_type): cdef ServerCredentials credentials = ServerCredentials() credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type) return credentials +def xds_server_credentials(ServerCredentials fallback_credentials): + cdef ServerCredentials credentials = ServerCredentials() + credentials.c_credentials = grpc_xds_server_credentials_create(fallback_credentials.c_credentials) + # NOTE: We do not need to call grpc_server_credentials_release on the + # fallback credentials here because this will be done by the __dealloc__ + # method of its Cython wrapper. + return credentials + +def insecure_server_credentials(): + cdef ServerCredentials credentials = ServerCredentials() + credentials.c_credentials = grpc_insecure_server_credentials_create() + return credentials cdef class ALTSChannelCredentials(ChannelCredentials): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index b873bb0676f..b7d5e2e4c87 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -397,6 +397,16 @@ cdef extern from "grpc/grpc.h": void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil + + ctypedef struct grpc_server_config_fetcher: + pass + + void grpc_server_set_config_fetcher( + grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil + + grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() nogil + + int grpc_server_add_insecure_http2_port( grpc_server *server, const char *addr) nogil void grpc_server_start(grpc_server *server) nogil @@ -514,6 +524,16 @@ cdef extern from "grpc/grpc_security.h": void *reserved) nogil void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil + grpc_channel_credentials *grpc_xds_credentials_create( + grpc_channel_credentials *fallback_creds) nogil + + grpc_channel_credentials *grpc_insecure_credentials_create() nogil + + grpc_server_credentials *grpc_xds_server_credentials_create( + grpc_server_credentials *fallback_creds) nogil + + grpc_server_credentials *grpc_insecure_server_credentials_create() nogil + grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *creds1, grpc_call_credentials *creds2, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 48014b68804..ffd06ca4edc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -15,7 +15,7 @@ cdef class Server: - def __cinit__(self, object arguments): + def __cinit__(self, object arguments, bint xds): fork_handlers_and_grpc_init() self.references = [] self.registered_completion_queues = [] @@ -25,6 +25,8 @@ cdef class Server: self.c_server = NULL cdef _ChannelArgs channel_args = _ChannelArgs(arguments) self.c_server = grpc_server_create(channel_args.c_args(), NULL) + if xds: + grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create()) self.references.append(arguments) def request_call( diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 48ff7439951..069ffa79822 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -945,9 +945,9 @@ class _Server(grpc.Server): # pylint: disable=too-many-arguments def __init__(self, thread_pool, generic_handlers, interceptors, options, - maximum_concurrent_rpcs, compression): + maximum_concurrent_rpcs, compression, xds): completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(_augment_options(options, compression)) + server = cygrpc.Server(_augment_options(options, compression), xds) server.register_completion_queue(completion_queue) self._state = _ServerState(completion_queue, server, generic_handlers, _interceptor.service_pipeline(interceptors), @@ -989,7 +989,7 @@ class _Server(grpc.Server): def create_server(thread_pool, generic_rpc_handlers, interceptors, options, - maximum_concurrent_rpcs, compression): + maximum_concurrent_rpcs, compression, xds): _validate_generic_rpc_handlers(generic_rpc_handlers) return _Server(thread_pool, generic_rpc_handlers, interceptors, options, - maximum_concurrent_rpcs, compression) + maximum_concurrent_rpcs, compression, xds) diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index f9ce5ec50f5..3dd44c705ed 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -60,20 +60,13 @@ else: def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], compression: Optional[grpc.Compression]) -> grpc.Channel: - if channel_credentials is grpc.experimental.insecure_channel_credentials(): - _LOGGER.debug(f"Creating insecure channel with options '{options}' " + - f"and compression '{compression}'") - return grpc.insecure_channel(target, - options=options, - compression=compression) - else: - _LOGGER.debug( - f"Creating secure channel with credentials '{channel_credentials}', " - + f"options '{options}' and compression '{compression}'") - return grpc.secure_channel(target, - credentials=channel_credentials, - options=options, - compression=compression) + _LOGGER.debug( + f"Creating secure channel with credentials '{channel_credentials}', " + + f"options '{options}' and compression '{compression}'") + return grpc.secure_channel(target, + credentials=channel_credentials, + options=options, + compression=compression) class ChannelCache: diff --git a/src/python/grpcio/grpc/experimental/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py index a4e2660fb4e..c820bc9db3b 100644 --- a/src/python/grpcio/grpc/experimental/__init__.py +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -22,6 +22,7 @@ import sys import warnings import grpc +from grpc._cython import cygrpc as _cygrpc _EXPERIMENTAL_APIS_USED = set() @@ -41,19 +42,16 @@ class UsageError(Exception): """Raised by the gRPC library to indicate usage not allowed by the API.""" -_insecure_channel_credentials_sentinel = object() +# It's important that there be a single insecure credentials object so that its +# hash is deterministic and can be used for indexing in the simple stubs cache. _insecure_channel_credentials = grpc.ChannelCredentials( - _insecure_channel_credentials_sentinel) + _cygrpc.channel_credentials_insecure()) def insecure_channel_credentials(): """Creates a ChannelCredentials for use with an insecure channel. THIS IS AN EXPERIMENTAL API. - - This is not for use with secure_channel function. Intead, this should be - used with grpc.unary_unary, grpc.unary_stream, grpc.stream_unary, or - grpc.stream_stream. """ return _insecure_channel_credentials diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 083b016748d..bd4139b4922 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -77,6 +77,7 @@ "unit._session_cache_test.SSLSessionCacheTest", "unit._signal_handling_test.SignalHandlingTest", "unit._version_test.VersionTest", + "unit._xds_credentials_test.XdsCredentialsTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 9969729ce94..d9338867f53 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -40,6 +40,7 @@ GRPCIO_TESTS_UNIT = [ "_server_shutdown_test.py", "_server_wait_for_termination_test.py", "_session_cache_test.py", + "_xds_credentials_test.py", ] py_library( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index b279f3d07c5..c84dff614ab 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -144,12 +144,10 @@ class CancelManyCallsTest(unittest.TestCase): test_constants.THREAD_CONCURRENCY) server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server([ - ( - b'grpc.so_reuseport', - 0, - ), - ]) + server = cygrpc.Server([( + b'grpc.so_reuseport', + 0, + )], False) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index d8210f36f80..42ec655feee 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -96,7 +96,7 @@ class RpcTest(object): def setUp(self): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)]) + self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)], False) self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 8a903bfaf91..4aad924427b 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -115,7 +115,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server = cygrpc.Server([( b'grpc.so_reuseport', 0, - )]) + )], False) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py index bbd25457b3e..60b068243c0 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -25,7 +25,7 @@ class Test(unittest.TestCase): def test_lonely_server(self): server_call_completion_queue = cygrpc.CompletionQueue() server_shutdown_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(None) + server = cygrpc.Server(None, False) server.register_completion_queue(server_call_completion_queue) server.register_completion_queue(server_shutdown_completion_queue) port = server.add_http2_port(b'[::]:0') diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 1182f83a425..30cd5078ffa 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -42,12 +42,10 @@ class TypeSmokeTest(unittest.TestCase): del completion_queue def testServerUpDown(self): - server = cygrpc.Server(set([ - ( - b'grpc.so_reuseport', - 0, - ), - ])) + server = cygrpc.Server(set([( + b'grpc.so_reuseport', + 0, + )]), False) del server def testChannelUpDown(self): @@ -59,12 +57,10 @@ class TypeSmokeTest(unittest.TestCase): b'test plugin name!') def testServerStartNoExplicitShutdown(self): - server = cygrpc.Server([ - ( - b'grpc.so_reuseport', - 0, - ), - ]) + server = cygrpc.Server([( + b'grpc.so_reuseport', + 0, + )], False) completion_queue = cygrpc.CompletionQueue() server.register_completion_queue(completion_queue) port = server.add_http2_port(b'[::]:0') @@ -79,7 +75,7 @@ class TypeSmokeTest(unittest.TestCase): b'grpc.so_reuseport', 0, ), - ]) + ], False) server.add_http2_port(b'[::]:0') server.register_completion_queue(completion_queue) server.start() @@ -97,12 +93,10 @@ class ServerClientMixin(object): def setUpMixin(self, server_credentials, client_credentials, host_override): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server([ - ( - b'grpc.so_reuseport', - 0, - ), - ]) + self.server = cygrpc.Server([( + b'grpc.so_reuseport', + 0, + )], False) self.server.register_completion_queue(self.server_completion_queue) if server_credentials: self.port = self.server.add_http2_port(b'[::]:0', diff --git a/src/python/grpcio_tests/tests/unit/_xds_credentials_test.py b/src/python/grpcio_tests/tests/unit/_xds_credentials_test.py new file mode 100644 index 00000000000..91d30c68ad5 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_xds_credentials_test.py @@ -0,0 +1,103 @@ +# Copyright 2021 The gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests xDS server and channel credentials.""" + +import unittest + +import logging +from concurrent import futures +import contextlib + +import grpc +import grpc.experimental +from tests.unit import test_common +from tests.unit import resources + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return grpc.unary_unary_rpc_method_handler( + lambda request, unused_context: request) + + +@contextlib.contextmanager +def xds_channel_server_without_xds(server_fallback_creds): + server = grpc.server(futures.ThreadPoolExecutor()) + server.add_generic_rpc_handlers((_GenericHandler(),)) + server_server_fallback_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) + server_creds = grpc.xds_server_credentials(server_fallback_creds) + port = server.add_secure_port("localhost:0", server_creds) + server.start() + try: + yield "localhost:{}".format(port) + finally: + server.stop(None) + + +class XdsCredentialsTest(unittest.TestCase): + + def test_xds_creds_fallback_ssl(self): + # Since there is no xDS server, the fallback credentials will be used. + # In this case, SSL credentials. + server_fallback_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) + with xds_channel_server_without_xds( + server_fallback_creds) as server_address: + override_options = (("grpc.ssl_target_name_override", + "foo.test.google.fr"),) + channel_fallback_creds = grpc.ssl_channel_credentials( + root_certificates=resources.test_root_certificates(), + private_key=resources.private_key(), + certificate_chain=resources.certificate_chain()) + channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) + with grpc.secure_channel(server_address, + channel_creds, + options=override_options) as channel: + request = b"abc" + response = channel.unary_unary("/test/method")( + request, wait_for_ready=True) + self.assertEqual(response, request) + + def test_xds_creds_fallback_insecure(self): + # Since there is no xDS server, the fallback credentials will be used. + # In this case, insecure. + server_fallback_creds = grpc.insecure_server_credentials() + with xds_channel_server_without_xds( + server_fallback_creds) as server_address: + channel_fallback_creds = grpc.experimental.insecure_channel_credentials( + ) + channel_creds = grpc.xds_channel_credentials(channel_fallback_creds) + with grpc.secure_channel(server_address, channel_creds) as channel: + request = b"abc" + response = channel.unary_unary("/test/method")( + request, wait_for_ready=True) + self.assertEqual(response, request) + + def test_start_xds_server(self): + server = grpc.server(futures.ThreadPoolExecutor(), xds=True) + server.add_generic_rpc_handlers((_GenericHandler(),)) + server_fallback_creds = grpc.insecure_server_credentials() + server_creds = grpc.xds_server_credentials(server_fallback_creds) + port = server.add_secure_port("localhost:0", server_creds) + server.start() + server.stop(None) + # No exceptions thrown. A more comprehensive suite of tests will be + # provided by the interop tests. + + +if __name__ == "__main__": + logging.basicConfig() + unittest.main()