@ -13,6 +13,7 @@
# limitations under the License.
# limitations under the License.
""" Tests server and client side compression. """
""" Tests server and client side compression. """
import itertools
import logging
import logging
import threading
import threading
import time
import time
@ -27,8 +28,11 @@ _BEAT = 0.5
_SOME_TIME = 5
_SOME_TIME = 5
_MORE_TIME = 10
_MORE_TIME = 10
_STREAM_URI = ' Meffod '
_UNARY_URI = ' MeffodMan '
class _MethodHandler ( grpc . RpcMethodHandler ) :
class _StreamingMethodHandler ( grpc . RpcMethodHandler ) :
request_streaming = True
request_streaming = True
response_streaming = True
response_streaming = True
@ -40,13 +44,28 @@ class _MethodHandler(grpc.RpcMethodHandler):
yield request * 2
yield request * 2
_METHOD_HANDLER = _MethodHandler ( )
class _UnaryMethodHandler ( grpc . RpcMethodHandler ) :
request_streaming = False
response_streaming = False
request_deserializer = None
response_serializer = None
def unary_unary ( self , request , servicer_context ) :
return request * 2
_STREAMING_METHOD_HANDLER = _StreamingMethodHandler ( )
_UNARY_METHOD_HANDLER = _UnaryMethodHandler ( )
class _GenericHandler ( grpc . GenericRpcHandler ) :
class _GenericHandler ( grpc . GenericRpcHandler ) :
def service ( self , handler_call_details ) :
def service ( self , handler_call_details ) :
return _METHOD_HANDLER
if handler_call_details . method == _STREAM_URI :
return _STREAMING_METHOD_HANDLER
else :
return _UNARY_METHOD_HANDLER
_GENERIC_HANDLER = _GenericHandler ( )
_GENERIC_HANDLER = _GenericHandler ( )
@ -108,7 +127,7 @@ class ChannelCloseTest(unittest.TestCase):
def test_close_immediately_after_call_invocation ( self ) :
def test_close_immediately_after_call_invocation ( self ) :
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
multi_callable = channel . stream_stream ( ' Meffod ' )
multi_callable = channel . stream_stream ( _STREAM_URI )
request_iterator = _Pipe ( ( ) )
request_iterator = _Pipe ( ( ) )
response_iterator = multi_callable ( request_iterator )
response_iterator = multi_callable ( request_iterator )
channel . close ( )
channel . close ( )
@ -118,7 +137,7 @@ class ChannelCloseTest(unittest.TestCase):
def test_close_while_call_active ( self ) :
def test_close_while_call_active ( self ) :
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
multi_callable = channel . stream_stream ( ' Meffod ' )
multi_callable = channel . stream_stream ( _STREAM_URI )
request_iterator = _Pipe ( ( b ' abc ' , ) )
request_iterator = _Pipe ( ( b ' abc ' , ) )
response_iterator = multi_callable ( request_iterator )
response_iterator = multi_callable ( request_iterator )
next ( response_iterator )
next ( response_iterator )
@ -130,7 +149,7 @@ class ChannelCloseTest(unittest.TestCase):
def test_context_manager_close_while_call_active ( self ) :
def test_context_manager_close_while_call_active ( self ) :
with grpc . insecure_channel ( ' localhost: {} ' . format (
with grpc . insecure_channel ( ' localhost: {} ' . format (
self . _port ) ) as channel : # pylint: disable=bad-continuation
self . _port ) ) as channel : # pylint: disable=bad-continuation
multi_callable = channel . stream_stream ( ' Meffod ' )
multi_callable = channel . stream_stream ( _STREAM_URI )
request_iterator = _Pipe ( ( b ' abc ' , ) )
request_iterator = _Pipe ( ( b ' abc ' , ) )
response_iterator = multi_callable ( request_iterator )
response_iterator = multi_callable ( request_iterator )
next ( response_iterator )
next ( response_iterator )
@ -141,7 +160,7 @@ class ChannelCloseTest(unittest.TestCase):
def test_context_manager_close_while_many_calls_active ( self ) :
def test_context_manager_close_while_many_calls_active ( self ) :
with grpc . insecure_channel ( ' localhost: {} ' . format (
with grpc . insecure_channel ( ' localhost: {} ' . format (
self . _port ) ) as channel : # pylint: disable=bad-continuation
self . _port ) ) as channel : # pylint: disable=bad-continuation
multi_callable = channel . stream_stream ( ' Meffod ' )
multi_callable = channel . stream_stream ( _STREAM_URI )
request_iterators = tuple (
request_iterators = tuple (
_Pipe ( ( b ' abc ' , ) )
_Pipe ( ( b ' abc ' , ) )
for _ in range ( test_constants . THREAD_CONCURRENCY ) )
for _ in range ( test_constants . THREAD_CONCURRENCY ) )
@ -158,7 +177,7 @@ class ChannelCloseTest(unittest.TestCase):
def test_many_concurrent_closes ( self ) :
def test_many_concurrent_closes ( self ) :
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
channel = grpc . insecure_channel ( ' localhost: {} ' . format ( self . _port ) )
multi_callable = channel . stream_stream ( ' Meffod ' )
multi_callable = channel . stream_stream ( _STREAM_URI )
request_iterator = _Pipe ( ( b ' abc ' , ) )
request_iterator = _Pipe ( ( b ' abc ' , ) )
response_iterator = multi_callable ( request_iterator )
response_iterator = multi_callable ( request_iterator )
next ( response_iterator )
next ( response_iterator )
@ -181,6 +200,20 @@ class ChannelCloseTest(unittest.TestCase):
self . assertIs ( response_iterator . code ( ) , grpc . StatusCode . CANCELLED )
self . assertIs ( response_iterator . code ( ) , grpc . StatusCode . CANCELLED )
def test_exception_in_callback ( self ) :
with grpc . insecure_channel ( ' localhost: {} ' . format (
self . _port ) ) as channel :
stream_multi_callable = channel . stream_stream ( _STREAM_URI )
endless_iterator = itertools . repeat ( b ' abc ' )
stream_response_iterator = stream_multi_callable ( endless_iterator )
future = channel . unary_unary ( _UNARY_URI ) . future ( b ' abc ' )
def on_done_callback ( future ) :
raise Exception ( " This should not cause a deadlock. " )
future . add_done_callback ( on_done_callback )
future . result ( )
if __name__ == ' __main__ ' :
if __name__ == ' __main__ ' :
logging . basicConfig ( )
logging . basicConfig ( )