Migrate distrib, interop, and stress to GA API

pull/7702/head
Nathaniel Manista 9 years ago
parent baa8c31984
commit 5450f05e0c
  1. 14
      src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
  2. 24
      src/python/grpcio_tests/tests/interop/_secure_interop_test.py
  3. 56
      src/python/grpcio_tests/tests/interop/client.py
  4. 270
      src/python/grpcio_tests/tests/interop/methods.py
  5. 12
      src/python/grpcio_tests/tests/interop/server.py
  6. 21
      src/python/grpcio_tests/tests/stress/client.py
  7. 2
      src/python/grpcio_tests/tests/stress/metrics_server.py
  8. 4
      test/distrib/python/distribtest.py

@ -29,9 +29,10 @@
"""Insecure client-server interoperability as a unit test.""" """Insecure client-server interoperability as a unit test."""
from concurrent import futures
import unittest import unittest
from grpc.beta import implementations import grpc
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case from tests.interop import _interop_test_case
@ -44,14 +45,13 @@ class InsecureInteropTest(
unittest.TestCase): unittest.TestCase):
def setUp(self): def setUp(self):
self.server = test_pb2.beta_create_TestService_server(methods.TestService()) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(
methods.TestService(), self.server)
port = self.server.add_insecure_port('[::]:0') port = self.server.add_insecure_port('[::]:0')
self.server.start() self.server.start()
self.stub = test_pb2.beta_create_TestService_stub( self.stub = test_pb2.TestServiceStub(
implementations.insecure_channel('localhost', port)) grpc.insecure_channel('localhost:{}'.format(port)))
def tearDown(self):
self.server.stop(0)
if __name__ == '__main__': if __name__ == '__main__':

