Merge pull request #6503 from kpayson64/python_3_support

Initial Python3 support
pull/6879/head
Jan Tattermusch 9 years ago committed by GitHub
commit 68e5ecbee4
  1. 12
      setup.py
  2. 2
      src/python/grpcio/commands.py
  3. 10
      src/python/grpcio/grpc/_channel.py
  4. 13
      src/python/grpcio/grpc/_common.py
  5. 7
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  6. 24
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  7. 2
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
  8. 53
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  9. 39
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
  10. 68
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  11. 7
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  12. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  13. 6
      src/python/grpcio/grpc/_links/service.py
  14. 14
      src/python/grpcio/grpc/_server.py
  15. 17
      src/python/grpcio/grpc/beta/_client_adaptations.py
  16. 30
      src/python/grpcio/grpc/beta/_server_adaptations.py
  17. 5
      src/python/grpcio/tests/qps/benchmark_client.py
  18. 7
      src/python/grpcio/tests/qps/client_runner.py
  19. 6
      src/python/grpcio/tests/stress/client.py
  20. 43
      src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py
  21. 16
      src/python/grpcio/tests/unit/_adapter/_low_test.py
  22. 2
      src/python/grpcio/tests/unit/_cython/cygrpc_test.py
  23. 2
      src/python/grpcio/tests/unit/_links/_transmission_test.py
  24. 8
      src/python/grpcio/tests/unit/_rpc_test.py
  25. 4
      src/python/grpcio/tests/unit/beta/_beta_features_test.py
  26. 2
      src/python/grpcio/tests/unit/beta/_not_found_test.py
  27. 6
      src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py
  28. 4
      src/python/grpcio/tests/unit/test_common.py
  29. 4
      tools/gce/linux_performance_worker_init.sh
  30. 15
      tools/run_tests/build_python.sh
  31. 1
      tools/run_tests/run_python.sh
  32. 29
      tools/run_tests/run_tests.py

@ -33,9 +33,11 @@ import os
import os.path
import shutil
import sys
import sysconfig
from distutils import core as _core
from distutils import extension as _extension
import pkg_resources
import setuptools
from setuptools.command import egg_info
@ -110,6 +112,16 @@ if "linux" in sys.platform or "darwin" in sys.platform:
DEFINE_MACROS += (('PyMODINIT_FUNC', pymodinit),)
# By default, Python3 distutils enforces compatibility of
# c plugins (.so files) with the OSX version Python3 was built with.
# For Python3.4, this is OSX 10.6, but we need Thread Local Support (__thread)
if 'darwin' in sys.platform and PY3:
mac_target = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET')
if mac_target and (pkg_resources.parse_version(mac_target) <
pkg_resources.parse_version('10.7.0')):
os.environ['MACOSX_DEPLOYMENT_TARGET'] = '10.7'
def cython_extensions(module_names, extra_sources, include_dirs,
libraries, define_macros, build_with_cython=False):
# Set compiler directives linetrace argument only if we care about tracing;

@ -191,7 +191,7 @@ class BuildProtoModules(setuptools.Command):
except subprocess.CalledProcessError as e:
sys.stderr.write(
'warning: Command:\n{}\nMessage:\n{}\nOutput:\n{}'.format(
command, e.message, e.output))
command, str(e), e.output))
# Generated proto directories dont include __init__.py, but
# these are needed for python package resolution

@ -85,7 +85,7 @@ def _deadline(timeout):
def _unknown_code_details(unknown_cygrpc_code, details):
return b'Server sent unknown code {} and details "{}"'.format(
return 'Server sent unknown code {} and details "{}"'.format(
unknown_cygrpc_code, details)
@ -142,7 +142,7 @@ def _handle_event(event, state, response_deserializer):
response = _common.deserialize(
serialized_response, response_deserializer)
if response is None:
details = b'Exception deserializing response!'
details = 'Exception deserializing response!'
_abort(state, grpc.StatusCode.INTERNAL, details)
else:
state.response = response
@ -186,7 +186,7 @@ def _consume_request_iterator(
if state.code is None and not state.cancelled:
if serialized_request is None:
call.cancel()
details = b'Exception serializing request!'
details = 'Exception serializing request!'
_abort(state, grpc.StatusCode.INTERNAL, details)
return
else:
@ -230,7 +230,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
if self._state.code is None:
self._call.cancel()
self._state.cancelled = True
_abort(self._state, grpc.StatusCode.CANCELLED, b'Cancelled!')
_abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!')
self._state.condition.notify_all()
return False
@ -402,7 +402,7 @@ def _start_unary_request(request, timeout, request_serializer):
if serialized_request is None:
state = _RPCState(
(), _EMPTY_METADATA, _EMPTY_METADATA, grpc.StatusCode.INTERNAL,
b'Exception serializing request!')
'Exception serializing request!')
rendezvous = _Rendezvous(state, None, None, deadline)
return deadline, deadline_timespec, None, rendezvous
else:

@ -97,3 +97,16 @@ def serialize(message, serializer):
def deserialize(serialized_message, deserializer):
return _transform(serialized_message, deserializer,
'Exception deserializing message!')
def _encode(s):
if isinstance(s, bytes):
return s
else:
return s.encode('ascii')
def fully_qualified_method(group, method):
group = _encode(group)
method = _encode(method)
return b'/' + group + b'/' + method

@ -55,6 +55,7 @@ cdef class Call:
def cancel(
self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE,
details=None):
details = str_to_bytes(details)
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE):
@ -63,12 +64,6 @@ cdef class Call:
cdef grpc_call_error result
cdef char *c_details = NULL
if error_code != GRPC_STATUS__DO_NOT_USE:
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected details to be str or bytes")
self.references.append(details)
c_details = details
with nogil:

