Merge pull request #7702 from nathanielmanistaatgoogle/distrib-interop-stress-ga-api

Migrate distrib, interop, and stress to GA API
pull/7741/head^2
kpayson64 9 years ago committed by GitHub
commit e3ce8bd503
  1. 14
      src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
  2. 24
      src/python/grpcio_tests/tests/interop/_secure_interop_test.py
  3. 56
      src/python/grpcio_tests/tests/interop/client.py
  4. 270
      src/python/grpcio_tests/tests/interop/methods.py
  5. 12
      src/python/grpcio_tests/tests/interop/server.py
  6. 21
      src/python/grpcio_tests/tests/stress/client.py
  7. 2
      src/python/grpcio_tests/tests/stress/metrics_server.py
  8. 4
      test/distrib/python/distribtest.py

@ -29,9 +29,10 @@
"""Insecure client-server interoperability as a unit test."""
from concurrent import futures
import unittest
from grpc.beta import implementations
import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
@ -44,14 +45,13 @@ class InsecureInteropTest(
unittest.TestCase):
def setUp(self):
self.server = test_pb2.beta_create_TestService_server(methods.TestService())
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(
methods.TestService(), self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()
self.stub = test_pb2.beta_create_TestService_stub(
implementations.insecure_channel('localhost', port))
def tearDown(self):
self.server.stop(0)
self.stub = test_pb2.TestServiceStub(
grpc.insecure_channel('localhost:{}'.format(port)))
if __name__ == '__main__':

@ -29,17 +29,16 @@
"""Secure client-server interoperability as a unit test."""
from concurrent import futures
import unittest
from grpc.beta import implementations
import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
from tests.interop import methods
from tests.interop import resources
from tests.unit.beta import test_utilities
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
@ -48,19 +47,18 @@ class SecureInteropTest(
unittest.TestCase):
def setUp(self):
self.server = test_pb2.beta_create_TestService_server(methods.TestService())
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(
methods.TestService(), self.server)
port = self.server.add_secure_port(
'[::]:0', implementations.ssl_server_credentials(
'[::]:0', grpc.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain())]))
self.server.start()
self.stub = test_pb2.beta_create_TestService_stub(
test_utilities.not_really_secure_channel(
'localhost', port, implementations.ssl_channel_credentials(
resources.test_root_certificates()),
_SERVER_HOST_OVERRIDE))
def tearDown(self):
self.server.stop(0)
self.stub = test_pb2.TestServiceStub(
grpc.secure_channel(
'localhost:{}'.format(port),
grpc.ssl_channel_credentials(resources.test_root_certificates()),
(('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
if __name__ == '__main__':

@ -32,14 +32,12 @@
import argparse
from oauth2client import client as oauth2client_client
import grpc
from grpc.beta import implementations
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
from tests.interop import resources
from tests.unit.beta import test_utilities
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def _args():
@ -66,41 +64,49 @@ def _args():
return parser.parse_args()
def _application_default_credentials():
return oauth2client_client.GoogleCredentials.get_application_default()
def _stub(args):
target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
creds = oauth2client_client.GoogleCredentials.get_application_default()
scoped_creds = creds.create_scoped([args.oauth_scope])
access_token = scoped_creds.get_access_token().access_token
call_creds = implementations.access_token_call_credentials(access_token)
google_credentials = _application_default_credentials()
scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
access_token = scoped_credentials.get_access_token().access_token
call_credentials = grpc.access_token_call_credentials(access_token)
elif args.test_case == 'compute_engine_creds':
creds = oauth2client_client.GoogleCredentials.get_application_default()
scoped_creds = creds.create_scoped([args.oauth_scope])
call_creds = implementations.google_call_credentials(scoped_creds)
google_credentials = _application_default_credentials()
scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
# TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations.google_call_credentials(
scoped_credentials)
elif args.test_case == 'jwt_token_creds':
creds = oauth2client_client.GoogleCredentials.get_application_default()
call_creds = implementations.google_call_credentials(creds)
google_credentials = _application_default_credentials()
# TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations.google_call_credentials(
google_credentials)
else:
call_creds = None
call_credentials = None
if args.use_tls:
if args.use_test_ca:
root_certificates = resources.test_root_certificates()
else:
root_certificates = None # will load default roots.
channel_creds = implementations.ssl_channel_credentials(root_certificates)
if call_creds is not None:
channel_creds = implementations.composite_channel_credentials(
channel_creds, call_creds)
channel_credentials = grpc.ssl_channel_credentials(root_certificates)
if call_credentials is not None:
channel_credentials = grpc.composite_channel_credentials(
channel_credentials, call_credentials)
channel = test_utilities.not_really_secure_channel(
args.server_host, args.server_port, channel_creds,
args.server_host_override)
stub = test_pb2.beta_create_TestService_stub(channel)
channel = grpc.secure_channel(
target, channel_credentials,
(('grpc.ssl_target_name_override', args.server_host_override,),))
else:
channel = implementations.insecure_channel(
args.server_host, args.server_port)
stub = test_pb2.beta_create_TestService_stub(channel)
return stub
channel = grpc.insecure_channel(target)
return test_pb2.TestServiceStub(channel)
def _test_case_from_arg(test_case_arg):

@ -29,8 +29,6 @@
"""Implementations of interoperability test methods."""
from __future__ import print_function
import enum
import json
import os
@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
import grpc
from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2
_TIMEOUT = 7
class TestService(test_pb2.BetaTestServiceServicer):
class TestService(test_pb2.TestServiceServicer):
def EmptyCall(self, request, context):
return empty_pb2.Empty()
def UnaryCall(self, request, context):
if request.HasField('response_status'):
context.code(request.response_status.code)
context.details(request.response_status.message)
context.set_code(request.response_status.code)
context.set_details(request.response_status.message)
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingOutputCall(self, request, context):
if request.HasField('response_status'):
context.code(request.response_status.code)
context.details(request.response_status.message)
context.set_code(request.response_status.code)
context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0
for request in request_iterator:
if request.payload and request.payload.body:
if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def FullDuplexCall(self, request_iterator, context):
for request in request_iterator:
if request.HasField('response_status'):
context.code(request.response_status.code)
context.details(request.response_status.message)
context.set_code(request.response_status.code)
context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
return self.FullDuplexCall(request_iterator, context)
def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope,
protocol_options=None):
with stub:
request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE, response_size=314159,
payload=messages_pb2.Payload(body=b'\x00' * 271828),
fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
response_future = stub.UnaryCall.future(request, _TIMEOUT,
protocol_options=protocol_options)
response = response_future.result()
if response.payload.type is not messages_pb2.COMPRESSABLE:
raise ValueError(
'response payload type is "%s"!' % type(response.payload.type))
if len(response.payload.body) != 314159:
raise ValueError(
'response body of incorrect size %d!' % len(response.payload.body))
def _large_unary_common_behavior(
stub, fill_username, fill_oauth_scope, call_credentials):
request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE, response_size=314159,
payload=messages_pb2.Payload(body=b'\x00' * 271828),
fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
response_future = stub.UnaryCall.future(
request, credentials=call_credentials)
response = response_future.result()
if response.payload.type is not messages_pb2.COMPRESSABLE:
raise ValueError(
'response payload type is "%s"!' % type(response.payload.type))
elif len(response.payload.body) != 314159:
raise ValueError(
'response body of incorrect size %d!' % len(response.payload.body))
else:
return response
def _empty_unary(stub):
with stub:
response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
if not isinstance(response, empty_pb2.Empty):
raise TypeError(
'response is of type "%s", not empty_pb2.Empty!', type(response))
response = stub.EmptyCall(empty_pb2.Empty())
if not isinstance(response, empty_pb2.Empty):
raise TypeError(
'response is of type "%s", not empty_pb2.Empty!', type(response))
def _large_unary(stub):
_large_unary_common_behavior(stub, False, False)
_large_unary_common_behavior(stub, False, False, None)
def _client_streaming(stub):
with stub:
payload_body_sizes = (27182, 8, 1828, 45904)
payloads = (
messages_pb2.Payload(body=b'\x00' * size)
for size in payload_body_sizes)
requests = (
messages_pb2.StreamingInputCallRequest(payload=payload)
for payload in payloads)
response = stub.StreamingInputCall(requests, _TIMEOUT)
if response.aggregated_payload_size != 74922:
raise ValueError(
'incorrect size %d!' % response.aggregated_payload_size)
payload_body_sizes = (27182, 8, 1828, 45904,)
payloads = (
messages_pb2.Payload(body=b'\x00' * size)
for size in payload_body_sizes)
requests = (
messages_pb2.StreamingInputCallRequest(payload=payload)
for payload in payloads)
response = stub.StreamingInputCall(requests)
if response.aggregated_payload_size != 74922:
raise ValueError(
'incorrect size %d!' % response.aggregated_payload_size)
def _server_streaming(stub):
sizes = (31415, 9, 2653, 58979)
with stub:
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(
messages_pb2.ResponseParameters(size=sizes[0]),
messages_pb2.ResponseParameters(size=sizes[1]),
messages_pb2.ResponseParameters(size=sizes[2]),
messages_pb2.ResponseParameters(size=sizes[3]),
))
response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
for index, response in enumerate(response_iterator):
if response.payload.type != messages_pb2.COMPRESSABLE:
raise ValueError(
'response body of invalid type %s!' % response.payload.type)
if len(response.payload.body) != sizes[index]:
raise ValueError(
'response body of invalid size %d!' % len(response.payload.body))
sizes = (31415, 9, 2653, 58979,)
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(
messages_pb2.ResponseParameters(size=sizes[0]),
messages_pb2.ResponseParameters(size=sizes[1]),
messages_pb2.ResponseParameters(size=sizes[2]),
messages_pb2.ResponseParameters(size=sizes[3]),
)
)
response_iterator = stub.StreamingOutputCall(request)
for index, response in enumerate(response_iterator):
if response.payload.type != messages_pb2.COMPRESSABLE:
raise ValueError(
'response body of invalid type %s!' % response.payload.type)
elif len(response.payload.body) != sizes[index]:
raise ValueError(
'response body of invalid size %d!' % len(response.payload.body))
def _cancel_after_begin(stub):
with stub:
sizes = (27182, 8, 1828, 45904)
payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
for payload in payloads]
responses = stub.StreamingInputCall.future(requests, _TIMEOUT)
responses.cancel()
if not responses.cancelled():
raise ValueError('expected call to be cancelled')
sizes = (27182, 8, 1828, 45904,)
payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
for payload in payloads)
response_future = stub.StreamingInputCall.future(requests)
response_future.cancel()
if not response_future.cancelled():
raise ValueError('expected call to be cancelled')
class _Pipe(object):
@ -220,18 +210,17 @@ class _Pipe(object):
def _ping_pong(stub):
request_response_sizes = (31415, 9, 2653, 58979)
request_payload_sizes = (27182, 8, 1828, 45904)
request_response_sizes = (31415, 9, 2653, 58979,)
request_payload_sizes = (27182, 8, 1828, 45904,)
with stub, _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
print('Starting ping-pong with response iterator %s' % response_iterator)
with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe)
for response_size, payload_size in zip(
request_response_sizes, request_payload_sizes):
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(messages_pb2.ResponseParameters(
size=response_size),),
response_parameters=(
messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@ -244,17 +233,17 @@ def _ping_pong(stub):
def _cancel_after_first_response(stub):
request_response_sizes = (31415, 9, 2653, 58979)
request_payload_sizes = (27182, 8, 1828, 45904)
with stub, _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
request_response_sizes = (31415, 9, 2653, 58979,)
request_payload_sizes = (27182, 8, 1828, 45904,)
with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe)
response_size = request_response_sizes[0]
payload_size = request_payload_sizes[0]
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(messages_pb2.ResponseParameters(
size=response_size),),
response_parameters=(
messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
try:
next(response_iterator)
except Exception:
pass
except grpc.RpcError as rpc_error:
if rpc_error.code() is not grpc.StatusCode.CANCELLED:
raise
else:
raise ValueError('expected call to be cancelled')
def _timeout_on_sleeping_server(stub):
request_payload_size = 27182
with stub, _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, 0.001)
with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
time.sleep(0.1)
try:
next(response_iterator)
except face.ExpirationError:
pass
except grpc.RpcError as rpc_error:
if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
raise
else:
raise ValueError('expected call to exceed deadline')
def _empty_stream(stub):
with stub, _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
with _Pipe() as pipe:
response_iterator = stub.FullDuplexCall(pipe)
pipe.close()
try:
next(response_iterator)
@ -300,65 +291,64 @@ def _empty_stream(stub):
def _status_code_and_message(stub):
with stub:
message = 'test status message'
code = 2
status = grpc.StatusCode.UNKNOWN # code = 2
request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
response_size=1,
payload=messages_pb2.Payload(body=b'\x00'),
response_status=messages_pb2.EchoStatus(code=code, message=message)
)
response_future = stub.UnaryCall.future(request, _TIMEOUT)
if response_future.code() != status:
raise ValueError(
'expected code %s, got %s' % (status, response_future.code()))
if response_future.details() != message:
raise ValueError(
'expected message %s, got %s' % (message, response_future.details()))
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(
messages_pb2.ResponseParameters(size=1),),
response_status=messages_pb2.EchoStatus(code=code, message=message))
response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
if response_future.code() != status:
raise ValueError(
'expected code %s, got %s' % (status, response_iterator.code()))
if response_future.details() != message:
raise ValueError(
'expected message %s, got %s' % (message, response_iterator.details()))
message = 'test status message'
code = 2
status = grpc.StatusCode.UNKNOWN # code = 2
request = messages_pb2.SimpleRequest(
response_type=messages_pb2.COMPRESSABLE,
response_size=1,
payload=messages_pb2.Payload(body=b'\x00'),
response_status=messages_pb2.EchoStatus(code=code, message=message)
)
response_future = stub.UnaryCall.future(request)
if response_future.code() != status:
raise ValueError(
'expected code %s, got %s' % (status, response_future.code()))
elif response_future.details() != message:
raise ValueError(
'expected message %s, got %s' % (message, response_future.details()))
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
response_parameters=(
messages_pb2.ResponseParameters(size=1),),
response_status=messages_pb2.EchoStatus(code=code, message=message))
response_iterator = stub.StreamingOutputCall(request)
if response_future.code() != status:
raise ValueError(
'expected code %s, got %s' % (status, response_iterator.code()))
elif response_future.details() != message:
raise ValueError(
'expected message %s, got %s' % (message, response_iterator.details()))
def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True)
response = _large_unary_common_behavior(stub, True, True, None)
if args.default_service_account != response.username:
raise ValueError(
'expected username %s, got %s' % (args.default_service_account,
response.username))
'expected username %s, got %s' % (
args.default_service_account, response.username))
def _oauth2_auth_token(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, True)
response = _large_unary_common_behavior(stub, True, True, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
if args.oauth_scope.find(response.oauth_scope) == -1:
raise ValueError(
'expected to find oauth scope "%s" in received "%s"' %
(response.oauth_scope, args.oauth_scope))
'expected to find oauth scope "{}" in received "{}"'.format(
response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, False)
response = _large_unary_common_behavior(stub, True, False, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
credentials = oauth2client_client.GoogleCredentials.get_application_default()
scoped_credentials = credentials.create_scoped([args.oauth_scope])
call_creds = implementations.google_call_credentials(scoped_credentials)
options = interfaces.grpc_call_options(disable_compression=False,
credentials=call_creds)
response = _large_unary_common_behavior(stub, True, False,
protocol_options=options)
# TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
# remaining use of the Beta API.
call_credentials = implementations.google_call_credentials(
scoped_credentials)
response = _large_unary_common_behavior(stub, True, False, call_credentials)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))

