@ -66,8 +66,8 @@ class _ServicerMethods(object):
def __init__ ( self , test_pb2 , delay ) :
self . _paused = False
self . _failed = False
self . test_pb2 = test_pb2
self . delay = delay
self . _ test_pb2 = test_pb2
self . _ delay = delay
@contextlib . contextmanager
def pause ( self ) : # pylint: disable=invalid-name
@ -84,27 +84,27 @@ class _ServicerMethods(object):
def _control ( self ) : # pylint: disable=invalid-name
if self . _failed :
raise ValueError ( )
time . sleep ( self . delay )
time . sleep ( self . _ delay)
while self . _paused :
time . sleep ( 0 )
def UnaryCall ( self , request , unused_context ) :
response = self . test_pb2 . SimpleResponse ( )
response . payload . payload_type = self . test_pb2 . COMPRESSABLE
def UnaryCall ( self , request , unused_rpc_ context ) :
response = self . _ test_pb2. SimpleResponse ( )
response . payload . payload_type = self . _ test_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * request . response_size
self . _control ( )
return response
def StreamingOutputCall ( self , request , unused_context ) :
def StreamingOutputCall ( self , request , unused_rpc_ context ) :
for parameter in request . response_parameters :
response = self . test_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . test_pb2 . COMPRESSABLE
response = self . _ test_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ test_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
def StreamingInputCall ( self , request_iter , unused_context ) :
response = self . test_pb2 . StreamingInputCallResponse ( )
def StreamingInputCall ( self , request_iter , unused_rpc_ context ) :
response = self . _ test_pb2. StreamingInputCallResponse ( )
aggregated_payload_size = 0
for request in request_iter :
aggregated_payload_size + = len ( request . payload . payload_compressable )
@ -112,21 +112,21 @@ class _ServicerMethods(object):
self . _control ( )
return response
def FullDuplexCall ( self , request_iter , unused_context ) :
def FullDuplexCall ( self , request_iter , unused_rpc_ context ) :
for request in request_iter :
for parameter in request . response_parameters :
response = self . test_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . test_pb2 . COMPRESSABLE
response = self . _ test_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ test_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
def HalfDuplexCall ( self , request_iter , unused_context ) :
def HalfDuplexCall ( self , request_iter , unused_rpc_ context ) :
responses = [ ]
for request in request_iter :
for parameter in request . response_parameters :
response = self . test_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = self . test_pb2 . COMPRESSABLE
response = self . _ test_pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ test_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
responses . append ( response )
@ -152,7 +152,7 @@ def _CreateService(test_pb2, delay):
timeout : how long the stub will wait for the servicer by default .
Yields :
A three - tuple ( servicer_methods , servicer , stub ) , where the servicer is
A ( servicer_methods , servicer , stub ) three - tuple where servicer_methods is
the back - end of the service bound to the stub and the server and stub
are both activated and ready for use .
"""
@ -185,7 +185,7 @@ def _CreateService(test_pb2, delay):
yield servicer_methods , stub , server
def StreamingInputRequest ( test_pb2 ) :
def _streaming_input_request_iterator ( test_pb2 ) :
for _ in range ( 3 ) :
request = test_pb2 . StreamingInputCallRequest ( )
request . payload . payload_type = test_pb2 . COMPRESSABLE
@ -193,7 +193,7 @@ def StreamingInputRequest(test_pb2):
yield request
def StreamingOutputR equest( test_pb2 ) :
def _streaming_output_r equest( test_pb2 ) :
request = test_pb2 . StreamingOutputCallRequest ( )
sizes = [ 1 , 2 , 3 ]
request . response_parameters . add ( size = sizes [ 0 ] , interval_us = 0 )
@ -202,7 +202,7 @@ def StreamingOutputRequest(test_pb2):
return request
def FullDuplexRequest ( test_pb2 ) :
def _full_duplex_request_iterator ( test_pb2 ) :
request = test_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
@ -270,32 +270,32 @@ class PythonPluginTest(unittest.TestCase):
def testUnaryCall ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
request = test_pb2 . SimpleRequest ( response_size = 13 )
response = stub . UnaryCall ( request , NORMAL_TIMEOUT )
expected_response = servicer . UnaryCall ( request , None )
expected_response = method s. UnaryCall ( request , ' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testUnaryCallAsync ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( test_pb2 , LONG_DELAY ) as (
servicer , stub , unused_server ) :
method s, stub , unused_server ) :
start_time = time . clock ( )
response_future = stub . UnaryCall . async ( request , LONG_TIMEOUT )
# Check that we didn't block on the asynchronous call.
self . assertGreater ( LONG_DELAY , time . clock ( ) - start_time )
response = response_future . result ( )
expected_response = servicer . UnaryCall ( request , None )
expected_response = method s. UnaryCall ( request , ' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testUnaryCallAsyncExpired ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
# set the timeout super low...
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
method s, stub , unused_server ) :
request = test_pb2 . SimpleRequest ( response_size = 13 )
with servicer . pause ( ) :
with method s. pause ( ) :
response_future = stub . UnaryCall . async ( request , SHORT_TIMEOUT )
with self . assertRaises ( exceptions . ExpirationError ) :
response_future . result ( )
@ -306,8 +306,8 @@ class PythonPluginTest(unittest.TestCase):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . pause ( ) :
method s, stub , unused_server ) :
with method s. pause ( ) :
response_future = stub . UnaryCall . async ( request , 1 )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
@ -316,29 +316,30 @@ class PythonPluginTest(unittest.TestCase):
import test_pb2 # pylint: disable=g-import-not-at-top
request = test_pb2 . SimpleRequest ( response_size = 13 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . fail ( ) :
method s, stub , unused_server ) :
with method s. fail ( ) :
response_future = stub . UnaryCall . async ( request , NORMAL_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testStreamingOutputCall ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputR equest( test_pb2 )
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
request = _streaming_output_r equest( test_pb2 )
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
responses = stub . StreamingOutputCall ( request , NORMAL_TIMEOUT )
expected_responses = servicer . StreamingOutputCall ( request , None )
for check in itertools . izip_longest ( expected_responses , responses ) :
expected_response , response = check
expected_responses = methods . StreamingOutputCall (
request , ' not a real RpcContext! ' )
for expected_response , response in itertools . izip_longest (
expected_responses , responses ) :
self . assertEqual ( expected_response , response )
@unittest . skip ( ' TODO(atash,nathaniel): figure out why this flakily hangs '
' forever and fix. ' )
def testStreamingOutputCallExpired ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputR equest( test_pb2 )
request = _streaming_output_r equest( test_pb2 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . pause ( ) :
method s, stub , unused_server ) :
with method s. pause ( ) :
responses = stub . StreamingOutputCall ( request , SHORT_TIMEOUT )
with self . assertRaises ( exceptions . ExpirationError ) :
list ( responses )
@ -347,9 +348,9 @@ class PythonPluginTest(unittest.TestCase):
' forever and fix. ' )
def testStreamingOutputCallCancelled ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputR equest( test_pb2 )
request = _streaming_output_r equest( test_pb2 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
unused_servicer , stub , unused_server ) :
unused_method s , stub , unused_server ) :
responses = stub . StreamingOutputCall ( request , SHORT_TIMEOUT )
next ( responses )
responses . cancel ( )
@ -360,10 +361,10 @@ class PythonPluginTest(unittest.TestCase):
' instead of raising the proper error. ' )
def testStreamingOutputCallFailed ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = StreamingOutputR equest( test_pb2 )
request = _streaming_output_r equest( test_pb2 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . fail ( ) :
method s, stub , unused_server ) :
with method s. fail ( ) :
responses = stub . StreamingOutputCall ( request , 1 )
self . assertIsNotNone ( responses )
with self . assertRaises ( exceptions . ServicerError ) :
@ -373,34 +374,34 @@ class PythonPluginTest(unittest.TestCase):
' forever and fix. ' )
def testStreamingInputCall ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
response = stub . StreamingInputCall ( StreamingInputRequest ( test_pb2 ) ,
NORMAL_TIMEOUT )
expected_response = servicer . StreamingInputCall (
StreamingInputRequest ( test_pb2 ) , None )
expected_response = method s. StreamingInputCall (
_streaming_input_request_iterator ( test_pb2 ) , ' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallAsync ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , LONG_DELAY ) as (
servicer , stub , unused_server ) :
method s, stub , unused_server ) :
start_time = time . clock ( )
response_future = stub . StreamingInputCall . async (
StreamingInputRequest ( test_pb2 ) , LONG_TIMEOUT )
_streaming_input_request_iterator ( test_pb2 ) , LONG_TIMEOUT )
self . assertGreater ( LONG_DELAY , time . clock ( ) - start_time )
response = response_future . result ( )
expected_response = servicer . StreamingInputCall (
StreamingInputRequest ( test_pb2 ) , None )
expected_response = method s. StreamingInputCall (
_streaming_input_request_iterator ( test_pb2 ) , ' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallAsyncExpired ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
# set the timeout super low...
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . pause ( ) :
method s, stub , unused_server ) :
with method s. pause ( ) :
response_future = stub . StreamingInputCall . async (
StreamingInputRequest ( test_pb2 ) , SHORT_TIMEOUT )
_streaming_input_request_iterator ( test_pb2 ) , SHORT_TIMEOUT )
with self . assertRaises ( exceptions . ExpirationError ) :
response_future . result ( )
self . assertIsInstance (
@ -409,10 +410,10 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncCancelled ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . pause ( ) :
method s, stub , unused_server ) :
with method s. pause ( ) :
response_future = stub . StreamingInputCall . async (
StreamingInputRequest ( test_pb2 ) , NORMAL_TIMEOUT )
_streaming_input_request_iterator ( test_pb2 ) , NORMAL_TIMEOUT )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
with self . assertRaises ( future . CancelledError ) :
@ -421,32 +422,32 @@ class PythonPluginTest(unittest.TestCase):
def testStreamingInputCallAsyncFailed ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . fail ( ) :
method s, stub , unused_server ) :
with method s. fail ( ) :
response_future = stub . StreamingInputCall . async (
StreamingInputRequest ( test_pb2 ) , SHORT_TIMEOUT )
_streaming_input_request_iterator ( test_pb2 ) , SHORT_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testFullDuplexCall ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
responses = stub . FullDuplexCall ( FullDuplexRequest ( test_pb2 ) ,
NORMAL_TIMEOUT )
expected_responses = servicer . FullDuplexCall ( FullDuplexRequest ( test_pb2 ) ,
None )
for check in itertools . izip_longest ( expected_responses , responses ) :
expected_response , response = check
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
responses = stub . FullDuplexCall (
_full_duplex_request_iterator ( test_pb2 ) , NORMAL_TIMEOUT )
expected_responses = method s. FullDuplexCall (
_full_duplex_request_iterator ( test_pb2 ) , ' not a real RpcContext! ' )
for expected_response , response in itertools . izip_longest (
expected_responses , responses ) :
self . assertEqual ( expected_response , response )
@unittest . skip ( ' TODO(atash,nathaniel): figure out why this flakily hangs '
' forever and fix. ' )
def testFullDuplexCallExpired ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest ( test_pb2 )
request_iterator = _full_duplex_request_iterator ( test_pb2 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . pause ( ) :
responses = stub . FullDuplexCall ( request , SHORT_TIMEOUT )
method s, stub , unused_server ) :
with method s. pause ( ) :
responses = stub . FullDuplexCall ( request_iterator , SHORT_TIMEOUT )
with self . assertRaises ( exceptions . ExpirationError ) :
list ( responses )
@ -454,9 +455,9 @@ class PythonPluginTest(unittest.TestCase):
' forever and fix. ' )
def testFullDuplexCallCancelled ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
request = FullDuplexRequest ( test_pb2 )
responses = stub . FullDuplexCall ( request , NORMAL_TIMEOUT )
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
request_iterator = _full_duplex_request_iterator ( test_pb2 )
responses = stub . FullDuplexCall ( request_iterator , NORMAL_TIMEOUT )
next ( responses )
responses . cancel ( )
with self . assertRaises ( future . CancelledError ) :
@ -466,11 +467,11 @@ class PythonPluginTest(unittest.TestCase):
' and fix. ' )
def testFullDuplexCallFailed ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
request = FullDuplexRequest ( test_pb2 )
request_iterator = _full_duplex_request_iterator ( test_pb2 )
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
with servicer . fail ( ) :
responses = stub . FullDuplexCall ( request , NORMAL_TIMEOUT )
method s, stub , unused_server ) :
with method s. fail ( ) :
responses = stub . FullDuplexCall ( request_iterator , NORMAL_TIMEOUT )
self . assertIsNotNone ( responses )
with self . assertRaises ( exceptions . ServicerError ) :
next ( responses )
@ -480,8 +481,8 @@ class PythonPluginTest(unittest.TestCase):
def testHalfDuplexCall ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
with _CreateService ( test_pb2 , DOES_NOT_MATTER_DELAY ) as (
servicer , stub , unused_server ) :
def HalfDuplexRequest ( ) :
method s, stub , unused_server ) :
def half_duplex_request_iterator ( ) :
request = test_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
@ -489,30 +490,33 @@ class PythonPluginTest(unittest.TestCase):
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
yield request
responses = stub . HalfDuplexCall ( HalfDuplexRequest ( ) , NORMAL_TIMEOUT )
expected_responses = servicer . HalfDuplexCall ( HalfDuplexRequest ( ) , None )
responses = stub . HalfDuplexCall (
half_duplex_request_iterator ( ) , NORMAL_TIMEOUT )
expected_responses = methods . HalfDuplexCall (
HalfDuplexRequest ( ) , ' not a real RpcContext! ' )
for check in itertools . izip_longest ( expected_responses , responses ) :
expected_response , response = check
self . assertEqual ( expected_response , response )
def testHalfDuplexCallWedged ( self ) :
import test_pb2 # pylint: disable=g-import-not-at-top
wait_flag = [ False ]
wait_cell = [ False ]
@contextlib . contextmanager
def wait ( ) : # pylint: disable=invalid-name
# Where's Python 3's 'nonlocal' statement when you need it?
wait_flag [ 0 ] = True
wait_cell [ 0 ] = True
yield
wait_flag [ 0 ] = False
def HalfDuplexRequest ( ) :
wait_cell [ 0 ] = False
def half_duplex_request_iterator ( ) :
request = test_pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
while wait_flag [ 0 ] :
while wait_cell [ 0 ] :
time . sleep ( 0.1 )
with _CreateService ( test_pb2 , NO_DELAY ) as ( servicer , stub , unused_server ) :
with _CreateService ( test_pb2 , NO_DELAY ) as ( method s, stub , unused_server ) :
with wait ( ) :
responses = stub . HalfDuplexCall ( HalfDuplexRequest ( ) , NORMAL_TIMEOUT )
responses = stub . HalfDuplexCall (
half_duplex_request_iterator ( ) , NORMAL_TIMEOUT )
# half-duplex waits for the client to send all info
with self . assertRaises ( exceptions . ExpirationError ) :
next ( responses )