@ -29,17 +29,16 @@
"""Secure client-server interoperability as a unit test.""" """Secure client-server interoperability as a unit test."""
from concurrent import futures
import unittest import unittest
from grpc.beta import implementations import grpc
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case from tests.interop import _interop_test_case
from tests.interop import methods from tests.interop import methods
from tests.interop import resources from tests.interop import resources
from tests.unit.beta import test_utilities
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
@ -48,19 +47,18 @@ class SecureInteropTest(
unittest.TestCase): unittest.TestCase):
def setUp(self): def setUp(self):
self.server = test_pb2.beta_create_TestService_server(methods.TestService()) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(
methods.TestService(), self.server)
port = self.server.add_secure_port( port = self.server.add_secure_port(
'[::]:0', implementations.ssl_server_credentials( '[::]:0', grpc.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain())])) [(resources.private_key(), resources.certificate_chain())]))
self.server.start() self.server.start()
self.stub = test_pb2.beta_create_TestService_stub( self.stub = test_pb2.TestServiceStub(
test_utilities.not_really_secure_channel( grpc.secure_channel(
'localhost', port, implementations.ssl_channel_credentials( 'localhost:{}'.format(port),
resources.test_root_certificates()), grpc.ssl_channel_credentials(resources.test_root_certificates()),
_SERVER_HOST_OVERRIDE)) (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
def tearDown(self):
self.server.stop(0)
if __name__ == '__main__': if __name__ == '__main__':

@ -32,14 +32,12 @@
import argparse import argparse
from oauth2client import client as oauth2client_client from oauth2client import client as oauth2client_client
import grpc
from grpc.beta import implementations from grpc.beta import implementations
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
from tests.interop import methods from tests.interop import methods
from tests.interop import resources from tests.interop import resources
from tests.unit.beta import test_utilities
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def _args(): def _args():
@ -66,41 +64,49 @@ def _args():
return parser.parse_args() return parser.parse_args()
def _application_default_credentials():
return oauth2client_client.GoogleCredentials.get_application_default()
def _stub(args): def _stub(args):
target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token': if args.test_case == 'oauth2_auth_token':
creds = oauth2client_client.GoogleCredentials.get_application_default() google_credentials = _application_default_credentials()
scoped_creds = creds.create_scoped([args.oauth_scope]) scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
access_token = scoped_creds.get_access_token().access_token access_token = scoped_credentials.get_access_token().access_token
call_creds = implementations.access_token_call_credentials(access_token) call_credentials = grpc.access_token_call_credentials(access_token)
elif args.test_case == 'compute_engine_creds': elif args.test_case == 'compute_engine_creds':
creds = oauth2client_client.GoogleCredentials.get_application_default() google_credentials = _application_default_credentials()
scoped_creds = creds.create_scoped([args.oauth_scope]) scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
call_creds = implementations.google_call_credentials(scoped_creds) # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations.google_call_credentials(
scoped_credentials)
elif args.test_case == 'jwt_token_creds': elif args.test_case == 'jwt_token_creds':
creds = oauth2client_client.GoogleCredentials.get_application_default() google_credentials = _application_default_credentials()
call_creds = implementations.google_call_credentials(creds) # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations.google_call_credentials(
google_credentials)
else: else:
call_creds = None call_credentials = None
if args.use_tls: if args.use_tls:
if args.use_test_ca: if args.use_test_ca:
root_certificates = resources.test_root_certificates() root_certificates = resources.test_root_certificates()
else: else:
root_certificates = None # will load default roots. root_certificates = None # will load default roots.
channel_creds = implementations.ssl_channel_credentials(root_certificates) channel_credentials = grpc.ssl_channel_credentials(root_certificates)
if call_creds is not None: if call_credentials is not None:
channel_creds = implementations.composite_channel_credentials( channel_credentials = grpc.composite_channel_credentials(
channel_creds, call_creds) channel_credentials, call_credentials)
channel = test_utilities.not_really_secure_channel( channel = grpc.secure_channel(
args.server_host, args.server_port, channel_creds, target, channel_credentials,
args.server_host_override) (('grpc.ssl_target_name_override', args.server_host_override,),))
stub = test_pb2.beta_create_TestService_stub(channel)
else: else:
channel = implementations.insecure_channel( channel = grpc.insecure_channel(target)
args.server_host, args.server_port) return test_pb2.TestServiceStub(channel)
stub = test_pb2.beta_create_TestService_stub(channel)
return stub
def _test_case_from_arg(test_case_arg): def _test_case_from_arg(test_case_arg):

@ -29,8 +29,6 @@
"""Implementations of interoperability test methods.""" """Implementations of interoperability test methods."""
from __future__ import print_function
import enum import enum
import json import json
import os import os
@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
import grpc import grpc
from grpc.beta import implementations from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
_TIMEOUT = 7
class TestService(test_pb2.BetaTestServiceServicer): class TestService(test_pb2.TestServiceServicer):
def EmptyCall(self, request, context): def EmptyCall(self, request, context):
return empty_pb2.Empty() return empty_pb2.Empty()
def UnaryCall(self, request, context): def UnaryCall(self, request, context):
if request.HasField('response_status'): if request.HasField('response_status'):
context.code(request.response_status.code) context.set_code(request.response_status.code)
context.details(request.response_status.message) context.set_details(request.response_status.message)
return messages_pb2.SimpleResponse( return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload( payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE, type=messages_pb2.COMPRESSABLE,
@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingOutputCall(self, request, context): def StreamingOutputCall(self, request, context):
if request.HasField('response_status'): if request.HasField('response_status'):
context.code(request.response_status.code) context.set_code(request.response_status.code)
context.details(request.response_status.message) context.set_details(request.response_status.message)
for response_parameters in request.response_parameters: for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse( yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload( payload=messages_pb2.Payload(
@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingInputCall(self, request_iterator, context): def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0 aggregate_size = 0
for request in request_iterator: for request in request_iterator:
if request.payload and request.payload.body: if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body) aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse( return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size) aggregated_payload_size=aggregate_size)
@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def FullDuplexCall(self, request_iterator, context): def FullDuplexCall(self, request_iterator, context):
for request in request_iterator: for request in request_iterator:
if request.HasField('response_status'): if request.HasField('response_status'):
context.code(request.response_status.code) context.set_code(request.response_status.code)
context.details(request.response_status.message) context.set_details(request.response_status.message)
for response_parameters in request.response_parameters: for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse( yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload( payload=messages_pb2.Payload(
@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
return self.FullDuplexCall(request_iterator, context) return self.FullDuplexCall(request_iterator, context)
def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope, def _large_unary_common_behavior(
protocol_options=None): stub, fill_username, fill_oauth_scope, call_credentials):
with stub: request = messages_pb2.SimpleRequest(
request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE, response_size=314159,
response_type=messages_pb2.COMPRESSABLE, response_size=314159, payload=messages_pb2.Payload(body=b'\x00' * 271828),
payload=messages_pb2.Payload(body=b'\x00' * 271828), fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
fill_username=fill_username, fill_oauth_scope=fill_oauth_scope) response_future = stub.UnaryCall.future(
response_future = stub.UnaryCall.future(request, _TIMEOUT, request, credentials=call_credentials)
protocol_options=protocol_options) response = response_future.result()
response = response_future.result() if response.payload.type is not messages_pb2.COMPRESSABLE:
if response.payload.type is not messages_pb2.COMPRESSABLE: raise ValueError(
raise ValueError( 'response payload type is "%s"!' % type(response.payload.type))
'response payload type is "%s"!' % type(response.payload.type)) elif len(response.payload.body) != 314159:
if len(response.payload.body) != 314159: raise ValueError(
raise ValueError( 'response body of incorrect size %d!' % len(response.payload.body))
'response body of incorrect size %d!' % len(response.payload.body)) else:
return response return response
def _empty_unary(stub): def _empty_unary(stub):
with stub: response = stub.EmptyCall(empty_pb2.Empty())
response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT) if not isinstance(response, empty_pb2.Empty):
if not isinstance(response, empty_pb2.Empty): raise TypeError(
raise TypeError( 'response is of type "%s", not empty_pb2.Empty!', type(response))
'response is of type "%s", not empty_pb2.Empty!', type(response))
def _large_unary(stub): def _large_unary(stub):
_large_unary_common_behavior(stub, False, False) _large_unary_common_behavior(stub, False, False, None)
def _client_streaming(stub): def _client_streaming(stub):
with stub: payload_body_sizes = (27182, 8, 1828, 45904,)
payload_body_sizes = (27182, 8, 1828, 45904) payloads = (
payloads = ( messages_pb2.Payload(body=b'\x00' * size)
messages_pb2.Payload(body=b'\x00' * size) for size in payload_body_sizes)
for size in payload_body_sizes) requests = (
requests = ( messages_pb2.StreamingInputCallRequest(payload=payload)
messages_pb2.StreamingInputCallRequest(payload=payload) for payload in payloads)
for payload in payloads) response = stub.StreamingInputCall(requests)
response = stub.StreamingInputCall(requests, _TIMEOUT) if response.aggregated_payload_size != 74922:
if response.aggregated_payload_size != 74922: raise ValueError(
raise ValueError( 'incorrect size %d!' % response.aggregated_payload_size)
'incorrect size %d!' % response.aggregated_payload_size)
def _server_streaming(stub): def _server_streaming(stub):
sizes = (31415, 9, 2653, 58979) sizes = (31415, 9, 2653, 58979,)
with stub: request = messages_pb2.StreamingOutputCallRequest(
request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE,
response_type=messages_pb2.COMPRESSABLE, response_parameters=(
response_parameters=( messages_pb2.ResponseParameters(size=sizes[0]),
messages_pb2.ResponseParameters(size=sizes[0]), messages_pb2.ResponseParameters(size=sizes[1]),
messages_pb2.ResponseParameters(size=sizes[1]), messages_pb2.ResponseParameters(size=sizes[2]),
messages_pb2.ResponseParameters(size=sizes[2]), messages_pb2.ResponseParameters(size=sizes[3]),
messages_pb2.ResponseParameters(size=sizes[3]), )
)) )
response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) response_iterator = stub.StreamingOutputCall(request)
for index, response in enumerate(response_iterator): for index, response in enumerate(response_iterator):
if response.payload.type != messages_pb2.COMPRESSABLE: if response.payload.type != messages_pb2.COMPRESSABLE:
raise ValueError( raise ValueError(
'response body of invalid type %s!' % response.payload.type) 'response body of invalid type %s!' % response.payload.type)
if len(response.payload.body) != sizes[index]: elif len(response.payload.body) != sizes[index]:
raise ValueError( raise ValueError(
'response body of invalid size %d!' % len(response.payload.body)) 'response body of invalid size %d!' % len(response.payload.body))
def _cancel_after_begin(stub): def _cancel_after_begin(stub):
with stub: sizes = (27182, 8, 1828, 45904,)
sizes = (27182, 8, 1828, 45904) payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes] requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
requests = [messages_pb2.StreamingInputCallRequest(payload=payload) for payload in payloads)
for payload in payloads] response_future = stub.StreamingInputCall.future(requests)
responses = stub.StreamingInputCall.future(requests, _TIMEOUT) response_future.cancel()
responses.cancel() if not response_future.cancelled():
if not responses.cancelled(): raise ValueError('expected call to be cancelled')
raise ValueError('expected call to be cancelled')
class _Pipe(object): class _Pipe(object):
@ -220,18 +210,17 @@ class _Pipe(object):
def _ping_pong(stub): def _ping_pong(stub):
request_response_sizes = (31415, 9, 2653, 58979) request_response_sizes = (31415, 9, 2653, 58979,)
request_payload_sizes = (27182, 8, 1828, 45904) request_payload_sizes = (27182, 8, 1828, 45904,)
with stub, _Pipe() as pipe: with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) response_iterator = stub.FullDuplexCall(pipe)
print('Starting ping-pong with response iterator %s' % response_iterator)
for response_size, payload_size in zip( for response_size, payload_size in zip(
request_response_sizes, request_payload_sizes): request_response_sizes, request_payload_sizes):
request = messages_pb2.StreamingOutputCallRequest( request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
response_parameters=(messages_pb2.ResponseParameters( response_parameters=(
size=response_size),), messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size)) payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request) pipe.add(request)
response = next(response_iterator) response = next(response_iterator)
@ -244,17 +233,17 @@ def _ping_pong(stub):
def _cancel_after_first_response(stub): def _cancel_after_first_response(stub):
request_response_sizes = (31415, 9, 2653, 58979) request_response_sizes = (31415, 9, 2653, 58979,)
request_payload_sizes = (27182, 8, 1828, 45904) request_payload_sizes = (27182, 8, 1828, 45904,)
with stub, _Pipe() as pipe: with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) response_iterator = stub.FullDuplexCall(pipe)
response_size = request_response_sizes[0] response_size = request_response_sizes[0]
payload_size = request_payload_sizes[0] payload_size = request_payload_sizes[0]
request = messages_pb2.StreamingOutputCallRequest( request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
response_parameters=(messages_pb2.ResponseParameters( response_parameters=(
size=response_size),), messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size)) payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request) pipe.add(request)
response = next(response_iterator) response = next(response_iterator)
@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
try: try:
next(response_iterator) next(response_iterator)
except Exception: except grpc.RpcError as rpc_error:
pass if rpc_error.code() is not grpc.StatusCode.CANCELLED:
raise
else: else:
raise ValueError('expected call to be cancelled') raise ValueError('expected call to be cancelled')
def _timeout_on_sleeping_server(stub): def _timeout_on_sleeping_server(stub):
request_payload_size = 27182 request_payload_size = 27182
with stub, _Pipe() as pipe: with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, 0.001) response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
request = messages_pb2.StreamingOutputCallRequest( request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE, response_type=messages_pb2.COMPRESSABLE,
@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
time.sleep(0.1) time.sleep(0.1)
try: try:
next(response_iterator) next(response_iterator)
except face.ExpirationError: except grpc.RpcError as rpc_error:
pass if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
raise
else: else:
raise ValueError('expected call to exceed deadline') raise ValueError('expected call to exceed deadline')
def _empty_stream(stub): def _empty_stream(stub):
with stub, _Pipe() as pipe: with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) response_iterator = stub.FullDuplexCall(pipe)
pipe.close() pipe.close()
try: try:
next(response_iterator) next(response_iterator)
@ -300,65 +291,64 @@ def _empty_stream(stub):
def _status_code_and_message(stub): def _status_code_and_message(stub):
with stub: message = 'test status message'
message = 'test status message' code = 2
code = 2 status = grpc.StatusCode.UNKNOWN # code = 2
status = grpc.StatusCode.UNKNOWN # code = 2 request = messages_pb2.SimpleRequest(
request = messages_pb2.SimpleRequest( response_type=messages_pb2.COMPRESSABLE,
response_type=messages_pb2.COMPRESSABLE, response_size=1,
response_size=1, payload=messages_pb2.Payload(body=b'\x00'),
payload=messages_pb2.Payload(body=b'\x00'), response_status=messages_pb2.EchoStatus(code=code, message=message)
response_status=messages_pb2.EchoStatus(code=code, message=message) )
) response_future = stub.UnaryCall.future(request)
response_future = stub.UnaryCall.future(request, _TIMEOUT) if response_future.code() != status:
if response_future.code() != status: raise ValueError(
raise ValueError( 'expected code %s, got %s' % (status, response_future.code()))
'expected code %s, got %s' % (status, response_future.code())) elif response_future.details() != message:
if response_future.details() != message: raise ValueError(
raise ValueError( 'expected message %s, got %s' % (message, response_future.details()))
'expected message %s, got %s' % (message, response_future.details()))
request = messages_pb2.StreamingOutputCallRequest(
request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE,
response_type=messages_pb2.COMPRESSABLE, response_parameters=(
response_parameters=( messages_pb2.ResponseParameters(size=1),),
messages_pb2.ResponseParameters(size=1),), response_status=messages_pb2.EchoStatus(code=code, message=message))
response_status=messages_pb2.EchoStatus(code=code, message=message)) response_iterator = stub.StreamingOutputCall(request)
response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) if response_future.code() != status:
if response_future.code() != status: raise ValueError(
raise ValueError( 'expected code %s, got %s' % (status, response_iterator.code()))
'expected code %s, got %s' % (status, response_iterator.code())) elif response_future.details() != message:
if response_future.details() != message: raise ValueError(
raise ValueError( 'expected message %s, got %s' % (message, response_iterator.details()))
'expected message %s, got %s' % (message, response_iterator.details()))
def _compute_engine_creds(stub, args): def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True) response = _large_unary_common_behavior(stub, True, True, None)
if args.default_service_account != response.username: if args.default_service_account != response.username:
raise ValueError( raise ValueError(
'expected username %s, got %s' % (args.default_service_account, 'expected username %s, got %s' % (
response.username)) args.default_service_account, response.username))
def _oauth2_auth_token(stub, args): def _oauth2_auth_token(stub, args):
json_key_filename = os.environ[ json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, True) response = _large_unary_common_behavior(stub, True, True, None)
if wanted_email != response.username: if wanted_email != response.username:
raise ValueError( raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username)) 'expected username %s, got %s' % (wanted_email, response.username))
if args.oauth_scope.find(response.oauth_scope) == -1: if args.oauth_scope.find(response.oauth_scope) == -1:
raise ValueError( raise ValueError(
'expected to find oauth scope "%s" in received "%s"' % 'expected to find oauth scope "{}" in received "{}"'.format(
(response.oauth_scope, args.oauth_scope)) response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args): def _jwt_token_creds(stub, args):
json_key_filename = os.environ[ json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, False) response = _large_unary_common_behavior(stub, True, False, None)
if wanted_email != response.username: if wanted_email != response.username:
raise ValueError( raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username)) 'expected username %s, got %s' % (wanted_email, response.username))
@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
credentials = oauth2client_client.GoogleCredentials.get_application_default() credentials = oauth2client_client.GoogleCredentials.get_application_default()
scoped_credentials = credentials.create_scoped([args.oauth_scope]) scoped_credentials = credentials.create_scoped([args.oauth_scope])
call_creds = implementations.google_call_credentials(scoped_credentials) # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
options = interfaces.grpc_call_options(disable_compression=False, # remaining use of the Beta API.
credentials=call_creds) call_credentials = implementations.google_call_credentials(
response = _large_unary_common_behavior(stub, True, False, scoped_credentials)
protocol_options=options) response = _large_unary_common_behavior(stub, True, False, call_credentials)
if wanted_email != response.username: if wanted_email != response.username:
raise ValueError( raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username)) 'expected username %s, got %s' % (wanted_email, response.username))

