@ -50,6 +50,11 @@ from grpc.framework.foundation import future
from grpc . framework . interfaces . face import face
from tests . unit . framework . common import test_constants
import tests . protoc_plugin . protos . payload . test_payload_pb2 as payload_pb2
import tests . protoc_plugin . protos . requests . r . test_requests_pb2 as request_pb2
import tests . protoc_plugin . protos . responses . test_responses_pb2 as response_pb2
import tests . protoc_plugin . protos . service . test_service_pb2 as service_pb2
# Identifiers of entities we expect to find in the generated module.
SERVICER_IDENTIFIER = ' BetaTestServiceServicer '
STUB_IDENTIFIER = ' BetaTestServiceStub '
@ -59,12 +64,10 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
class _ServicerMethods ( object ) :
def __init__ ( self , response_pb2 , payload_pb2 ) :
def __init__ ( self ) :
self . _condition = threading . Condition ( )
self . _paused = False
self . _fail = False
self . _response_pb2 = response_pb2
self . _payload_pb2 = payload_pb2
@contextlib . contextmanager
def pause ( self ) : # pylint: disable=invalid-name
@ -91,22 +94,22 @@ class _ServicerMethods(object):
self . _condition . wait ( )
def UnaryCall ( self , request , unused_rpc_context ) :
response = self . _ response_pb2. SimpleResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response = response_pb2 . SimpleResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * request . response_size
self . _control ( )
return response
def StreamingOutputCall ( self , request , unused_rpc_context ) :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
def StreamingInputCall ( self , request_iter , unused_rpc_context ) :
response = self . _ response_pb2. StreamingInputCallResponse ( )
response = response_pb2 . StreamingInputCallResponse ( )
aggregated_payload_size = 0
for request in request_iter :
aggregated_payload_size + = len ( request . payload . payload_compressable )
@ -117,8 +120,8 @@ class _ServicerMethods(object):
def FullDuplexCall ( self , request_iter , unused_rpc_context ) :
for request in request_iter :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
@ -127,8 +130,8 @@ class _ServicerMethods(object):
responses = [ ]
for request in request_iter :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
responses . append ( response )
@ -137,23 +140,18 @@ class _ServicerMethods(object):
@contextlib . contextmanager
def _CreateService ( service_pb2 , response_pb2 , payload_pb2 ) :
def _CreateService ( ) :
""" Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation ; the two are detached .
Args :
service_pb2 : The service_pb2 module generated by this test .
response_pb2 : The response_pb2 module generated by this test
payload_pb2 : The payload_pb2 module generated by this test
Yields :
A ( servicer_methods , stub ) pair where servicer_methods is the back - end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs .
"""
servicer_methods = _ServicerMethods ( response_pb2 , payload_pb2 )
servicer_methods = _ServicerMethods ( )
class Servicer ( getattr ( service_pb2 , SERVICER_IDENTIFIER ) ) :
@ -183,7 +181,7 @@ def _CreateService(service_pb2, response_pb2, payload_pb2):
@contextlib . contextmanager
def _CreateIncompleteService ( service_pb2 ) :
def _CreateIncompleteService ( ) :
""" Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
@ -209,7 +207,7 @@ def _CreateIncompleteService(service_pb2):
server . stop ( 0 )
def _streaming_input_request_iterator ( request_pb2 , payload_pb2 ) :
def _streaming_input_request_iterator ( ) :
for _ in range ( 3 ) :
request = request_pb2 . StreamingInputCallRequest ( )
request . payload . payload_type = payload_pb2 . COMPRESSABLE
@ -217,7 +215,7 @@ def _streaming_input_request_iterator(request_pb2, payload_pb2):
yield request
def _streaming_output_request ( request_pb2 ) :
def _streaming_output_request ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
sizes = [ 1 , 2 , 3 ]
request . response_parameters . add ( size = sizes [ 0 ] , interval_us = 0 )
@ -226,7 +224,7 @@ def _streaming_output_request(request_pb2):
return request
def _full_duplex_request_iterator ( request_pb2 ) :
def _full_duplex_request_iterator ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
@ -244,101 +242,39 @@ class PythonPluginTest(unittest.TestCase):
methods and does not exist for response - streaming methods .
"""
def setUp ( self ) :
# Assume that the appropriate protoc and grpc_python_plugins are on the
# path.
protoc_command = ' protoc '
protoc_plugin_filename = distutils . spawn . find_executable (
' grpc_python_plugin ' )
if not os . path . isfile ( protoc_command ) :
# Assume that if we haven't built protoc that it's on the system.
protoc_command = ' protoc '
# Ensure that the output directory exists.
self . outdir = tempfile . mkdtemp ( )
# Find all proto files
paths = [ ]
root_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
proto_dir = os . path . join ( root_dir , ' protos ' )
for walk_root , _ , filenames in os . walk ( proto_dir ) :
for filename in filenames :
if filename . endswith ( ' .proto ' ) :
path = os . path . join ( walk_root , filename )
paths . append ( path )
# Invoke protoc with the plugin.
cmd = [
protoc_command ,
' --plugin=protoc-gen-python-grpc= %s ' % protoc_plugin_filename ,
' -I %s ' % root_dir ,
' --python_out= %s ' % self . outdir ,
' --python-grpc_out= %s ' % self . outdir
] + paths
subprocess . check_call ( ' ' . join ( cmd ) , shell = True , env = os . environ ,
cwd = os . path . dirname ( os . path . realpath ( __file__ ) ) )
# Generated proto directories dont include __init__.py, but
# these are needed for python package resolution
for walk_root , _ , _ in os . walk ( os . path . join ( self . outdir , ' protos ' ) ) :
path = os . path . join ( walk_root , ' __init__.py ' )
open ( path , ' a ' ) . close ( )
sys . path . insert ( 0 , self . outdir )
import protos . payload . test_payload_pb2 as payload_pb2 # pylint: disable=g-import-not-at-top
import protos . requests . r . test_requests_pb2 as request_pb2 # pylint: disable=g-import-not-at-top
import protos . responses . test_responses_pb2 as response_pb2 # pylint: disable=g-import-not-at-top
import protos . service . test_service_pb2 as service_pb2 # pylint: disable=g-import-not-at-top
self . _payload_pb2 = payload_pb2
self . _request_pb2 = request_pb2
self . _response_pb2 = response_pb2
self . _service_pb2 = service_pb2
def tearDown ( self ) :
try :
shutil . rmtree ( self . outdir )
except OSError as exc :
if exc . errno != errno . ENOENT :
raise
sys . path . remove ( self . outdir )
def testImportAttributes ( self ) :
# check that we can access the generated module and its members.
self . assertIsNotNone (
getattr ( self . _ service_pb2, SERVICER_IDENTIFIER , None ) )
getattr ( service_pb2 , SERVICER_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( self . _ service_pb2, STUB_IDENTIFIER , None ) )
getattr ( service_pb2 , STUB_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( self . _ service_pb2, SERVER_FACTORY_IDENTIFIER , None ) )
getattr ( service_pb2 , SERVER_FACTORY_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( self . _ service_pb2, STUB_FACTORY_IDENTIFIER , None ) )
getattr ( service_pb2 , STUB_FACTORY_IDENTIFIER , None ) )
def testUpDown ( self ) :
with _CreateService (
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 ) :
self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) :
request_pb2 . SimpleRequest ( response_size = 13 )
def testIncompleteServicer ( self ) :
with _CreateIncompleteService ( self . _service_pb2 ) as ( _ , stub ) :
request = self . _ request_pb2. SimpleRequest ( response_size = 13 )
with _CreateIncompleteService ( ) as ( _ , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
try :
stub . UnaryCall ( request , test_constants . LONG_TIMEOUT )
except face . AbortionError as error :
self . assertEqual ( interfaces . StatusCode . UNIMPLEMENTED , error . code )
def testUnaryCall ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
response = stub . UnaryCall ( request , test_constants . LONG_TIMEOUT )
expected_response = methods . UnaryCall ( request , ' not a real context! ' )
self . assertEqual ( expected_response , response )
def testUnaryCallFuture ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
# Check that the call does not block waiting for the server to respond.
with methods . pause ( ) :
response_future = stub . UnaryCall . future (
@ -348,9 +284,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
def testUnaryCallFutureExpired ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
with methods . pause ( ) :
response_future = stub . UnaryCall . future (
request , test_constants . SHORT_TIMEOUT )
@ -358,27 +293,24 @@ class PythonPluginTest(unittest.TestCase):
response_future . result ( )
def testUnaryCallFutureCancelled ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
with methods . pause ( ) :
response_future = stub . UnaryCall . future ( request , 1 )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
def testUnaryCallFutureFailed ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
with methods . fail ( ) :
response_future = stub . UnaryCall . future (
request , test_constants . LONG_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testStreamingOutputCall ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _request_pb2 )
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
responses = stub . StreamingOutputCall (
request , test_constants . LONG_TIMEOUT )
expected_responses = methods . StreamingOutputCall (
@ -388,9 +320,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
def testStreamingOutputCallExpired ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _request_pb2 )
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
with methods . pause ( ) :
responses = stub . StreamingOutputCall (
request , test_constants . SHORT_TIMEOUT )
@ -398,9 +329,8 @@ class PythonPluginTest(unittest.TestCase):
list ( responses )
def testStreamingOutputCallCancelled ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _request_pb2 )
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
responses = stub . StreamingOutputCall (
request , test_constants . LONG_TIMEOUT )
next ( responses )
@ -409,9 +339,8 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testStreamingOutputCallFailed ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _request_pb2 )
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
with methods . fail ( ) :
responses = stub . StreamingOutputCall ( request , 1 )
self . assertIsNotNone ( responses )
@ -419,38 +348,32 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testStreamingInputCall ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
response = stub . StreamingInputCall (
_streaming_input_request_iterator (
self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
expected_response = methods . StreamingInputCall (
_streaming_input_request_iterator ( self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFuture ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator (
self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
response = response_future . result ( )
expected_response = methods . StreamingInputCall (
_streaming_input_request_iterator ( self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFutureExpired ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator (
self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
test_constants . SHORT_TIMEOUT )
with self . assertRaises ( face . ExpirationError ) :
response_future . result ( )
@ -458,12 +381,10 @@ class PythonPluginTest(unittest.TestCase):
response_future . exception ( ) , face . ExpirationError )
def testStreamingInputCallFutureCancelled ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator (
self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
@ -471,32 +392,28 @@ class PythonPluginTest(unittest.TestCase):
response_future . result ( )
def testStreamingInputCallFutureFailed ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
with methods . fail ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator (
self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testFullDuplexCall ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
responses = stub . FullDuplexCall (
_full_duplex_request_iterator ( self . _request_pb2 ) ,
_full_duplex_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
expected_responses = methods . FullDuplexCall (
_full_duplex_request_iterator ( self . _request_pb2 ) ,
_full_duplex_request_iterator ( ) ,
' not a real RpcContext! ' )
for expected_response , response in moves . zip_longest (
expected_responses , responses ) :
self . assertEqual ( expected_response , response )
def testFullDuplexCallExpired ( self ) :
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( )
with _CreateService ( ) as ( methods , stub ) :
with methods . pause ( ) :
responses = stub . FullDuplexCall (
request_iterator , test_constants . SHORT_TIMEOUT )
@ -504,9 +421,8 @@ class PythonPluginTest(unittest.TestCase):
list ( responses )
def testFullDuplexCallCancelled ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
with _CreateService ( ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( )
responses = stub . FullDuplexCall (
request_iterator , test_constants . LONG_TIMEOUT )
next ( responses )
@ -515,9 +431,8 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testFullDuplexCallFailed ( self ) :
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( )
with _CreateService ( ) as ( methods , stub ) :
with methods . fail ( ) :
responses = stub . FullDuplexCall (
request_iterator , test_constants . LONG_TIMEOUT )
@ -526,13 +441,12 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testHalfDuplexCall ( self ) :
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
def half_duplex_request_iterator ( ) :
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
yield request
@ -557,14 +471,13 @@ class PythonPluginTest(unittest.TestCase):
wait_cell [ 0 ] = False
condition . notify_all ( )
def half_duplex_request_iterator ( ) :
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
with condition :
while wait_cell [ 0 ] :
condition . wait ( )
with _CreateService ( self . _service_pb2 , self . _response_pb2 ,
self . _payload_pb2 ) as ( methods , stub ) :
with _CreateService ( ) as ( methods , stub ) :
with wait ( ) :
responses = stub . HalfDuplexCall (
half_duplex_request_iterator ( ) , test_constants . SHORT_TIMEOUT )
@ -574,5 +487,4 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == ' __main__ ' :
#os.chdir(os.path.dirname(sys.argv[0]))
unittest . main ( verbosity = 2 )