diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc index 2c76fae7fd6..ffadf0b1abd 100644 --- a/src/core/lib/iomgr/tcp_server_uv.cc +++ b/src/core/lib/iomgr/tcp_server_uv.cc @@ -260,15 +260,36 @@ static void on_connect(uv_stream_t* server, int status) { grpc_exec_ctx_finish(&exec_ctx); } -static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, - const grpc_resolved_address* addr, - unsigned port_index, - grpc_tcp_listener** listener) { +static grpc_error* add_addr_to_server(grpc_tcp_server* s, + const grpc_resolved_address* addr, + unsigned port_index, + grpc_tcp_listener** listener) { grpc_tcp_listener* sp = NULL; int port = -1; int status; grpc_error* error; grpc_resolved_address sockname_temp; + uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); + int family = grpc_sockaddr_get_family(addr); + + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t*)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to initialize UV tcp handle"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + return error; + } // The last argument to uv_tcp_bind is flags status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0); @@ -325,20 +346,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle, return GRPC_ERROR_NONE; } +static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s, + unsigned port_index, + int requested_port, + grpc_tcp_listener** listener) { + grpc_resolved_address wild4; + grpc_resolved_address wild6; + grpc_tcp_listener* sp = nullptr; + grpc_tcp_listener* sp2 = nullptr; + grpc_error* v6_err = GRPC_ERROR_NONE; + grpc_error* v4_err = GRPC_ERROR_NONE; + + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); + /* Try listening on IPv6 first. */ + if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) == + GRPC_ERROR_NONE) { + *listener = sp; + return GRPC_ERROR_NONE; + } + + if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) == + GRPC_ERROR_NONE) { + *listener = sp2; + return GRPC_ERROR_NONE; + } + + grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Failed to add any wildcard listeners"); + root_err = grpc_error_add_child(root_err, v6_err); + root_err = grpc_error_add_child(root_err, v4_err); + return root_err; +} + grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, const grpc_resolved_address* addr, int* port) { // This function is mostly copied from tcp_server_windows.c grpc_tcp_listener* sp = NULL; - uv_tcp_t* handle; grpc_resolved_address addr6_v4mapped; - grpc_resolved_address wildcard; grpc_resolved_address* allocated_addr = NULL; grpc_resolved_address sockname_temp; unsigned port_index = 0; - int status; grpc_error* error = GRPC_ERROR_NONE; - int family; GRPC_UV_ASSERT_SAME_THREAD(); @@ -367,38 +416,15 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s, } } - if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = &addr6_v4mapped; - } - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, port)) { - grpc_sockaddr_make_wildcard6(*port, &wildcard); - - addr = &wildcard; - } - - handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t)); - - family = grpc_sockaddr_get_family(addr); - status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); -#if defined(GPR_LINUX) && defined(SO_REUSEPORT) - if (family == AF_INET || family == AF_INET6) { - int fd; - uv_fileno((uv_handle_t*)handle, &fd); - int enable = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); - } -#endif /* GPR_LINUX && SO_REUSEPORT */ - - if (status == 0) { - error = add_socket_to_server(s, handle, addr, port_index, &sp); + error = add_wildcard_addrs_to_server(s, port_index, *port, &sp); } else { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Failed to initialize UV tcp handle"); - error = - grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, - grpc_slice_from_static_string(uv_strerror(status))); + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = &addr6_v4mapped; + } + + error = add_addr_to_server(s, addr, port_index, &sp); } gpr_free(allocated_addr); diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 558ce42129b..564772527e8 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -348,26 +348,25 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): class ChannelCredentials(object): """An encapsulation of the data required to create a secure Channel. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. For - example, ssl_channel_credentials returns an instance, and secure_channel - consumes an instance of this class. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. For + example, ssl_channel_credentials returns an instance of this class and + secure_channel requires an instance of this class. + """ def __init__(self, credentials): self._credentials = credentials class CallCredentials(object): - """An encapsulation of the data required to assert an identity over a - channel. + """An encapsulation of the data required to assert an identity over a call. - A CallCredentials may be composed with ChannelCredentials to always assert - identity for every call over that Channel. + A CallCredentials may be composed with ChannelCredentials to always assert + identity for every call over that Channel. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. + """ def __init__(self, credentials): self._credentials = credentials @@ -376,23 +375,22 @@ class CallCredentials(object): class AuthMetadataContext(six.with_metaclass(abc.ABCMeta)): """Provides information to call credentials metadata plugins. - Attributes: - service_url: A string URL of the service being called into. - method_name: A string of the fully qualified method name being called. - """ + Attributes: + service_url: A string URL of the service being called into. + method_name: A string of the fully qualified method name being called. + """ class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)): """Callback object received by a metadata plugin.""" def __call__(self, metadata, error): - """Inform the gRPC runtime of the metadata to construct a - CallCredentials. + """Passes to the gRPC runtime authentication metadata for an RPC. - Args: - metadata: The :term:`metadata` used to construct the CallCredentials. - error: An Exception to indicate error or None to indicate success. - """ + Args: + metadata: The :term:`metadata` used to construct the CallCredentials. + error: An Exception to indicate error or None to indicate success. + """ raise NotImplementedError() @@ -402,14 +400,14 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)): def __call__(self, context, callback): """Implements authentication by passing metadata to a callback. - Implementations of this method must not block. + Implementations of this method must not block. - Args: - context: An AuthMetadataContext providing information on the RPC that the - plugin is being called to authenticate. - callback: An AuthMetadataPluginCallback to be invoked either synchronously - or asynchronously. - """ + Args: + context: An AuthMetadataContext providing information on the RPC that + the plugin is being called to authenticate. + callback: An AuthMetadataPluginCallback to be invoked either + synchronously or asynchronously. + """ raise NotImplementedError() @@ -1138,99 +1136,86 @@ def ssl_channel_credentials(root_certificates=None, certificate_chain=None): """Creates a ChannelCredentials for use with an SSL-enabled Channel. - Args: - root_certificates: The PEM-encoded root certificates as a byte string, - or None to retrieve them from a default location chosen by gRPC runtime. - private_key: The PEM-encoded private key as a byte string, or None if no - private key should be used. - certificate_chain: The PEM-encoded certificate chain as a byte string - to use or or None if no certificate chain should be used. + Args: + root_certificates: The PEM-encoded root certificates as a byte string, + or None to retrieve them from a default location chosen by gRPC + runtime. + private_key: The PEM-encoded private key as a byte string, or None if no + private key should be used. + certificate_chain: The PEM-encoded certificate chain as a byte string + to use or or None if no certificate chain should be used. - Returns: - A ChannelCredentials for use with an SSL-enabled Channel. - """ - if private_key is not None or certificate_chain is not None: - pair = _cygrpc.SslPemKeyCertPair(private_key, certificate_chain) - else: - pair = None + Returns: + A ChannelCredentials for use with an SSL-enabled Channel. + """ return ChannelCredentials( - _cygrpc.channel_credentials_ssl(root_certificates, pair)) + _cygrpc.SSLChannelCredentials(root_certificates, private_key, + certificate_chain)) def metadata_call_credentials(metadata_plugin, name=None): """Construct CallCredentials from an AuthMetadataPlugin. - Args: - metadata_plugin: An AuthMetadataPlugin to use for authentication. - name: An optional name for the plugin. + Args: + metadata_plugin: An AuthMetadataPlugin to use for authentication. + name: An optional name for the plugin. - Returns: - A CallCredentials. - """ + Returns: + A CallCredentials. + """ from grpc import _plugin_wrapping # pylint: disable=cyclic-import - if name is None: - try: - effective_name = metadata_plugin.__name__ - except AttributeError: - effective_name = metadata_plugin.__class__.__name__ - else: - effective_name = name - return CallCredentials( - _plugin_wrapping.call_credentials_metadata_plugin(metadata_plugin, - effective_name)) + return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin, + name) def access_token_call_credentials(access_token): """Construct CallCredentials from an access token. - Args: - access_token: A string to place directly in the http request - authorization header, for example - "authorization: Bearer ". + Args: + access_token: A string to place directly in the http request + authorization header, for example + "authorization: Bearer ". - Returns: - A CallCredentials. - """ + Returns: + A CallCredentials. + """ from grpc import _auth # pylint: disable=cyclic-import - return metadata_call_credentials( - _auth.AccessTokenCallCredentials(access_token)) + from grpc import _plugin_wrapping # pylint: disable=cyclic-import + return _plugin_wrapping.metadata_plugin_call_credentials( + _auth.AccessTokenAuthMetadataPlugin(access_token), None) def composite_call_credentials(*call_credentials): """Compose multiple CallCredentials to make a new CallCredentials. - Args: - *call_credentials: At least two CallCredentials objects. + Args: + *call_credentials: At least two CallCredentials objects. - Returns: - A CallCredentials object composed of the given CallCredentials objects. - """ - from grpc import _credential_composition # pylint: disable=cyclic-import - cygrpc_call_credentials = tuple( - single_call_credentials._credentials - for single_call_credentials in call_credentials) + Returns: + A CallCredentials object composed of the given CallCredentials objects. + """ return CallCredentials( - _credential_composition.call(cygrpc_call_credentials)) + _cygrpc.CompositeCallCredentials( + tuple(single_call_credentials._credentials + for single_call_credentials in call_credentials))) def composite_channel_credentials(channel_credentials, *call_credentials): """Compose a ChannelCredentials and one or more CallCredentials objects. - Args: - channel_credentials: A ChannelCredentials object. - *call_credentials: One or more CallCredentials objects. + Args: + channel_credentials: A ChannelCredentials object. + *call_credentials: One or more CallCredentials objects. - Returns: - A ChannelCredentials composed of the given ChannelCredentials and - CallCredentials objects. - """ - from grpc import _credential_composition # pylint: disable=cyclic-import - cygrpc_call_credentials = tuple( - single_call_credentials._credentials - for single_call_credentials in call_credentials) + Returns: + A ChannelCredentials composed of the given ChannelCredentials and + CallCredentials objects. + """ return ChannelCredentials( - _credential_composition.channel(channel_credentials._credentials, - cygrpc_call_credentials)) + _cygrpc.CompositeChannelCredentials( + tuple(single_call_credentials._credentials + for single_call_credentials in call_credentials), + channel_credentials._credentials)) def ssl_server_credentials(private_key_certificate_chain_pairs, diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index c6542d01354..9a339b59004 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -63,7 +63,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): self._pool.shutdown(wait=False) -class AccessTokenCallCredentials(grpc.AuthMetadataPlugin): +class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin): """Metadata wrapper for raw access token credentials.""" def __init__(self, access_token): diff --git a/src/python/grpcio/grpc/_credential_composition.py b/src/python/grpcio/grpc/_credential_composition.py deleted file mode 100644 index f652cc3ae79..00000000000 --- a/src/python/grpcio/grpc/_credential_composition.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from grpc._cython import cygrpc - - -def _call(call_credentialses): - call_credentials_iterator = iter(call_credentialses) - composition = next(call_credentials_iterator) - for additional_call_credentials in call_credentials_iterator: - composition = cygrpc.call_credentials_composite( - composition, additional_call_credentials) - return composition - - -def call(call_credentialses): - return _call(call_credentialses) - - -def channel(channel_credentials, call_credentialses): - return cygrpc.channel_credentials_composite(channel_credentials, - _call(call_credentialses)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 752fb330d01..6b3a276097f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -72,13 +72,12 @@ cdef class Call: result = grpc_call_cancel(self.c_call, NULL) return result - def set_credentials( - self, CallCredentials call_credentials not None): - cdef grpc_call_error result - with nogil: - result = grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) - return result + def set_credentials(self, CallCredentials call_credentials not None): + cdef grpc_call_credentials *c_call_credentials = call_credentials.c() + cdef grpc_call_error call_error = grpc_call_set_credentials( + self.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + return call_error def peer(self): cdef char *peer = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index aeabdba021d..4c397f8f644 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -33,10 +33,10 @@ cdef class Channel: self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL) else: - with nogil: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, c_target, c_arguments, NULL) - self.references.append(channel_credentials) + c_channel_credentials = channel_credentials.c() + self.c_channel = grpc_secure_channel_create( + c_channel_credentials, c_target, c_arguments, NULL) + grpc_channel_credentials_release(c_channel_credentials) self.references.append(target) self.references.append(arguments) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index bc0f185c77d..7e9ea33ca04 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -12,20 +12,66 @@ # See the License for the specific language governing permissions and # limitations under the License. -cimport cpython + +cdef class CallCredentials: + + cdef grpc_call_credentials *c(self) + + # TODO(https://github.com/grpc/grpc/issues/12531): remove. + cdef grpc_call_credentials *c_credentials + + +cdef int _get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil + +cdef void _destroy(void *state) with gil + + +cdef class MetadataPluginCallCredentials(CallCredentials): + + cdef readonly object _metadata_plugin + cdef readonly bytes _name + + cdef grpc_call_credentials *c(self) + + +cdef grpc_call_credentials *_composition(call_credentialses) + + +cdef class CompositeCallCredentials(CallCredentials): + + cdef readonly tuple _call_credentialses + + cdef grpc_call_credentials *c(self) cdef class ChannelCredentials: + cdef grpc_channel_credentials *c(self) + + # TODO(https://github.com/grpc/grpc/issues/12531): remove. cdef grpc_channel_credentials *c_credentials - cdef grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair - cdef list references -cdef class CallCredentials: +cdef class SSLChannelCredentials(ChannelCredentials): - cdef grpc_call_credentials *c_credentials - cdef list references + cdef readonly object _pem_root_certificates + cdef readonly object _private_key + cdef readonly object _certificate_chain + + cdef grpc_channel_credentials *c(self) + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + cdef readonly tuple _call_credentialses + cdef readonly ChannelCredentials _channel_credentials + + cdef grpc_channel_credentials *c(self) cdef class ServerCertificateConfig: @@ -49,27 +95,3 @@ cdef class ServerCredentials: cdef object cert_config_fetcher # whether C-core has asked for the initial_cert_config cdef bint initial_cert_config_fetched - - -cdef class CredentialsMetadataPlugin: - - cdef object plugin_callback - cdef bytes plugin_name - - -cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin) - - -cdef class AuthMetadataContext: - - cdef grpc_auth_metadata_context context - - -cdef int plugin_get_metadata( - void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data, - grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], - size_t *num_creds_md, grpc_status_code *status, - const char **error_details) with gil - -cdef void plugin_destroy_c_plugin_state(void *state) with gil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index ee1328d2e9f..246a2718934 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -16,47 +16,123 @@ cimport cpython import grpc import threading -import traceback -cdef class ChannelCredentials: +cdef class CallCredentials: - def __cinit__(self): - grpc_init() - self.c_credentials = NULL - self.c_ssl_pem_key_cert_pair.private_key = NULL - self.c_ssl_pem_key_cert_pair.certificate_chain = NULL - self.references = [] + cdef grpc_call_credentials *c(self): + raise NotImplementedError() - # The object *can* be invalid in Python if we fail to make the credentials - # (and the core thus returns NULL credentials). Used primarily for debugging. - @property - def is_valid(self): - return self.c_credentials != NULL - def __dealloc__(self): - if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) - grpc_shutdown() +cdef int _get_metadata( + void *state, grpc_auth_metadata_context context, + grpc_credentials_plugin_metadata_cb cb, void *user_data, + grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], + size_t *num_creds_md, grpc_status_code *status, + const char **error_details) with gil: + def callback(Metadata metadata, grpc_status_code status, bytes error_details): + if status is StatusCode.ok: + cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL) + else: + cb(user_data, NULL, 0, status, error_details) + args = context.service_url, context.method_name, callback, + threading.Thread(target=state, args=args).start() + return 0 # Asynchronous return -cdef class CallCredentials: +cdef void _destroy(void *state) with gil: + cpython.Py_DECREF(state) - def __cinit__(self): - grpc_init() - self.c_credentials = NULL - self.references = [] - # The object *can* be invalid in Python if we fail to make the credentials - # (and the core thus returns NULL credentials). Used primarily for debugging. - @property - def is_valid(self): - return self.c_credentials != NULL +cdef class MetadataPluginCallCredentials(CallCredentials): - def __dealloc__(self): - if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) - grpc_shutdown() + def __cinit__(self, metadata_plugin, name): + self._metadata_plugin = metadata_plugin + self._name = name + + cdef grpc_call_credentials *c(self): + cdef grpc_metadata_credentials_plugin c_metadata_plugin + c_metadata_plugin.get_metadata = _get_metadata + c_metadata_plugin.destroy = _destroy + c_metadata_plugin.state = self._metadata_plugin + c_metadata_plugin.type = self._name + cpython.Py_INCREF(self._metadata_plugin) + return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, NULL) + + +cdef grpc_call_credentials *_composition(call_credentialses): + call_credentials_iterator = iter(call_credentialses) + cdef CallCredentials composition = next(call_credentials_iterator) + cdef grpc_call_credentials *c_composition = composition.c() + cdef CallCredentials additional_call_credentials + cdef grpc_call_credentials *c_additional_call_credentials + cdef grpc_call_credentials *c_next_composition + for additional_call_credentials in call_credentials_iterator: + c_additional_call_credentials = additional_call_credentials.c() + c_next_composition = grpc_composite_call_credentials_create( + c_composition, c_additional_call_credentials, NULL) + grpc_call_credentials_release(c_composition) + grpc_call_credentials_release(c_additional_call_credentials) + c_composition = c_next_composition + return c_composition + + +cdef class CompositeCallCredentials(CallCredentials): + + def __cinit__(self, call_credentialses): + self._call_credentialses = call_credentialses + + cdef grpc_call_credentials *c(self): + return _composition(self._call_credentialses) + + +cdef class ChannelCredentials: + + cdef grpc_channel_credentials *c(self): + raise NotImplementedError() + + +cdef class SSLChannelCredentials(ChannelCredentials): + + def __cinit__(self, pem_root_certificates, private_key, certificate_chain): + self._pem_root_certificates = pem_root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain + + cdef grpc_channel_credentials *c(self): + cdef const char *c_pem_root_certificates + cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair + if self._pem_root_certificates is None: + c_pem_root_certificates = NULL + else: + c_pem_root_certificates = self._pem_root_certificates + if self._private_key is None and self._certificate_chain is None: + return grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) + else: + c_pem_key_certificate_pair.private_key = self._private_key + c_pem_key_certificate_pair.certificate_chain = self._certificate_chain + return grpc_ssl_credentials_create( + c_pem_root_certificates, &c_pem_key_certificate_pair, NULL) + + +cdef class CompositeChannelCredentials(ChannelCredentials): + + def __cinit__(self, call_credentialses, channel_credentials): + self._call_credentialses = call_credentialses + self._channel_credentials = channel_credentials + + cdef grpc_channel_credentials *c(self): + cdef grpc_channel_credentials *c_channel_credentials + c_channel_credentials = self._channel_credentials.c() + cdef grpc_call_credentials *c_call_credentials_composition = _composition( + self._call_credentialses) + cdef grpc_channel_credentials *composition + c_composition = grpc_composite_channel_credentials_create( + c_channel_credentials, c_call_credentials_composition, NULL) + grpc_channel_credentials_release(c_channel_credentials) + grpc_call_credentials_release(c_call_credentials_composition) + return c_composition cdef class ServerCertificateConfig: @@ -89,190 +165,6 @@ cdef class ServerCredentials: grpc_server_credentials_release(self.c_credentials) grpc_shutdown() - -cdef class CredentialsMetadataPlugin: - - def __cinit__(self, object plugin_callback, bytes name): - """ - Args: - plugin_callback (callable): Callback accepting a service URL (str/bytes) - and callback object (accepting a MetadataArray, - grpc_status_code, and a str/bytes error message). This argument - when called should be non-blocking and eventually call the callback - object with the appropriate status code/details and metadata (if - successful). - name (bytes): Plugin name. - """ - grpc_init() - if not callable(plugin_callback): - raise ValueError('expected callable plugin_callback') - self.plugin_callback = plugin_callback - self.plugin_name = name - - def __dealloc__(self): - grpc_shutdown() - - -cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin): - cdef grpc_metadata_credentials_plugin c_plugin - c_plugin.get_metadata = plugin_get_metadata - c_plugin.destroy = plugin_destroy_c_plugin_state - c_plugin.state = plugin - c_plugin.type = plugin.plugin_name - cpython.Py_INCREF(plugin) - return c_plugin - - -cdef class AuthMetadataContext: - - def __cinit__(self): - grpc_init() - self.context.service_url = NULL - self.context.method_name = NULL - - @property - def service_url(self): - return self.context.service_url - - @property - def method_name(self): - return self.context.method_name - - def __dealloc__(self): - grpc_shutdown() - - -cdef int plugin_get_metadata( - void *state, grpc_auth_metadata_context context, - grpc_credentials_plugin_metadata_cb cb, void *user_data, - grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], - size_t *num_creds_md, grpc_status_code *status, - const char **error_details) with gil: - called_flag = [False] - def python_callback( - Metadata metadata, grpc_status_code status, - bytes error_details): - cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details) - called_flag[0] = True - cdef CredentialsMetadataPlugin self = state - cdef AuthMetadataContext cy_context = AuthMetadataContext() - cy_context.context = context - def async_callback(): - try: - self.plugin_callback(cy_context, python_callback) - except Exception as error: - if not called_flag[0]: - cb(user_data, NULL, 0, StatusCode.unknown, - traceback.format_exc().encode()) - threading.Thread(group=None, target=async_callback).start() - return 0 # Asynchronous return - -cdef void plugin_destroy_c_plugin_state(void *state) with gil: - cpython.Py_DECREF(state) - -def channel_credentials_google_default(): - cdef ChannelCredentials credentials = ChannelCredentials(); - with nogil: - credentials.c_credentials = grpc_google_default_credentials_create() - return credentials - -def channel_credentials_ssl(pem_root_certificates, - SslPemKeyCertPair ssl_pem_key_cert_pair): - pem_root_certificates = str_to_bytes(pem_root_certificates) - cdef ChannelCredentials credentials = ChannelCredentials() - cdef const char *c_pem_root_certificates = NULL - if pem_root_certificates is not None: - c_pem_root_certificates = pem_root_certificates - credentials.references.append(pem_root_certificates) - if ssl_pem_key_cert_pair is not None: - with nogil: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) - credentials.references.append(ssl_pem_key_cert_pair) - else: - with nogil: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) - return credentials - -def channel_credentials_composite( - ChannelCredentials credentials_1 not None, - CallCredentials credentials_2 not None): - if not credentials_1.is_valid or not credentials_2.is_valid: - raise ValueError("passed credentials must both be valid") - cdef ChannelCredentials credentials = ChannelCredentials() - with nogil: - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) - credentials.references.append(credentials_1) - credentials.references.append(credentials_2) - return credentials - -def call_credentials_composite( - CallCredentials credentials_1 not None, - CallCredentials credentials_2 not None): - if not credentials_1.is_valid or not credentials_2.is_valid: - raise ValueError("passed credentials must both be valid") - cdef CallCredentials credentials = CallCredentials() - with nogil: - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) - credentials.references.append(credentials_1) - credentials.references.append(credentials_2) - return credentials - -def call_credentials_google_compute_engine(): - cdef CallCredentials credentials = CallCredentials() - with nogil: - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) - return credentials - -def call_credentials_service_account_jwt_access( - json_key, Timespec token_lifetime not None): - json_key = str_to_bytes(json_key) - cdef CallCredentials credentials = CallCredentials() - cdef char *json_key_c_string = json_key - with nogil: - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key_c_string, token_lifetime.c_time, NULL)) - credentials.references.append(json_key) - return credentials - -def call_credentials_google_refresh_token(json_refresh_token): - json_refresh_token = str_to_bytes(json_refresh_token) - cdef CallCredentials credentials = CallCredentials() - cdef char *json_refresh_token_c_string = json_refresh_token - with nogil: - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token_c_string, NULL) - credentials.references.append(json_refresh_token) - return credentials - -def call_credentials_google_iam(authorization_token, authority_selector): - authorization_token = str_to_bytes(authorization_token) - authority_selector = str_to_bytes(authority_selector) - cdef CallCredentials credentials = CallCredentials() - cdef char *authorization_token_c_string = authorization_token - cdef char *authority_selector_c_string = authority_selector - with nogil: - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token_c_string, authority_selector_c_string, NULL) - credentials.references.append(authorization_token) - credentials.references.append(authority_selector) - return credentials - -def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): - cdef CallCredentials credentials = CallCredentials() - cdef grpc_metadata_credentials_plugin c_plugin = _c_plugin(plugin) - with nogil: - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) - # TODO(atash): the following held reference is *probably* never necessary - credentials.references.append(plugin) - return credentials - cdef const char* _get_c_pem_root_certs(pem_root_certs): if pem_root_certs is None: return NULL diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index bea2c0f1396..cd17f4a0493 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import logging import threading import grpc @@ -20,89 +21,79 @@ from grpc import _common from grpc._cython import cygrpc -class AuthMetadataContext( +class _AuthMetadataContext( collections.namedtuple('AuthMetadataContext', ( 'service_url', 'method_name',)), grpc.AuthMetadataContext): pass -class AuthMetadataPluginCallback(grpc.AuthMetadataContext): +class _CallbackState(object): - def __init__(self, callback): - self._callback = callback - - def __call__(self, metadata, error): - self._callback(metadata, error) + def __init__(self): + self.lock = threading.Lock() + self.called = False + self.exception = None -class _WrappedCygrpcCallback(object): +class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback): - def __init__(self, cygrpc_callback): - self.is_called = False - self.error = None - self.is_called_lock = threading.Lock() - self.cygrpc_callback = cygrpc_callback - - def _invoke_failure(self, error): - # TODO(atash) translate different Exception superclasses into different - # status codes. - self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal, - _common.encode(str(error))) - - def _invoke_success(self, metadata): - try: - cygrpc_metadata = _common.to_cygrpc_metadata(metadata) - except Exception as exception: # pylint: disable=broad-except - self._invoke_failure(exception) - return - self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'') + def __init__(self, state, callback): + self._state = state + self._callback = callback def __call__(self, metadata, error): - with self.is_called_lock: - if self.is_called: - raise RuntimeError('callback should only ever be invoked once') - if self.error: - self._invoke_failure(self.error) - return - self.is_called = True + with self._state.lock: + if self._state.exception is None: + if self._state.called: + raise RuntimeError( + 'AuthMetadataPluginCallback invoked more than once!') + else: + self._state.called = True + else: + raise RuntimeError( + 'AuthMetadataPluginCallback raised exception "{}"!'.format( + self._state.exception)) if error is None: - self._invoke_success(metadata) + self._callback( + _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok, + None) else: - self._invoke_failure(error) - - def notify_failure(self, error): - with self.is_called_lock: - if not self.is_called: - self.error = error + self._callback(None, cygrpc.StatusCode.internal, + _common.encode(str(error))) -class _WrappedPlugin(object): +class _Plugin(object): - def __init__(self, plugin): - self.plugin = plugin + def __init__(self, metadata_plugin): + self._metadata_plugin = metadata_plugin - def __call__(self, context, cygrpc_callback): - wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback) - wrapped_context = AuthMetadataContext( - _common.decode(context.service_url), - _common.decode(context.method_name)) + def __call__(self, service_url, method_name, callback): + context = _AuthMetadataContext( + _common.decode(service_url), _common.decode(method_name)) + callback_state = _CallbackState() + try: + self._metadata_plugin( + context, _AuthMetadataPluginCallback(callback_state, callback)) + except Exception as exception: # pylint: disable=broad-except + logging.exception( + 'AuthMetadataPluginCallback "%s" raised exception!', + self._metadata_plugin) + with callback_state.lock: + callback_state.exception = exception + if callback_state.called: + return + callback(None, cygrpc.StatusCode.internal, + _common.encode(str(exception))) + + +def metadata_plugin_call_credentials(metadata_plugin, name): + if name is None: try: - self.plugin(wrapped_context, - AuthMetadataPluginCallback(wrapped_cygrpc_callback)) - except Exception as error: - wrapped_cygrpc_callback.notify_failure(error) - raise - - -def call_credentials_metadata_plugin(plugin, name): - """ - Args: - plugin: A callable accepting a grpc.AuthMetadataContext - object and a callback (itself accepting a list of metadata key/value - 2-tuples and a None-able exception value). The callback must be eventually - called, but need not be called in plugin's invocation. - plugin's invocation must be non-blocking. - """ - return cygrpc.call_credentials_metadata_plugin( - cygrpc.CredentialsMetadataPlugin( - _WrappedPlugin(plugin), _common.encode(name))) + effective_name = metadata_plugin.__name__ + except AttributeError: + effective_name = metadata_plugin.__class__.__name__ + else: + effective_name = name + return grpc.CallCredentials( + cygrpc.MetadataPluginCallCredentials( + _Plugin(metadata_plugin), _common.encode(effective_name))) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index cd59b07c04c..5b4812bffe8 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -374,10 +374,10 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): context = _Context(rpc_event, state, request_deserializer) try: return behavior(argument, context), True - except Exception as e: # pylint: disable=broad-except + except Exception as exception: # pylint: disable=broad-except with state.condition: - if e not in state.rpc_errors: - details = 'Exception calling application: {}'.format(e) + if exception not in state.rpc_errors: + details = 'Exception calling application: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, cygrpc.StatusCode.unknown, _common.encode(details)) @@ -389,10 +389,10 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): return next(response_iterator), True except StopIteration: return None, True - except Exception as e: # pylint: disable=broad-except + except Exception as exception: # pylint: disable=broad-except with state.condition: - if e not in state.rpc_errors: - details = 'Exception iterating responses: {}'.format(e) + if exception not in state.rpc_errors: + details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, cygrpc.StatusCode.unknown, _common.encode(details)) @@ -591,7 +591,13 @@ def _handle_call(rpc_event, generic_handlers, thread_pool, if not rpc_event.success: return None, None if rpc_event.request_call_details.method is not None: - method_handler = _find_method_handler(rpc_event, generic_handlers) + try: + method_handler = _find_method_handler(rpc_event, generic_handlers) + except Exception as exception: # pylint: disable=broad-except + details = 'Exception servicing handler: {}'.format(exception) + logging.exception(details) + return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown, + b'Error in service handler!'), None if method_handler is None: return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented, b'Method not found!'), None diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index e277a3ea1d1..34cbade92c3 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -22,7 +22,7 @@ "unit._api_test.ChannelConnectivityTest", "unit._api_test.ChannelTest", "unit._auth_context_test.AuthContextTest", - "unit._auth_test.AccessTokenCallCredentialsTest", + "unit._auth_test.AccessTokenAuthMetadataPluginTest", "unit._auth_test.GoogleCallCredentialsTest", "unit._channel_args_test.ChannelArgsTest", "unit._channel_connectivity_test.ChannelConnectivityTest", diff --git a/src/python/grpcio_tests/tests/unit/_auth_test.py b/src/python/grpcio_tests/tests/unit/_auth_test.py index f61951b80a0..e2cb9389368 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_test.py @@ -61,7 +61,7 @@ class GoogleCallCredentialsTest(unittest.TestCase): self.assertTrue(callback_event.wait(1.0)) -class AccessTokenCallCredentialsTest(unittest.TestCase): +class AccessTokenAuthMetadataPluginTest(unittest.TestCase): def test_google_call_credentials_success(self): callback_event = threading.Event() @@ -71,8 +71,8 @@ class AccessTokenCallCredentialsTest(unittest.TestCase): self.assertIsNone(error) callback_event.set() - call_creds = _auth.AccessTokenCallCredentials('token') - call_creds(None, mock_callback) + metadata_plugin = _auth.AccessTokenAuthMetadataPlugin('token') + metadata_plugin(None, mock_callback) self.assertTrue(callback_event.wait(1.0)) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 18d4a6df64c..da94cf80286 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -28,7 +28,7 @@ _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' _EMPTY_FLAGS = 0 -def _metadata_plugin_callback(context, callback): +def _metadata_plugin(context, callback): callback( cygrpc.Metadata([ cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY, @@ -105,17 +105,9 @@ class TypeSmokeTest(unittest.TestCase): channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([])) del channel - def testCredentialsMetadataPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin( - lambda ignored_a, ignored_b: None, b'') - del plugin - - def testCallCredentialsFromPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, - b'') - call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) - del plugin - del call_credentials + def test_metadata_plugin_call_credentials_up_down(self): + cygrpc.MetadataPluginCallCredentials(_metadata_plugin, + b'test plugin name!') def testServerStartNoExplicitShutdown(self): server = cygrpc.Server(cygrpc.ChannelArgs([])) @@ -205,7 +197,7 @@ class ServerClientMixin(object): return test_utilities.SimpleFuture(performer) - def testEcho(self): + def test_echo(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 CLIENT_METADATA_ASCII_KEY = b'key' @@ -439,8 +431,8 @@ class SecureServerSecureClient(unittest.TestCase, ServerClientMixin): cygrpc.SslPemKeyCertPair(resources.private_key(), resources.certificate_chain()) ], False) - client_credentials = cygrpc.channel_credentials_ssl( - resources.test_root_certificates(), None) + client_credentials = cygrpc.SSLChannelCredentials( + resources.test_root_certificates(), None, None) self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 0a1e50c94cb..2a1a49ce747 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -32,6 +32,7 @@ _UNARY_UNARY = '/test/UnaryUnary' _UNARY_STREAM = '/test/UnaryStream' _STREAM_UNARY = '/test/StreamUnary' _STREAM_STREAM = '/test/StreamStream' +_DEFECTIVE_GENERIC_RPC_HANDLER = '/test/DefectiveGenericRpcHandler' class _Callback(object): @@ -95,6 +96,9 @@ class _Handler(object): yield request self._control.control() + def defective_generic_rpc_handler(self): + raise test_control.Defect() + class _MethodHandler(grpc.RpcMethodHandler): @@ -132,6 +136,8 @@ class _GenericHandler(grpc.GenericRpcHandler): elif handler_call_details.method == _STREAM_STREAM: return _MethodHandler(True, True, None, None, None, None, None, self._handler.handle_stream_stream) + elif handler_call_details.method == _DEFECTIVE_GENERIC_RPC_HANDLER: + return self._handler.defective_generic_rpc_handler() else: return None @@ -176,6 +182,10 @@ def _stream_stream_multi_callable(channel): return channel.stream_stream(_STREAM_STREAM) +def _defective_handler_multi_callable(channel): + return channel.unary_unary(_DEFECTIVE_GENERIC_RPC_HANDLER) + + class InvocationDefectsTest(unittest.TestCase): def setUp(self): @@ -235,6 +245,18 @@ class InvocationDefectsTest(unittest.TestCase): for _ in range(test_constants.STREAM_LENGTH // 2 + 1): next(response_iterator) + def testDefectiveGenericRpcHandlerUnaryResponse(self): + request = b'\x07\x08' + multi_callable = _defective_handler_multi_callable(self._channel) + + with self.assertRaises(grpc.RpcError) as exception_context: + response = multi_callable( + request, + metadata=(('test', 'DefectiveGenericRpcHandlerUnary'),)) + + self.assertIs(grpc.StatusCode.UNKNOWN, + exception_context.exception.code()) + if __name__ == '__main__': unittest.main(verbosity=2)