From 4a8a2e286ed10317673e46d0505311dc9f12d1ef Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Fri, 1 Mar 2019 11:14:40 -0800 Subject: [PATCH] Add basic multiprocessing-based server --- examples/python/multiprocessing/BUILD | 0 examples/python/multiprocessing/README.md | 0 examples/python/multiprocessing/client.py | 9 ++ examples/python/multiprocessing/prime.proto | 35 +++++ examples/python/multiprocessing/prime_pb2.py | 132 ++++++++++++++++++ .../python/multiprocessing/prime_pb2_grpc.py | 46 ++++++ examples/python/multiprocessing/server.py | 98 +++++++++++++ .../test/_multiprocessing_test.py | 0 8 files changed, 320 insertions(+) create mode 100644 examples/python/multiprocessing/BUILD create mode 100644 examples/python/multiprocessing/README.md create mode 100644 examples/python/multiprocessing/client.py create mode 100644 examples/python/multiprocessing/prime.proto create mode 100644 examples/python/multiprocessing/prime_pb2.py create mode 100644 examples/python/multiprocessing/prime_pb2_grpc.py create mode 100644 examples/python/multiprocessing/server.py create mode 100644 examples/python/multiprocessing/test/_multiprocessing_test.py diff --git a/examples/python/multiprocessing/BUILD b/examples/python/multiprocessing/BUILD new file mode 100644 index 00000000000..e69de29bb2d diff --git a/examples/python/multiprocessing/README.md b/examples/python/multiprocessing/README.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/examples/python/multiprocessing/client.py b/examples/python/multiprocessing/client.py new file mode 100644 index 00000000000..4ab33374ce2 --- /dev/null +++ b/examples/python/multiprocessing/client.py @@ -0,0 +1,9 @@ +# spin up multiple concurrent clients + +import logging +import multiprocessing +import os +import time + +import prime_pb2 +import prime_pb2_grpc diff --git a/examples/python/multiprocessing/prime.proto b/examples/python/multiprocessing/prime.proto new file mode 100644 index 00000000000..4ef232f86cb --- /dev/null +++ b/examples/python/multiprocessing/prime.proto @@ -0,0 +1,35 @@ +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package prime; + +// A candidate integer for primality testing. +message PrimeCandidate { + // The candidate. + int64 candidate = 1; +} + +// The primality of the requested integer candidate. +message Primality { + // Is the candidate prime? + bool isPrime = 1; +} + +// Service to check primality. +service PrimeChecker { + // Determines the primality of an integer. + rpc check (PrimeCandidate) returns (Primality) {} +} diff --git a/examples/python/multiprocessing/prime_pb2.py b/examples/python/multiprocessing/prime_pb2.py new file mode 100644 index 00000000000..58e6e6a023a --- /dev/null +++ b/examples/python/multiprocessing/prime_pb2.py @@ -0,0 +1,132 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: prime.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='prime.proto', + package='prime', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\x0bprime.proto\x12\x05prime\"#\n\x0ePrimeCandidate\x12\x11\n\tcandidate\x18\x01 \x01(\x03\"\x1c\n\tPrimality\x12\x0f\n\x07isPrime\x18\x01 \x01(\x08\x32\x42\n\x0cPrimeChecker\x12\x32\n\x05\x63heck\x12\x15.prime.PrimeCandidate\x1a\x10.prime.Primality\"\x00\x62\x06proto3') +) + + + + +_PRIMECANDIDATE = _descriptor.Descriptor( + name='PrimeCandidate', + full_name='prime.PrimeCandidate', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='candidate', full_name='prime.PrimeCandidate.candidate', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=22, + serialized_end=57, +) + + +_PRIMALITY = _descriptor.Descriptor( + name='Primality', + full_name='prime.Primality', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='isPrime', full_name='prime.Primality.isPrime', index=0, + number=1, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=59, + serialized_end=87, +) + +DESCRIPTOR.message_types_by_name['PrimeCandidate'] = _PRIMECANDIDATE +DESCRIPTOR.message_types_by_name['Primality'] = _PRIMALITY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +PrimeCandidate = _reflection.GeneratedProtocolMessageType('PrimeCandidate', (_message.Message,), dict( + DESCRIPTOR = _PRIMECANDIDATE, + __module__ = 'prime_pb2' + # @@protoc_insertion_point(class_scope:prime.PrimeCandidate) + )) +_sym_db.RegisterMessage(PrimeCandidate) + +Primality = _reflection.GeneratedProtocolMessageType('Primality', (_message.Message,), dict( + DESCRIPTOR = _PRIMALITY, + __module__ = 'prime_pb2' + # @@protoc_insertion_point(class_scope:prime.Primality) + )) +_sym_db.RegisterMessage(Primality) + + + +_PRIMECHECKER = _descriptor.ServiceDescriptor( + name='PrimeChecker', + full_name='prime.PrimeChecker', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=89, + serialized_end=155, + methods=[ + _descriptor.MethodDescriptor( + name='check', + full_name='prime.PrimeChecker.check', + index=0, + containing_service=None, + input_type=_PRIMECANDIDATE, + output_type=_PRIMALITY, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_PRIMECHECKER) + +DESCRIPTOR.services_by_name['PrimeChecker'] = _PRIMECHECKER + +# @@protoc_insertion_point(module_scope) diff --git a/examples/python/multiprocessing/prime_pb2_grpc.py b/examples/python/multiprocessing/prime_pb2_grpc.py new file mode 100644 index 00000000000..dcc3a35706d --- /dev/null +++ b/examples/python/multiprocessing/prime_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import prime_pb2 as prime__pb2 + + +class PrimeCheckerStub(object): + """Service to check primality. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.check = channel.unary_unary( + '/prime.PrimeChecker/check', + request_serializer=prime__pb2.PrimeCandidate.SerializeToString, + response_deserializer=prime__pb2.Primality.FromString, + ) + + +class PrimeCheckerServicer(object): + """Service to check primality. + """ + + def check(self, request, context): + """Determines the primality of an integer. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_PrimeCheckerServicer_to_server(servicer, server): + rpc_method_handlers = { + 'check': grpc.unary_unary_rpc_method_handler( + servicer.check, + request_deserializer=prime__pb2.PrimeCandidate.FromString, + response_serializer=prime__pb2.Primality.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'prime.PrimeChecker', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/examples/python/multiprocessing/server.py b/examples/python/multiprocessing/server.py new file mode 100644 index 00000000000..d30f3b6734d --- /dev/null +++ b/examples/python/multiprocessing/server.py @@ -0,0 +1,98 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of multiprocess concurrency with gRPC.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from concurrent import futures +import datetime +import grpc +import logging +import math +import multiprocessing +import os +import time + +import prime_pb2 +import prime_pb2_grpc + +_ONE_DAY = datetime.timedelta(days=1) +_NUM_PROCESSES = 8 +_THREAD_CONCURRENCY = 10 +_BIND_ADDRESS = '[::]:50051' + + +def is_prime(n): + for i in range(2, math.ceil(math.sqrt(n))): + if i % n == 0: + return False + else: + return True + + +class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): + + def check(self, request, context): + logging.info( + '[PID {}] Determining primality of {}'.format( + os.getpid(), request.candidate)) + return is_prime(request.candidate) + + +def _wait_forever(server): + try: + while True: + time.sleep(_ONE_DAY.total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + +def _run_server(bind_address): + logging.warning( '[PID {}] Starting new server.'.format( os.getpid())) + options = (('grpc.so_reuseport', 1),) + + # WARNING: This example takes advantage of SO_REUSEPORT. Due to the + # limitations of manylinux1, none of our precompiled Linux wheels currently + # support this option. (https://github.com/grpc/grpc/issues/18210). To take + # advantage of this feature, install from source with + # `pip install grpcio --no-binary grpcio`. + + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=_THREAD_CONCURRENCY,), + options=options) + prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) + server.add_insecure_port(bind_address) + server.start() + _wait_forever(server) + + +def main(): + workers = [] + for _ in range(_NUM_PROCESSES): + # NOTE: It is imperative that the worker subprocesses be forked before + # any gRPC servers start up. See + # https://github.com/grpc/grpc/issues/16001 for more details. + worker = multiprocessing.Process(target=_run_server, args=(_BIND_ADDRESS,)) + worker.start() + workers.append(worker) + for worker in workers: + worker.join() + + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/examples/python/multiprocessing/test/_multiprocessing_test.py b/examples/python/multiprocessing/test/_multiprocessing_test.py new file mode 100644 index 00000000000..e69de29bb2d