diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py new file mode 100644 index 00000000000..b0c9ec62d07 --- /dev/null +++ b/test/compiler/python_plugin_test.py @@ -0,0 +1,480 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import contextlib +import errno +import itertools +import os +import subprocess +import sys +import time +import unittest + +from grpc.framework.face import exceptions +from grpc.framework.foundation import future + +# Assigned in __main__. +_build_mode = None + + +class _ServicerMethods(object): + + def __init__(self, test_pb2, delay): + self._paused = False + self._failed = False + self.test_pb2 = test_pb2 + self.delay = delay + + @contextlib.contextmanager + def pause(self): # pylint: disable=invalid-name + self._paused = True + yield + self._paused = False + + @contextlib.contextmanager + def fail(self): # pylint: disable=invalid-name + self._failed = True + yield + self._failed = False + + def _control(self): # pylint: disable=invalid-name + if self._failed: + raise ValueError() + time.sleep(self.delay) + while self._paused: + time.sleep(0) + + def UnaryCall(self, request): + response = self.test_pb2.SimpleResponse() + response.payload.payload_type = self.test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * request.response_size + self._control() + return response + + def StreamingOutputCall(self, request): + for parameter in request.response_parameters: + response = self.test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self.test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def StreamingInputCall(self, request_iter): + response = self.test_pb2.StreamingInputCallResponse() + aggregated_payload_size = 0 + for request in request_iter: + aggregated_payload_size += len(request.payload.payload_compressable) + response.aggregated_payload_size = aggregated_payload_size + self._control() + return response + + def FullDuplexCall(self, request_iter): + for request in request_iter: + for parameter in request.response_parameters: + response = self.test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self.test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def HalfDuplexCall(self, request_iter): + responses = [] + for request in request_iter: + for parameter in request.response_parameters: + response = self.test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self.test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + responses.append(response) + for response in responses: + yield response + + +def CreateService(test_pb2, delay=0, timeout=1): + """Provides a servicer backend and a stub. + + The servicer is just the implementation + of the actual servicer passed to the face player of the python RPC + implementation; the two are detached. + + Non-zero delay puts a delay on each call to the servicer, representative of + communication latency. Timeout is the default timeout for the stub while + waiting for the service. + + Args: + test_pb2: the test_pb2 module generated by this test + delay: delay in seconds per response from the servicer + timeout: how long the stub will wait for the servicer by default. + Returns: + A two-tuple (servicer, stub), where the servicer is the back-end of the + service bound to the stub. + """ + class Servicer(test_pb2.TestServiceServicer): + + def UnaryCall(self, request): + return servicer_methods.UnaryCall(request) + + def StreamingOutputCall(self, request): + return servicer_methods.StreamingOutputCall(request) + + def StreamingInputCall(self, request_iter): + return servicer_methods.StreamingInputCall(request_iter) + + def FullDuplexCall(self, request_iter): + return servicer_methods.FullDuplexCall(request_iter) + + def HalfDuplexCall(self, request_iter): + return servicer_methods.HalfDuplexCall(request_iter) + + servicer_methods = _ServicerMethods(test_pb2, delay) + servicer = Servicer() + linked_pair = test_pb2.mock_TestService(servicer, timeout) + stub = linked_pair.stub + return servicer_methods, stub + + +def StreamingInputRequest(test_pb2): + for _ in range(3): + request = test_pb2.StreamingInputCallRequest() + request.payload.payload_type = test_pb2.COMPRESSABLE + request.payload.payload_compressable = 'a' + yield request + + +def StreamingOutputRequest(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + sizes = [1, 2, 3] + request.response_parameters.add(size=sizes[0], interval_us=0) + request.response_parameters.add(size=sizes[1], interval_us=0) + request.response_parameters.add(size=sizes[2], interval_us=0) + return request + + +def FullDuplexRequest(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + +class PythonPluginTest(unittest.TestCase): + """Test case for the gRPC Python protoc-plugin. + + While reading these tests, remember that the futures API + (`stub.method.async()`) only gives futures for the *non-streaming* responses, + else it behaves like its blocking cousin. + """ + + def setUp(self): + protoc_command = '../../bins/%s/protobuf/protoc' % _build_mode + protoc_plugin_filename = '../../bins/%s/grpc_python_plugin' % _build_mode + test_proto_filename = '../cpp/interop/test.proto' + if not os.path.isfile(protoc_command): + # Assume that if we haven't built protoc that it's on the system. + protoc_command = 'protoc' + + # ensure that the output directory exists + outdir = '../../gens/test/compiler/python/' + try: + os.makedirs(outdir) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + + cmd = [ + protoc_command, + '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, + '-I %s' % os.path.dirname(test_proto_filename), + '--python_out=%s' % outdir, + '--python-grpc_out=%s' % outdir, + os.path.basename(test_proto_filename), + ] + subprocess.call(' '.join(cmd), shell=True) + sys.path.append(outdir) + + self.delay = 1 # seconds + self.timeout = 2 # seconds + + def testImportAttributes(self): + # check that we can access the members + import test_pb2 # pylint: disable=g-import-not-at-top + self.assertIsNotNone(getattr(test_pb2, 'TestServiceServicer', None)) + self.assertIsNotNone(getattr(test_pb2, 'TestServiceService', None)) + self.assertIsNotNone(getattr(test_pb2, 'TestServiceStub', None)) + + def testUnaryCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + request = test_pb2.SimpleRequest(response_size=13) + response = stub.UnaryCall(request) + expected_response = servicer.UnaryCall(request) + self.assertEqual(expected_response, response) + + def testUnaryCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService( + test_pb2, delay=self.delay, timeout=self.timeout) + request = test_pb2.SimpleRequest(response_size=13) + # TODO(atash): consider using the 'profile' module? Does it even work here? + start_time = time.clock() + response_future = stub.UnaryCall.async(request) + self.assertGreater(self.delay, time.clock() - start_time) + response = response_future.result() + expected_response = servicer.UnaryCall(request) + self.assertEqual(expected_response, response) + + def testUnaryCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + # set the timeout super low... + servicer, stub = CreateService(test_pb2, delay=1, timeout=0.1) + request = test_pb2.SimpleRequest(response_size=13) + with servicer.pause(): + response_future = stub.UnaryCall.async(request) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + def testUnaryCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + request = test_pb2.SimpleRequest(response_size=13) + with servicer.pause(): + response_future = stub.UnaryCall.async(request) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + + def testUnaryCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + request = test_pb2.SimpleRequest(response_size=13) + with servicer.fail(): + response_future = stub.UnaryCall.async(request) + self.assertIsNotNone(response_future.exception()) + + def testStreamingOutputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + request = StreamingOutputRequest(test_pb2) + responses = stub.StreamingOutputCall(request) + expected_responses = servicer.StreamingOutputCall(request) + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testStreamingOutputCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=self.timeout) + request = StreamingOutputRequest(test_pb2) + responses = stub.StreamingOutputCall.async(request) + expected_responses = servicer.StreamingOutputCall(request) + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testStreamingOutputCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=0.1) + request = StreamingOutputRequest(test_pb2) + with servicer.pause(): + responses = stub.StreamingOutputCall.async(request) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + def testStreamingOutputCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + _, stub = CreateService(test_pb2, timeout=0.1) + request = StreamingOutputRequest(test_pb2) + responses = stub.StreamingOutputCall.async(request) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + def testStreamingOutputCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=0.1) + request = StreamingOutputRequest(test_pb2) + with servicer.fail(): + responses = stub.StreamingOutputCall.async(request) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + def testStreamingInputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + response = stub.StreamingInputCall(StreamingInputRequest(test_pb2)) + expected_response = servicer.StreamingInputCall( + StreamingInputRequest(test_pb2)) + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService( + test_pb2, delay=self.delay, timeout=self.timeout) + start_time = time.clock() + response_future = stub.StreamingInputCall.async( + StreamingInputRequest(test_pb2)) + self.assertGreater(self.delay, time.clock() - start_time) + response = response_future.result() + expected_response = servicer.StreamingInputCall( + StreamingInputRequest(test_pb2)) + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + # set the timeout super low... + servicer, stub = CreateService(test_pb2, delay=1, timeout=0.1) + with servicer.pause(): + response_future = stub.StreamingInputCall.async( + StreamingInputRequest(test_pb2)) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + + def testStreamingInputCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + with servicer.pause(): + response_future = stub.StreamingInputCall.async( + StreamingInputRequest(test_pb2)) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + + def testStreamingInputCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + with servicer.fail(): + response_future = stub.StreamingInputCall.async( + StreamingInputRequest(test_pb2)) + self.assertIsNotNone(response_future.exception()) + + def testFullDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + responses = stub.FullDuplexCall(FullDuplexRequest(test_pb2)) + expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2)) + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testFullDuplexCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=self.timeout) + responses = stub.FullDuplexCall.async(FullDuplexRequest(test_pb2)) + expected_responses = servicer.FullDuplexCall(FullDuplexRequest(test_pb2)) + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testFullDuplexCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=0.1) + request = FullDuplexRequest(test_pb2) + with servicer.pause(): + responses = stub.FullDuplexCall.async(request) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + def testFullDuplexCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + _, stub = CreateService(test_pb2, timeout=0.1) + request = FullDuplexRequest(test_pb2) + responses = stub.FullDuplexCall.async(request) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + def testFullDuplexCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2, timeout=0.1) + request = FullDuplexRequest(test_pb2) + with servicer.fail(): + responses = stub.FullDuplexCall.async(request) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + def testHalfDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + servicer, stub = CreateService(test_pb2) + def HalfDuplexRequest(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + responses = stub.HalfDuplexCall(HalfDuplexRequest()) + expected_responses = servicer.HalfDuplexCall(HalfDuplexRequest()) + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testHalfDuplexCallAsyncWedged(self): + import test_pb2 # pylint: disable=g-import-not-at-top + _, stub = CreateService(test_pb2, timeout=1) + wait_flag = [False] + @contextlib.contextmanager + def wait(): # pylint: disable=invalid-name + # Where's Python 3's 'nonlocal' statement when you need it? + wait_flag[0] = True + yield + wait_flag[0] = False + def HalfDuplexRequest(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + while wait_flag[0]: + time.sleep(0.1) + with wait(): + responses = stub.HalfDuplexCall.async(HalfDuplexRequest()) + # half-duplex waits for the client to send all info + with self.assertRaises(exceptions.ExpirationError): + next(responses) + + +if __name__ == '__main__': + os.chdir(os.path.dirname(sys.argv[0])) + parser = argparse.ArgumentParser(description='Run Python compiler plugin test.') + parser.add_argument('--build_mode', dest='build_mode', type=str, default='dbg', + help='The build mode of the targets to test, e.g. ' + '"dbg", "opt", "asan", etc.') + args, remainder = parser.parse_known_args() + _build_mode = args.build_mode + sys.argv[1:] = remainder + unittest.main() diff --git a/test/compiler/test.proto b/test/compiler/test.proto new file mode 100644 index 00000000000..ed7c6a7b797 --- /dev/null +++ b/test/compiler/test.proto @@ -0,0 +1,139 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +// This file is duplicated around the code base. See GitHub issue #526. +syntax = "proto2"; + +package grpc.testing; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message Payload { + required PayloadType payload_type = 1; + oneof payload_body { + string payload_compressable = 2; + bytes payload_uncompressable = 3; + } +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh index 1b8fe1982d1..fe40b511860 100755 --- a/tools/run_tests/run_python.sh +++ b/tools/run_tests/run_python.sh @@ -37,6 +37,7 @@ root=`pwd` export LD_LIBRARY_PATH=$root/libs/opt source python2.7_virtual_environment/bin/activate # TODO(issue 215): Properly itemize these in run_tests.py so that they can be parallelized. +python2.7 -B test/compiler/python_plugin_test.py python2.7 -B -m grpc._adapter._blocking_invocation_inline_service_test python2.7 -B -m grpc._adapter._c_test python2.7 -B -m grpc._adapter._event_invocation_synchronous_event_service_test