@ -34,18 +34,13 @@ cdef class Channel:
def __cinit__(self, target, ChannelArgs arguments=None,
ChannelCredentials channel_credentials=None):
target = str_to_bytes(target)
cdef grpc_channel_args *c_arguments = NULL
cdef char *c_target = NULL
self.c_channel = NULL
self.references = []
if arguments is not None:
c_arguments = &arguments.c_args
if isinstance(target, bytes):
pass
elif isinstance(target, basestring):
target = target.encode()
else:
raise TypeError("expected target to be str or bytes")
c_target = target
if channel_credentials is None:
with nogil:
@ -62,25 +57,14 @@ cdef class Channel:
def create_call(self, Call parent, int flags,
CompletionQueue queue not None,
method, host, Timespec deadline not None):
method = str_to_bytes(method)
host = str_to_bytes(host)
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
if isinstance(method, bytes):
pass
elif isinstance(method, basestring):
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
cdef char *method_c_string = method
cdef char *host_c_string = NULL
if host is None:
pass
elif isinstance(host, bytes):
if host is not None:
host_c_string = host
elif isinstance(host, basestring):
host = host.encode()
host_c_string = host
else:
raise TypeError("expected host to be str, bytes, or None")
cdef Call operation_call = Call()
operation_call.references = [self, method, host, queue]
cdef grpc_call *parent_call = NULL

@ -54,7 +54,7 @@ cdef class ServerCredentials:
cdef class CredentialsMetadataPlugin:
cdef object plugin_callback
cdef str plugin_name
cdef bytes plugin_name
cdef grpc_metadata_credentials_plugin make_c_plugin(self)

