pull/18312/head
Richard Belleville 6 years ago
parent 23c5fb8ca4
commit d832738c08
  1. 20
      examples/python/multiprocessing/client.py
  2. 13
      examples/python/multiprocessing/server.py
  3. 13
      examples/python/multiprocessing/test/_multiprocessing_example_test.py

@ -46,7 +46,7 @@ def _initialize_worker(server_address):
_LOGGER.info('Initializing worker process.')
_worker_channel_singleton = grpc.insecure_channel(server_address)
_worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
_worker_channel_singleton)
_worker_channel_singleton)
atexit.register(_shutdown_worker)
@ -57,15 +57,16 @@ def _shutdown_worker():
def _run_worker_query(primality_candidate):
_LOGGER.info('Checking primality of {}.'.format(
primality_candidate))
_LOGGER.info('Checking primality of {}.'.format(primality_candidate))
return _worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
prime_pb2.PrimeCandidate(candidate=primality_candidate))
def _calculate_primes(server_address):
worker_pool = multiprocessing.Pool(processes=_PROCESS_COUNT,
initializer=_initialize_worker, initargs=(server_address,))
worker_pool = multiprocessing.Pool(
processes=_PROCESS_COUNT,
initializer=_initialize_worker,
initargs=(server_address,))
check_range = range(2, _MAXIMUM_CANDIDATE)
primality = worker_pool.map(_run_worker_query, check_range)
primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
@ -74,10 +75,11 @@ def _calculate_primes(server_address):
def main():
msg = 'Determine the primality of the first {} integers.'.format(
_MAXIMUM_CANDIDATE)
_MAXIMUM_CANDIDATE)
parser = argparse.ArgumentParser(description=msg)
parser.add_argument('server_address',
help='The address of the server (e.g. localhost:50051)')
parser.add_argument(
'server_address',
help='The address of the server (e.g. localhost:50051)')
args = parser.parse_args()
primes = _calculate_primes(args.server_address)
print(primes)

@ -50,9 +50,7 @@ def is_prime(n):
class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
def check(self, request, context):
_LOGGER.info(
'Determining primality of {}'.format(
request.candidate))
_LOGGER.info('Determining primality of {}'.format(request.candidate))
return prime_pb2.Primality(isPrime=is_prime(request.candidate))
@ -76,9 +74,8 @@ def _run_server(bind_address):
# `pip install grpcio --no-binary grpcio`.
server = grpc.server(
futures.ThreadPoolExecutor(
max_workers=_THREAD_CONCURRENCY,),
options=options)
futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,),
options=options)
prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
server.add_insecure_port(bind_address)
server.start()
@ -109,12 +106,14 @@ def main():
# NOTE: It is imperative that the worker subprocesses be forked before
# any gRPC servers start up. See
# https://github.com/grpc/grpc/issues/16001 for more details.
worker = multiprocessing.Process(target=_run_server, args=(bind_address,))
worker = multiprocessing.Process(
target=_run_server, args=(bind_address,))
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')

@ -24,8 +24,7 @@ import time
import unittest
_BINARY_DIR = os.path.realpath(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..'))
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
_SERVER_PATH = os.path.join(_BINARY_DIR, 'server')
_CLIENT_PATH = os.path.join(_BINARY_DIR, 'client')
@ -53,12 +52,14 @@ class MultiprocessingExampleTest(unittest.TestCase):
def test_multiprocessing_example(self):
server_stdout = tempfile.TemporaryFile(mode='r')
server_process = subprocess.Popen((_SERVER_PATH,),
stdout=server_stdout)
server_process = subprocess.Popen((_SERVER_PATH,), stdout=server_stdout)
server_address = _get_server_address(server_stdout)
client_stdout = tempfile.TemporaryFile(mode='r')
client_process = subprocess.Popen((_CLIENT_PATH, server_address,),
stdout=client_stdout)
client_process = subprocess.Popen(
(
_CLIENT_PATH,
server_address,
), stdout=client_stdout)
client_process.wait()
server_process.terminate()
client_stdout.seek(0)

Loading…
Cancel
Save