@ -30,10 +30,11 @@
"""The Python implementation of the GRPC interoperability test server."""
import argparse
from concurrent import futures
import logging
import time
from grpc.beta import implementations
import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
@ -51,12 +52,13 @@ def serve():
default=False, type=resources.parse_bool)
args = parser.parse_args()
server = test_pb2.beta_create_TestService_server(methods.TestService())
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
if args.use_tls:
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
credentials = implementations.ssl_server_credentials(
[(private_key, certificate_chain)])
credentials = grpc.ssl_server_credentials(
((private_key, certificate_chain),))
server.add_secure_port('[::]:{}'.format(args.port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(args.port))
@ -68,7 +70,7 @@ def serve():
time.sleep(_ONE_DAY_IN_SECONDS)
except BaseException as e:
logging.info('Caught exception "%s"; stopping server...', e)
server.stop(0)
server.stop(None)
logging.info('Server stopped; exiting.')
if __name__ == '__main__':

@ -30,9 +30,10 @@
"""Entry point for running stress tests."""
import argparse
from concurrent import futures
import threading
from grpc.beta import implementations
import grpc
from six.moves import queue
from src.proto.grpc.testing import metrics_pb2
from src.proto.grpc.testing import test_pb2
@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args):
def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases)
test_servers = args.server_addresses.split(',')
test_server_targets = args.server_addresses.split(',')
# Propagate any client exceptions with a queue
exception_queue = queue.Queue()
stop_event = threading.Event()
hist = histogram.Histogram(1, 1)
runners = []
server = metrics_pb2.beta_create_MetricsService_server(
metrics_server.MetricsServer(hist))
server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
metrics_pb2.add_MetricsServiceServicer_to_server(
metrics_server.MetricsServer(hist), server)
server.add_insecure_port('[::]:{}'.format(args.metrics_port))
server.start()
for test_server in test_servers:
host, port = test_server.split(':', 1)
for test_server_target in test_server_targets:
for _ in xrange(args.num_channels_per_server):
channel = implementations.insecure_channel(host, int(port))
channel = grpc.insecure_channel(test_server_target)
for _ in xrange(args.num_stubs_per_channel):
stub = test_pb2.beta_create_TestService_stub(channel)
stub = test_pb2.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event)
runners.append(runner)
@ -128,8 +129,8 @@ def run_test(args):
stop_event.set()
for runner in runners:
runner.join()
runner = None
server.stop(0)
runner = None
server.stop(None)
if __name__ == '__main__':
run_test(_args())

@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2
GAUGE_NAME = 'python_overall_qps'
class MetricsServer(metrics_pb2.BetaMetricsServiceServicer):
class MetricsServer(metrics_pb2.MetricsServiceServicer):
def __init__(self, histogram):
self._start_time = time.time()

@ -27,10 +27,10 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc.beta import implementations
import grpc
# This code doesn't do much but makes sure the native extension is loaded
# which is what we are testing here.
channel = implementations.insecure_channel('localhost', 1000)
channel = grpc.insecure_channel('localhost:1000')
del channel
print 'Success!'

Loading…
Cancel
Save