diff --git a/examples/python/multiprocessing/client.py b/examples/python/multiprocessing/client.py index fa3b394f394..6c583964fde 100644 --- a/examples/python/multiprocessing/client.py +++ b/examples/python/multiprocessing/client.py @@ -19,14 +19,13 @@ from __future__ import print_function import argparse import atexit -import grpc import logging import multiprocessing import operator -import os -import time import sys +import grpc + import prime_pb2 import prime_pb2_grpc @@ -34,15 +33,23 @@ _PROCESS_COUNT = 8 _MAXIMUM_CANDIDATE = 10000 # Each worker process initializes a single channel after forking. +# It's regrettable, but to ensure that each subprocess only has to instantiate +# a single channel to be reused across all RPCs, we use globals. _worker_channel_singleton = None _worker_stub_singleton = None _LOGGER = logging.getLogger(__name__) +def _shutdown_worker(): + _LOGGER.info('Shutting worker process down.') + if _worker_channel_singleton is not None: + _worker_channel_singleton.stop() + + def _initialize_worker(server_address): - global _worker_channel_singleton - global _worker_stub_singleton + global _worker_channel_singleton # pylint: disable=global-statement + global _worker_stub_singleton # pylint: disable=global-statement _LOGGER.info('Initializing worker process.') _worker_channel_singleton = grpc.insecure_channel(server_address) _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub( @@ -50,14 +57,8 @@ def _initialize_worker(server_address): atexit.register(_shutdown_worker) -def _shutdown_worker(): - _LOGGER.info('Shutting worker process down.') - if _worker_channel_singleton is not None: - _worker_channel_singleton.stop() - - def _run_worker_query(primality_candidate): - _LOGGER.info('Checking primality of {}.'.format(primality_candidate)) + _LOGGER.info('Checking primality of %s.', primality_candidate) return _worker_stub_singleton.check( prime_pb2.PrimeCandidate(candidate=primality_candidate)) diff --git a/examples/python/multiprocessing/server.py b/examples/python/multiprocessing/server.py index 588cd4734e9..d686d90559a 100644 --- a/examples/python/multiprocessing/server.py +++ b/examples/python/multiprocessing/server.py @@ -20,15 +20,15 @@ from __future__ import print_function from concurrent import futures import contextlib import datetime -import grpc import logging import math import multiprocessing -import os import time import socket import sys +import grpc + import prime_pb2 import prime_pb2_grpc @@ -50,7 +50,7 @@ def is_prime(n): class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): def check(self, request, context): - _LOGGER.info('Determining primality of {}'.format(request.candidate)) + _LOGGER.info('Determining primality of %s', request.candidate) return prime_pb2.Primality(isPrime=is_prime(request.candidate)) @@ -99,7 +99,7 @@ def _reserve_port(): def main(): with _reserve_port() as port: bind_address = '[::]:{}'.format(port) - _LOGGER.info("Binding to '{}'".format(bind_address)) + _LOGGER.info("Binding to '%s'", bind_address) sys.stdout.flush() workers = [] for _ in range(_PROCESS_COUNT): diff --git a/examples/python/multiprocessing/test/_multiprocessing_example_test.py b/examples/python/multiprocessing/test/_multiprocessing_example_test.py index 92e7c0a4b8d..2d8f8d49db4 100644 --- a/examples/python/multiprocessing/test/_multiprocessing_example_test.py +++ b/examples/python/multiprocessing/test/_multiprocessing_example_test.py @@ -13,14 +13,13 @@ # limitations under the License. """Test for multiprocessing example.""" -import datetime +import ast import logging import math import os import re import subprocess import tempfile -import time import unittest _BINARY_DIR = os.path.realpath( @@ -63,7 +62,7 @@ class MultiprocessingExampleTest(unittest.TestCase): client_process.wait() server_process.terminate() client_stdout.seek(0) - results = eval(client_stdout.read().strip().split('\n')[-1]) + results = ast.literal_eval(client_stdout.read().strip().split('\n')[-1]) values = tuple(result[0] for result in results) self.assertSequenceEqual(range(2, 10000), values) for result in results: