The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#) https://grpc.io/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.5 KiB

# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An example of multiprocess concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from concurrent import futures
import contextlib
import datetime
import logging
import math
import multiprocessing
import socket
import sys
import time
6 years ago
import grpc
import prime_pb2
import prime_pb2_grpc
_LOGGER = logging.getLogger(__name__)
_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
def check(self, request, context):
_LOGGER.info("Determining primality of %s", request.candidate)
return prime_pb2.Primality(isPrime=is_prime(request.candidate))
def _wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY.total_seconds())
except KeyboardInterrupt:
server.stop(None)
def _run_server(bind_address):
"""Start a server in a subprocess."""
_LOGGER.info("Starting new server.")
options = (("grpc.so_reuseport", 1),)
server = grpc.server(
futures.ThreadPoolExecutor(
max_workers=_THREAD_CONCURRENCY,
),
options=options,
)
prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
server.add_insecure_port(bind_address)
server.start()
_wait_forever(server)
@contextlib.contextmanager
def _reserve_port():
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
6 years ago
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(("", 0))
try:
yield sock.getsockname()[1]
finally:
sock.close()
def main():
with _reserve_port() as port:
bind_address = "localhost:{}".format(port)
6 years ago
_LOGGER.info("Binding to '%s'", bind_address)
sys.stdout.flush()
workers = []
for _ in range(_PROCESS_COUNT):
# NOTE: It is imperative that the worker subprocesses be forked before
# any gRPC servers start up. See
# https://github.com/grpc/grpc/issues/16001 for more details.
worker = multiprocessing.Process(
target=_run_server, args=(bind_address,)
)
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
6 years ago
if __name__ == "__main__":
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[PID %(process)d] %(message)s")
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()