Merge pull request #25410 from gnossen/unrevert_xds_creds

Reintroduce xDS Creds
pull/25438/head
Richard Belleville 4 years ago committed by GitHub
commit ee915d8a15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      src/python/grpcio/grpc/__init__.py
  2. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  3. 7
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
  4. 32
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  5. 20
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  6. 4
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  7. 8
      src/python/grpcio/grpc/_server.py
  8. 21
      src/python/grpcio/grpc/_simple_stubs.py
  9. 10
      src/python/grpcio/grpc/experimental/__init__.py
  10. 1
      src/python/grpcio_tests/tests/tests.json
  11. 1
      src/python/grpcio_tests/tests/unit/BUILD.bazel
  12. 3
      src/python/grpcio_tests/tests/unit/_api_test.py
  13. 10
      src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
  14. 2
      src/python/grpcio_tests/tests/unit/_cython/_common.py
  15. 2
      src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
  16. 2
      src/python/grpcio_tests/tests/unit/_cython/_server_test.py
  17. 32
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
  18. 103
      src/python/grpcio_tests/tests/unit/_xds_credentials_test.py

@ -1609,6 +1609,21 @@ def ssl_channel_credentials(root_certificates=None,
certificate_chain))
def xds_channel_credentials(fallback_credentials=None):
"""Creates a ChannelCredentials for use with xDS. This is an EXPERIMENTAL
API.
Args:
fallback_credentials: Credentials to use in case it is not possible to
establish a secure connection via xDS. If no fallback_credentials
argument is supplied, a default SSLChannelCredentials is used.
"""
fallback_credentials = ssl_channel_credentials(
) if fallback_credentials is None else fallback_credentials
return ChannelCredentials(
_cygrpc.XDSChannelCredentials(fallback_credentials._credentials))
def metadata_call_credentials(metadata_plugin, name=None):
"""Construct CallCredentials from an AuthMetadataPlugin.
@ -1708,6 +1723,29 @@ def ssl_server_credentials(private_key_certificate_chain_pairs,
], require_client_auth))
def xds_server_credentials(fallback_credentials):
"""Creates a ServerCredentials for use with xDS. This is an EXPERIMENTAL
API.
Args:
fallback_credentials: Credentials to use in case it is not possible to
establish a secure connection via xDS. No default value is provided.
"""
return ServerCredentials(
_cygrpc.xds_server_credentials(fallback_credentials._credentials))
def insecure_server_credentials():
"""Creates a credentials object directing the server to use no credentials.
This is an EXPERIMENTAL API.
This object cannot be used directly in a call to `add_secure_port`.
Instead, it should be used to construct other credentials objects, e.g.
with xds_server_credentials.
"""
return ServerCredentials(_cygrpc.insecure_server_credentials())
def ssl_server_certificate_configuration(private_key_certificate_chain_pairs,
root_certificates=None):
"""Creates a ServerCertificateConfiguration for use with a Server.
@ -1983,7 +2021,8 @@ def server(thread_pool,
interceptors=None,
options=None,
maximum_concurrent_rpcs=None,
compression=None):
compression=None,
xds=False):
"""Creates a Server with which RPCs can be serviced.
Args:
@ -2004,6 +2043,8 @@ def server(thread_pool,
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This compression algorithm will be used for the
lifetime of the server unless overridden. This is an EXPERIMENTAL option.
xds: If set to true, retrieves server configuration via xDS. This is an
EXPERIMENTAL option.
Returns:
A Server object.
@ -2013,7 +2054,7 @@ def server(thread_pool,
() if handlers is None else handlers,
() if interceptors is None else interceptors,
() if options is None else options,
maximum_concurrent_rpcs, compression)
maximum_concurrent_rpcs, compression, xds)
@contextlib.contextmanager
@ -2105,6 +2146,9 @@ __all__ = (
'protos',
'services',
'protos_and_services',
'xds_channel_credentials',
'xds_server_credentials',
'insecure_server_credentials',
)
############################### Extension Shims ################################

@ -822,7 +822,8 @@ cdef class AioServer:
init_grpc_aio()
# NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak.
self._server = Server(options)
# TODO(rbellevi): Support xDS in aio server.
self._server = Server(options, False)
grpc_server_register_completion_queue(
self._server.c_server,
global_completion_queue(),

@ -76,6 +76,13 @@ cdef class CompositeChannelCredentials(ChannelCredentials):
cdef grpc_channel_credentials *c(self) except *
cdef class XDSChannelCredentials(ChannelCredentials):
cdef readonly ChannelCredentials _fallback_credentials
cdef grpc_channel_credentials *c(self) except *
cdef class ServerCertificateConfig:
cdef grpc_ssl_server_certificate_config *c_cert_config

@ -178,6 +178,18 @@ cdef class CompositeChannelCredentials(ChannelCredentials):
return c_composition
cdef class XDSChannelCredentials(ChannelCredentials):
def __cinit__(self, fallback_credentials):
self._fallback_credentials = fallback_credentials
cdef grpc_channel_credentials *c(self) except *:
cdef grpc_channel_credentials *c_fallback_creds = self._fallback_credentials.c()
cdef grpc_channel_credentials *xds_creds = grpc_xds_credentials_create(c_fallback_creds)
grpc_channel_credentials_release(c_fallback_creds)
return xds_creds
cdef class ServerCertificateConfig:
def __cinit__(self):
@ -347,11 +359,31 @@ cdef class LocalChannelCredentials(ChannelCredentials):
def channel_credentials_local(grpc_local_connect_type local_connect_type):
return LocalChannelCredentials(local_connect_type)
cdef class InsecureChannelCredentials(ChannelCredentials):
cdef grpc_channel_credentials *c(self) except *:
return grpc_insecure_credentials_create()
def channel_credentials_insecure():
return InsecureChannelCredentials()
def server_credentials_local(grpc_local_connect_type local_connect_type):
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type)
return credentials
def xds_server_credentials(ServerCredentials fallback_credentials):
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_xds_server_credentials_create(fallback_credentials.c_credentials)
# NOTE: We do not need to call grpc_server_credentials_release on the
# fallback credentials here because this will be done by the __dealloc__
# method of its Cython wrapper.
return credentials
def insecure_server_credentials():
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_insecure_server_credentials_create()
return credentials
cdef class ALTSChannelCredentials(ChannelCredentials):

@ -397,6 +397,16 @@ cdef extern from "grpc/grpc.h":
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
void *reserved) nogil
ctypedef struct grpc_server_config_fetcher:
pass
void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() nogil
int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil
@ -514,6 +524,16 @@ cdef extern from "grpc/grpc_security.h":
void *reserved) nogil
void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
grpc_channel_credentials *grpc_xds_credentials_create(
grpc_channel_credentials *fallback_creds) nogil
grpc_channel_credentials *grpc_insecure_credentials_create() nogil
grpc_server_credentials *grpc_xds_server_credentials_create(
grpc_server_credentials *fallback_creds) nogil
grpc_server_credentials *grpc_insecure_server_credentials_create() nogil
grpc_call_credentials *grpc_composite_call_credentials_create(
grpc_call_credentials *creds1, grpc_call_credentials *creds2,
void *reserved) nogil

@ -15,7 +15,7 @@
cdef class Server:
def __cinit__(self, object arguments):
def __cinit__(self, object arguments, bint xds):
fork_handlers_and_grpc_init()
self.references = []
self.registered_completion_queues = []
@ -25,6 +25,8 @@ cdef class Server:
self.c_server = NULL
cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
self.c_server = grpc_server_create(channel_args.c_args(), NULL)
if xds:
grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create())
self.references.append(arguments)
def request_call(

@ -945,9 +945,9 @@ class _Server(grpc.Server):
# pylint: disable=too-many-arguments
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
maximum_concurrent_rpcs, compression, xds):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_augment_options(options, compression))
server = cygrpc.Server(_augment_options(options, compression), xds)
server.register_completion_queue(completion_queue)
self._state = _ServerState(completion_queue, server, generic_handlers,
_interceptor.service_pipeline(interceptors),
@ -989,7 +989,7 @@ class _Server(grpc.Server):
def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
maximum_concurrent_rpcs, compression, xds):
_validate_generic_rpc_handlers(generic_rpc_handlers)
return _Server(thread_pool, generic_rpc_handlers, interceptors, options,
maximum_concurrent_rpcs, compression)
maximum_concurrent_rpcs, compression, xds)

@ -60,20 +60,13 @@ else:
def _create_channel(target: str, options: Sequence[Tuple[str, str]],
channel_credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression]) -> grpc.Channel:
if channel_credentials is grpc.experimental.insecure_channel_credentials():
_LOGGER.debug(f"Creating insecure channel with options '{options}' " +
f"and compression '{compression}'")
return grpc.insecure_channel(target,
options=options,
compression=compression)
else:
_LOGGER.debug(
f"Creating secure channel with credentials '{channel_credentials}', "
+ f"options '{options}' and compression '{compression}'")
return grpc.secure_channel(target,
credentials=channel_credentials,
options=options,
compression=compression)
_LOGGER.debug(
f"Creating secure channel with credentials '{channel_credentials}', " +
f"options '{options}' and compression '{compression}'")
return grpc.secure_channel(target,
credentials=channel_credentials,
options=options,
compression=compression)
class ChannelCache:

@ -22,6 +22,7 @@ import sys
import warnings
import grpc
from grpc._cython import cygrpc as _cygrpc
_EXPERIMENTAL_APIS_USED = set()
@ -41,19 +42,16 @@ class UsageError(Exception):
"""Raised by the gRPC library to indicate usage not allowed by the API."""
_insecure_channel_credentials_sentinel = object()
# It's important that there be a single insecure credentials object so that its
# hash is deterministic and can be used for indexing in the simple stubs cache.
_insecure_channel_credentials = grpc.ChannelCredentials(
_insecure_channel_credentials_sentinel)
_cygrpc.channel_credentials_insecure())
def insecure_channel_credentials():
"""Creates a ChannelCredentials for use with an insecure channel.
THIS IS AN EXPERIMENTAL API.
This is not for use with secure_channel function. Intead, this should be
used with grpc.unary_unary, grpc.unary_stream, grpc.stream_unary, or
grpc.stream_stream.
"""
return _insecure_channel_credentials

@ -77,6 +77,7 @@
"unit._session_cache_test.SSLSessionCacheTest",
"unit._signal_handling_test.SignalHandlingTest",
"unit._version_test.VersionTest",
"unit._xds_credentials_test.XdsCredentialsTest",
"unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest",

@ -40,6 +40,7 @@ GRPCIO_TESTS_UNIT = [
"_server_shutdown_test.py",
"_server_wait_for_termination_test.py",
"_session_cache_test.py",
"_xds_credentials_test.py",
]
py_library(

@ -87,6 +87,9 @@ class AllTest(unittest.TestCase):
'protos',
'services',
'protos_and_services',
'xds_channel_credentials',
'xds_server_credentials',
'insecure_server_credentials',
)
six.assertCountEqual(self, expected_grpc_code_elements,

@ -144,12 +144,10 @@ class CancelManyCallsTest(unittest.TestCase):
test_constants.THREAD_CONCURRENCY)
server_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server([
(
b'grpc.so_reuseport',
0,
),
])
server = cygrpc.Server([(
b'grpc.so_reuseport',
0,
)], False)
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()

@ -96,7 +96,7 @@ class RpcTest(object):
def setUp(self):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)])
self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)], False)
self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0')
self.server.start()

@ -115,7 +115,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server = cygrpc.Server([(
b'grpc.so_reuseport',
0,
)])
)], False)
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()

@ -25,7 +25,7 @@ class Test(unittest.TestCase):
def test_lonely_server(self):
server_call_completion_queue = cygrpc.CompletionQueue()
server_shutdown_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(None)
server = cygrpc.Server(None, False)
server.register_completion_queue(server_call_completion_queue)
server.register_completion_queue(server_shutdown_completion_queue)
port = server.add_http2_port(b'[::]:0')

@ -42,12 +42,10 @@ class TypeSmokeTest(unittest.TestCase):
del completion_queue
def testServerUpDown(self):
server = cygrpc.Server(set([
(
b'grpc.so_reuseport',
0,
),
]))
server = cygrpc.Server(set([(
b'grpc.so_reuseport',
0,
)]), False)
del server
def testChannelUpDown(self):
@ -59,12 +57,10 @@ class TypeSmokeTest(unittest.TestCase):
b'test plugin name!')
def testServerStartNoExplicitShutdown(self):
server = cygrpc.Server([
(
b'grpc.so_reuseport',
0,
),
])
server = cygrpc.Server([(
b'grpc.so_reuseport',
0,
)], False)
completion_queue = cygrpc.CompletionQueue()
server.register_completion_queue(completion_queue)
port = server.add_http2_port(b'[::]:0')
@ -79,7 +75,7 @@ class TypeSmokeTest(unittest.TestCase):
b'grpc.so_reuseport',
0,
),
])
], False)
server.add_http2_port(b'[::]:0')
server.register_completion_queue(completion_queue)
server.start()
@ -97,12 +93,10 @@ class ServerClientMixin(object):
def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server([
(
b'grpc.so_reuseport',
0,
),
])
self.server = cygrpc.Server([(
b'grpc.so_reuseport',
0,
)], False)
self.server.register_completion_queue(self.server_completion_queue)
if server_credentials:
self.port = self.server.add_http2_port(b'[::]:0',

@ -0,0 +1,103 @@
# Copyright 2021 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests xDS server and channel credentials."""
import unittest
import logging
from concurrent import futures
import contextlib
import grpc
import grpc.experimental
from tests.unit import test_common
from tests.unit import resources
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
return grpc.unary_unary_rpc_method_handler(
lambda request, unused_context: request)
@contextlib.contextmanager
def xds_channel_server_without_xds(server_fallback_creds):
server = grpc.server(futures.ThreadPoolExecutor())
server.add_generic_rpc_handlers((_GenericHandler(),))
server_server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
server_creds = grpc.xds_server_credentials(server_fallback_creds)
port = server.add_secure_port("localhost:0", server_creds)
server.start()
try:
yield "localhost:{}".format(port)
finally:
server.stop(None)
class XdsCredentialsTest(unittest.TestCase):
def test_xds_creds_fallback_ssl(self):
# Since there is no xDS server, the fallback credentials will be used.
# In this case, SSL credentials.
server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
with xds_channel_server_without_xds(
server_fallback_creds) as server_address:
override_options = (("grpc.ssl_target_name_override",
"foo.test.google.fr"),)
channel_fallback_creds = grpc.ssl_channel_credentials(
root_certificates=resources.test_root_certificates(),
private_key=resources.private_key(),
certificate_chain=resources.certificate_chain())
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
with grpc.secure_channel(server_address,
channel_creds,
options=override_options) as channel:
request = b"abc"
response = channel.unary_unary("/test/method")(
request, wait_for_ready=True)
self.assertEqual(response, request)
def test_xds_creds_fallback_insecure(self):
# Since there is no xDS server, the fallback credentials will be used.
# In this case, insecure.
server_fallback_creds = grpc.insecure_server_credentials()
with xds_channel_server_without_xds(
server_fallback_creds) as server_address:
channel_fallback_creds = grpc.experimental.insecure_channel_credentials(
)
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
with grpc.secure_channel(server_address, channel_creds) as channel:
request = b"abc"
response = channel.unary_unary("/test/method")(
request, wait_for_ready=True)
self.assertEqual(response, request)
def test_start_xds_server(self):
server = grpc.server(futures.ThreadPoolExecutor(), xds=True)
server.add_generic_rpc_handlers((_GenericHandler(),))
server_fallback_creds = grpc.insecure_server_credentials()
server_creds = grpc.xds_server_credentials(server_fallback_creds)
port = server.add_secure_port("localhost:0", server_creds)
server.start()
server.stop(None)
# No exceptions thrown. A more comprehensive suite of tests will be
# provided by the interop tests.
if __name__ == "__main__":
logging.basicConfig()
unittest.main()
Loading…
Cancel
Save