@ -30,10 +30,11 @@
"""The Python implementation of the GRPC interoperability test server.""" """The Python implementation of the GRPC interoperability test server."""
import argparse import argparse
from concurrent import futures
import logging import logging
import time import time
from grpc.beta import implementations import grpc
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
from tests.interop import methods from tests.interop import methods
@ -51,12 +52,13 @@ def serve():
default=False, type=resources.parse_bool) default=False, type=resources.parse_bool)
args = parser.parse_args() args = parser.parse_args()
server = test_pb2.beta_create_TestService_server(methods.TestService()) server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
if args.use_tls: if args.use_tls:
private_key = resources.private_key() private_key = resources.private_key()
certificate_chain = resources.certificate_chain() certificate_chain = resources.certificate_chain()
credentials = implementations.ssl_server_credentials( credentials = grpc.ssl_server_credentials(
[(private_key, certificate_chain)]) ((private_key, certificate_chain),))
server.add_secure_port('[::]:{}'.format(args.port), credentials) server.add_secure_port('[::]:{}'.format(args.port), credentials)
else: else:
server.add_insecure_port('[::]:{}'.format(args.port)) server.add_insecure_port('[::]:{}'.format(args.port))
@ -68,7 +70,7 @@ def serve():
time.sleep(_ONE_DAY_IN_SECONDS) time.sleep(_ONE_DAY_IN_SECONDS)
except BaseException as e: except BaseException as e:
logging.info('Caught exception "%s"; stopping server...', e) logging.info('Caught exception "%s"; stopping server...', e)
server.stop(0) server.stop(None)
logging.info('Server stopped; exiting.') logging.info('Server stopped; exiting.')
if __name__ == '__main__': if __name__ == '__main__':

