Revert "Merge pull request #25365 from gnossen/python_xds_creds"

This reverts commit b80d374b93, reversing
changes made to df062598c0.
pull/25407/head
Richard Belleville 4 years ago
parent d262d6866d
commit 7bbefc8365
  1. 45
      src/python/grpcio/grpc/__init__.py
  2. 3
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  3. 7
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
  4. 32
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  5. 20
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  6. 4
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  7. 8
      src/python/grpcio/grpc/_server.py
  8. 11
      src/python/grpcio/grpc/_simple_stubs.py
  9. 10
      src/python/grpcio/grpc/experimental/__init__.py
  10. 1
      src/python/grpcio_tests/tests/tests.json
  11. 1
      src/python/grpcio_tests/tests/unit/BUILD.bazel
  12. 6
      src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
  13. 2
      src/python/grpcio_tests/tests/unit/_cython/_common.py
  14. 2
      src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
  15. 2
      src/python/grpcio_tests/tests/unit/_cython/_server_test.py
  16. 20
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
  17. 103
      src/python/grpcio_tests/tests/unit/_xds_credentials_test.py

@ -1607,21 +1607,6 @@ def ssl_channel_credentials(root_certificates=None,
certificate_chain)) certificate_chain))
def xds_channel_credentials(fallback_credentials=None):
"""Creates a ChannelCredentials for use with xDS. This is an EXPERIMENTAL
API.
Args:
fallback_credentials: Credentials to use in case it is not possible to
establish a secure connection via xDS. If no fallback_credentials
argument is supplied, a default SSLChannelCredentials is used.
"""
fallback_credentials = ssl_channel_credentials(
) if fallback_credentials is None else fallback_credentials
return ChannelCredentials(
_cygrpc.XDSChannelCredentials(fallback_credentials._credentials))
def metadata_call_credentials(metadata_plugin, name=None): def metadata_call_credentials(metadata_plugin, name=None):
"""Construct CallCredentials from an AuthMetadataPlugin. """Construct CallCredentials from an AuthMetadataPlugin.
@ -1721,29 +1706,6 @@ def ssl_server_credentials(private_key_certificate_chain_pairs,
], require_client_auth)) ], require_client_auth))
def xds_server_credentials(fallback_credentials):
"""Creates a ServerCredentials for use with xDS. This is an EXPERIMENTAL
API.
Args:
fallback_credentials: Credentials to use in case it is not possible to
establish a secure connection via xDS. No default value is provided.
"""
return ServerCredentials(
_cygrpc.xds_server_credentials(fallback_credentials._credentials))
def insecure_server_credentials():
"""Creates a credentials object directing the server to use no credentials.
This is an EXPERIMENTAL API.
This object cannot be used directly in a call to `add_secure_port`.
Instead, it should be used to construct other credentials objects, e.g.
with xds_server_credentials.
"""
return ServerCredentials(_cygrpc.insecure_server_credentials())
def ssl_server_certificate_configuration(private_key_certificate_chain_pairs, def ssl_server_certificate_configuration(private_key_certificate_chain_pairs,
root_certificates=None): root_certificates=None):
"""Creates a ServerCertificateConfiguration for use with a Server. """Creates a ServerCertificateConfiguration for use with a Server.
@ -2019,8 +1981,7 @@ def server(thread_pool,
interceptors=None, interceptors=None,
options=None, options=None,
maximum_concurrent_rpcs=None, maximum_concurrent_rpcs=None,
compression=None, compression=None):
xds=False):
"""Creates a Server with which RPCs can be serviced. """Creates a Server with which RPCs can be serviced.
Args: Args:
@ -2041,8 +2002,6 @@ def server(thread_pool,
compression: An element of grpc.compression, e.g. compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This compression algorithm will be used for the grpc.compression.Gzip. This compression algorithm will be used for the
lifetime of the server unless overridden. This is an EXPERIMENTAL option. lifetime of the server unless overridden. This is an EXPERIMENTAL option.
xds: If set to true, retrieves server configuration via xDS. This is an
EXPERIMENTAL option.
Returns: Returns:
A Server object. A Server object.
@ -2052,7 +2011,7 @@ def server(thread_pool,
() if handlers is None else handlers, () if handlers is None else handlers,
() if interceptors is None else interceptors, () if interceptors is None else interceptors,
() if options is None else options, () if options is None else options,
maximum_concurrent_rpcs, compression, xds) maximum_concurrent_rpcs, compression)
@contextlib.contextmanager @contextlib.contextmanager

