From 624839b7043e8ac3463765152e1d9326d474c691 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Thu, 20 Jun 2019 10:32:54 -0700 Subject: [PATCH] Add example Python server using compression. --- examples/python/compression/BUILD.bazel | 44 +++++++ examples/python/compression/README.md | 58 ++++++++++ examples/python/compression/client.py | 76 ++++++++++++ examples/python/compression/server.py | 109 ++++++++++++++++++ .../test/compression_example_test.py | 62 ++++++++++ 5 files changed, 349 insertions(+) create mode 100644 examples/python/compression/BUILD.bazel create mode 100644 examples/python/compression/README.md create mode 100644 examples/python/compression/client.py create mode 100644 examples/python/compression/server.py create mode 100644 examples/python/compression/test/compression_example_test.py diff --git a/examples/python/compression/BUILD.bazel b/examples/python/compression/BUILD.bazel new file mode 100644 index 00000000000..b95d7cccc77 --- /dev/null +++ b/examples/python/compression/BUILD.bazel @@ -0,0 +1,44 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +py_binary( + name = "server", + srcs = ["server.py"], + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//examples:py_helloworld", + ], + srcs_version = "PY2AND3", +) + +py_binary( + name = "client", + srcs = ["client.py"], + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//examples:py_helloworld", + ], + srcs_version = "PY2AND3", +) + +py_test( + name = "test/compression_example_test", + srcs = ["test/compression_example_test.py"], + srcs_version = "PY2AND3", + data = [ + ":client", + ":server", + ], + size = "small", +) diff --git a/examples/python/compression/README.md b/examples/python/compression/README.md new file mode 100644 index 00000000000..c719bba07f8 --- /dev/null +++ b/examples/python/compression/README.md @@ -0,0 +1,58 @@ +## Compression with gRPC Python + +gRPC offers lossless compression options in order to decrease the number of bits +transferred over the wire. Three levels of compression are available: + + - `grpc.Compression.NoCompression` - No compression is applied to the payload. (default) + - `grpc.Compression.Deflate` - The "Deflate" algorithm is applied to the payload. + - `grpc.Compression.Gzip` - The Gzip algorithm is applied to the payload. + +The default option on both clients and servers is `grpc.Compression.NoCompression`. + +See [the gRPC Compression Spec](https://github.com/grpc/grpc/blob/master/doc/compression.md) +for more information. + +### Client Side Compression + +Compression may be set at two levels on the client side. + +#### At the channel level + +```python +with grpc.insecure_channel('foo.bar:1234', compression=grpc.Compression.Gzip) as channel: + use_channel(channel) +``` + +#### At the call level + +Setting the compression method at the call level will override any settings on +the channel level. + +```python +stub = helloworld_pb2_grpc.GreeterStub(channel) +response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), + compression=grpc.Compression.Deflate) +``` + + +### Server Side Compression + +Additionally, compression may be set at two levels on the server side. + +#### On the entire server + +```python +server = grpc.server(futures.ThreadPoolExecutor(), + compression=grpc.Compression.Gzip) +``` + +#### For an individual RPC + +```python +def SayHello(self, request, context): + context.set_response_compression(grpc.Compression.NoCompression) + return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) +``` + +Setting the compression method for an individual RPC will override any setting +supplied at server creation time. diff --git a/examples/python/compression/client.py b/examples/python/compression/client.py new file mode 100644 index 00000000000..444f14c1f68 --- /dev/null +++ b/examples/python/compression/client.py @@ -0,0 +1,76 @@ +# Copyright 2019 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of compression on the client side with gRPC.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import grpc + +from examples import helloworld_pb2 +from examples import helloworld_pb2_grpc + +_DESCRIPTION = 'A client capable of compression.' +_COMPRESSION_OPTIONS = { + "none": grpc.Compression.NoCompression, + "deflate": grpc.Compression.Deflate, + "gzip": grpc.Compression.Gzip, +} + +_LOGGER = logging.getLogger(__name__) + + +def run_client(channel_compression, call_compression, target): + with grpc.insecure_channel( + target, compression=channel_compression) as channel: + stub = helloworld_pb2_grpc.GreeterStub(channel) + response = stub.SayHello( + helloworld_pb2.HelloRequest(name='you'), + compression=call_compression, + wait_for_ready=True) + print("Response: {}".format(response)) + + +def main(): + parser = argparse.ArgumentParser(description=_DESCRIPTION) + parser.add_argument( + '--channel_compression', + default='none', + nargs='?', + choices=_COMPRESSION_OPTIONS.keys(), + help='The compression method to use for the channel.') + parser.add_argument( + '--call_compression', + default='none', + nargs='?', + choices=_COMPRESSION_OPTIONS.keys(), + help='The compression method to use for an individual call.') + parser.add_argument( + '--server', + default='localhost:50051', + type=str, + nargs='?', + help='The host-port pair at which to reach the server.') + args = parser.parse_args() + channel_compression = _COMPRESSION_OPTIONS[args.channel_compression] + call_compression = _COMPRESSION_OPTIONS[args.call_compression] + run_client(channel_compression, call_compression, args.server) + + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/examples/python/compression/server.py b/examples/python/compression/server.py new file mode 100644 index 00000000000..bc13e60cb5b --- /dev/null +++ b/examples/python/compression/server.py @@ -0,0 +1,109 @@ +# Copyright 2019 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of compression on the server side with gRPC.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from concurrent import futures +import argparse +import logging +import threading +import time +import grpc + +from examples import helloworld_pb2 +from examples import helloworld_pb2_grpc + +_DESCRIPTION = 'A server capable of compression.' +_COMPRESSION_OPTIONS = { + "none": grpc.Compression.NoCompression, + "deflate": grpc.Compression.Deflate, + "gzip": grpc.Compression.Gzip, +} +_LOGGER = logging.getLogger(__name__) + +_SERVER_HOST = 'localhost' +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + + def __init__(self, no_compress_every_n): + super(Greeter, self).__init__() + self._no_compress_every_n = 0 + self._request_counter = 0 + self._counter_lock = threading.RLock() + + def _should_suppress_compression(self): + suppress_compression = False + with self._counter_lock: + if self._no_compress_every_n and self._request_counter % self._no_compress_every_n == 0: + suppress_compression = True + self._request_counter += 1 + return suppress_compression + + def SayHello(self, request, context): + if self._should_suppress_compression(): + context.set_response_compression(grpc.Compression.NoCompression) + return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) + + +def run_server(server_compression, no_compress_every_n, port): + server = grpc.server( + futures.ThreadPoolExecutor(), + compression=server_compression, + options=(('grpc.so_reuseport', 1),)) + helloworld_pb2_grpc.add_GreeterServicer_to_server( + Greeter(no_compress_every_n), server) + address = '{}:{}'.format(_SERVER_HOST, port) + server.add_insecure_port(address) + server.start() + print("Server listening at '{}'".format(address)) + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(None) + + +def main(): + parser = argparse.ArgumentParser(description=_DESCRIPTION) + parser.add_argument( + '--server_compression', + default='none', + nargs='?', + choices=_COMPRESSION_OPTIONS.keys(), + help='The default compression method for the server.') + parser.add_argument( + '--no_compress_every_n', + type=int, + default=0, + nargs='?', + help='If set, every nth reply will be uncompressed.') + parser.add_argument( + '--port', + type=int, + default=50051, + nargs='?', + help='The port on which the server will listen.') + args = parser.parse_args() + run_server(_COMPRESSION_OPTIONS[args.server_compression], + args.no_compress_every_n, args.port) + + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/examples/python/compression/test/compression_example_test.py b/examples/python/compression/test/compression_example_test.py new file mode 100644 index 00000000000..7d25379683f --- /dev/null +++ b/examples/python/compression/test/compression_example_test.py @@ -0,0 +1,62 @@ +# Copyright 2019 the gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test for compression example.""" + +import contextlib +import os +import socket +import subprocess +import unittest + +_BINARY_DIR = os.path.realpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) +_SERVER_PATH = os.path.join(_BINARY_DIR, 'server') +_CLIENT_PATH = os.path.join(_BINARY_DIR, 'client') + + +@contextlib.contextmanager +def _get_port(): + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(('', 0)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + + +class CompressionExampleTest(unittest.TestCase): + + def test_compression_example(self): + with _get_port() as test_port: + server_process = subprocess.Popen((_SERVER_PATH, '--port', + str(test_port), + '--server_compression', 'gzip')) + try: + server_target = 'localhost:{}'.format(test_port) + client_process = subprocess.Popen( + (_CLIENT_PATH, '--server', server_target, + '--channel_compression', 'gzip')) + client_return_code = client_process.wait() + self.assertEqual(0, client_return_code) + self.assertIsNone(server_process.poll()) + finally: + server_process.kill() + server_process.wait() + + +if __name__ == '__main__': + unittest.main(verbosity=2)