Merge pull request #18312 from grpc/multiprocessing-example

Multiprocessing Example
pull/18352/head
Richard Belleville 6 years ago committed by GitHub
commit b8241addc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 59
      examples/python/multiprocessing/BUILD
  2. 67
      examples/python/multiprocessing/README.md
  3. 95
      examples/python/multiprocessing/client.py
  4. 35
      examples/python/multiprocessing/prime.proto
  5. 123
      examples/python/multiprocessing/server.py
  6. 74
      examples/python/multiprocessing/test/_multiprocessing_example_test.py
  7. 2
      tools/distrib/pylint_code.sh
  8. 2
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh

@ -0,0 +1,59 @@
# gRPC Bazel BUILD file.
#
# Copyright 2019 The gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
py_proto_library(
name = "prime_proto",
protos = ["prime.proto",],
deps = [requirement("protobuf")],
)
py_binary(
name = "client",
testonly = 1,
srcs = ["client.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":prime_proto",
],
default_python_version = "PY3",
)
py_binary(
name = "server",
testonly = 1,
srcs = ["server.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":prime_proto"
] + select({
"//conditions:default": [requirement("futures")],
"//:python3": [],
}),
default_python_version = "PY3",
)
py_test(
name = "test/_multiprocessing_example_test",
srcs = ["test/_multiprocessing_example_test.py"],
data = [
":client",
":server"
],
size = "small",
)

@ -0,0 +1,67 @@
## Multiprocessing with gRPC Python
Multiprocessing allows application developers to sidestep the Python global
interpreter lock and achieve true concurrency on multicore systems.
Unfortunately, using multiprocessing and gRPC Python is not yet as simple as
instantiating your server with a `futures.ProcessPoolExecutor`.
The library is implemented as a C extension, maintaining much of the state that
drives the system in native code. As such, upon calling
[`fork`](http://man7.org/linux/man-pages/man2/fork.2.html), much of the
state copied into the child process is invalid, leading to hangs and crashes.
However, calling `fork` without `exec` in your python process is supported
*before* any gRPC servers have been instantiated. Application developers can
take advantage of this to parallelize their CPU-intensive operations.
## Calculating Prime Numbers with Multiple Processes
This example calculates the first 10,000 prime numbers as an RPC. We instantiate
one server per subprocess, balancing requests between the servers using the
[`SO_REUSEPORT`](https://lwn.net/Articles/542629/) socket option. Note that this
option is not available in `manylinux1` distributions, which are, as of the time
of writing, the only gRPC Python wheels available on PyPI. To take advantage of this
feature, you'll need to build from source, either using bazel (as we do for
these examples) or via pip, using `pip install grpcio --no-binary grpcio`.
```python
_PROCESS_COUNT = multiprocessing.cpu_count()
```
On the server side, we detect the number of CPUs available on the system and
spawn exactly that many child processes. If we spin up fewer, we won't be taking
full advantage of the hardware resources available.
## Running the Example
To run the server,
[ensure `bazel` is installed](https://docs.bazel.build/versions/master/install.html)
and run:
```
bazel run //examples/python/multiprocessing:server &
```
Note the address at which the server is running. For example,
```
...
[PID 107153] Binding to '[::]:33915'
[PID 107507] Starting new server.
[PID 107508] Starting new server.
...
```
Note that several servers have been started, each with its own PID.
Now, start the client by running
```
bazel run //examples/python/multiprocessing:client -- [SERVER_ADDRESS]
```
For example,
```
bazel run //examples/python/multiprocessing:client -- [::]:33915
```

@ -0,0 +1,95 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example of multiprocessing concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import atexit
import logging
import multiprocessing
import operator
import sys
import grpc
from examples.python.multiprocessing import prime_pb2
from examples.python.multiprocessing import prime_pb2_grpc
_PROCESS_COUNT = 8
_MAXIMUM_CANDIDATE = 10000
# Each worker process initializes a single channel after forking.
# It's regrettable, but to ensure that each subprocess only has to instantiate
# a single channel to be reused across all RPCs, we use globals.
_worker_channel_singleton = None
_worker_stub_singleton = None
_LOGGER = logging.getLogger(__name__)
def _shutdown_worker():
_LOGGER.info('Shutting worker process down.')
if _worker_channel_singleton is not None:
_worker_channel_singleton.stop()
def _initialize_worker(server_address):
global _worker_channel_singleton # pylint: disable=global-statement
global _worker_stub_singleton # pylint: disable=global-statement
_LOGGER.info('Initializing worker process.')
_worker_channel_singleton = grpc.insecure_channel(server_address)
_worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
_worker_channel_singleton)
atexit.register(_shutdown_worker)
def _run_worker_query(primality_candidate):
_LOGGER.info('Checking primality of %s.', primality_candidate)
return _worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
def _calculate_primes(server_address):
worker_pool = multiprocessing.Pool(
processes=_PROCESS_COUNT,
initializer=_initialize_worker,
initargs=(server_address,))
check_range = range(2, _MAXIMUM_CANDIDATE)
primality = worker_pool.map(_run_worker_query, check_range)
primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
return tuple(primes)
def main():
msg = 'Determine the primality of the first {} integers.'.format(
_MAXIMUM_CANDIDATE)
parser = argparse.ArgumentParser(description=msg)
parser.add_argument(
'server_address',
help='The address of the server (e.g. localhost:50051)')
args = parser.parse_args()
primes = _calculate_primes(args.server_address)
print(primes)
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()

@ -0,0 +1,35 @@
// Copyright 2019 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prime;
// A candidate integer for primality testing.
message PrimeCandidate {
// The candidate.
int64 candidate = 1;
}
// The primality of the requested integer candidate.
message Primality {
// Is the candidate prime?
bool isPrime = 1;
}
// Service to check primality.
service PrimeChecker {
// Determines the primality of an integer.
rpc check (PrimeCandidate) returns (Primality) {}
}

@ -0,0 +1,123 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example of multiprocess concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from concurrent import futures
import contextlib
import datetime
import logging
import math
import multiprocessing
import time
import socket
import sys
import grpc
from examples.python.multiprocessing import prime_pb2
from examples.python.multiprocessing import prime_pb2_grpc
_LOGGER = logging.getLogger(__name__)
_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
def check(self, request, context):
_LOGGER.info('Determining primality of %s', request.candidate)
return prime_pb2.Primality(isPrime=is_prime(request.candidate))
def _wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY.total_seconds())
except KeyboardInterrupt:
server.stop(None)
def _run_server(bind_address):
"""Start a server in a subprocess."""
_LOGGER.info('Starting new server.')
options = (('grpc.so_reuseport', 1),)
# WARNING: This example takes advantage of SO_REUSEPORT. Due to the
# limitations of manylinux1, none of our precompiled Linux wheels currently
# support this option. (https://github.com/grpc/grpc/issues/18210). To take
# advantage of this feature, install from source with
# `pip install grpcio --no-binary grpcio`.
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,),
options=options)
prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
server.add_insecure_port(bind_address)
server.start()
_wait_forever(server)
@contextlib.contextmanager
def _reserve_port():
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) != 1:
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(('', 0))
try:
yield sock.getsockname()[1]
finally:
sock.close()
def main():
with _reserve_port() as port:
bind_address = 'localhost:{}'.format(port)
_LOGGER.info("Binding to '%s'", bind_address)
sys.stdout.flush()
workers = []
for _ in range(_PROCESS_COUNT):
# NOTE: It is imperative that the worker subprocesses be forked before
# any gRPC servers start up. See
# https://github.com/grpc/grpc/issues/16001 for more details.
worker = multiprocessing.Process(
target=_run_server, args=(bind_address,))
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()