@ -822,8 +822,7 @@ cdef class AioServer:
init_grpc_aio() init_grpc_aio()
# NOTE(lidiz) Core objects won't be deallocated automatically. # NOTE(lidiz) Core objects won't be deallocated automatically.
# If AioServer.shutdown is not called, those objects will leak. # If AioServer.shutdown is not called, those objects will leak.
# TODO(rbellevi): Support xDS in aio server. self._server = Server(options)
self._server = Server(options, False)
grpc_server_register_completion_queue( grpc_server_register_completion_queue(
self._server.c_server, self._server.c_server,
global_completion_queue(), global_completion_queue(),

@ -76,13 +76,6 @@ cdef class CompositeChannelCredentials(ChannelCredentials):
cdef grpc_channel_credentials *c(self) except * cdef grpc_channel_credentials *c(self) except *
cdef class XDSChannelCredentials(ChannelCredentials):
cdef readonly ChannelCredentials _fallback_credentials
cdef grpc_channel_credentials *c(self) except *
cdef class ServerCertificateConfig: cdef class ServerCertificateConfig:
cdef grpc_ssl_server_certificate_config *c_cert_config cdef grpc_ssl_server_certificate_config *c_cert_config

@ -178,18 +178,6 @@ cdef class CompositeChannelCredentials(ChannelCredentials):
return c_composition return c_composition
cdef class XDSChannelCredentials(ChannelCredentials):
def __cinit__(self, fallback_credentials):
self._fallback_credentials = fallback_credentials
cdef grpc_channel_credentials *c(self) except *:
cdef grpc_channel_credentials *c_fallback_creds = self._fallback_credentials.c()
cdef grpc_channel_credentials *xds_creds = grpc_xds_credentials_create(c_fallback_creds)
grpc_channel_credentials_release(c_fallback_creds)
return xds_creds
cdef class ServerCertificateConfig: cdef class ServerCertificateConfig:
def __cinit__(self): def __cinit__(self):
@ -359,31 +347,11 @@ cdef class LocalChannelCredentials(ChannelCredentials):
def channel_credentials_local(grpc_local_connect_type local_connect_type): def channel_credentials_local(grpc_local_connect_type local_connect_type):
return LocalChannelCredentials(local_connect_type) return LocalChannelCredentials(local_connect_type)
cdef class InsecureChannelCredentials(ChannelCredentials):
cdef grpc_channel_credentials *c(self) except *:
return grpc_insecure_credentials_create()
def channel_credentials_insecure():
return InsecureChannelCredentials()
def server_credentials_local(grpc_local_connect_type local_connect_type): def server_credentials_local(grpc_local_connect_type local_connect_type):
cdef ServerCredentials credentials = ServerCredentials() cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type) credentials.c_credentials = grpc_local_server_credentials_create(local_connect_type)
return credentials return credentials
def xds_server_credentials(ServerCredentials fallback_credentials):
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_xds_server_credentials_create(fallback_credentials.c_credentials)
# NOTE: We do not need to call grpc_server_credentials_release on the
# fallback credentials here because this will be done by the __dealloc__
# method of its Cython wrapper.
return credentials
def insecure_server_credentials():
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = grpc_insecure_server_credentials_create()
return credentials
cdef class ALTSChannelCredentials(ChannelCredentials): cdef class ALTSChannelCredentials(ChannelCredentials):

@ -397,16 +397,6 @@ cdef extern from "grpc/grpc.h":
void grpc_server_register_completion_queue(grpc_server *server, void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq, grpc_completion_queue *cq,
void *reserved) nogil void *reserved) nogil
ctypedef struct grpc_server_config_fetcher:
pass
void grpc_server_set_config_fetcher(
grpc_server* server, grpc_server_config_fetcher* config_fetcher) nogil
grpc_server_config_fetcher* grpc_server_config_fetcher_xds_create() nogil
int grpc_server_add_insecure_http2_port( int grpc_server_add_insecure_http2_port(
grpc_server *server, const char *addr) nogil grpc_server *server, const char *addr) nogil
void grpc_server_start(grpc_server *server) nogil void grpc_server_start(grpc_server *server) nogil
@ -524,16 +514,6 @@ cdef extern from "grpc/grpc_security.h":
void *reserved) nogil void *reserved) nogil
void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
grpc_channel_credentials *grpc_xds_credentials_create(
grpc_channel_credentials *fallback_creds) nogil
grpc_channel_credentials *grpc_insecure_credentials_create() nogil
grpc_server_credentials *grpc_xds_server_credentials_create(
grpc_server_credentials *fallback_creds) nogil
grpc_server_credentials *grpc_insecure_server_credentials_create() nogil
grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *grpc_composite_call_credentials_create(
grpc_call_credentials *creds1, grpc_call_credentials *creds2, grpc_call_credentials *creds1, grpc_call_credentials *creds2,
void *reserved) nogil void *reserved) nogil