@ -82,7 +82,7 @@ cdef class ServerCredentials:
cdef class CredentialsMetadataPlugin:
def __cinit__(self, object plugin_callback, str name):
def __cinit__(self, object plugin_callback, name):
"""
Args:
plugin_callback (callable): Callback accepting a service URL (str/bytes)
@ -93,6 +93,7 @@ cdef class CredentialsMetadataPlugin:
successful).
name (str): Plugin name.
"""
name = str_to_bytes(name)
if not callable(plugin_callback):
raise ValueError('expected callable plugin_callback')
self.plugin_callback = plugin_callback
@ -129,7 +130,8 @@ cdef void plugin_get_metadata(
grpc_credentials_plugin_metadata_cb cb, void *user_data) with gil:
def python_callback(
Metadata metadata, grpc_status_code status,
const char *error_details):
error_details):
error_details = str_to_bytes(error_details)
cb(user_data, metadata.c_metadata_array.metadata,
metadata.c_metadata_array.count, status, error_details)
cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
@ -148,14 +150,7 @@ def channel_credentials_google_default():
def channel_credentials_ssl(pem_root_certificates,
SslPemKeyCertPair ssl_pem_key_cert_pair):
if pem_root_certificates is None:
pass
elif isinstance(pem_root_certificates, bytes):
pass
elif isinstance(pem_root_certificates, basestring):
pem_root_certificates = pem_root_certificates.encode()
else:
raise TypeError("expected str or bytes for pem_root_certificates")
pem_root_certificates = str_to_bytes(pem_root_certificates)
cdef ChannelCredentials credentials = ChannelCredentials()
cdef const char *c_pem_root_certificates = NULL
if pem_root_certificates is not None:
@ -207,12 +202,7 @@ def call_credentials_google_compute_engine():
def call_credentials_service_account_jwt_access(
json_key, Timespec token_lifetime not None):
if isinstance(json_key, bytes):
pass
elif isinstance(json_key, basestring):
json_key = json_key.encode()
else:
raise TypeError("expected json_key to be str or bytes")
json_key = str_to_bytes(json_key)
cdef CallCredentials credentials = CallCredentials()
cdef char *json_key_c_string = json_key
with nogil:
@ -223,12 +213,7 @@ def call_credentials_service_account_jwt_access(
return credentials
def call_credentials_google_refresh_token(json_refresh_token):
if isinstance(json_refresh_token, bytes):
pass
elif isinstance(json_refresh_token, basestring):
json_refresh_token = json_refresh_token.encode()
else:
raise TypeError("expected json_refresh_token to be str or bytes")
json_refresh_token = str_to_bytes(json_refresh_token)
cdef CallCredentials credentials = CallCredentials()
cdef char *json_refresh_token_c_string = json_refresh_token
with nogil:
@ -238,18 +223,8 @@ def call_credentials_google_refresh_token(json_refresh_token):
return credentials
def call_credentials_google_iam(authorization_token, authority_selector):
if isinstance(authorization_token, bytes):
pass
elif isinstance(authorization_token, basestring):
authorization_token = authorization_token.encode()
else:
raise TypeError("expected authorization_token to be str or bytes")
if isinstance(authority_selector, bytes):
pass
elif isinstance(authority_selector, basestring):
authority_selector = authority_selector.encode()
else:
raise TypeError("expected authority_selector to be str or bytes")
authorization_token = str_to_bytes(authorization_token)
authority_selector = str_to_bytes(authority_selector)
cdef CallCredentials credentials = CallCredentials()
cdef char *authorization_token_c_string = authorization_token
cdef char *authority_selector_c_string = authority_selector
@ -272,16 +247,10 @@ def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
bint force_client_auth):
pem_root_certs = str_to_bytes(pem_root_certs)
cdef char *c_pem_root_certs = NULL
if pem_root_certs is None:
pass
elif isinstance(pem_root_certs, bytes):
c_pem_root_certs = pem_root_certs
elif isinstance(pem_root_certs, basestring):
pem_root_certs = pem_root_certs.encode()
if pem_root_certs is not None:
c_pem_root_certs = pem_root_certs
else:
raise TypeError("expected pem_root_certs to be str or bytes")
pem_key_cert_pairs = list(pem_key_cert_pairs)
for pair in pem_key_cert_pairs:
if not isinstance(pair, SslPemKeyCertPair):

@ -0,0 +1,39 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# This function will ascii encode unicode string inputs if neccesary.
# In Python3, unicode strings are the default str type.
cdef bytes str_to_bytes(object s):
if s is None or isinstance(s, bytes):
return s
elif isinstance(s, unicode):
return s.encode('ascii')
else:
raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s)))

