@ -45,6 +45,11 @@ from six import moves
import grpc
import grpc
from tests . unit . framework . common import test_constants
from tests . unit . framework . common import test_constants
import tests . protoc_plugin . protos . payload . test_payload_pb2 as payload_pb2
import tests . protoc_plugin . protos . requests . r . test_requests_pb2 as request_pb2
import tests . protoc_plugin . protos . responses . test_responses_pb2 as response_pb2
import tests . protoc_plugin . protos . service . test_service_pb2 as service_pb2
# Identifiers of entities we expect to find in the generated module.
# Identifiers of entities we expect to find in the generated module.
STUB_IDENTIFIER = ' TestServiceStub '
STUB_IDENTIFIER = ' TestServiceStub '
SERVICER_IDENTIFIER = ' TestServiceServicer '
SERVICER_IDENTIFIER = ' TestServiceServicer '
@ -53,12 +58,10 @@ ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server'
class _ServicerMethods ( object ) :
class _ServicerMethods ( object ) :
def __init__ ( self , response_pb2 , payload_pb2 ) :
def __init__ ( self ) :
self . _condition = threading . Condition ( )
self . _condition = threading . Condition ( )
self . _paused = False
self . _paused = False
self . _fail = False
self . _fail = False
self . _response_pb2 = response_pb2
self . _payload_pb2 = payload_pb2
@contextlib . contextmanager
@contextlib . contextmanager
def pause ( self ) : # pylint: disable=invalid-name
def pause ( self ) : # pylint: disable=invalid-name
@ -85,22 +88,22 @@ class _ServicerMethods(object):
self . _condition . wait ( )
self . _condition . wait ( )
def UnaryCall ( self , request , unused_rpc_context ) :
def UnaryCall ( self , request , unused_rpc_context ) :
response = self . _ response_pb2. SimpleResponse ( )
response = response_pb2 . SimpleResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * request . response_size
response . payload . payload_compressable = ' a ' * request . response_size
self . _control ( )
self . _control ( )
return response
return response
def StreamingOutputCall ( self , request , unused_rpc_context ) :
def StreamingOutputCall ( self , request , unused_rpc_context ) :
for parameter in request . response_parameters :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
self . _control ( )
yield response
yield response
def StreamingInputCall ( self , request_iter , unused_rpc_context ) :
def StreamingInputCall ( self , request_iter , unused_rpc_context ) :
response = self . _ response_pb2. StreamingInputCallResponse ( )
response = response_pb2 . StreamingInputCallResponse ( )
aggregated_payload_size = 0
aggregated_payload_size = 0
for request in request_iter :
for request in request_iter :
aggregated_payload_size + = len ( request . payload . payload_compressable )
aggregated_payload_size + = len ( request . payload . payload_compressable )
@ -111,8 +114,8 @@ class _ServicerMethods(object):
def FullDuplexCall ( self , request_iter , unused_rpc_context ) :
def FullDuplexCall ( self , request_iter , unused_rpc_context ) :
for request in request_iter :
for request in request_iter :
for parameter in request . response_parameters :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
self . _control ( )
yield response
yield response
@ -121,8 +124,8 @@ class _ServicerMethods(object):
responses = [ ]
responses = [ ]
for request in request_iter :
for request in request_iter :
for parameter in request . response_parameters :
for parameter in request . response_parameters :
response = self . _ response_pb2. StreamingOutputCallResponse ( )
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
self . _control ( )
responses . append ( response )
responses . append ( response )
@ -142,18 +145,13 @@ class _Service(
"""
"""
def _CreateService ( service_pb2 , response_pb2 , payload_pb2 ) :
def _CreateService ( ) :
""" Provides a servicer backend and a stub.
""" Provides a servicer backend and a stub.
Args :
service_pb2 : The service_pb2 module generated by this test .
response_pb2 : The response_pb2 module generated by this test .
payload_pb2 : The payload_pb2 module generated by this test .
Returns :
Returns :
A _Service with which to test RPCs .
A _Service with which to test RPCs .
"""
"""
servicer_methods = _ServicerMethods ( response_pb2 , payload_pb2 )
servicer_methods = _ServicerMethods ( )
class Servicer ( getattr ( service_pb2 , SERVICER_IDENTIFIER ) ) :
class Servicer ( getattr ( service_pb2 , SERVICER_IDENTIFIER ) ) :
@ -182,12 +180,9 @@ def _CreateService(service_pb2, response_pb2, payload_pb2):
return _Service ( servicer_methods , server , stub )
return _Service ( servicer_methods , server , stub )
def _CreateIncompleteService ( service_pb2 ) :
def _CreateIncompleteService ( ) :
""" Provides a servicer backend that fails to implement methods and its stub.
""" Provides a servicer backend that fails to implement methods and its stub.
Args :
service_pb2 : The service_pb2 module generated by this test .
Returns :
Returns :
A _Service with which to test RPCs . The returned _Service ' s
A _Service with which to test RPCs . The returned _Service ' s
servicer_methods implements none of the methods required of it .
servicer_methods implements none of the methods required of it .
@ -206,7 +201,7 @@ def _CreateIncompleteService(service_pb2):
return _Service ( None , server , stub )
return _Service ( None , server , stub )
def _streaming_input_request_iterator ( request_pb2 , payload_pb2 ) :
def _streaming_input_request_iterator ( ) :
for _ in range ( 3 ) :
for _ in range ( 3 ) :
request = request_pb2 . StreamingInputCallRequest ( )
request = request_pb2 . StreamingInputCallRequest ( )
request . payload . payload_type = payload_pb2 . COMPRESSABLE
request . payload . payload_type = payload_pb2 . COMPRESSABLE
@ -214,7 +209,7 @@ def _streaming_input_request_iterator(request_pb2, payload_pb2):
yield request
yield request
def _streaming_output_request ( request_pb2 ) :
def _streaming_output_request ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
sizes = [ 1 , 2 , 3 ]
sizes = [ 1 , 2 , 3 ]
request . response_parameters . add ( size = sizes [ 0 ] , interval_us = 0 )
request . response_parameters . add ( size = sizes [ 0 ] , interval_us = 0 )
@ -223,7 +218,7 @@ def _streaming_output_request(request_pb2):
return request
return request
def _full_duplex_request_iterator ( request_pb2 ) :
def _full_duplex_request_iterator ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
yield request
@ -241,102 +236,40 @@ class PythonPluginTest(unittest.TestCase):
methods and does not exist for response - streaming methods .
methods and does not exist for response - streaming methods .
"""
"""
def setUp ( self ) :
# Assume that the appropriate protoc and grpc_python_plugins are on the
# path.
protoc_command = ' protoc '
protoc_plugin_filename = distutils . spawn . find_executable (
' grpc_python_plugin ' )
if not os . path . isfile ( protoc_command ) :
# Assume that if we haven't built protoc that it's on the system.
protoc_command = ' protoc '
# Ensure that the output directory exists.
self . outdir = tempfile . mkdtemp ( )
# Find all proto files
paths = [ ]
root_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
proto_dir = os . path . join ( root_dir , ' protos ' )
for walk_root , _ , filenames in os . walk ( proto_dir ) :
for filename in filenames :
if filename . endswith ( ' .proto ' ) :
path = os . path . join ( walk_root , filename )
paths . append ( path )
# Invoke protoc with the plugin.
cmd = [
protoc_command ,
' --plugin=protoc-gen-python-grpc= %s ' % protoc_plugin_filename ,
' -I %s ' % root_dir ,
' --python_out= %s ' % self . outdir ,
' --python-grpc_out= %s ' % self . outdir
] + paths
subprocess . check_call ( ' ' . join ( cmd ) , shell = True , env = os . environ ,
cwd = os . path . dirname ( os . path . realpath ( __file__ ) ) )
# Generated proto directories dont include __init__.py, but
# these are needed for python package resolution
for walk_root , _ , _ in os . walk ( os . path . join ( self . outdir , ' protos ' ) ) :
path = os . path . join ( walk_root , ' __init__.py ' )
open ( path , ' a ' ) . close ( )
sys . path . insert ( 0 , self . outdir )
import protos . payload . test_payload_pb2 as payload_pb2
import protos . requests . r . test_requests_pb2 as request_pb2
import protos . responses . test_responses_pb2 as response_pb2
import protos . service . test_service_pb2 as service_pb2
self . _payload_pb2 = payload_pb2
self . _request_pb2 = request_pb2
self . _response_pb2 = response_pb2
self . _service_pb2 = service_pb2
def tearDown ( self ) :
try :
shutil . rmtree ( self . outdir )
except OSError as exc :
if exc . errno != errno . ENOENT :
raise
sys . path . remove ( self . outdir )
def testImportAttributes ( self ) :
def testImportAttributes ( self ) :
# check that we can access the generated module and its members.
# check that we can access the generated module and its members.
self . assertIsNotNone (
self . assertIsNotNone (
getattr ( self . _ service_pb2, STUB_IDENTIFIER , None ) )
getattr ( service_pb2 , STUB_IDENTIFIER , None ) )
self . assertIsNotNone (
self . assertIsNotNone (
getattr ( self . _ service_pb2, SERVICER_IDENTIFIER , None ) )
getattr ( service_pb2 , SERVICER_IDENTIFIER , None ) )
self . assertIsNotNone (
self . assertIsNotNone (
getattr ( self . _ service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER , None ) )
getattr ( service_pb2 , ADD_SERVICER_TO_SERVER_IDENTIFIER , None ) )
def testUpDown ( self ) :
def testUpDown ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
self . assertIsNotNone ( service . servicer_methods )
self . assertIsNotNone ( service . servicer_methods )
self . assertIsNotNone ( service . server )
self . assertIsNotNone ( service . server )
self . assertIsNotNone ( service . stub )
self . assertIsNotNone ( service . stub )
def testIncompleteServicer ( self ) :
def testIncompleteServicer ( self ) :
service = _CreateIncompleteService ( self . _service_pb2 )
service = _CreateIncompleteService ( )
request = self . _ request_pb2. SimpleRequest ( response_size = 13 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
with self . assertRaises ( grpc . RpcError ) as exception_context :
with self . assertRaises ( grpc . RpcError ) as exception_context :
service . stub . UnaryCall ( request )
service . stub . UnaryCall ( request )
self . assertIs (
self . assertIs (
exception_context . exception . code ( ) , grpc . StatusCode . UNIMPLEMENTED )
exception_context . exception . code ( ) , grpc . StatusCode . UNIMPLEMENTED )
def testUnaryCall ( self ) :
def testUnaryCall ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
response = service . stub . UnaryCall ( request )
response = service . stub . UnaryCall ( request )
expected_response = service . servicer_methods . UnaryCall (
expected_response = service . servicer_methods . UnaryCall (
request , ' not a real context! ' )
request , ' not a real context! ' )
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testUnaryCallFuture ( self ) :
def testUnaryCallFuture ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
# Check that the call does not block waiting for the server to respond.
# Check that the call does not block waiting for the server to respond.
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . UnaryCall . future ( request )
response_future = service . stub . UnaryCall . future ( request )
@ -346,9 +279,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testUnaryCallFutureExpired ( self ) :
def testUnaryCallFutureExpired ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . UnaryCall . future (
response_future = service . stub . UnaryCall . future (
request , timeout = test_constants . SHORT_TIMEOUT )
request , timeout = test_constants . SHORT_TIMEOUT )
@ -359,9 +291,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertIs ( response_future . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
def testUnaryCallFutureCancelled ( self ) :
def testUnaryCallFutureCancelled ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . UnaryCall . future ( request )
response_future = service . stub . UnaryCall . future ( request )
response_future . cancel ( )
response_future . cancel ( )
@ -369,18 +300,16 @@ class PythonPluginTest(unittest.TestCase):
self . assertIs ( response_future . code ( ) , grpc . StatusCode . CANCELLED )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . CANCELLED )
def testUnaryCallFutureFailed ( self ) :
def testUnaryCallFutureFailed ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = request_pb2 . SimpleRequest ( response_size = 13 )
request = self . _request_pb2 . SimpleRequest ( response_size = 13 )
with service . servicer_methods . fail ( ) :
with service . servicer_methods . fail ( ) :
response_future = service . stub . UnaryCall . future ( request )
response_future = service . stub . UnaryCall . future ( request )
self . assertIsNotNone ( response_future . exception ( ) )
self . assertIsNotNone ( response_future . exception ( ) )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . UNKNOWN )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . UNKNOWN )
def testStreamingOutputCall ( self ) :
def testStreamingOutputCall ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = _streaming_output_request ( )
request = _streaming_output_request ( self . _request_pb2 )
responses = service . stub . StreamingOutputCall ( request )
responses = service . stub . StreamingOutputCall ( request )
expected_responses = service . servicer_methods . StreamingOutputCall (
expected_responses = service . servicer_methods . StreamingOutputCall (
request , ' not a real RpcContext! ' )
request , ' not a real RpcContext! ' )
@ -389,9 +318,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testStreamingOutputCallExpired ( self ) :
def testStreamingOutputCallExpired ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = _streaming_output_request ( )
request = _streaming_output_request ( self . _request_pb2 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
responses = service . stub . StreamingOutputCall (
responses = service . stub . StreamingOutputCall (
request , timeout = test_constants . SHORT_TIMEOUT )
request , timeout = test_constants . SHORT_TIMEOUT )
@ -401,9 +329,8 @@ class PythonPluginTest(unittest.TestCase):
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
def testStreamingOutputCallCancelled ( self ) :
def testStreamingOutputCallCancelled ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = _streaming_output_request ( )
request = _streaming_output_request ( self . _request_pb2 )
responses = service . stub . StreamingOutputCall ( request )
responses = service . stub . StreamingOutputCall ( request )
next ( responses )
next ( responses )
responses . cancel ( )
responses . cancel ( )
@ -412,9 +339,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertIs ( responses . code ( ) , grpc . StatusCode . CANCELLED )
self . assertIs ( responses . code ( ) , grpc . StatusCode . CANCELLED )
def testStreamingOutputCallFailed ( self ) :
def testStreamingOutputCallFailed ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request = _streaming_output_request ( )
request = _streaming_output_request ( self . _request_pb2 )
with service . servicer_methods . fail ( ) :
with service . servicer_methods . fail ( ) :
responses = service . stub . StreamingOutputCall ( request )
responses = service . stub . StreamingOutputCall ( request )
self . assertIsNotNone ( responses )
self . assertIsNotNone ( responses )
@ -423,36 +349,30 @@ class PythonPluginTest(unittest.TestCase):
self . assertIs ( exception_context . exception . code ( ) , grpc . StatusCode . UNKNOWN )
self . assertIs ( exception_context . exception . code ( ) , grpc . StatusCode . UNKNOWN )
def testStreamingInputCall ( self ) :
def testStreamingInputCall ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
response = service . stub . StreamingInputCall (
response = service . stub . StreamingInputCall (
_streaming_input_request_iterator (
_streaming_input_request_iterator ( ) )
self . _request_pb2 , self . _payload_pb2 ) )
expected_response = service . servicer_methods . StreamingInputCall (
expected_response = service . servicer_methods . StreamingInputCall (
_streaming_input_request_iterator ( self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
' not a real RpcContext! ' )
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFuture ( self ) :
def testStreamingInputCallFuture ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . StreamingInputCall . future (
response_future = service . stub . StreamingInputCall . future (
_streaming_input_request_iterator (
_streaming_input_request_iterator ( ) )
self . _request_pb2 , self . _payload_pb2 ) )
response = response_future . result ( )
response = response_future . result ( )
expected_response = service . servicer_methods . StreamingInputCall (
expected_response = service . servicer_methods . StreamingInputCall (
_streaming_input_request_iterator ( self . _request_pb2 , self . _payload_pb2 ) ,
_streaming_input_request_iterator ( ) ,
' not a real RpcContext! ' )
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFutureExpired ( self ) :
def testStreamingInputCallFutureExpired ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . StreamingInputCall . future (
response_future = service . stub . StreamingInputCall . future (
_streaming_input_request_iterator (
_streaming_input_request_iterator ( ) ,
self . _request_pb2 , self . _payload_pb2 ) ,
timeout = test_constants . SHORT_TIMEOUT )
timeout = test_constants . SHORT_TIMEOUT )
with self . assertRaises ( grpc . RpcError ) as exception_context :
with self . assertRaises ( grpc . RpcError ) as exception_context :
response_future . result ( )
response_future . result ( )
@ -463,43 +383,37 @@ class PythonPluginTest(unittest.TestCase):
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
def testStreamingInputCallFutureCancelled ( self ) :
def testStreamingInputCallFutureCancelled ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
response_future = service . stub . StreamingInputCall . future (
response_future = service . stub . StreamingInputCall . future (
_streaming_input_request_iterator (
_streaming_input_request_iterator ( ) )
self . _request_pb2 , self . _payload_pb2 ) )
response_future . cancel ( )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
self . assertTrue ( response_future . cancelled ( ) )
with self . assertRaises ( grpc . FutureCancelledError ) :
with self . assertRaises ( grpc . FutureCancelledError ) :
response_future . result ( )
response_future . result ( )
def testStreamingInputCallFutureFailed ( self ) :
def testStreamingInputCallFutureFailed ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . fail ( ) :
with service . servicer_methods . fail ( ) :
response_future = service . stub . StreamingInputCall . future (
response_future = service . stub . StreamingInputCall . future (
_streaming_input_request_iterator (
_streaming_input_request_iterator ( ) )
self . _request_pb2 , self . _payload_pb2 ) )
self . assertIsNotNone ( response_future . exception ( ) )
self . assertIsNotNone ( response_future . exception ( ) )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . UNKNOWN )
self . assertIs ( response_future . code ( ) , grpc . StatusCode . UNKNOWN )
def testFullDuplexCall ( self ) :
def testFullDuplexCall ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
responses = service . stub . FullDuplexCall (
responses = service . stub . FullDuplexCall (
_full_duplex_request_iterator ( self . _request_pb2 ) )
_full_duplex_request_iterator ( ) )
expected_responses = service . servicer_methods . FullDuplexCall (
expected_responses = service . servicer_methods . FullDuplexCall (
_full_duplex_request_iterator ( self . _request_pb2 ) ,
_full_duplex_request_iterator ( ) ,
' not a real RpcContext! ' )
' not a real RpcContext! ' )
for expected_response , response in moves . zip_longest (
for expected_response , response in moves . zip_longest (
expected_responses , responses ) :
expected_responses , responses ) :
self . assertEqual ( expected_response , response )
self . assertEqual ( expected_response , response )
def testFullDuplexCallExpired ( self ) :
def testFullDuplexCallExpired ( self ) :
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
request_iterator = _full_duplex_request_iterator ( )
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . pause ( ) :
with service . servicer_methods . pause ( ) :
responses = service . stub . FullDuplexCall (
responses = service . stub . FullDuplexCall (
request_iterator , timeout = test_constants . SHORT_TIMEOUT )
request_iterator , timeout = test_constants . SHORT_TIMEOUT )
@ -509,9 +423,8 @@ class PythonPluginTest(unittest.TestCase):
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
exception_context . exception . code ( ) , grpc . StatusCode . DEADLINE_EXCEEDED )
def testFullDuplexCallCancelled ( self ) :
def testFullDuplexCallCancelled ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
request_iterator = _full_duplex_request_iterator ( )
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
responses = service . stub . FullDuplexCall ( request_iterator )
responses = service . stub . FullDuplexCall ( request_iterator )
next ( responses )
next ( responses )
responses . cancel ( )
responses . cancel ( )
@ -521,9 +434,8 @@ class PythonPluginTest(unittest.TestCase):
exception_context . exception . code ( ) , grpc . StatusCode . CANCELLED )
exception_context . exception . code ( ) , grpc . StatusCode . CANCELLED )
def testFullDuplexCallFailed ( self ) :
def testFullDuplexCallFailed ( self ) :
request_iterator = _full_duplex_request_iterator ( self . _request_pb2 )
request_iterator = _full_duplex_request_iterator ( )
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with service . servicer_methods . fail ( ) :
with service . servicer_methods . fail ( ) :
responses = service . stub . FullDuplexCall ( request_iterator )
responses = service . stub . FullDuplexCall ( request_iterator )
with self . assertRaises ( grpc . RpcError ) as exception_context :
with self . assertRaises ( grpc . RpcError ) as exception_context :
@ -531,13 +443,12 @@ class PythonPluginTest(unittest.TestCase):
self . assertIs ( exception_context . exception . code ( ) , grpc . StatusCode . UNKNOWN )
self . assertIs ( exception_context . exception . code ( ) , grpc . StatusCode . UNKNOWN )
def testHalfDuplexCall ( self ) :
def testHalfDuplexCall ( self ) :
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
def half_duplex_request_iterator ( ) :
def half_duplex_request_iterator ( ) :
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
yield request
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
yield request
yield request
@ -561,14 +472,13 @@ class PythonPluginTest(unittest.TestCase):
wait_cell [ 0 ] = False
wait_cell [ 0 ] = False
condition . notify_all ( )
condition . notify_all ( )
def half_duplex_request_iterator ( ) :
def half_duplex_request_iterator ( ) :
request = self . _ request_pb2. StreamingOutputCallRequest ( )
request = request_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
yield request
with condition :
with condition :
while wait_cell [ 0 ] :
while wait_cell [ 0 ] :
condition . wait ( )
condition . wait ( )
service = _CreateService (
service = _CreateService ( )
self . _service_pb2 , self . _response_pb2 , self . _payload_pb2 )
with wait ( ) :
with wait ( ) :
responses = service . stub . HalfDuplexCall (
responses = service . stub . HalfDuplexCall (
half_duplex_request_iterator ( ) , timeout = test_constants . SHORT_TIMEOUT )
half_duplex_request_iterator ( ) , timeout = test_constants . SHORT_TIMEOUT )