@ -15,7 +15,7 @@
cdef class Server: cdef class Server:
def __cinit__(self, object arguments, bint xds): def __cinit__(self, object arguments):
fork_handlers_and_grpc_init() fork_handlers_and_grpc_init()
self.references = [] self.references = []
self.registered_completion_queues = [] self.registered_completion_queues = []
@ -25,8 +25,6 @@ cdef class Server:
self.c_server = NULL self.c_server = NULL
cdef _ChannelArgs channel_args = _ChannelArgs(arguments) cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
self.c_server = grpc_server_create(channel_args.c_args(), NULL) self.c_server = grpc_server_create(channel_args.c_args(), NULL)
if xds:
grpc_server_set_config_fetcher(self.c_server, grpc_server_config_fetcher_xds_create())
self.references.append(arguments) self.references.append(arguments)
def request_call( def request_call(

@ -945,9 +945,9 @@ class _Server(grpc.Server):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def __init__(self, thread_pool, generic_handlers, interceptors, options, def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression, xds): maximum_concurrent_rpcs, compression):
completion_queue = cygrpc.CompletionQueue() completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_augment_options(options, compression), xds) server = cygrpc.Server(_augment_options(options, compression))
server.register_completion_queue(completion_queue) server.register_completion_queue(completion_queue)
self._state = _ServerState(completion_queue, server, generic_handlers, self._state = _ServerState(completion_queue, server, generic_handlers,
_interceptor.service_pipeline(interceptors), _interceptor.service_pipeline(interceptors),
@ -989,7 +989,7 @@ class _Server(grpc.Server):
def create_server(thread_pool, generic_rpc_handlers, interceptors, options, def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
maximum_concurrent_rpcs, compression, xds): maximum_concurrent_rpcs, compression):
_validate_generic_rpc_handlers(generic_rpc_handlers) _validate_generic_rpc_handlers(generic_rpc_handlers)
return _Server(thread_pool, generic_rpc_handlers, interceptors, options, return _Server(thread_pool, generic_rpc_handlers, interceptors, options,
maximum_concurrent_rpcs, compression, xds) maximum_concurrent_rpcs, compression)

@ -60,9 +60,16 @@ else:
def _create_channel(target: str, options: Sequence[Tuple[str, str]], def _create_channel(target: str, options: Sequence[Tuple[str, str]],
channel_credentials: Optional[grpc.ChannelCredentials], channel_credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression]) -> grpc.Channel: compression: Optional[grpc.Compression]) -> grpc.Channel:
if channel_credentials is grpc.experimental.insecure_channel_credentials():
_LOGGER.debug(f"Creating insecure channel with options '{options}' " +
f"and compression '{compression}'")
return grpc.insecure_channel(target,
options=options,
compression=compression)
else:
_LOGGER.debug( _LOGGER.debug(
f"Creating secure channel with credentials '{channel_credentials}', " + f"Creating secure channel with credentials '{channel_credentials}', "
f"options '{options}' and compression '{compression}'") + f"options '{options}' and compression '{compression}'")
return grpc.secure_channel(target, return grpc.secure_channel(target,
credentials=channel_credentials, credentials=channel_credentials,
options=options, options=options,

@ -22,7 +22,6 @@ import sys
import warnings import warnings
import grpc import grpc
from grpc._cython import cygrpc as _cygrpc
_EXPERIMENTAL_APIS_USED = set() _EXPERIMENTAL_APIS_USED = set()
@ -42,16 +41,19 @@ class UsageError(Exception):
"""Raised by the gRPC library to indicate usage not allowed by the API.""" """Raised by the gRPC library to indicate usage not allowed by the API."""
# It's important that there be a single insecure credentials object so that its _insecure_channel_credentials_sentinel = object()
# hash is deterministic and can be used for indexing in the simple stubs cache.
_insecure_channel_credentials = grpc.ChannelCredentials( _insecure_channel_credentials = grpc.ChannelCredentials(
_cygrpc.channel_credentials_insecure()) _insecure_channel_credentials_sentinel)
def insecure_channel_credentials(): def insecure_channel_credentials():
"""Creates a ChannelCredentials for use with an insecure channel. """Creates a ChannelCredentials for use with an insecure channel.
THIS IS AN EXPERIMENTAL API. THIS IS AN EXPERIMENTAL API.
This is not for use with secure_channel function. Intead, this should be
used with grpc.unary_unary, grpc.unary_stream, grpc.stream_unary, or
grpc.stream_stream.
""" """
return _insecure_channel_credentials return _insecure_channel_credentials

@ -77,7 +77,6 @@
"unit._session_cache_test.SSLSessionCacheTest", "unit._session_cache_test.SSLSessionCacheTest",
"unit._signal_handling_test.SignalHandlingTest", "unit._signal_handling_test.SignalHandlingTest",
"unit._version_test.VersionTest", "unit._version_test.VersionTest",
"unit._xds_credentials_test.XdsCredentialsTest",
"unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.BetaFeaturesTest",
"unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest",
"unit.beta._connectivity_channel_test.ConnectivityStatesTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest",

@ -40,7 +40,6 @@ GRPCIO_TESTS_UNIT = [
"_server_shutdown_test.py", "_server_shutdown_test.py",
"_server_wait_for_termination_test.py", "_server_wait_for_termination_test.py",
"_session_cache_test.py", "_session_cache_test.py",
"_xds_credentials_test.py",
] ]
py_library( py_library(

@ -144,10 +144,12 @@ class CancelManyCallsTest(unittest.TestCase):
test_constants.THREAD_CONCURRENCY) test_constants.THREAD_CONCURRENCY)
server_completion_queue = cygrpc.CompletionQueue() server_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server([( server = cygrpc.Server([
(
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
)], False) ),
])
server.register_completion_queue(server_completion_queue) server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0') port = server.add_http2_port(b'[::]:0')
server.start() server.start()

@ -96,7 +96,7 @@ class RpcTest(object):
def setUp(self): def setUp(self):
self.server_completion_queue = cygrpc.CompletionQueue() self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)], False) self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)])
self.server.register_completion_queue(self.server_completion_queue) self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0') port = self.server.add_http2_port(b'[::]:0')
self.server.start() self.server.start()