@ -0,0 +1,74 @@
# Copyright 2019 the gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test for multiprocessing example."""
import ast
import logging
import math
import os
import re
import subprocess
import tempfile
import unittest
_BINARY_DIR = os.path.realpath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
_SERVER_PATH = os.path.join(_BINARY_DIR, 'server')
_CLIENT_PATH = os.path.join(_BINARY_DIR, 'client')
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
def _get_server_address(server_stream):
while True:
server_stream.seek(0)
line = server_stream.readline()
while line:
matches = re.search('Binding to \'(.+)\'', line)
if matches is not None:
return matches.groups()[0]
line = server_stream.readline()
class MultiprocessingExampleTest(unittest.TestCase):
def test_multiprocessing_example(self):
server_stdout = tempfile.TemporaryFile(mode='r')
server_process = subprocess.Popen((_SERVER_PATH,), stdout=server_stdout)
server_address = _get_server_address(server_stdout)
client_stdout = tempfile.TemporaryFile(mode='r')
client_process = subprocess.Popen(
(
_CLIENT_PATH,
server_address,
), stdout=client_stdout)
client_process.wait()
server_process.terminate()
client_stdout.seek(0)
results = ast.literal_eval(client_stdout.read().strip().split('\n')[-1])
values = tuple(result[0] for result in results)
self.assertSequenceEqual(range(2, 10000), values)
for result in results:
self.assertEqual(is_prime(result[0]), result[1])
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)

@ -32,7 +32,7 @@ TEST_DIRS=(
)
VIRTUALENV=python_pylint_venv
python3 -m virtualenv $VIRTUALENV
python3 -m virtualenv $VIRTUALENV -p $(which python3)
PYTHON=$VIRTUALENV/bin/python

@ -25,5 +25,7 @@ git clone /var/local/jenkins/grpc /var/local/git/grpc
${name}')
cd /var/local/git/grpc/test
bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //src/python/...
bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //examples/python/...
bazel clean --expunge
bazel test --config=python3 --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //src/python/...
bazel test --config=python3 --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //examples/python/...

Loading…
Cancel
Save