@ -235,18 +235,13 @@ cdef class ByteBuffer:
if data is None:
self.c_byte_buffer = NULL
return
if isinstance(data, bytes):
pass
elif isinstance(data, basestring):
data = data.encode()
elif isinstance(data, ByteBuffer):
if isinstance(data, ByteBuffer):
data = (<ByteBuffer>data).bytes()
if data is None:
self.c_byte_buffer = NULL
return
else:
raise TypeError("expected value to be of type str, bytes, or "
"ByteBuffer, not {}".format(type(data)))
data = str_to_bytes(data)
cdef char *c_data = data
cdef gpr_slice data_slice
@ -302,19 +297,8 @@ cdef class ByteBuffer:
cdef class SslPemKeyCertPair:
def __cinit__(self, private_key, certificate_chain):
if isinstance(private_key, bytes):
self.private_key = private_key
elif isinstance(private_key, basestring):
self.private_key = private_key.encode()
else:
raise TypeError("expected private_key to be of type str or bytes")
if isinstance(certificate_chain, bytes):
self.certificate_chain = certificate_chain
elif isinstance(certificate_chain, basestring):
self.certificate_chain = certificate_chain.encode()
else:
raise TypeError("expected certificate_chain to be of type str or bytes "
"or int")
self.private_key = str_to_bytes(private_key)
self.certificate_chain = str_to_bytes(certificate_chain)
self.c_pair.private_key = self.private_key
self.c_pair.certificate_chain = self.certificate_chain
@ -322,27 +306,16 @@ cdef class SslPemKeyCertPair:
cdef class ChannelArg:
def __cinit__(self, key, value):
if isinstance(key, bytes):
self.key = key
elif isinstance(key, basestring):
self.key = key.encode()
else:
raise TypeError("expected key to be of type str or bytes")
if isinstance(value, bytes):
self.value = value
self.c_arg.type = GRPC_ARG_STRING
self.c_arg.value.string = self.value
elif isinstance(value, basestring):
self.value = value.encode()
self.c_arg.type = GRPC_ARG_STRING
self.c_arg.value.string = self.value
elif isinstance(value, int):
self.key = str_to_bytes(key)
self.c_arg.key = self.key
if isinstance(value, int):
self.value = int(value)
self.c_arg.type = GRPC_ARG_INTEGER
self.c_arg.value.integer = self.value
else:
raise TypeError("expected value to be of type str or bytes or int")
self.c_arg.key = self.key
self.value = str_to_bytes(value)
self.c_arg.type = GRPC_ARG_STRING
self.c_arg.value.string = self.value
cdef class ChannelArgs:
@ -375,18 +348,8 @@ cdef class ChannelArgs:
cdef class Metadatum:
def __cinit__(self, key, value):
if isinstance(key, bytes):
self._key = key
elif isinstance(key, basestring):
self._key = key.encode()
else:
raise TypeError("expected key to be of type str or bytes")
if isinstance(value, bytes):
self._value = value
elif isinstance(value, basestring):
self._value = value.encode()
else:
raise TypeError("expected value to be of type str or bytes")
self._key = str_to_bytes(key)
self._value = str_to_bytes(value)
self.c_metadata.key = self._key
self.c_metadata.value = self._value
self.c_metadata.value_length = len(self._value)
@ -601,12 +564,7 @@ def operation_send_close_from_client(int flags):
def operation_send_status_from_server(
Metadata metadata, grpc_status_code code, details, int flags):
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected a str or bytes object for details")
details = str_to_bytes(details)
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags

@ -103,12 +103,7 @@ cdef class Server:
def add_http2_port(self, address,
ServerCredentials server_credentials=None):
if isinstance(address, bytes):
pass
elif isinstance(address, basestring):
address = address.encode()
else:
raise TypeError("expected address to be a str or bytes")
address = str_to_bytes(address)
self.references.append(address)
cdef int result
cdef char *address_c_string = address

@ -35,6 +35,7 @@ import sys
# TODO(atash): figure out why the coverage tool gets confused about the Cython
# coverage plugin when the following files don't have a '.pxi' suffix.
include "grpc/_cython/_cygrpc/grpc_string.pyx.pxi"
include "grpc/_cython/_cygrpc/call.pyx.pxi"
include "grpc/_cython/_cygrpc/channel.pyx.pxi"
include "grpc/_cython/_cygrpc/credentials.pyx.pxi"

@ -33,6 +33,7 @@ import abc
import enum
import logging
import threading
import six
import time
from grpc._adapter import _intermediary_low
@ -177,7 +178,10 @@ class _Kernel(object):
call = service_acceptance.call
call.accept(self._completion_queue, call)
try:
group, method = service_acceptance.method.split(b'/')[1:3]
service_method = service_acceptance.method
if six.PY3:
service_method = service_method.decode('latin1')
group, method = service_method.split('/')[1:3]
except ValueError:
logging.info('Illegal path "%s"!', service_acceptance.method)
return

