diff --git a/examples/python/multiprocessing/client.py b/examples/python/multiprocessing/client.py index 4ab33374ce2..788820916ea 100644 --- a/examples/python/multiprocessing/client.py +++ b/examples/python/multiprocessing/client.py @@ -1,9 +1,76 @@ -# spin up multiple concurrent clients +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of multiprocessing concurrency with gRPC.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import atexit +import grpc import logging import multiprocessing +import operator import os import time import prime_pb2 import prime_pb2_grpc + +_PROCESS_COUNT = 8 +_SERVER_ADDRESS = 'localhost:50051' +_MAXIMUM_CANDIDATE = 10000 + +# Each worker process initializes a single channel after forking. +_worker_channel_singleton = None +_worker_stub_singleton = None + + +def _initialize_worker(server_address): + global _worker_channel_singleton + global _worker_stub_singleton + logging.warning('[PID {}] Initializing worker process.'.format( + os.getpid())) + _worker_channel_singleton = grpc.insecure_channel(server_address) + _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub( + _worker_channel_singleton) + atexit.register(_shutdown_worker) + + +def _shutdown_worker(): + logging.warning('[PID {}] Shutting worker process down.'.format( + os.getpid())) + if _worker_channel_singleton is not None: + _worker_channel_singleton.stop() + + +def _run_worker_query(primality_candidate): + logging.warning('[PID {}] Checking primality of {}.'.format( + os.getpid(), primality_candidate)) + return _worker_stub_singleton.check( + prime_pb2.PrimeCandidate(candidate=primality_candidate)) + + +def main(): + worker_pool = multiprocessing.Pool(processes=_PROCESS_COUNT, + initializer=_initialize_worker, initargs=(_SERVER_ADDRESS,)) + check_range = range(2, _MAXIMUM_CANDIDATE) + primality = worker_pool.map(_run_worker_query, check_range) + primes = zip(check_range, map(operator.attrgetter('isPrime'), primality)) + logging.warning(tuple(primes)) + + +if __name__ == '__main__': + logging.basicConfig() + main() diff --git a/examples/python/multiprocessing/server.py b/examples/python/multiprocessing/server.py index d30f3b6734d..d0ca6a0cdfb 100644 --- a/examples/python/multiprocessing/server.py +++ b/examples/python/multiprocessing/server.py @@ -30,14 +30,14 @@ import prime_pb2 import prime_pb2_grpc _ONE_DAY = datetime.timedelta(days=1) -_NUM_PROCESSES = 8 +_PROCESS_COUNT = 8 _THREAD_CONCURRENCY = 10 _BIND_ADDRESS = '[::]:50051' def is_prime(n): - for i in range(2, math.ceil(math.sqrt(n))): - if i % n == 0: + for i in range(2, int(math.ceil(math.sqrt(n)))): + if n % i == 0: return False else: return True @@ -46,10 +46,10 @@ def is_prime(n): class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): def check(self, request, context): - logging.info( + logging.warning( '[PID {}] Determining primality of {}'.format( os.getpid(), request.candidate)) - return is_prime(request.candidate) + return prime_pb2.Primality(isPrime=is_prime(request.candidate)) def _wait_forever(server): @@ -82,7 +82,7 @@ def _run_server(bind_address): def main(): workers = [] - for _ in range(_NUM_PROCESSES): + for _ in range(_PROCESS_COUNT): # NOTE: It is imperative that the worker subprocesses be forked before # any gRPC servers start up. See # https://github.com/grpc/grpc/issues/16001 for more details. @@ -93,6 +93,6 @@ def main(): worker.join() -if __name__ == "__main__": +if __name__ == '__main__': logging.basicConfig() main()