@ -115,7 +115,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server = cygrpc.Server([( server = cygrpc.Server([(
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
)], False) )])
server.register_completion_queue(server_completion_queue) server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0') port = server.add_http2_port(b'[::]:0')
server.start() server.start()

@ -25,7 +25,7 @@ class Test(unittest.TestCase):
def test_lonely_server(self): def test_lonely_server(self):
server_call_completion_queue = cygrpc.CompletionQueue() server_call_completion_queue = cygrpc.CompletionQueue()
server_shutdown_completion_queue = cygrpc.CompletionQueue() server_shutdown_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(None, False) server = cygrpc.Server(None)
server.register_completion_queue(server_call_completion_queue) server.register_completion_queue(server_call_completion_queue)
server.register_completion_queue(server_shutdown_completion_queue) server.register_completion_queue(server_shutdown_completion_queue)
port = server.add_http2_port(b'[::]:0') port = server.add_http2_port(b'[::]:0')

@ -42,10 +42,12 @@ class TypeSmokeTest(unittest.TestCase):
del completion_queue del completion_queue
def testServerUpDown(self): def testServerUpDown(self):
server = cygrpc.Server(set([( server = cygrpc.Server(set([
(
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
)]), False) ),
]))
del server del server
def testChannelUpDown(self): def testChannelUpDown(self):
@ -57,10 +59,12 @@ class TypeSmokeTest(unittest.TestCase):
b'test plugin name!') b'test plugin name!')
def testServerStartNoExplicitShutdown(self): def testServerStartNoExplicitShutdown(self):
server = cygrpc.Server([( server = cygrpc.Server([
(
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
)], False) ),
])
completion_queue = cygrpc.CompletionQueue() completion_queue = cygrpc.CompletionQueue()
server.register_completion_queue(completion_queue) server.register_completion_queue(completion_queue)
port = server.add_http2_port(b'[::]:0') port = server.add_http2_port(b'[::]:0')
@ -75,7 +79,7 @@ class TypeSmokeTest(unittest.TestCase):
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
), ),
], False) ])
server.add_http2_port(b'[::]:0') server.add_http2_port(b'[::]:0')
server.register_completion_queue(completion_queue) server.register_completion_queue(completion_queue)
server.start() server.start()
@ -93,10 +97,12 @@ class ServerClientMixin(object):
def setUpMixin(self, server_credentials, client_credentials, host_override): def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue() self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server([( self.server = cygrpc.Server([
(
b'grpc.so_reuseport', b'grpc.so_reuseport',
0, 0,
)], False) ),
])
self.server.register_completion_queue(self.server_completion_queue) self.server.register_completion_queue(self.server_completion_queue)
if server_credentials: if server_credentials:
self.port = self.server.add_http2_port(b'[::]:0', self.port = self.server.add_http2_port(b'[::]:0',

@ -1,103 +0,0 @@
# Copyright 2021 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests xDS server and channel credentials."""
import unittest
import logging
from concurrent import futures
import contextlib
import grpc
import grpc.experimental
from tests.unit import test_common
from tests.unit import resources
class _GenericHandler(grpc.GenericRpcHandler):
def service(self, handler_call_details):
return grpc.unary_unary_rpc_method_handler(
lambda request, unused_context: request)
@contextlib.contextmanager
def xds_channel_server_without_xds(server_fallback_creds):
server = grpc.server(futures.ThreadPoolExecutor())
server.add_generic_rpc_handlers((_GenericHandler(),))
server_server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
server_creds = grpc.xds_server_credentials(server_fallback_creds)
port = server.add_secure_port("localhost:0", server_creds)
server.start()
try:
yield "localhost:{}".format(port)
finally:
server.stop(None)
class XdsCredentialsTest(unittest.TestCase):
def test_xds_creds_fallback_ssl(self):
# Since there is no xDS server, the fallback credentials will be used.
# In this case, SSL credentials.
server_fallback_creds = grpc.ssl_server_credentials(
((resources.private_key(), resources.certificate_chain()),))
with xds_channel_server_without_xds(
server_fallback_creds) as server_address:
override_options = (("grpc.ssl_target_name_override",
"foo.test.google.fr"),)
channel_fallback_creds = grpc.ssl_channel_credentials(
root_certificates=resources.test_root_certificates(),
private_key=resources.private_key(),
certificate_chain=resources.certificate_chain())
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
with grpc.secure_channel(server_address,
channel_creds,
options=override_options) as channel:
request = b"abc"
response = channel.unary_unary("/test/method")(
request, wait_for_ready=True)
self.assertEqual(response, request)
def test_xds_creds_fallback_insecure(self):
# Since there is no xDS server, the fallback credentials will be used.
# In this case, insecure.
server_fallback_creds = grpc.insecure_server_credentials()
with xds_channel_server_without_xds(
server_fallback_creds) as server_address:
channel_fallback_creds = grpc.experimental.insecure_channel_credentials(
)
channel_creds = grpc.xds_channel_credentials(channel_fallback_creds)
with grpc.secure_channel(server_address, channel_creds) as channel:
request = b"abc"
response = channel.unary_unary("/test/method")(
request, wait_for_ready=True)
self.assertEqual(response, request)
def test_start_xds_server(self):
server = grpc.server(futures.ThreadPoolExecutor(), xds=True)
server.add_generic_rpc_handlers((_GenericHandler(),))
server_fallback_creds = grpc.insecure_server_credentials()
server_creds = grpc.xds_server_credentials(server_fallback_creds)
port = server.add_secure_port("localhost:0", server_creds)
server.start()
server.stop(None)
# No exceptions thrown. A more comprehensive suite of tests will be
# provided by the interop tests.
if __name__ == "__main__":
logging.basicConfig()
unittest.main()
Loading…
Cancel
Save