@ -12,19 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import contextlib
import distutils . spawn
import errno
import itertools
import importlib
import os
import pkg_resources
from os import path
import pkgutil
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import unittest
from six import moves
@ -33,12 +29,22 @@ from grpc.beta import implementations
from grpc . beta import interfaces
from grpc . framework . foundation import future
from grpc . framework . interfaces . face import face
from grpc_tools import protoc
from tests . unit . framework . common import test_constants
import tests . protoc_plugin . protos . payload . test_payload_pb2 as payload_pb2
import tests . protoc_plugin . protos . requests . r . test_requests_pb2 as request_pb2
import tests . protoc_plugin . protos . responses . test_responses_pb2 as response_pb2
import tests . protoc_plugin . protos . service . test_service_pb2 as service_pb2
_RELATIVE_PROTO_PATH = ' relative_proto_path '
_RELATIVE_PYTHON_OUT = ' relative_python_out '
_PROTO_FILES_PATH_COMPONENTS = (
( ' beta_grpc_plugin_test ' , ' payload ' , ' test_payload.proto ' , ) ,
( ' beta_grpc_plugin_test ' , ' requests ' , ' r ' , ' test_requests.proto ' , ) ,
( ' beta_grpc_plugin_test ' , ' responses ' , ' test_responses.proto ' , ) ,
( ' beta_grpc_plugin_test ' , ' service ' , ' test_service.proto ' , ) , )
_PAYLOAD_PB2 = ' beta_grpc_plugin_test.payload.test_payload_pb2 '
_REQUESTS_PB2 = ' beta_grpc_plugin_test.requests.r.test_requests_pb2 '
_RESPONSES_PB2 = ' beta_grpc_plugin_test.responses.test_responses_pb2 '
_SERVICE_PB2 = ' beta_grpc_plugin_test.service.test_service_pb2 '
# Identifiers of entities we expect to find in the generated module.
SERVICER_IDENTIFIER = ' BetaTestServiceServicer '
@ -47,12 +53,50 @@ SERVER_FACTORY_IDENTIFIER = 'beta_create_TestService_server'
STUB_FACTORY_IDENTIFIER = ' beta_create_TestService_stub '
@contextlib . contextmanager
def _system_path ( path_insertion ) :
old_system_path = sys . path [ : ]
sys . path = sys . path [ 0 : 1 ] + path_insertion + sys . path [ 1 : ]
yield
sys . path = old_system_path
def _create_directory_tree ( root , path_components_sequence ) :
created = set ( )
for path_components in path_components_sequence :
thus_far = ' '
for path_component in path_components :
relative_path = path . join ( thus_far , path_component )
if relative_path not in created :
os . makedirs ( path . join ( root , relative_path ) )
created . add ( relative_path )
thus_far = path . join ( thus_far , path_component )
def _massage_proto_content ( raw_proto_content ) :
imports_substituted = raw_proto_content . replace (
b ' import " tests/protoc_plugin/protos/ ' ,
b ' import " beta_grpc_plugin_test/ ' )
package_statement_substituted = imports_substituted . replace (
b ' package grpc_protoc_plugin; ' , b ' package beta_grpc_protoc_plugin; ' )
return package_statement_substituted
def _packagify ( directory ) :
for subdirectory , _ , _ in os . walk ( directory ) :
init_file_name = path . join ( subdirectory , ' __init__.py ' )
with open ( init_file_name , ' wb ' ) as init_file :
init_file . write ( b ' ' )
class _ServicerMethods ( object ) :
def __init__ ( self ) :
def __init__ ( self , payload_pb2 , responses_pb2 ) :
self . _condition = threading . Condition ( )
self . _paused = False
self . _fail = False
self . _payload_pb2 = payload_pb2
self . _responses_pb2 = responses_pb2
@contextlib . contextmanager
def pause ( self ) : # pylint: disable=invalid-name
@ -79,22 +123,22 @@ class _ServicerMethods(object):
self . _condition . wait ( )
def UnaryCall ( self , request , unused_rpc_context ) :
response = response_pb2 . SimpleResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response = self . _ responses _pb2. SimpleResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * request . response_size
self . _control ( )
return response
def StreamingOutputCall ( self , request , unused_rpc_context ) :
for parameter in request . response_parameters :
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response = self . _ responses _pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
def StreamingInputCall ( self , request_iter , unused_rpc_context ) :
response = response_pb2 . StreamingInputCallResponse ( )
response = self . _ responses _pb2. StreamingInputCallResponse ( )
aggregated_payload_size = 0
for request in request_iter :
aggregated_payload_size + = len ( request . payload . payload_compressable )
@ -105,8 +149,8 @@ class _ServicerMethods(object):
def FullDuplexCall ( self , request_iter , unused_rpc_context ) :
for request in request_iter :
for parameter in request . response_parameters :
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response = self . _ responses _pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
yield response
@ -115,8 +159,8 @@ class _ServicerMethods(object):
responses = [ ]
for request in request_iter :
for parameter in request . response_parameters :
response = response_pb2 . StreamingOutputCallResponse ( )
response . payload . payload_type = payload_pb2 . COMPRESSABLE
response = self . _ responses _pb2. StreamingOutputCallResponse ( )
response . payload . payload_type = self . _ payload_pb2. COMPRESSABLE
response . payload . payload_compressable = ' a ' * parameter . size
self . _control ( )
responses . append ( response )
@ -125,7 +169,7 @@ class _ServicerMethods(object):
@contextlib . contextmanager
def _CreateService ( ) :
def _CreateService ( payload_pb2 , responses_pb2 , service_pb2 ) :
""" Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
@ -136,7 +180,7 @@ def _CreateService():
the service bound to the stub and and stub is the stub on which to invoke
RPCs .
"""
servicer_methods = _ServicerMethods ( )
servicer_methods = _ServicerMethods ( payload_pb2 , responses_pb2 )
class Servicer ( getattr ( service_pb2 , SERVICER_IDENTIFIER ) ) :
@ -161,12 +205,12 @@ def _CreateService():
server . start ( )
channel = implementations . insecure_channel ( ' localhost ' , port )
stub = getattr ( service_pb2 , STUB_FACTORY_IDENTIFIER ) ( channel )
yield ( servicer_methods , stub )
yield servicer_methods , stub ,
server . stop ( 0 )
@contextlib . contextmanager
def _CreateIncompleteService ( ) :
def _CreateIncompleteService ( service_pb2 ) :
""" Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
@ -192,16 +236,16 @@ def _CreateIncompleteService():
server . stop ( 0 )
def _streaming_input_request_iterator ( ) :
def _streaming_input_request_iterator ( payload_pb2 , requests_pb2 ) :
for _ in range ( 3 ) :
request = request_pb2 . StreamingInputCallRequest ( )
request = requests _pb2 . StreamingInputCallRequest ( )
request . payload . payload_type = payload_pb2 . COMPRESSABLE
request . payload . payload_compressable = ' a '
yield request
def _streaming_output_request ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
def _streaming_output_request ( requests_pb2 ) :
request = requests _pb2 . StreamingOutputCallRequest ( )
sizes = [ 1 , 2 , 3 ]
request . response_parameters . add ( size = sizes [ 0 ] , interval_us = 0 )
request . response_parameters . add ( size = sizes [ 1 ] , interval_us = 0 )
@ -209,11 +253,11 @@ def _streaming_output_request():
return request
def _full_duplex_request_iterator ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
def _full_duplex_request_iterator ( requests_pb2 ) :
request = requests _pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
request = request_pb2 . StreamingOutputCallRequest ( )
request = requests _pb2 . StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
yield request
@ -227,22 +271,78 @@ class PythonPluginTest(unittest.TestCase):
methods and does not exist for response - streaming methods .
"""
def setUp ( self ) :
self . _directory = tempfile . mkdtemp ( dir = ' . ' )
self . _proto_path = path . join ( self . _directory , _RELATIVE_PROTO_PATH )
self . _python_out = path . join ( self . _directory , _RELATIVE_PYTHON_OUT )
os . makedirs ( self . _proto_path )
os . makedirs ( self . _python_out )
directories_path_components = {
proto_file_path_components [ : - 1 ]
for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS
}
_create_directory_tree ( self . _proto_path , directories_path_components )
self . _proto_file_names = set ( )
for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS :
raw_proto_content = pkgutil . get_data (
' tests.protoc_plugin.protos ' ,
path . join ( * proto_file_path_components [ 1 : ] ) )
massaged_proto_content = _massage_proto_content ( raw_proto_content )
proto_file_name = path . join ( self . _proto_path ,
* proto_file_path_components )
with open ( proto_file_name , ' wb ' ) as proto_file :
proto_file . write ( massaged_proto_content )
self . _proto_file_names . add ( proto_file_name )
def tearDown ( self ) :
shutil . rmtree ( self . _directory )
def _protoc ( self ) :
args = [
' ' ,
' --proto_path= {} ' . format ( self . _proto_path ) ,
' --python_out= {} ' . format ( self . _python_out ) ,
' --grpc_python_out=grpc_1_0: {} ' . format ( self . _python_out ) ,
] + list ( self . _proto_file_names )
protoc_exit_code = protoc . main ( args )
self . assertEqual ( 0 , protoc_exit_code )
_packagify ( self . _python_out )
with _system_path ( [
self . _python_out ,
] ) :
self . _payload_pb2 = importlib . import_module ( _PAYLOAD_PB2 )
self . _requests_pb2 = importlib . import_module ( _REQUESTS_PB2 )
self . _responses_pb2 = importlib . import_module ( _RESPONSES_PB2 )
self . _service_pb2 = importlib . import_module ( _SERVICE_PB2 )
def testImportAttributes ( self ) :
self . _protoc ( )
# check that we can access the generated module and its members.
self . assertIsNotNone ( getattr ( service_pb2 , SERVICER_IDENTIFIER , None ) )
self . assertIsNotNone ( getattr ( service_pb2 , STUB_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( service_pb2 , SERVER_FACTORY_IDENTIFIER , None ) )
getattr ( self . _service_pb2 , SERVICER_IDENTIFIER , None ) )
self . assertIsNotNone ( getattr ( self . _service_pb2 , STUB_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( service_pb2 , STUB_FACTORY_IDENTIFIER , None ) )
getattr ( self . _service_pb2 , SERVER_FACTORY_IDENTIFIER , None ) )
self . assertIsNotNone (
getattr ( self . _service_pb2 , STUB_FACTORY_IDENTIFIER , None ) )
def testUpDown ( self ) :
with _CreateService ( ) :
request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) :
self . _requests_pb2 . SimpleRequest ( response_size = 13 )
def testIncompleteServicer ( self ) :
with _CreateIncompleteService ( ) as ( _ , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateIncompleteService ( self . _service_pb2 ) as ( _ , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
try :
stub . UnaryCall ( request , test_constants . LONG_TIMEOUT )
except face . AbortionError as error :
@ -250,15 +350,21 @@ class PythonPluginTest(unittest.TestCase):
error . code )
def testUnaryCall ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
response = stub . UnaryCall ( request , test_constants . LONG_TIMEOUT )
expected_response = methods . UnaryCall ( request , ' not a real context! ' )
self . assertEqual ( expected_response , response )
def testUnaryCallFuture ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
# Check that the call does not block waiting for the server to respond.
with methods . pause ( ) :
response_future = stub . UnaryCall . future (
@ -268,8 +374,11 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
def testUnaryCallFutureExpired ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
with methods . pause ( ) :
response_future = stub . UnaryCall . future (
request , test_constants . SHORT_TIMEOUT )
@ -277,24 +386,33 @@ class PythonPluginTest(unittest.TestCase):
response_future . result ( )
def testUnaryCallFutureCancelled ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
with methods . pause ( ) :
response_future = stub . UnaryCall . future ( request , 1 )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
def testUnaryCallFutureFailed ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = request_pb2 . SimpleRequest ( response_size = 13 )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = self . _requests_pb2 . SimpleRequest ( response_size = 13 )
with methods . fail ( ) :
response_future = stub . UnaryCall . future (
request , test_constants . LONG_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testStreamingOutputCall ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _requests_pb2 )
responses = stub . StreamingOutputCall ( request ,
test_constants . LONG_TIMEOUT )
expected_responses = methods . StreamingOutputCall (
@ -304,8 +422,11 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
def testStreamingOutputCallExpired ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _requests_pb2 )
with methods . pause ( ) :
responses = stub . StreamingOutputCall (
request , test_constants . SHORT_TIMEOUT )
@ -313,8 +434,11 @@ class PythonPluginTest(unittest.TestCase):
list ( responses )
def testStreamingOutputCallCancelled ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _requests_pb2 )
responses = stub . StreamingOutputCall ( request ,
test_constants . LONG_TIMEOUT )
next ( responses )
@ -323,8 +447,11 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testStreamingOutputCallFailed ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request = _streaming_output_request ( )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request = _streaming_output_request ( self . _requests_pb2 )
with methods . fail ( ) :
responses = stub . StreamingOutputCall ( request , 1 )
self . assertIsNotNone ( responses )
@ -332,30 +459,46 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testStreamingInputCall ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
response = stub . StreamingInputCall (
_streaming_input_request_iterator ( ) ,
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
test_constants . LONG_TIMEOUT )
expected_response = methods . StreamingInputCall (
_streaming_input_request_iterator ( ) , ' not a real RpcContext! ' )
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFuture ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator ( ) ,
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
test_constants . LONG_TIMEOUT )
response = response_future . result ( )
expected_response = methods . StreamingInputCall (
_streaming_input_request_iterator ( ) , ' not a real RpcContext! ' )
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
' not a real RpcContext! ' )
self . assertEqual ( expected_response , response )
def testStreamingInputCallFutureExpired ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator ( ) ,
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
test_constants . SHORT_TIMEOUT )
with self . assertRaises ( face . ExpirationError ) :
response_future . result ( )
@ -363,10 +506,14 @@ class PythonPluginTest(unittest.TestCase):
face . ExpirationError )
def testStreamingInputCallFutureCancelled ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . pause ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator ( ) ,
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
test_constants . LONG_TIMEOUT )
response_future . cancel ( )
self . assertTrue ( response_future . cancelled ( ) )
@ -374,26 +521,38 @@ class PythonPluginTest(unittest.TestCase):
response_future . result ( )
def testStreamingInputCallFutureFailed ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . fail ( ) :
response_future = stub . StreamingInputCall . future (
_streaming_input_request_iterator ( ) ,
_streaming_input_request_iterator ( self . _payload_pb2 ,
self . _requests_pb2 ) ,
test_constants . LONG_TIMEOUT )
self . assertIsNotNone ( response_future . exception ( ) )
def testFullDuplexCall ( self ) :
with _CreateService ( ) as ( methods , stub ) :
responses = stub . FullDuplexCall ( _full_duplex_request_iterator ( ) ,
test_constants . LONG_TIMEOUT )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
responses = stub . FullDuplexCall (
_full_duplex_request_iterator ( self . _requests_pb2 ) ,
test_constants . LONG_TIMEOUT )
expected_responses = methods . FullDuplexCall (
_full_duplex_request_iterator ( ) , ' not a real RpcContext! ' )
_full_duplex_request_iterator ( self . _requests_pb2 ) ,
' not a real RpcContext! ' )
for expected_response , response in moves . zip_longest (
expected_responses , responses ) :
self . assertEqual ( expected_response , response )
def testFullDuplexCallExpired ( self ) :
request_iterator = _full_duplex_request_iterator ( )
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
request_iterator = _full_duplex_request_iterator ( self . _requests_pb2 )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . pause ( ) :
responses = stub . FullDuplexCall ( request_iterator ,
test_constants . SHORT_TIMEOUT )
@ -401,8 +560,11 @@ class PythonPluginTest(unittest.TestCase):
list ( responses )
def testFullDuplexCallCancelled ( self ) :
with _CreateService ( ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( )
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
request_iterator = _full_duplex_request_iterator ( self . _requests_pb2 )
responses = stub . FullDuplexCall ( request_iterator ,
test_constants . LONG_TIMEOUT )
next ( responses )
@ -411,8 +573,11 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testFullDuplexCallFailed ( self ) :
request_iterator = _full_duplex_request_iterator ( )
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
request_iterator = _full_duplex_request_iterator ( self . _requests_pb2 )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with methods . fail ( ) :
responses = stub . FullDuplexCall ( request_iterator ,
test_constants . LONG_TIMEOUT )
@ -421,13 +586,16 @@ class PythonPluginTest(unittest.TestCase):
next ( responses )
def testHalfDuplexCall ( self ) :
with _CreateService ( ) as ( methods , stub ) :
self . _protoc ( )
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
def half_duplex_request_iterator ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
request = self . _ requests _pb2. StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
request = request_pb2 . StreamingOutputCallRequest ( )
request = self . _ requests _pb2. StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 2 , interval_us = 0 )
request . response_parameters . add ( size = 3 , interval_us = 0 )
yield request
@ -441,6 +609,8 @@ class PythonPluginTest(unittest.TestCase):
self . assertEqual ( expected_response , response )
def testHalfDuplexCallWedged ( self ) :
self . _protoc ( )
condition = threading . Condition ( )
wait_cell = [ False ]
@ -455,14 +625,15 @@ class PythonPluginTest(unittest.TestCase):
condition . notify_all ( )
def half_duplex_request_iterator ( ) :
request = request_pb2 . StreamingOutputCallRequest ( )
request = self . _ requests _pb2. StreamingOutputCallRequest ( )
request . response_parameters . add ( size = 1 , interval_us = 0 )
yield request
with condition :
while wait_cell [ 0 ] :
condition . wait ( )
with _CreateService ( ) as ( methods , stub ) :
with _CreateService ( self . _payload_pb2 , self . _responses_pb2 ,
self . _service_pb2 ) as ( methods , stub ) :
with wait ( ) :
responses = stub . HalfDuplexCall ( half_duplex_request_iterator ( ) ,
test_constants . SHORT_TIMEOUT )