@ -29,8 +29,6 @@
""" Implementations of interoperability test methods. """
from __future__ import print_function
import enum
import json
import os
@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
import grpc
from grpc . beta import implementations
from grpc . beta import interfaces
from grpc . framework . common import cardinality
from grpc . framework . interfaces . face import face
from src . proto . grpc . testing import empty_pb2
from src . proto . grpc . testing import messages_pb2
from src . proto . grpc . testing import test_pb2
_TIMEOUT = 7
class TestService ( test_pb2 . Beta TestServiceServicer) :
class TestService ( test_pb2 . TestServiceServicer ) :
def EmptyCall ( self , request , context ) :
return empty_pb2 . Empty ( )
def UnaryCall ( self , request , context ) :
if request . HasField ( ' response_status ' ) :
context . code ( request . response_status . code )
context . details ( request . response_status . message )
context . set_ code( request . response_status . code )
context . set_ details( request . response_status . message )
return messages_pb2 . SimpleResponse (
payload = messages_pb2 . Payload (
type = messages_pb2 . COMPRESSABLE ,
@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingOutputCall ( self , request , context ) :
if request . HasField ( ' response_status ' ) :
context . code ( request . response_status . code )
context . details ( request . response_status . message )
context . set_ code( request . response_status . code )
context . set_ details( request . response_status . message )
for response_parameters in request . response_parameters :
yield messages_pb2 . StreamingOutputCallResponse (
payload = messages_pb2 . Payload (
@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingInputCall ( self , request_iterator , context ) :
aggregate_size = 0
for request in request_iterator :
if request . payload and request . payload . body :
if request . payload is not None and request . payload . body :
aggregate_size + = len ( request . payload . body )
return messages_pb2 . StreamingInputCallResponse (
aggregated_payload_size = aggregate_size )
@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def FullDuplexCall ( self , request_iterator , context ) :
for request in request_iterator :
if request . HasField ( ' response_status ' ) :
context . code ( request . response_status . code )
context . details ( request . response_status . message )
context . set_ code( request . response_status . code )
context . set_ details( request . response_status . message )
for response_parameters in request . response_parameters :
yield messages_pb2 . StreamingOutputCallResponse (
payload = messages_pb2 . Payload (
@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
return self . FullDuplexCall ( request_iterator , context )
def _large_unary_common_behavior ( stub , fill_username , fill_oauth_scope ,
protocol_options = None ) :
with stub :
reque st = messages_pb2 . SimpleRequest (
response_type = messages_pb2 . COMPRESSABLE , response_size = 314159 ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' * 271828 ) ,
fill_username = fill_username , fill_oauth_scope = fill_oauth_scope )
response_future = stub . UnaryCall . future ( request , _TIMEOUT ,
protocol_options = protocol_options )
response = response_future . result ( )
if response . payload . type is not messages_pb2 . COMPRESSABLE :
raise ValueError (
' response payload type is " %s " ! ' % type ( response . payload . type ) )
if len ( response . payload . body ) != 314159 :
raise ValueError (
' response body of incorrect size %d ! ' % len ( response . payload . body ) )
def _large_unary_common_behavior (
stub , fill_username , fill_oauth_scope , call_credentials ) :
request = messages_pb2 . SimpleRequest (
response_ type = messages_pb2 . COMPRESSABLE , response_size = 314159 ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' * 271828 ) ,
fill_username = fill_username , fill_oauth_scope = fill_oauth_scope )
response_future = stub . UnaryCall . future (
request , credentials = call_credentials )
response = response_future . result ( )
if response . payload . type is not messages_pb2 . COMPRESSABLE :
raise ValueError (
' response payload type is " %s " ! ' % type ( response . payload . type ) )
elif len ( response . payload . body ) != 314159 :
raise ValueError (
' response body of incorrect size %d ! ' % len ( response . payload . body ) )
else :
return response
def _empty_unary ( stub ) :
with stub :
response = stub . EmptyCall ( empty_pb2 . Empty ( ) , _TIMEOUT )
if not isinstance ( response , empty_pb2 . Empty ) :
raise TypeError (
' response is of type " %s " , not empty_pb2.Empty! ' , type ( response ) )
response = stub . EmptyCall ( empty_pb2 . Empty ( ) )
if not isinstance ( response , empty_pb2 . Empty ) :
raise TypeError (
' response is of type " %s " , not empty_pb2.Empty! ' , type ( response ) )
def _large_unary ( stub ) :
_large_unary_common_behavior ( stub , False , False )
_large_unary_common_behavior ( stub , False , False , None )
def _client_streaming ( stub ) :
with stub :
payload_body_sizes = ( 27182 , 8 , 1828 , 45904 )
payloads = (
messages_pb2 . Payload ( body = b ' \x00 ' * size )
for size in payload_body_sizes )
requests = (
messages_pb2 . StreamingInputCallRequest ( payload = payload )
for payload in payloads )
response = stub . StreamingInputCall ( requests , _TIMEOUT )
if response . aggregated_payload_size != 74922 :
raise ValueError (
' incorrect size %d ! ' % response . aggregated_payload_size )
payload_body_sizes = ( 27182 , 8 , 1828 , 45904 , )
payloads = (
messages_pb2 . Payload ( body = b ' \x00 ' * size )
for size in payload_body_sizes )
requests = (
messages_pb2 . StreamingInputCallRequest ( payload = payload )
for payload in payloads )
response = stub . StreamingInputCall ( requests )
if response . aggregated_payload_size != 74922 :
raise ValueError (
' incorrect size %d ! ' % response . aggregated_payload_size )
def _server_streaming ( stub ) :
sizes = ( 31415 , 9 , 2653 , 58979 )
with stub :
reque st = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = sizes [ 0 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 1 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 2 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 3 ] ) ,
) )
response_iterator = stub . StreamingOutputCall ( request , _TIMEOUT )
for index , response in enumerate ( response_iterator ) :
if response . payload . type != messages_pb2 . COMPRESSABLE :
raise ValueError (
' response body of invalid type %s ! ' % response . payload . type )
if len ( response . payload . body ) != sizes [ index ] :
raise ValueError (
' response body of invalid size %d ! ' % len ( response . payload . body ) )
sizes = ( 31415 , 9 , 2653 , 58979 , )
request = messages_pb2 . StreamingOutputCallRequest (
response_ type = messages_pb2 . COMPRESSABLE ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = sizes [ 0 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 1 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 2 ] ) ,
messages_pb2 . ResponseParameters ( size = sizes [ 3 ] ) ,
)
)
response_iterator = stub . StreamingOutputCall ( request )
for index , response in enumerate ( response_iterator ) :
if response . payload . type != messages_pb2 . COMPRESSABLE :
raise ValueError (
' response body of invalid type %s ! ' % response . payload . type )
el if len ( response . payload . body ) != sizes [ index ] :
raise ValueError (
' response body of invalid size %d ! ' % len ( response . payload . body ) )
def _cancel_after_begin ( stub ) :
with stub :
sizes = ( 27182 , 8 , 1828 , 45904 )
payloads = [ messages_pb2 . Payload ( body = b ' \x00 ' * size ) for size in sizes ]
requests = [ messages_pb2 . StreamingInputCallRequest ( payload = payload )
for payload in payloads ]
responses = stub . StreamingInputCall . future ( requests , _TIMEOUT )
responses . cancel ( )
if not responses . cancelled ( ) :
raise ValueError ( ' expected call to be cancelled ' )
sizes = ( 27182 , 8 , 1828 , 45904 , )
payloads = ( messages_pb2 . Payload ( body = b ' \x00 ' * size ) for size in sizes )
requests = ( messages_pb2 . StreamingInputCallRequest ( payload = payload )
for payload in payloads )
response_future = stub . StreamingInputCall . future ( requests )
response_future . cancel ( )
if not response_future . cancelled ( ) :
raise ValueError ( ' expected call to be cancelled ' )
class _Pipe ( object ) :
@ -220,18 +210,17 @@ class _Pipe(object):
def _ping_pong ( stub ) :
request_response_sizes = ( 31415 , 9 , 2653 , 58979 )
request_payload_sizes = ( 27182 , 8 , 1828 , 45904 )
request_response_sizes = ( 31415 , 9 , 2653 , 58979 , )
request_payload_sizes = ( 27182 , 8 , 1828 , 45904 , )
with stub , _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe , _TIMEOUT )
print ( ' Starting ping-pong with response iterator %s ' % response_iterator )
with _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe )
for response_size , payload_size in zip (
request_response_sizes , request_payload_sizes ) :
request = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_parameters = ( messages_pb2 . ResponseParameters (
size = response_size ) , ) ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = response_size ) , ) ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' * payload_size ) )
pipe . add ( request )
response = next ( response_iterator )
@ -244,17 +233,17 @@ def _ping_pong(stub):
def _cancel_after_first_response ( stub ) :
request_response_sizes = ( 31415 , 9 , 2653 , 58979 )
request_payload_sizes = ( 27182 , 8 , 1828 , 45904 )
with stub , _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe , _TIMEOUT )
request_response_sizes = ( 31415 , 9 , 2653 , 58979 , )
request_payload_sizes = ( 27182 , 8 , 1828 , 45904 , )
with _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe )
response_size = request_response_sizes [ 0 ]
payload_size = request_payload_sizes [ 0 ]
request = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_parameters = ( messages_pb2 . ResponseParameters (
size = response_size ) , ) ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = response_size ) , ) ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' * payload_size ) )
pipe . add ( request )
response = next ( response_iterator )
@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
try :
next ( response_iterator )
except Exception :
pass
except grpc . RpcError as rpc_error :
if rpc_error . code ( ) is not grpc . StatusCode . CANCELLED :
raise
else :
raise ValueError ( ' expected call to be cancelled ' )
def _timeout_on_sleeping_server ( stub ) :
request_payload_size = 27182
with stub , _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe , 0.001 )
with _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe , timeout = 0.001 )
request = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
time . sleep ( 0.1 )
try :
next ( response_iterator )
except face . ExpirationError :
pass
except grpc . RpcError as rpc_error :
if rpc_error . code ( ) is not grpc . StatusCode . DEADLINE_EXCEEDED :
raise
else :
raise ValueError ( ' expected call to exceed deadline ' )
def _empty_stream ( stub ) :
with stub , _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe , _TIMEOUT )
with _Pipe ( ) as pipe :
response_iterator = stub . FullDuplexCall ( pipe )
pipe . close ( )
try :
next ( response_iterator )
@ -300,65 +291,64 @@ def _empty_stream(stub):
def _status_code_and_message ( stub ) :
with stub :
message = ' test status message '
code = 2
status = grpc . StatusCode . UNKNOWN # code = 2
request = messages_pb2 . SimpleRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_size = 1 ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' ) ,
response_status = messages_pb2 . EchoStatus ( code = code , message = message )
)
response_future = stub . UnaryCall . future ( request , _TIMEOUT )
if response_future . code ( ) != status :
raise ValueError (
' expected code %s , got %s ' % ( status , response_future . code ( ) ) )
if response_future . details ( ) != message :
raise ValueError (
' expected message %s , got %s ' % ( message , response_future . details ( ) ) )
request = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = 1 ) , ) ,
response_status = messages_pb2 . EchoStatus ( code = code , message = message ) )
response_iterator = stub . StreamingOutputCall ( request , _TIMEOUT )
if response_future . code ( ) != status :
raise ValueError (
' expected code %s , got %s ' % ( status , response_iterator . code ( ) ) )
if response_future . details ( ) != message :
raise ValueError (
' expected message %s , got %s ' % ( message , response_iterator . details ( ) ) )
message = ' test status message '
code = 2
status = grpc . StatusCode . UNKNOWN # code = 2
request = messages_pb2 . SimpleRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_size = 1 ,
payload = messages_pb2 . Payload ( body = b ' \x00 ' ) ,
response_status = messages_pb2 . EchoStatus ( code = code , message = message )
)
response_future = stub . UnaryCall . future ( request )
if response_future . code ( ) != status :
raise ValueError (
' expected code %s , got %s ' % ( status , response_future . code ( ) ) )
elif response_future . details ( ) != message :
raise ValueError (
' expected message %s , got %s ' % ( message , response_future . details ( ) ) )
request = messages_pb2 . StreamingOutputCallRequest (
response_type = messages_pb2 . COMPRESSABLE ,
response_parameters = (
messages_pb2 . ResponseParameters ( size = 1 ) , ) ,
response_status = messages_pb2 . EchoStatus ( code = code , message = message ) )
response_iterator = stub . StreamingOutputCall ( request )
if response_future . code ( ) != status :
raise ValueError (
' expected code %s , got %s ' % ( status , response_iterator . code ( ) ) )
elif response_future . details ( ) != message :
raise ValueError (
' expected message %s , got %s ' % ( message , response_iterator . details ( ) ) )
def _compute_engine_creds ( stub , args ) :
response = _large_unary_common_behavior ( stub , True , True )
response = _large_unary_common_behavior ( stub , True , True , None )
if args . default_service_account != response . username :
raise ValueError (
' expected username %s , got %s ' % ( args . default_service_account ,
response . username ) )
' expected username %s , got %s ' % (
args . default_service_account , response . username ) )
def _oauth2_auth_token ( stub , args ) :
json_key_filename = os . environ [
oauth2client_client . GOOGLE_APPLICATION_CREDENTIALS ]
wanted_email = json . load ( open ( json_key_filename , ' rb ' ) ) [ ' client_email ' ]
response = _large_unary_common_behavior ( stub , True , True )
response = _large_unary_common_behavior ( stub , True , True , None )
if wanted_email != response . username :
raise ValueError (
' expected username %s , got %s ' % ( wanted_email , response . username ) )
if args . oauth_scope . find ( response . oauth_scope ) == - 1 :
raise ValueError (
' expected to find oauth scope " %s " in received " %s " ' %
( response . oauth_scope , args . oauth_scope ) )
' expected to find oauth scope " {} " in received " {} " ' . format (
response . oauth_scope , args . oauth_scope ) )
def _jwt_token_creds ( stub , args ) :
json_key_filename = os . environ [
oauth2client_client . GOOGLE_APPLICATION_CREDENTIALS ]
wanted_email = json . load ( open ( json_key_filename , ' rb ' ) ) [ ' client_email ' ]
response = _large_unary_common_behavior ( stub , True , False )
response = _large_unary_common_behavior ( stub , True , False , None )
if wanted_email != response . username :
raise ValueError (
' expected username %s , got %s ' % ( wanted_email , response . username ) )
@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
wanted_email = json . load ( open ( json_key_filename , ' rb ' ) ) [ ' client_email ' ]
credentials = oauth2client_client . GoogleCredentials . get_application_default ( )
scoped_credentials = credentials . create_scoped ( [ args . oauth_scope ] )
call_creds = implementations . google_call_credentials ( scoped_credentials )
options = interfaces . grpc_call_options ( disable_compression = False ,
credentials = call_creds )
response = _large_unary_common_behavior ( stub , True , False ,
protocol_options = option s )
# TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations . google_call_credentials (
scoped_credentials )
response = _large_unary_common_behavior ( stub , True , False , call_credential s)
if wanted_email != response . username :
raise ValueError (
' expected username %s , got %s ' % ( wanted_email , response . username ) )