pull/18312/head
Richard Belleville 6 years ago
parent b2a75fdc65
commit 67ca10b4f9
  1. 54
      examples/python/multiprocessing/README.md
  2. 132
      examples/python/multiprocessing/prime_pb2.py
  3. 46
      examples/python/multiprocessing/prime_pb2_grpc.py

@ -1,3 +1,51 @@
TODO: Describe the example. ## Multiprocessing with gRPC Python
TODO: Describe how to run the example.
TODO: Describe how to run the test. Multiprocessing allows application developers to sidestep the Python global
interpreter lock and achieve true concurrency on multicore systems.
Unfortunately, using multiprocessing and gRPC Python is not yet as simple as
instantiating your server with a `futures.ProcessPoolExecutor`.
The library is implemented as a C extension, maintaining much of the state that
drives the system in native code. As such, upon calling
[`fork`](http://man7.org/linux/man-pages/man2/fork.2.html), much of the
state copied into the child process is invalid, leading to hangs and crashes.
However, calling `fork` without `exec` in your python process is supported
*before* any gRPC servers have been instantiated. Application developers can
take advantage of this to parallelize their CPU-intensive operations.
## Running the Example
This example calculates the first 10,000 prime numbers as an RPC. We instantiate
one server per subprocess, balancing requests between the servers using the
[`SO_REUSEPORT`](https://lwn.net/Articles/542629/) socket option.
To run the server,
[ensure `bazel` is installed](https://docs.bazel.build/versions/master/install.html)
and run:
```
bazel run //examples/python/multiprocessing:server &
```
Note the address at which the server is running. For example,
```
...
[PID 107153] Binding to '[::]:33915'
[PID 107507] Starting new server.
[PID 107508] Starting new server.
...
```
Now, start the client by running
```
bazel run //examples/python/multiprocessing:client -- [SERVER_ADDRESS]
```
For example,
```
bazel run //examples/python/multiprocessing:client -- [::]:33915
```

@ -1,132 +0,0 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: prime.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='prime.proto',
package='prime',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0bprime.proto\x12\x05prime\"#\n\x0ePrimeCandidate\x12\x11\n\tcandidate\x18\x01 \x01(\x03\"\x1c\n\tPrimality\x12\x0f\n\x07isPrime\x18\x01 \x01(\x08\x32\x42\n\x0cPrimeChecker\x12\x32\n\x05\x63heck\x12\x15.prime.PrimeCandidate\x1a\x10.prime.Primality\"\x00\x62\x06proto3')
)
_PRIMECANDIDATE = _descriptor.Descriptor(
name='PrimeCandidate',
full_name='prime.PrimeCandidate',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='candidate', full_name='prime.PrimeCandidate.candidate', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=22,
serialized_end=57,
)
_PRIMALITY = _descriptor.Descriptor(
name='Primality',
full_name='prime.Primality',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='isPrime', full_name='prime.Primality.isPrime', index=0,
number=1, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=59,
serialized_end=87,
)
DESCRIPTOR.message_types_by_name['PrimeCandidate'] = _PRIMECANDIDATE
DESCRIPTOR.message_types_by_name['Primality'] = _PRIMALITY
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
PrimeCandidate = _reflection.GeneratedProtocolMessageType('PrimeCandidate', (_message.Message,), dict(
DESCRIPTOR = _PRIMECANDIDATE,
__module__ = 'prime_pb2'
# @@protoc_insertion_point(class_scope:prime.PrimeCandidate)
))
_sym_db.RegisterMessage(PrimeCandidate)
Primality = _reflection.GeneratedProtocolMessageType('Primality', (_message.Message,), dict(
DESCRIPTOR = _PRIMALITY,
__module__ = 'prime_pb2'
# @@protoc_insertion_point(class_scope:prime.Primality)
))
_sym_db.RegisterMessage(Primality)
_PRIMECHECKER = _descriptor.ServiceDescriptor(
name='PrimeChecker',
full_name='prime.PrimeChecker',
file=DESCRIPTOR,
index=0,
serialized_options=None,
serialized_start=89,
serialized_end=155,
methods=[
_descriptor.MethodDescriptor(
name='check',
full_name='prime.PrimeChecker.check',
index=0,
containing_service=None,
input_type=_PRIMECANDIDATE,
output_type=_PRIMALITY,
serialized_options=None,
),
])
_sym_db.RegisterServiceDescriptor(_PRIMECHECKER)
DESCRIPTOR.services_by_name['PrimeChecker'] = _PRIMECHECKER
# @@protoc_insertion_point(module_scope)

@ -1,46 +0,0 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import prime_pb2 as prime__pb2
class PrimeCheckerStub(object):
"""Service to check primality.
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.check = channel.unary_unary(
'/prime.PrimeChecker/check',
request_serializer=prime__pb2.PrimeCandidate.SerializeToString,
response_deserializer=prime__pb2.Primality.FromString,
)
class PrimeCheckerServicer(object):
"""Service to check primality.
"""
def check(self, request, context):
"""Determines the primality of an integer.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PrimeCheckerServicer_to_server(servicer, server):
rpc_method_handlers = {
'check': grpc.unary_unary_rpc_method_handler(
servicer.check,
request_deserializer=prime__pb2.PrimeCandidate.FromString,
response_serializer=prime__pb2.Primality.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'prime.PrimeChecker', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
Loading…
Cancel
Save