@ -30,9 +30,10 @@
"""Entry point for running stress tests.""" """Entry point for running stress tests."""
import argparse import argparse
from concurrent import futures
import threading import threading
from grpc.beta import implementations import grpc
from six.moves import queue from six.moves import queue
from src.proto.grpc.testing import metrics_pb2 from src.proto.grpc.testing import metrics_pb2
from src.proto.grpc.testing import test_pb2 from src.proto.grpc.testing import test_pb2
@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args):
def run_test(args): def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases) test_cases = _parse_weighted_test_cases(args.test_cases)
test_servers = args.server_addresses.split(',') test_server_targets = args.server_addresses.split(',')
# Propagate any client exceptions with a queue # Propagate any client exceptions with a queue
exception_queue = queue.Queue() exception_queue = queue.Queue()
stop_event = threading.Event() stop_event = threading.Event()
hist = histogram.Histogram(1, 1) hist = histogram.Histogram(1, 1)
runners = [] runners = []
server = metrics_pb2.beta_create_MetricsService_server( server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
metrics_server.MetricsServer(hist)) metrics_pb2.add_MetricsServiceServicer_to_server(
metrics_server.MetricsServer(hist), server)
server.add_insecure_port('[::]:{}'.format(args.metrics_port)) server.add_insecure_port('[::]:{}'.format(args.metrics_port))
server.start() server.start()
for test_server in test_servers: for test_server_target in test_server_targets:
host, port = test_server.split(':', 1)
for _ in xrange(args.num_channels_per_server): for _ in xrange(args.num_channels_per_server):
channel = implementations.insecure_channel(host, int(port)) channel = grpc.insecure_channel(test_server_target)
for _ in xrange(args.num_stubs_per_channel): for _ in xrange(args.num_stubs_per_channel):
stub = test_pb2.beta_create_TestService_stub(channel) stub = test_pb2.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist, runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event) exception_queue, stop_event)
runners.append(runner) runners.append(runner)
@ -128,8 +129,8 @@ def run_test(args):
stop_event.set() stop_event.set()
for runner in runners: for runner in runners:
runner.join() runner.join()
runner = None runner = None
server.stop(0) server.stop(None)
if __name__ == '__main__': if __name__ == '__main__':
run_test(_args()) run_test(_args())

@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2
GAUGE_NAME = 'python_overall_qps' GAUGE_NAME = 'python_overall_qps'
class MetricsServer(metrics_pb2.BetaMetricsServiceServicer): class MetricsServer(metrics_pb2.MetricsServiceServicer):
def __init__(self, histogram): def __init__(self, histogram):
self._start_time = time.time() self._start_time = time.time()

@ -27,10 +27,10 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc.beta import implementations import grpc
# This code doesn't do much but makes sure the native extension is loaded # This code doesn't do much but makes sure the native extension is loaded
# which is what we are testing here. # which is what we are testing here.
channel = implementations.insecure_channel('localhost', 1000) channel = grpc.insecure_channel('localhost:1000')
del channel del channel
print 'Success!' print 'Success!'

Loading…
Cancel
Save