mirror of https://github.com/grpc/grpc.git
Merge pull request #19421 from gnossen/python_compression_example
Python Compression Examplepull/19440/head
commit
00ad30c804
5 changed files with 349 additions and 0 deletions
@ -0,0 +1,44 @@ |
||||
# Copyright 2019 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
py_binary( |
||||
name = "server", |
||||
srcs = ["server.py"], |
||||
deps = [ |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//examples:py_helloworld", |
||||
], |
||||
srcs_version = "PY2AND3", |
||||
) |
||||
|
||||
py_binary( |
||||
name = "client", |
||||
srcs = ["client.py"], |
||||
deps = [ |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//examples:py_helloworld", |
||||
], |
||||
srcs_version = "PY2AND3", |
||||
) |
||||
|
||||
py_test( |
||||
name = "test/compression_example_test", |
||||
srcs = ["test/compression_example_test.py"], |
||||
srcs_version = "PY2AND3", |
||||
data = [ |
||||
":client", |
||||
":server", |
||||
], |
||||
size = "small", |
||||
) |
@ -0,0 +1,58 @@ |
||||
## Compression with gRPC Python |
||||
|
||||
gRPC offers lossless compression options in order to decrease the number of bits |
||||
transferred over the wire. Three levels of compression are available: |
||||
|
||||
- `grpc.Compression.NoCompression` - No compression is applied to the payload. (default) |
||||
- `grpc.Compression.Deflate` - The "Deflate" algorithm is applied to the payload. |
||||
- `grpc.Compression.Gzip` - The Gzip algorithm is applied to the payload. |
||||
|
||||
The default option on both clients and servers is `grpc.Compression.NoCompression`. |
||||
|
||||
See [the gRPC Compression Spec](https://github.com/grpc/grpc/blob/master/doc/compression.md) |
||||
for more information. |
||||
|
||||
### Client Side Compression |
||||
|
||||
Compression may be set at two levels on the client side. |
||||
|
||||
#### At the channel level |
||||
|
||||
```python |
||||
with grpc.insecure_channel('foo.bar:1234', compression=grpc.Compression.Gzip) as channel: |
||||
use_channel(channel) |
||||
``` |
||||
|
||||
#### At the call level |
||||
|
||||
Setting the compression method at the call level will override any settings on |
||||
the channel level. |
||||
|
||||
```python |
||||
stub = helloworld_pb2_grpc.GreeterStub(channel) |
||||
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), |
||||
compression=grpc.Compression.Deflate) |
||||
``` |
||||
|
||||
|
||||
### Server Side Compression |
||||
|
||||
Additionally, compression may be set at two levels on the server side. |
||||
|
||||
#### On the entire server |
||||
|
||||
```python |
||||
server = grpc.server(futures.ThreadPoolExecutor(), |
||||
compression=grpc.Compression.Gzip) |
||||
``` |
||||
|
||||
#### For an individual RPC |
||||
|
||||
```python |
||||
def SayHello(self, request, context): |
||||
context.set_response_compression(grpc.Compression.NoCompression) |
||||
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) |
||||
``` |
||||
|
||||
Setting the compression method for an individual RPC will override any setting |
||||
supplied at server creation time. |
@ -0,0 +1,76 @@ |
||||
# Copyright 2019 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""An example of compression on the client side with gRPC.""" |
||||
|
||||
from __future__ import absolute_import |
||||
from __future__ import division |
||||
from __future__ import print_function |
||||
|
||||
import argparse |
||||
import logging |
||||
import grpc |
||||
|
||||
from examples import helloworld_pb2 |
||||
from examples import helloworld_pb2_grpc |
||||
|
||||
_DESCRIPTION = 'A client capable of compression.' |
||||
_COMPRESSION_OPTIONS = { |
||||
"none": grpc.Compression.NoCompression, |
||||
"deflate": grpc.Compression.Deflate, |
||||
"gzip": grpc.Compression.Gzip, |
||||
} |
||||
|
||||
_LOGGER = logging.getLogger(__name__) |
||||
|
||||
|
||||
def run_client(channel_compression, call_compression, target): |
||||
with grpc.insecure_channel( |
||||
target, compression=channel_compression) as channel: |
||||
stub = helloworld_pb2_grpc.GreeterStub(channel) |
||||
response = stub.SayHello( |
||||
helloworld_pb2.HelloRequest(name='you'), |
||||
compression=call_compression, |
||||
wait_for_ready=True) |
||||
print("Response: {}".format(response)) |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser(description=_DESCRIPTION) |
||||
parser.add_argument( |
||||
'--channel_compression', |
||||
default='none', |
||||
nargs='?', |
||||
choices=_COMPRESSION_OPTIONS.keys(), |
||||
help='The compression method to use for the channel.') |
||||
parser.add_argument( |
||||
'--call_compression', |
||||
default='none', |
||||
nargs='?', |
||||
choices=_COMPRESSION_OPTIONS.keys(), |
||||
help='The compression method to use for an individual call.') |
||||
parser.add_argument( |
||||
'--server', |
||||
default='localhost:50051', |
||||
type=str, |
||||
nargs='?', |
||||
help='The host-port pair at which to reach the server.') |
||||
args = parser.parse_args() |
||||
channel_compression = _COMPRESSION_OPTIONS[args.channel_compression] |
||||
call_compression = _COMPRESSION_OPTIONS[args.call_compression] |
||||
run_client(channel_compression, call_compression, args.server) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig() |
||||
main() |
@ -0,0 +1,109 @@ |
||||
# Copyright 2019 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""An example of compression on the server side with gRPC.""" |
||||
|
||||
from __future__ import absolute_import |
||||
from __future__ import division |
||||
from __future__ import print_function |
||||
|
||||
from concurrent import futures |
||||
import argparse |
||||
import logging |
||||
import threading |
||||
import time |
||||
import grpc |
||||
|
||||
from examples import helloworld_pb2 |
||||
from examples import helloworld_pb2_grpc |
||||
|
||||
_DESCRIPTION = 'A server capable of compression.' |
||||
_COMPRESSION_OPTIONS = { |
||||
"none": grpc.Compression.NoCompression, |
||||
"deflate": grpc.Compression.Deflate, |
||||
"gzip": grpc.Compression.Gzip, |
||||
} |
||||
_LOGGER = logging.getLogger(__name__) |
||||
|
||||
_SERVER_HOST = 'localhost' |
||||
_ONE_DAY_IN_SECONDS = 60 * 60 * 24 |
||||
|
||||
|
||||
class Greeter(helloworld_pb2_grpc.GreeterServicer): |
||||
|
||||
def __init__(self, no_compress_every_n): |
||||
super(Greeter, self).__init__() |
||||
self._no_compress_every_n = 0 |
||||
self._request_counter = 0 |
||||
self._counter_lock = threading.RLock() |
||||
|
||||
def _should_suppress_compression(self): |
||||
suppress_compression = False |
||||
with self._counter_lock: |
||||
if self._no_compress_every_n and self._request_counter % self._no_compress_every_n == 0: |
||||
suppress_compression = True |
||||
self._request_counter += 1 |
||||
return suppress_compression |
||||
|
||||
def SayHello(self, request, context): |
||||
if self._should_suppress_compression(): |
||||
context.set_response_compression(grpc.Compression.NoCompression) |
||||
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name) |
||||
|
||||
|
||||
def run_server(server_compression, no_compress_every_n, port): |
||||
server = grpc.server( |
||||
futures.ThreadPoolExecutor(), |
||||
compression=server_compression, |
||||
options=(('grpc.so_reuseport', 1),)) |
||||
helloworld_pb2_grpc.add_GreeterServicer_to_server( |
||||
Greeter(no_compress_every_n), server) |
||||
address = '{}:{}'.format(_SERVER_HOST, port) |
||||
server.add_insecure_port(address) |
||||
server.start() |
||||
print("Server listening at '{}'".format(address)) |
||||
try: |
||||
while True: |
||||
time.sleep(_ONE_DAY_IN_SECONDS) |
||||
except KeyboardInterrupt: |
||||
server.stop(None) |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser(description=_DESCRIPTION) |
||||
parser.add_argument( |
||||
'--server_compression', |
||||
default='none', |
||||
nargs='?', |
||||
choices=_COMPRESSION_OPTIONS.keys(), |
||||
help='The default compression method for the server.') |
||||
parser.add_argument( |
||||
'--no_compress_every_n', |
||||
type=int, |
||||
default=0, |
||||
nargs='?', |
||||
help='If set, every nth reply will be uncompressed.') |
||||
parser.add_argument( |
||||
'--port', |
||||
type=int, |
||||
default=50051, |
||||
nargs='?', |
||||
help='The port on which the server will listen.') |
||||
args = parser.parse_args() |
||||
run_server(_COMPRESSION_OPTIONS[args.server_compression], |
||||
args.no_compress_every_n, args.port) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig() |
||||
main() |
@ -0,0 +1,62 @@ |
||||
# Copyright 2019 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Test for compression example.""" |
||||
|
||||
import contextlib |
||||
import os |
||||
import socket |
||||
import subprocess |
||||
import unittest |
||||
|
||||
_BINARY_DIR = os.path.realpath( |
||||
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) |
||||
_SERVER_PATH = os.path.join(_BINARY_DIR, 'server') |
||||
_CLIENT_PATH = os.path.join(_BINARY_DIR, 'client') |
||||
|
||||
|
||||
@contextlib.contextmanager |
||||
def _get_port(): |
||||
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) |
||||
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: |
||||
raise RuntimeError("Failed to set SO_REUSEPORT.") |
||||
sock.bind(('', 0)) |
||||
try: |
||||
yield sock.getsockname()[1] |
||||
finally: |
||||
sock.close() |
||||
|
||||
|
||||
class CompressionExampleTest(unittest.TestCase): |
||||
|
||||
def test_compression_example(self): |
||||
with _get_port() as test_port: |
||||
server_process = subprocess.Popen((_SERVER_PATH, '--port', |
||||
str(test_port), |
||||
'--server_compression', 'gzip')) |
||||
try: |
||||
server_target = 'localhost:{}'.format(test_port) |
||||
client_process = subprocess.Popen( |
||||
(_CLIENT_PATH, '--server', server_target, |
||||
'--channel_compression', 'gzip')) |
||||
client_return_code = client_process.wait() |
||||
self.assertEqual(0, client_return_code) |
||||
self.assertIsNone(server_process.poll()) |
||||
finally: |
||||
server_process.kill() |
||||
server_process.wait() |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
unittest.main(verbosity=2) |
Loading…
Reference in new issue