# Copyright 2019 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """An example of multiprocess concurrency with gRPC.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function from concurrent import futures import contextlib import datetime import logging import math import multiprocessing import time import socket import sys import grpc from examples.python.multiprocessing import prime_pb2 from examples.python.multiprocessing import prime_pb2_grpc _LOGGER = logging.getLogger(__name__) _ONE_DAY = datetime.timedelta(days=1) _PROCESS_COUNT = multiprocessing.cpu_count() _THREAD_CONCURRENCY = _PROCESS_COUNT def is_prime(n): for i in range(2, int(math.ceil(math.sqrt(n)))): if n % i == 0: return False else: return True class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): def check(self, request, context): _LOGGER.info('Determining primality of %s', request.candidate) return prime_pb2.Primality(isPrime=is_prime(request.candidate)) def _wait_forever(server): try: while True: time.sleep(_ONE_DAY.total_seconds()) except KeyboardInterrupt: server.stop(None) def _run_server(bind_address): """Start a server in a subprocess.""" _LOGGER.info('Starting new server.') options = (('grpc.so_reuseport', 1),) # WARNING: This example takes advantage of SO_REUSEPORT. Due to the # limitations of manylinux1, none of our precompiled Linux wheels currently # support this option. (https://github.com/grpc/grpc/issues/18210). To take # advantage of this feature, install from source with # `pip install grpcio --no-binary grpcio`. server = grpc.server( futures.ThreadPoolExecutor(max_workers=_THREAD_CONCURRENCY,), options=options) prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) server.add_insecure_port(bind_address) server.start() _wait_forever(server) @contextlib.contextmanager def _reserve_port(): """Find and reserve a port for all subprocesses to use.""" sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) != 1: raise RuntimeError("Failed to set SO_REUSEPORT.") sock.bind(('', 0)) try: yield sock.getsockname()[1] finally: sock.close() def main(): with _reserve_port() as port: bind_address = 'localhost:{}'.format(port) _LOGGER.info("Binding to '%s'", bind_address) sys.stdout.flush() workers = [] for _ in range(_PROCESS_COUNT): # NOTE: It is imperative that the worker subprocesses be forked before # any gRPC servers start up. See # https://github.com/grpc/grpc/issues/16001 for more details. worker = multiprocessing.Process( target=_run_server, args=(bind_address,)) worker.start() workers.append(worker) for worker in workers: worker.join() if __name__ == '__main__': handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter('[PID %(process)d] %(message)s') handler.setFormatter(formatter) _LOGGER.addHandler(handler) _LOGGER.setLevel(logging.INFO) main()