@ -85,7 +85,7 @@ def _abortion_code(state, code):
def _details(state):
return b'' if state.details is None else state.details
return '' if state.details is None else state.details
class _HandlerCallDetails(
@ -189,7 +189,7 @@ def _receive_message(state, call, request_deserializer):
if request is None:
_abort(
state, call, cygrpc.StatusCode.internal,
b'Exception deserializing request!')
'Exception deserializing request!')
else:
state.request = request
state.condition.notify_all()
@ -340,7 +340,7 @@ def _unary_request(rpc_event, state, request_deserializer):
state.condition.wait()
if state.request is None:
if state.client is _CLOSED:
details = b'"{}" requires exactly one request message.'.format(
details = '"{}" requires exactly one request message.'.format(
rpc_event.request_call_details.method)
# TODO(5992#issuecomment-220761992): really, what status code?
_abort(
@ -363,7 +363,7 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = b'Exception calling application: {}'.format(e)
details = 'Exception calling application: {}'.format(e)
logging.exception(details)
_abort(
state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details)
@ -378,7 +378,7 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = b'Exception iterating responses: {}'.format(e)
details = 'Exception iterating responses: {}'.format(e)
logging.exception(details)
_abort(
state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details)
@ -391,7 +391,7 @@ def _serialize_response(rpc_event, state, response, response_serializer):
with state.condition:
_abort(
state, rpc_event.operation_call, cygrpc.StatusCode.internal,
b'Failed to serialize response!')
'Failed to serialize response!')
return None
else:
return serialized_response
@ -544,7 +544,7 @@ def _handle_unrecognized_method(rpc_event):
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_EMPTY_METADATA, cygrpc.StatusCode.unimplemented,
b'Method not found!', _EMPTY_FLAGS),
'Method not found!', _EMPTY_FLAGS),
)
rpc_state = _RPCState()
rpc_event.operation_call.start_batch(

@ -30,6 +30,7 @@
"""Translates gRPC's client-side API into gRPC's client-side Beta API."""
import grpc
from grpc import _common
from grpc._cython import cygrpc
from grpc.beta import interfaces
from grpc.framework.common import cardinality
@ -48,10 +49,6 @@ _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
}
def _fully_qualified_method(group, method):
return b'/{}/{}'.format(group, method)
def _effective_metadata(metadata, metadata_transformer):
non_none_metadata = () if metadata is None else metadata
if metadata_transformer is None:
@ -184,7 +181,7 @@ def _blocking_unary_unary(
metadata_transformer, request, request_serializer, response_deserializer):
try:
multi_callable = channel.unary_unary(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
@ -205,7 +202,7 @@ def _future_unary_unary(
channel, group, method, timeout, protocol_options, metadata,
metadata_transformer, request, request_serializer, response_deserializer):
multi_callable = channel.unary_unary(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
@ -219,7 +216,7 @@ def _unary_stream(
channel, group, method, timeout, protocol_options, metadata,
metadata_transformer, request, request_serializer, response_deserializer):
multi_callable = channel.unary_stream(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
@ -235,7 +232,7 @@ def _blocking_stream_unary(
response_deserializer):
try:
multi_callable = channel.stream_unary(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
@ -257,7 +254,7 @@ def _future_stream_unary(
metadata_transformer, request_iterator, request_serializer,
response_deserializer):
multi_callable = channel.stream_unary(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
@ -272,7 +269,7 @@ def _stream_stream(
metadata_transformer, request_iterator, request_serializer,
response_deserializer):
multi_callable = channel.stream_stream(
_fully_qualified_method(group, method),
_common.fully_qualified_method(group, method),
request_serializer=request_serializer,
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)

@ -33,6 +33,7 @@ import collections
import threading
import grpc
from grpc import _common
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.common import style
@ -287,29 +288,36 @@ def _simple_method_handler(
None, _adapt_stream_stream_event(implementation.stream_stream_event))
def _flatten_method_pair_map(method_pair_map):
method_pair_map = method_pair_map or {}
flat_map = {}
for method_pair in method_pair_map:
method = _common.fully_qualified_method(method_pair[0], method_pair[1])
flat_map[method] = method_pair_map[method_pair]
return flat_map
class _GenericRpcHandler(grpc.GenericRpcHandler):
def __init__(
self, method_implementations, multi_method_implementation,
request_deserializers, response_serializers):
self._method_implementations = method_implementations
self._method_implementations = _flatten_method_pair_map(
method_implementations)
self._request_deserializers = _flatten_method_pair_map(
request_deserializers)
self._response_serializers = _flatten_method_pair_map(
response_serializers)
self._multi_method_implementation = multi_method_implementation
self._request_deserializers = request_deserializers or {}
self._response_serializers = response_serializers or {}
def service(self, handler_call_details):
try:
group_name, method_name = handler_call_details.method.split(b'/')[1:3]
except ValueError:
return None
else:
method_implementation = self._method_implementations.get(
(group_name, method_name,))
handler_call_details.method)
if method_implementation is not None:
return _simple_method_handler(
method_implementation,
self._request_deserializers.get((group_name, method_name,)),
self._response_serializers.get((group_name, method_name,)))
self._request_deserializers.get(handler_call_details.method),
self._response_serializers.get(handler_call_details.method))
elif self._multi_method_implementation is None:
return None
else:

@ -31,12 +31,9 @@
import abc
import time
try:
import Queue as queue # Python 2.x
except ImportError:
import queue # Python 3
from concurrent import futures
from six.moves import queue
from grpc.beta import implementations
from grpc.framework.interfaces.face import face

@ -34,7 +34,7 @@ ClientRunner invokes either periodically or in response to some event.
"""
import abc
import thread
import threading
import time
@ -61,15 +61,18 @@ class OpenLoopClientRunner(ClientRunner):
super(OpenLoopClientRunner, self).__init__(client)
self._is_running = False
self._interval_generator = interval_generator
self._dispatch_thread = threading.Thread(
target=self._dispatch_requests, args=())
def start(self):
self._is_running = True
self._client.start()
thread.start_new_thread(self._dispatch_requests, ())
self._dispatch_thread.start()
def stop(self):
self._is_running = False
self._client.stop()
self._dispatch_thread.join()
self._client = None
def _dispatch_requests(self):

@ -30,10 +30,10 @@
"""Entry point for running stress tests."""
import argparse
import Queue
import threading
from grpc.beta import implementations
from six.moves import queue
from src.proto.grpc.testing import metrics_pb2
from src.proto.grpc.testing import test_pb2
@ -94,7 +94,7 @@ def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases)
test_servers = args.server_addresses.split(',')
# Propagate any client exceptions with a queue
exception_queue = Queue.Queue()
exception_queue = queue.Queue()
stop_event = threading.Event()
hist = histogram.Histogram(1, 1)
runners = []
@ -121,7 +121,7 @@ def run_test(args):
if timeout_secs < 0:
timeout_secs = None
raise exception_queue.get(block=True, timeout=timeout_secs)
except Queue.Empty:
except queue.Empty:
# No exceptions thrown, success
pass
finally:

@ -164,15 +164,15 @@ class EchoTest(unittest.TestCase):
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
self.assertEqual(method, service_accepted.service_acceptance.method)
self.assertEqual(self.host, service_accepted.service_acceptance.host)
self.assertEqual(method.encode(), service_accepted.service_acceptance.method)
self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host)
self.assertIsNotNone(service_accepted.service_acceptance.call)
metadata = dict(service_accepted.metadata)
self.assertIn(client_metadata_key, metadata)
self.assertEqual(client_metadata_value, metadata[client_metadata_key])
self.assertIn(client_binary_metadata_key, metadata)
self.assertIn(client_metadata_key.encode(), metadata)
self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()])
self.assertIn(client_binary_metadata_key.encode(), metadata)
self.assertEqual(client_binary_metadata_value,
metadata[client_binary_metadata_key])
metadata[client_binary_metadata_key.encode()])
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.add_metadata(server_leading_metadata_key,
@ -186,12 +186,12 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
metadata = dict(metadata_accepted.metadata)
self.assertIn(server_leading_metadata_key, metadata)
self.assertEqual(server_leading_metadata_value,
metadata[server_leading_metadata_key])
self.assertIn(server_leading_binary_metadata_key, metadata)
self.assertIn(server_leading_metadata_key.encode(), metadata)
self.assertEqual(server_leading_metadata_value.encode(),
metadata[server_leading_metadata_key.encode()])
self.assertIn(server_leading_binary_metadata_key.encode(), metadata)
self.assertEqual(server_leading_binary_metadata_value,
metadata[server_leading_binary_metadata_key])
metadata[server_leading_binary_metadata_key.encode()])
for datum in test_data:
client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS)
@ -277,17 +277,17 @@ class EchoTest(unittest.TestCase):
self.assertIsNone(read_accepted.bytes)
self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
self.assertEqual(finish_tag, finish_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status)
metadata = dict(finish_accepted.metadata)
self.assertIn(server_trailing_metadata_key, metadata)
self.assertEqual(server_trailing_metadata_value,
metadata[server_trailing_metadata_key])
self.assertIn(server_trailing_binary_metadata_key, metadata)
self.assertIn(server_trailing_metadata_key.encode(), metadata)
self.assertEqual(server_trailing_metadata_value.encode(),
metadata[server_trailing_metadata_key.encode()])
self.assertIn(server_trailing_binary_metadata_key.encode(), metadata)
self.assertEqual(server_trailing_binary_metadata_value,
metadata[server_trailing_binary_metadata_key])
metadata[server_trailing_binary_metadata_key.encode()])
self.assertSetEqual(set(key for key, _ in finish_accepted.metadata),
set((server_trailing_metadata_key,
server_trailing_binary_metadata_key,)))
set((server_trailing_metadata_key.encode(),
server_trailing_binary_metadata_key.encode(),)))
self.assertSequenceEqual(test_data, server_data)
self.assertSequenceEqual(test_data, client_data)
@ -302,7 +302,8 @@ class EchoTest(unittest.TestCase):
self._perform_echo_test([_BYTE_SEQUENCE])
def testManyOneByteEchoes(self):
self._perform_echo_test(_BYTE_SEQUENCE)
self._perform_echo_test(
[_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))])
def testManyManyByteEchoes(self):
self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
@ -409,7 +410,7 @@ class CancellationTest(unittest.TestCase):
finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'),
finish_event.status)
self.assertSequenceEqual(test_data, server_data)

@ -148,11 +148,11 @@ class InsecureServerInsecureClient(unittest.TestCase):
# Check that Python's user agent string is a part of the full user agent
# string
received_initial_metadata_dict = dict(received_initial_metadata)
self.assertIn('user-agent', received_initial_metadata_dict)
self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
received_initial_metadata_dict['user-agent'])
self.assertEqual(method, request_event.call_details.method)
self.assertEqual(host, request_event.call_details.host)
self.assertIn(b'user-agent', received_initial_metadata_dict)
self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(),
received_initial_metadata_dict[b'user-agent'])
self.assertEqual(method.encode(), request_event.call_details.method)
self.assertEqual(host.encode(), request_event.call_details.host)
self.assertLess(abs(deadline - request_event.call_details.deadline),
deadline_tolerance)
@ -198,12 +198,12 @@ class InsecureServerInsecureClient(unittest.TestCase):
test_common.metadata_transmitted(server_initial_metadata,
client_result.initial_metadata))
elif client_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(response, client_result.message)
self.assertEqual(response.encode(), client_result.message)
elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
self.assertTrue(
test_common.metadata_transmitted(server_trailing_metadata,
client_result.trailing_metadata))
self.assertEqual(server_status_details, client_result.status.details)
self.assertEqual(server_status_details.encode(), client_result.status.details)
self.assertEqual(server_status_code, client_result.status.code)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
@ -220,7 +220,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(request, server_result.message)
self.assertEqual(request.encode(), server_result.message)
elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
self.assertFalse(server_result.cancelled)
self.assertEqual(set([

@ -37,7 +37,7 @@ from tests.unit import test_common
from tests.unit import resources
_SSL_HOST_OVERRIDE = 'foo.test.google.fr'
_SSL_HOST_OVERRIDE = b'foo.test.google.fr'
_CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key'
_CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
_EMPTY_FLAGS = 0

@ -153,7 +153,7 @@ class RoundTripTest(unittest.TestCase):
invocation_mate.tickets()[-1].termination,
links.Ticket.Termination.COMPLETION)
self.assertIs(invocation_mate.tickets()[-1].code, test_code)
self.assertEqual(invocation_mate.tickets()[-1].message, test_message)
self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode())
def _perform_scenario_test(self, scenario):
test_operation_id = object()

@ -29,6 +29,8 @@
"""Test of gRPC Python's application-layer API."""
from __future__ import division
import itertools
import threading
import unittest
@ -41,9 +43,9 @@ from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) / 2:]
_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) / 3]
_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
_UNARY_UNARY = b'/test/UnaryUnary'
_UNARY_STREAM = b'/test/UnaryStream'
@ -189,7 +191,7 @@ class RPCTest(unittest.TestCase):
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
self._channel = grpc.insecure_channel(b'localhost:%d' % port)
self._channel = grpc.insecure_channel('localhost:%d' % port)
# TODO(nathaniel): Why is this necessary, and only in some development
# environments?

@ -42,8 +42,8 @@ from tests.unit.framework.common import test_constants
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
_PER_RPC_CREDENTIALS_METADATA_KEY = 'my-call-credentials-metadata-key'
_PER_RPC_CREDENTIALS_METADATA_VALUE = 'my-call-credentials-metadata-value'
_PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key'
_PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value'
_GROUP = 'group'
_UNARY_UNARY = 'unary-unary'

@ -61,7 +61,7 @@ class NotFoundTest(unittest.TestCase):
def test_future_stream_unary_not_found(self):
rpc_future = self._generic_stub.future_stream_unary(
'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT)
'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT)
with self.assertRaises(face.LocalError) as exception_assertion_context:
rpc_future.result()
self.assertIs(

@ -29,6 +29,8 @@
"""Tests of the base interface of RPC Framework."""
from __future__ import division
import logging
import random
import threading
@ -54,13 +56,13 @@ class _Serialization(test_interfaces.Serialization):
return request + request
def deserialize_request(self, serialized_request):
return serialized_request[:len(serialized_request) / 2]
return serialized_request[:len(serialized_request) // 2]
def serialize_response(self, response):
return response * 3
def deserialize_response(self, serialized_response):
return serialized_response[2 * len(serialized_response) / 3:]
return serialized_response[2 * len(serialized_response) // 3:]
def _advance(quadruples, operator, controller):

@ -61,6 +61,10 @@ def metadata_transmitted(original_metadata, transmitted_metadata):
original = collections.defaultdict(list)
for key_value_pair in original_metadata:
key, value = tuple(key_value_pair)
if not isinstance(key, bytes):
key = key.encode()
if not isinstance(value, bytes):
value = value.encode()
original[key].append(value)
transmitted = collections.defaultdict(list)
for key_value_pair in transmitted_metadata:

@ -69,6 +69,10 @@ sudo apt-get install -y \
python-pip \
python-setuptools \
python-yaml \
python3-dev \
python3-pip \
python3-setuptools \
python3-yaml \
telnet \
unzip \
wget \

@ -34,6 +34,7 @@ set -ex
cd $(dirname $0)/../..
TOX_PYTHON_ENV="$1"
PY_VERSION="${TOX_PYTHON_ENV: -2}"
ROOT=`pwd`
export LD_LIBRARY_PATH=$ROOT/libs/$CONFIG
@ -51,11 +52,25 @@ fi
tox -e ${TOX_PYTHON_ENV} --notest
# We force the .so naming convention in PEP 3149 for side by side installation support
# Note this is the default in Python3, but explicitly disabled for Darwin, so we only
# use this hack for our testing environment.
if [ "$PY_VERSION" -gt "27" ]
then
mv $ROOT/src/python/grpcio/grpc/_cython/cygrpc.so $ROOT/src/python/grpcio/grpc/_cython/cygrpc.so.backup || true
fi
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build_py
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build_ext --inplace
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py gather --test
if [ "$PY_VERSION" -gt "27" ]
then
mv $ROOT/src/python/grpcio/grpc/_cython/cygrpc.so $ROOT/src/python/grpcio/grpc/_cython/cygrpc.cpython-${PY_VERSION}m.so || true
mv $ROOT/src/python/grpcio/grpc/_cython/cygrpc.so.backup $ROOT/src/python/grpcio/grpc/_cython/cygrpc.so || true
fi
# Build the health checker
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/src/python/grpcio_health_checking/setup.py build
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/src/python/grpcio_health_checking/setup.py build_py

@ -55,3 +55,4 @@ fi
mkdir -p $ROOT/reports
rm -rf $ROOT/reports/python-coverage
(mv -T $ROOT/htmlcov $ROOT/reports/python-coverage) || true

@ -387,7 +387,7 @@ class PythonLanguage(object):
def configure(self, config, args):
self.config = config
self.args = args
self._tox_env = self._get_tox_env(self.args.compiler)
self._tox_envs = self._get_tox_envs(self.args.compiler)
def test_specs(self):
# load list of known test suites
@ -399,19 +399,21 @@ class PythonLanguage(object):
os.path.abspath('src/python/grpcio_health_checking'))
if self.config.build_config != 'gcov':
return [self.config.job_spec(
['tools/run_tests/run_python.sh', self._tox_env],
['tools/run_tests/run_python.sh', tox_env],
None,
environ=dict(environment.items() +
[('GRPC_PYTHON_TESTRUNNER_FILTER', suite_name)]),
shortname='py.test.%s' % suite_name,
shortname='%s.test.%s' % (tox_env, suite_name),
timeout_seconds=5*60)
for suite_name in tests_json]
for suite_name in tests_json
for tox_env in self._tox_envs]
else:
return [self.config.job_spec(['tools/run_tests/run_python.sh'],
return [self.config.job_spec(['tools/run_tests/run_python.sh', tox_env],
None,
environ=environment,
shortname='py.test.coverage',
timeout_seconds=15*60)]
shortname='%s.test.coverage' % tox_env,
timeout_seconds=15*60)
for tox_env in self._tox_envs]
def pre_build_steps(self):
@ -424,7 +426,8 @@ class PythonLanguage(object):
return []
def build_steps(self):
return [['tools/run_tests/build_python.sh', self._tox_env]]
return [['tools/run_tests/build_python.sh', tox_env]
for tox_env in self._tox_envs]
def post_tests_steps(self):
return []
@ -435,12 +438,14 @@ class PythonLanguage(object):
def dockerfile_dir(self):
return 'tools/dockerfile/test/python_jessie_%s' % _docker_arch_suffix(self.args.arch)
def _get_tox_env(self, compiler):
def _get_tox_envs(self, compiler):
"""Returns name of tox environment based on selected compiler."""
if compiler == 'python2.7' or compiler == 'default':
return 'py27'
if compiler == 'default':
return ('py27', 'py34')
elif compiler == 'python2.7':
return ('py27',)
elif compiler == 'python3.4':
return 'py34'
return ('py34',)
else:
raise Exception('Compiler %s not supported.' % compiler)

Loading…
Cancel
Save