Cython refactor of Python C wrapping layer

Also set up environment-related details needed for a smooth Cython
experience: have the test script avoid rebuilding all dependencies if the
virtualenv directory already exists, have the PyPI distribution script
distribute the Cython generated C code rather than the .pyx files.
pull/2243/head
Masood Malekghassemi 10 years ago
parent 9b67ccfde0
commit 743c10ccd5
  1. 1
      src/python/requirements.txt
  2. 1
      src/python/src/.gitignore
  3. 5
      src/python/src/grpc/_adapter/.gitignore
  4. 7
      src/python/src/grpc/_cython/.gitignore
  5. 52
      src/python/src/grpc/_cython/README.rst
  6. 28
      src/python/src/grpc/_cython/__init__.py
  7. 28
      src/python/src/grpc/_cython/_cygrpc/__init__.py
  8. 37
      src/python/src/grpc/_cython/_cygrpc/call.pxd
  9. 82
      src/python/src/grpc/_cython/_cygrpc/call.pyx
  10. 36
      src/python/src/grpc/_cython/_cygrpc/channel.pxd
  11. 84
      src/python/src/grpc/_cython/_cygrpc/channel.pyx
  12. 39
      src/python/src/grpc/_cython/_cygrpc/completion_queue.pxd
  13. 117
      src/python/src/grpc/_cython/_cygrpc/completion_queue.pyx
  14. 45
      src/python/src/grpc/_cython/_cygrpc/credentials.pxd
  15. 217
      src/python/src/grpc/_cython/_cygrpc/credentials.pyx
  16. 344
      src/python/src/grpc/_cython/_cygrpc/grpc.pxd
  17. 129
      src/python/src/grpc/_cython/_cygrpc/records.pxd
  18. 575
      src/python/src/grpc/_cython/_cygrpc/records.pyx
  19. 45
      src/python/src/grpc/_cython/_cygrpc/server.pxd
  20. 158
      src/python/src/grpc/_cython/_cygrpc/server.pyx
  21. 114
      src/python/src/grpc/_cython/adapter_low.py
  22. 187
      src/python/src/grpc/_cython/adapter_low_test.py
  23. 111
      src/python/src/grpc/_cython/cygrpc.pyx
  24. 276
      src/python/src/grpc/_cython/cygrpc_test.py
  25. 46
      src/python/src/grpc/_cython/test_utilities.py
  26. 65
      src/python/src/setup.py
  27. 6
      tools/distrib/python/submit.py
  28. 22
      tools/run_tests/build_python.sh
  29. 6
      tools/run_tests/python_tests.json

@ -1,3 +1,4 @@
enum34==1.0.4
futures==2.2.0
protobuf==3.0.0a3
cython>=0.22

@ -1,3 +1,4 @@
MANIFEST
grpcio.egg-info/
build/
dist/

@ -0,0 +1,5 @@
*.a
*.so
*.dll
*.pyc
*.pyd

@ -0,0 +1,7 @@
*.h
*.c
*.a
*.so
*.dll
*.pyc
*.pyd

@ -0,0 +1,52 @@
GRPC Python Cython layer
========================
Package for the GRPC Python Cython layer.
What is Cython?
---------------
Cython is both a superset of the Python language with extensions for dealing
with C types and a tool that transpiles this superset into C code. It provides
convenient means of statically typing expressions and of converting Python
strings to pointers (among other niceties), thus dramatically smoothing the
Python/C interop by allowing fluid use of APIs in both from the same source.
See the wonderful `Cython website`_.
Why Cython?
-----------
- **Python 2 and 3 support**
Cython generated C code has precompiler macros to target both Python 2 and
Python 3 C APIs, even while acting as a superset of just the Python 2
language (e.g. using ``basestring``).
- **Significantly less semantic noise**
A lot of CPython code is just glue, especially human-error-prone
``Py_INCREF``-ing and ``Py_DECREF``-ing around error handlers and such.
Cython takes care of that automagically.
- **Possible PyPy support**
One of the major developments in Cython over the past few years was the
addition of support for PyPy. We might soon be able to provide such support
ourselves through our use of Cython.
- **Less Python glue code**
There existed several adapter layers in and around the original CPython code
to smooth the surface exposed to Python due to how much trouble it was to
make such a smooth surface via the CPython API alone. Cython makes writing
such a surface incredibly easy, so these adapter layers may be removed.
Implications for Users
----------------------
Nothing additional will be required for users. PyPI packages will contain
Cython generated C code and thus not necessitate a Cython installation.
Implications for GRPC Developers
--------------------------------
A typical edit-compile-debug cycle now requires Cython. We install Cython in
the ``virtualenv`` generated for the Python tests in this repository, so
initial test runs may take an extra 2+ minutes to complete. Subsequent test
runs won't reinstall ``Cython`` (unless required versions change and the
``virtualenv`` doesn't have installed versions that satisfy the change).
.. _`Cython website`: http://cython.org/

@ -0,0 +1,28 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,28 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,37 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
cdef class Call:
cdef grpc.grpc_call *c_call
cdef list references

@ -0,0 +1,82 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport cpython
from grpc._cython._cygrpc cimport records
cdef class Call:
def __cinit__(self):
# Create an *empty* call
self.c_call = NULL
self.references = []
def start_batch(self, operations, tag):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
cdef records.Operations cy_operations = records.Operations(operations)
cdef records.OperationTag operation_tag = records.OperationTag(tag)
operation_tag.operation_call = self
operation_tag.batch_operations = cy_operations
cpython.Py_INCREF(operation_tag)
return grpc.grpc_call_start_batch(
self.c_call, cy_operations.c_ops, cy_operations.c_nops,
<cpython.PyObject *>operation_tag)
def cancel(self,
grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE,
details=None):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
if (details is None) != (error_code == grpc.GRPC_STATUS__DO_NOT_USE):
raise ValueError("if error_code is specified, so must details "
"(and vice-versa)")
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected details to be str or bytes")
if error_code != grpc.GRPC_STATUS__DO_NOT_USE:
self.references.append(details)
return grpc.grpc_call_cancel_with_status(self.c_call, error_code, details)
else:
return grpc.grpc_call_cancel(self.c_call)
def __dealloc__(self):
if self.c_call != NULL:
grpc.grpc_call_destroy(self.c_call)
# The object *should* always be valid from Python. Used for debugging.
@property
def is_valid(self):
return self.c_call != NULL

@ -0,0 +1,36 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
cdef class Channel:
cdef grpc.grpc_channel *c_channel
cdef list references

@ -0,0 +1,84 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport completion_queue
from grpc._cython._cygrpc cimport credentials
from grpc._cython._cygrpc cimport records
cdef class Channel:
def __cinit__(self, target, records.ChannelArgs arguments=None,
credentials.ClientCredentials client_credentials=None):
cdef grpc.grpc_channel_args *c_arguments = NULL
self.c_channel = NULL
self.references = []
if arguments is not None:
c_arguments = &arguments.c_args
if isinstance(target, bytes):
pass
elif isinstance(target, basestring):
target = target.encode()
else:
raise TypeError("expected target to be str or bytes")
if client_credentials is None:
self.c_channel = grpc.grpc_channel_create(target, c_arguments)
else:
self.c_channel = grpc.grpc_secure_channel_create(
client_credentials.c_credentials, target, c_arguments)
self.references.append(client_credentials)
self.references.append(target)
self.references.append(arguments)
def create_call(self, completion_queue.CompletionQueue queue not None,
method, host, records.Timespec deadline not None):
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
if isinstance(method, bytes):
pass
elif isinstance(method, basestring):
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
if isinstance(host, bytes):
pass
elif isinstance(host, basestring):
host = host.encode()
else:
raise TypeError("expected host to be str or bytes")
cdef call.Call operation_call = call.Call()
operation_call.references = [self, method, host, queue]
operation_call.c_call = grpc.grpc_channel_create_call(
self.c_channel, queue.c_completion_queue, method, host, deadline.c_time)
return operation_call
def __dealloc__(self):
if self.c_channel != NULL:
grpc.grpc_channel_destroy(self.c_channel)

@ -0,0 +1,39 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
cdef class CompletionQueue:
cdef grpc.grpc_completion_queue *c_completion_queue
cdef object poll_condition
cdef bint is_polling
cdef bint is_shutting_down
cdef bint is_shutdown

@ -0,0 +1,117 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport cpython
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport records
import threading
import time
cdef class CompletionQueue:
def __cinit__(self):
self.c_completion_queue = grpc.grpc_completion_queue_create()
self.is_shutting_down = False
self.is_shutdown = False
self.poll_condition = threading.Condition()
self.is_polling = False
def poll(self, records.Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
cdef grpc.gpr_timespec c_deadline = grpc.gpr_inf_future
cdef records.OperationTag tag = None
cdef object user_tag = None
cdef call.Call operation_call = None
cdef records.CallDetails request_call_details = None
cdef records.Metadata request_metadata = None
cdef records.Operations batch_operations = None
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc.grpc_event event
# Poll within a critical section
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
self.is_polling = True
with nogil:
event = grpc.grpc_completion_queue_next(
self.c_completion_queue, c_deadline)
with self.poll_condition:
self.is_polling = False
self.poll_condition.notify()
if event.type == grpc.GRPC_QUEUE_TIMEOUT:
return records.Event(event.type, False, None, None, None, None, None)
elif event.type == grpc.GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
return records.Event(event.type, True, None, None, None, None, None)
else:
if event.tag != NULL:
tag = <records.OperationTag>event.tag
# We receive event tags only after they've been inc-ref'd elsewhere in
# the code.
cpython.Py_DECREF(tag)
if tag.shutting_down_server is not None:
tag.shutting_down_server.notify_shutdown_complete()
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
request_metadata = tag.request_metadata
batch_operations = tag.batch_operations
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
operation_call.references.extend(tag.references)
return records.Event(
event.type, event.success, user_tag, operation_call,
request_call_details, request_metadata, batch_operations)
def shutdown(self):
grpc.grpc_completion_queue_shutdown(self.c_completion_queue)
self.is_shutting_down = True
def clear(self):
if not self.is_shutting_down:
raise ValueError('queue must be shutting down to be cleared')
while self.poll().type != grpc.GRPC_QUEUE_SHUTDOWN:
pass
def __dealloc__(self):
if self.c_completion_queue != NULL:
# Ensure shutdown, pump the queue
if not self.is_shutting_down:
self.shutdown()
while not self.is_shutdown:
self.poll()
grpc.grpc_completion_queue_destroy(self.c_completion_queue)

@ -0,0 +1,45 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
cdef class ClientCredentials:
cdef grpc.grpc_credentials *c_credentials
cdef grpc.grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair
cdef list references
cdef class ServerCredentials:
cdef grpc.grpc_server_credentials *c_credentials
cdef grpc.grpc_ssl_pem_key_cert_pair *c_ssl_pem_key_cert_pairs
cdef size_t c_ssl_pem_key_cert_pairs_count
cdef list references

@ -0,0 +1,217 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport records
cdef class ClientCredentials:
def __cinit__(self):
self.c_credentials = NULL
self.c_ssl_pem_key_cert_pair.private_key = NULL
self.c_ssl_pem_key_cert_pair.certificate_chain = NULL
self.references = []
# The object *can* be invalid in Python if we fail to make the credentials
# (and the core thus returns NULL credentials). Used primarily for debugging.
@property
def is_valid(self):
return self.c_credentials != NULL
def __dealloc__(self):
if self.c_credentials != NULL:
grpc.grpc_credentials_release(self.c_credentials)
cdef class ServerCredentials:
def __cinit__(self):
self.c_credentials = NULL
def __dealloc__(self):
if self.c_credentials != NULL:
grpc.grpc_server_credentials_release(self.c_credentials)
def client_credentials_google_default():
cdef ClientCredentials credentials = ClientCredentials();
credentials.c_credentials = grpc.grpc_google_default_credentials_create()
return credentials
def client_credentials_ssl(pem_root_certificates,
records.SslPemKeyCertPair ssl_pem_key_cert_pair):
if pem_root_certificates is None:
pass
elif isinstance(pem_root_certificates, bytes):
pass
elif isinstance(pem_root_certificates, basestring):
pem_root_certificates = pem_root_certificates.encode()
else:
raise TypeError("expected str or bytes for pem_root_certificates")
cdef ClientCredentials credentials = ClientCredentials()
cdef const char *c_pem_root_certificates = NULL
if pem_root_certificates is not None:
c_pem_root_certificates = pem_root_certificates
credentials.references.append(pem_root_certificates)
if ssl_pem_key_cert_pair is not None:
credentials.c_credentials = grpc.grpc_ssl_credentials_create(
c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair
)
credentials.references.append(ssl_pem_key_cert_pair)
else:
credentials.c_credentials = grpc.grpc_ssl_credentials_create(
c_pem_root_certificates, NULL
)
def client_credentials_composite_credentials(
ClientCredentials credentials_1 not None,
ClientCredentials credentials_2 not None):
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_composite_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def client_credentials_compute_engine():
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_compute_engine_credentials_create()
return credentials
def client_credentials_service_account(
json_key, scope, records.Timespec token_lifetime not None):
if isinstance(json_key, bytes):
pass
elif isinstance(json_key, basestring):
json_key = json_key.encode()
else:
raise TypeError("expected json_key to be str or bytes")
if isinstance(scope, bytes):
pass
elif isinstance(scope, basestring):
scope = scope.encode()
else:
raise TypeError("expected scope to be str or bytes")
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_service_account_credentials_create(
json_key, scope, token_lifetime.c_time)
credentials.references.extend([json_key, scope])
return credentials
def client_credentials_jwt(json_key, records.Timespec token_lifetime not None):
if isinstance(json_key, bytes):
pass
elif isinstance(json_key, basestring):
json_key = json_key.encode()
else:
raise TypeError("expected json_key to be str or bytes")
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_jwt_credentials_create(
json_key, token_lifetime.c_time)
credentials.references.append(json_key)
return credentials
def client_credentials_refresh_token(json_refresh_token):
if isinstance(json_refresh_token, bytes):
pass
elif isinstance(json_refresh_token, basestring):
json_refresh_token = json_refresh_token.encode()
else:
raise TypeError("expected json_refresh_token to be str or bytes")
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_refresh_token_credentials_create(
json_refresh_token)
credentials.references.append(json_refresh_token)
return credentials
def client_credentials_fake_transport_security():
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = (
grpc.grpc_fake_transport_security_credentials_create())
return credentials
def client_credentials_iam(authorization_token, authority_selector):
if isinstance(authorization_token, bytes):
pass
elif isinstance(authorization_token, basestring):
authorization_token = authorization_token.encode()
else:
raise TypeError("expected authorization_token to be str or bytes")
if isinstance(authority_selector, bytes):
pass
elif isinstance(authority_selector, basestring):
authority_selector = authority_selector.encode()
else:
raise TypeError("expected authority_selector to be str or bytes")
cdef ClientCredentials credentials = ClientCredentials()
credentials.c_credentials = grpc.grpc_iam_credentials_create(
authorization_token, authority_selector)
credentials.references.append(authorization_token)
credentials.references.append(authority_selector)
return credentials
def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs):
if pem_root_certs is None:
pass
elif isinstance(pem_root_certs, bytes):
pass
elif isinstance(pem_root_certs, basestring):
pem_root_certs = pem_root_certs.encode()
else:
raise TypeError("expected pem_root_certs to be str or bytes")
pem_key_cert_pairs = list(pem_key_cert_pairs)
for pair in pem_key_cert_pairs:
if not isinstance(pair, records.SslPemKeyCertPair):
raise TypeError("expected pem_key_cert_pairs to be sequence of "
"records.SslPemKeyCertPair")
cdef ServerCredentials credentials = ServerCredentials()
credentials.references.append(pem_key_cert_pairs)
credentials.references.append(pem_root_certs)
credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
credentials.c_ssl_pem_key_cert_pairs = (
<grpc.grpc_ssl_pem_key_cert_pair *>grpc.gpr_malloc(
sizeof(grpc.grpc_ssl_pem_key_cert_pair) *
credentials.c_ssl_pem_key_cert_pairs_count
))
for i in range(credentials.c_ssl_pem_key_cert_pairs_count):
credentials.c_ssl_pem_key_cert_pairs[i] = (
(<records.SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
credentials.c_credentials = grpc.grpc_ssl_server_credentials_create(
pem_root_certs, credentials.c_ssl_pem_key_cert_pairs,
credentials.c_ssl_pem_key_cert_pairs_count
)
return credentials
def server_credentials_fake_transport_security():
cdef ServerCredentials credentials = ServerCredentials()
credentials.c_credentials = (
grpc.grpc_fake_transport_security_server_credentials_create())
return credentials

@ -0,0 +1,344 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport libc.time
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size)
void gpr_free(void *ptr)
void *gpr_realloc(void *p, size_t size)
cdef extern from "grpc/support/slice.h":
ctypedef struct gpr_slice:
# don't worry about writing out the members of gpr_slice; we never access
# them directly.
pass
gpr_slice gpr_slice_ref(gpr_slice s)
void gpr_slice_unref(gpr_slice s)
gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *))
gpr_slice gpr_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t))
gpr_slice gpr_slice_malloc(size_t length)
gpr_slice gpr_slice_from_copied_string(const char *source)
gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len)
# Declare functions for function-like macros (because Cython)...
void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s)
size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s)
cdef extern from "grpc/support/port_platform.h":
# As long as the header file gets this type right, we don't need to get this
# type exactly; just close enough that the operations will be supported in the
# underlying C layers.
ctypedef unsigned int gpr_uint32
cdef extern from "grpc/support/time.h":
ctypedef struct gpr_timespec:
libc.time.time_t seconds "tv_sec"
int nanoseconds "tv_nsec"
cdef gpr_timespec gpr_time_0
cdef gpr_timespec gpr_inf_future
cdef gpr_timespec gpr_inf_past
gpr_timespec gpr_now()
cdef extern from "grpc/status.h":
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
GRPC_STATUS_CANCELLED
GRPC_STATUS_UNKNOWN
GRPC_STATUS_INVALID_ARGUMENT
GRPC_STATUS_DEADLINE_EXCEEDED
GRPC_STATUS_NOT_FOUND
GRPC_STATUS_ALREADY_EXISTS
GRPC_STATUS_PERMISSION_DENIED
GRPC_STATUS_UNAUTHENTICATED
GRPC_STATUS_RESOURCE_EXHAUSTED
GRPC_STATUS_FAILED_PRECONDITION
GRPC_STATUS_ABORTED
GRPC_STATUS_OUT_OF_RANGE
GRPC_STATUS_UNIMPLEMENTED
GRPC_STATUS_INTERNAL
GRPC_STATUS_UNAVAILABLE
GRPC_STATUS_DATA_LOSS
GRPC_STATUS__DO_NOT_USE
cdef extern from "grpc/byte_buffer_reader.h":
struct grpc_byte_buffer_reader:
# We don't care about the internals
pass
cdef extern from "grpc/byte_buffer.h":
ctypedef struct grpc_byte_buffer:
# We don't care about the internals.
pass
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
size_t nslices)
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb)
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer)
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer)
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice)
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_completion_queue:
# We don't care about the internals (and in fact don't know them)
pass
ctypedef struct grpc_channel:
# We don't care about the internals (and in fact don't know them)
pass
ctypedef struct grpc_server:
# We don't care about the internals (and in fact don't know them)
pass
ctypedef struct grpc_call:
# We don't care about the internals (and in fact don't know them)
pass
ctypedef enum grpc_arg_type:
grpc_arg_string "GRPC_ARG_STRING"
grpc_arg_integer "GRPC_ARG_INTEGER"
grpc_arg_pointer "GRPC_ARG_POINTER"
ctypedef struct grpc_arg_value_pointer:
void *address "p"
void *(*copy)(void *)
void (*destroy)(void *)
union grpc_arg_value:
char *string
int integer
grpc_arg_value_pointer pointer
ctypedef struct grpc_arg:
grpc_arg_type type
char *key
grpc_arg_value value
ctypedef struct grpc_channel_args:
size_t arguments_length "num_args"
grpc_arg *arguments "args"
ctypedef enum grpc_call_error:
GRPC_CALL_OK
GRPC_CALL_ERROR
GRPC_CALL_ERROR_NOT_ON_SERVER
GRPC_CALL_ERROR_NOT_ON_CLIENT
GRPC_CALL_ERROR_ALREADY_ACCEPTED
GRPC_CALL_ERROR_ALREADY_INVOKED
GRPC_CALL_ERROR_NOT_INVOKED
GRPC_CALL_ERROR_ALREADY_FINISHED
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS
GRPC_CALL_ERROR_INVALID_FLAGS
GRPC_CALL_ERROR_INVALID_METADATA
ctypedef struct grpc_metadata:
const char *key
const char *value
size_t value_length
# ignore the 'internal_data.obfuscated' fields.
ctypedef enum grpc_completion_type:
GRPC_QUEUE_SHUTDOWN
GRPC_QUEUE_TIMEOUT
GRPC_OP_COMPLETE
ctypedef struct grpc_event:
grpc_completion_type type
int success
void *tag
ctypedef struct grpc_metadata_array:
size_t count
size_t capacity
grpc_metadata *metadata
void grpc_metadata_array_init(grpc_metadata_array *array)
void grpc_metadata_array_destroy(grpc_metadata_array *array)
ctypedef struct grpc_call_details:
char *method
size_t method_capacity
char *host
size_t host_capacity
gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details)
void grpc_call_details_destroy(grpc_call_details *details)
ctypedef enum grpc_op_type:
GRPC_OP_SEND_INITIAL_METADATA
GRPC_OP_SEND_MESSAGE
GRPC_OP_SEND_CLOSE_FROM_CLIENT
GRPC_OP_SEND_STATUS_FROM_SERVER
GRPC_OP_RECV_INITIAL_METADATA
GRPC_OP_RECV_MESSAGE
GRPC_OP_RECV_STATUS_ON_CLIENT
GRPC_OP_RECV_CLOSE_ON_SERVER
ctypedef struct grpc_op_data_send_initial_metadata:
size_t count
grpc_metadata *metadata
ctypedef struct grpc_op_data_send_status_from_server:
size_t trailing_metadata_count
grpc_metadata *trailing_metadata
grpc_status_code status
const char *status_details
ctypedef struct grpc_op_data_recv_status_on_client:
grpc_metadata_array *trailing_metadata
grpc_status_code *status
char **status_details
size_t *status_details_capacity
ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled
union grpc_op_data:
grpc_op_data_send_initial_metadata send_initial_metadata
grpc_byte_buffer *send_message
grpc_op_data_send_status_from_server send_status_from_server
grpc_metadata_array *receive_initial_metadata "recv_initial_metadata"
grpc_byte_buffer **receive_message "recv_message"
grpc_op_data_recv_status_on_client receive_status_on_client "recv_status_on_client"
grpc_op_data_recv_close_on_server receive_close_on_server "recv_close_on_server"
ctypedef struct grpc_op:
grpc_op_type type "op"
gpr_uint32 flags
grpc_op_data data
void grpc_init()
void grpc_shutdown()
grpc_completion_queue *grpc_completion_queue_create()
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline) nogil
void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
void grpc_completion_queue_destroy(grpc_completion_queue *cq)
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag)
grpc_call_error grpc_call_cancel(grpc_call *call)
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description)
void grpc_call_destroy(grpc_call *call)
grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args)
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline)
void grpc_channel_destroy(grpc_channel *channel)
grpc_server *grpc_server_create(const grpc_channel_args *args)
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_completion_queue
*cq_bound_to_call, grpc_completion_queue *cq_for_notification, void
*tag_new)
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq)
int grpc_server_add_http2_port(grpc_server *server, const char *addr)
void grpc_server_start(grpc_server *server)
void grpc_server_shutdown_and_notify(
grpc_server *server, grpc_completion_queue *cq, void *tag)
void grpc_server_cancel_all_calls(grpc_server *server)
void grpc_server_destroy(grpc_server *server)
cdef extern from "grpc/grpc_security.h":
ctypedef struct grpc_ssl_pem_key_cert_pair:
const char *private_key
const char *certificate_chain "cert_chain"
ctypedef struct grpc_credentials:
# We don't care about the internals (and in fact don't know them)
pass
grpc_credentials *grpc_google_default_credentials_create()
grpc_credentials *grpc_ssl_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair)
grpc_credentials *grpc_composite_credentials_create(grpc_credentials *creds1,
grpc_credentials *creds2)
grpc_credentials *grpc_compute_engine_credentials_create()
grpc_credentials *grpc_service_account_credentials_create(
const char *json_key, const char *scope, gpr_timespec token_lifetime)
grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
gpr_timespec token_lifetime)
grpc_credentials *grpc_refresh_token_credentials_create(
const char *json_refresh_token)
grpc_credentials *grpc_fake_transport_security_credentials_create()
grpc_credentials *grpc_iam_credentials_create(const char *authorization_token,
const char *authority_selector)
void grpc_credentials_release(grpc_credentials *creds)
grpc_channel *grpc_secure_channel_create(
grpc_credentials *creds, const char *target,
const grpc_channel_args *args)
ctypedef struct grpc_server_credentials:
# We don't care about the internals (and in fact don't know them)
pass
grpc_server_credentials *grpc_ssl_server_credentials_create(
const char *pem_root_certs,
grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs);
grpc_server_credentials *grpc_fake_transport_security_server_credentials_create()
void grpc_server_credentials_release(grpc_server_credentials *creds)
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_server_credentials *creds)
grpc_call_error grpc_call_set_credentials(grpc_call *call,
grpc_credentials *creds)

@ -0,0 +1,129 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport server
cdef class Timespec:
cdef grpc.gpr_timespec c_time
cdef class CallDetails:
cdef grpc.grpc_call_details c_details
cdef class OperationTag:
cdef object user_tag
cdef list references
# This allows CompletionQueue to notify the Python Server object that the
# underlying GRPC core server has shutdown
cdef server.Server shutting_down_server
cdef call.Call operation_call
cdef CallDetails request_call_details
cdef Metadata request_metadata
cdef Operations batch_operations
cdef bint is_new_request
cdef class Event:
cdef readonly grpc.grpc_completion_type type
cdef readonly bint success
cdef readonly object tag
# For operations with calls
cdef readonly call.Call operation_call
# For Server.request_call
cdef readonly CallDetails request_call_details
cdef readonly Metadata request_metadata
# For Call.start_batch
cdef readonly Operations batch_operations
cdef class ByteBuffer:
cdef grpc.grpc_byte_buffer *c_byte_buffer
cdef class SslPemKeyCertPair:
cdef grpc.grpc_ssl_pem_key_cert_pair c_pair
cdef readonly object private_key, certificate_chain
cdef class ChannelArg:
cdef grpc.grpc_arg c_arg
cdef readonly object key, value
cdef class ChannelArgs:
cdef grpc.grpc_channel_args c_args
cdef list args
cdef class Metadatum:
cdef grpc.grpc_metadata c_metadata
cdef object _key, _value
cdef class Metadata:
cdef grpc.grpc_metadata_array c_metadata_array
cdef object metadata
cdef class Operation:
cdef grpc.grpc_op c_op
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef grpc.grpc_status_code _received_status_code
cdef char *_received_status_details
cdef size_t _received_status_details_capacity
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references
cdef class Operations:
cdef grpc.grpc_op *c_ops
cdef size_t c_nops
cdef list operations

@ -0,0 +1,575 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport server
class StatusCode:
ok = grpc.GRPC_STATUS_OK
cancelled = grpc.GRPC_STATUS_CANCELLED
unknown = grpc.GRPC_STATUS_UNKNOWN
invalid_argument = grpc.GRPC_STATUS_INVALID_ARGUMENT
deadline_exceeded = grpc.GRPC_STATUS_DEADLINE_EXCEEDED
not_found = grpc.GRPC_STATUS_NOT_FOUND
already_exists = grpc.GRPC_STATUS_ALREADY_EXISTS
permission_denied = grpc.GRPC_STATUS_PERMISSION_DENIED
unauthenticated = grpc.GRPC_STATUS_UNAUTHENTICATED
resource_exhausted = grpc.GRPC_STATUS_RESOURCE_EXHAUSTED
failed_precondition = grpc.GRPC_STATUS_FAILED_PRECONDITION
aborted = grpc.GRPC_STATUS_ABORTED
out_of_range = grpc.GRPC_STATUS_OUT_OF_RANGE
unimplemented = grpc.GRPC_STATUS_UNIMPLEMENTED
internal = grpc.GRPC_STATUS_INTERNAL
unavailable = grpc.GRPC_STATUS_UNAVAILABLE
data_loss = grpc.GRPC_STATUS_DATA_LOSS
class CallError:
ok = grpc.GRPC_CALL_OK
error = grpc.GRPC_CALL_ERROR
not_on_server = grpc.GRPC_CALL_ERROR_NOT_ON_SERVER
not_on_client = grpc.GRPC_CALL_ERROR_NOT_ON_CLIENT
already_accepted = grpc.GRPC_CALL_ERROR_ALREADY_ACCEPTED
already_invoked = grpc.GRPC_CALL_ERROR_ALREADY_INVOKED
not_invoked = grpc.GRPC_CALL_ERROR_NOT_INVOKED
already_finished = grpc.GRPC_CALL_ERROR_ALREADY_FINISHED
too_many_operations = grpc.GRPC_CALL_ERROR_TOO_MANY_OPERATIONS
invalid_flags = grpc.GRPC_CALL_ERROR_INVALID_FLAGS
invalid_metadata = grpc.GRPC_CALL_ERROR_INVALID_METADATA
class CompletionType:
queue_shutdown = grpc.GRPC_QUEUE_SHUTDOWN
queue_timeout = grpc.GRPC_QUEUE_TIMEOUT
operation_complete = grpc.GRPC_OP_COMPLETE
class OperationType:
send_initial_metadata = grpc.GRPC_OP_SEND_INITIAL_METADATA
send_message = grpc.GRPC_OP_SEND_MESSAGE
send_close_from_client = grpc.GRPC_OP_SEND_CLOSE_FROM_CLIENT
send_status_from_server = grpc.GRPC_OP_SEND_STATUS_FROM_SERVER
receive_initial_metadata = grpc.GRPC_OP_RECV_INITIAL_METADATA
receive_message = grpc.GRPC_OP_RECV_MESSAGE
receive_status_on_client = grpc.GRPC_OP_RECV_STATUS_ON_CLIENT
receive_close_on_server = grpc.GRPC_OP_RECV_CLOSE_ON_SERVER
cdef class Timespec:
def __cinit__(self, time):
if time is None:
self.c_time = grpc.gpr_now()
elif isinstance(time, float):
if time == float("+inf"):
self.c_time = grpc.gpr_inf_future
elif time == float("-inf"):
self.c_time = grpc.gpr_inf_past
else:
self.c_time.seconds = time
self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9
else:
raise TypeError("expected time to be float")
@property
def seconds(self):
return self.c_time.seconds
@property
def nanoseconds(self):
return self.c_time.nanoseconds
def __float__(self):
return <double>self.c_time.seconds + <double>self.c_time.nanoseconds / 1e9
infinite_future = Timespec(float("+inf"))
infinite_past = Timespec(float("-inf"))
cdef class CallDetails:
def __cinit__(self):
grpc.grpc_call_details_init(&self.c_details)
def __dealloc__(self):
grpc.grpc_call_details_destroy(&self.c_details)
@property
def method(self):
if self.c_details.method != NULL:
return <bytes>self.c_details.method
else:
return None
@property
def host(self):
if self.c_details.host != NULL:
return <bytes>self.c_details.host
else:
return None
@property
def deadline(self):
timespec = Timespec(float("-inf"))
timespec.c_time = self.c_details.deadline
return timespec
cdef class OperationTag:
def __cinit__(self, user_tag):
self.user_tag = user_tag
self.references = []
cdef class Event:
def __cinit__(self, grpc.grpc_completion_type type, bint success,
object tag, call.Call operation_call,
CallDetails request_call_details,
Metadata request_metadata,
Operations batch_operations):
self.type = type
self.success = success
self.tag = tag
self.operation_call = operation_call
self.request_call_details = request_call_details
self.request_metadata = request_metadata
self.batch_operations = batch_operations
cdef class ByteBuffer:
def __cinit__(self, data):
if data is None:
self.c_byte_buffer = NULL
return
if isinstance(data, bytes):
pass
elif isinstance(data, basestring):
data = data.encode()
else:
raise TypeError("expected value to be of type str or bytes")
cdef char *c_data = data
data_slice = grpc.gpr_slice_from_copied_buffer(c_data, len(data))
self.c_byte_buffer = grpc.grpc_raw_byte_buffer_create(
&data_slice, 1)
grpc.gpr_slice_unref(data_slice)
def bytes(self):
cdef grpc.grpc_byte_buffer_reader reader
cdef grpc.gpr_slice data_slice
cdef size_t data_slice_length
cdef void *data_slice_pointer
if self.c_byte_buffer != NULL:
grpc.grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
result = b""
while grpc.grpc_byte_buffer_reader_next(&reader, &data_slice):
data_slice_pointer = grpc.gpr_slice_start_ptr(data_slice)
data_slice_length = grpc.gpr_slice_length(data_slice)
result += (<char *>data_slice_pointer)[:data_slice_length]
grpc.grpc_byte_buffer_reader_destroy(&reader)
return result
else:
return None
def __len__(self):
if self.c_byte_buffer != NULL:
return grpc.grpc_byte_buffer_length(self.c_byte_buffer)
else:
return 0
def __str__(self):
return self.bytes()
def __dealloc__(self):
if self.c_byte_buffer != NULL:
grpc.grpc_byte_buffer_destroy(self.c_byte_buffer)
cdef class SslPemKeyCertPair:
def __cinit__(self, private_key, certificate_chain):
if isinstance(private_key, bytes):
self.private_key = private_key
elif isinstance(private_key, basestring):
self.private_key = private_key.encode()
else:
raise TypeError("expected private_key to be of type str or bytes")
if isinstance(certificate_chain, bytes):
self.certificate_chain = certificate_chain
elif isinstance(certificate_chain, basestring):
self.certificate_chain = certificate_chain.encode()
else:
raise TypeError("expected certificate_chain to be of type str or bytes "
"or int")
self.c_pair.private_key = self.private_key
self.c_pair.certificate_chain = self.certificate_chain
cdef class ChannelArg:
def __cinit__(self, key, value):
if isinstance(key, bytes):
self.key = key
elif isinstance(key, basestring):
self.key = key.encode()
else:
raise TypeError("expected key to be of type str or bytes")
if isinstance(value, bytes):
self.value = value
self.c_arg.type = grpc.GRPC_ARG_STRING
self.c_arg.value.string = self.value
elif isinstance(value, basestring):
self.value = value.encode()
self.c_arg.type = grpc.GRPC_ARG_STRING
self.c_arg.value.string = self.value
elif isinstance(value, int):
self.value = int(value)
self.c_arg.type = grpc.GRPC_ARG_INTEGER
self.c_arg.value.integer = self.value
else:
raise TypeError("expected value to be of type str or bytes or int")
self.c_arg.key = self.key
cdef class ChannelArgs:
def __cinit__(self, args):
self.args = list(args)
for arg in self.args:
if not isinstance(arg, ChannelArg):
raise TypeError("expected list of ChannelArg")
self.c_args.arguments_length = len(self.args)
self.c_args.arguments = <grpc.grpc_arg *>grpc.gpr_malloc(
self.c_args.arguments_length*sizeof(grpc.grpc_arg)
)
for i in range(self.c_args.arguments_length):
self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg
def __dealloc__(self):
grpc.gpr_free(self.c_args.arguments)
def __len__(self):
# self.args is never stale; it's only updated from this file
return len(self.args)
def __getitem__(self, size_t i):
# self.args is never stale; it's only updated from this file
return self.args[i]
cdef class Metadatum:
def __cinit__(self, key, value):
if isinstance(key, bytes):
self._key = key
elif isinstance(key, basestring):
self._key = key.encode()
else:
raise TypeError("expected key to be of type str or bytes")
if isinstance(value, bytes):
self._value = value
elif isinstance(value, basestring):
self._value = value.encode()
else:
raise TypeError("expected value to be of type str or bytes")
self.c_metadata.key = self._key
self.c_metadata.value = self._value
self.c_metadata.value_length = len(self._value)
@property
def key(self):
return <bytes>self.c_metadata.key
@property
def value(self):
return <bytes>self.c_metadata.value[:self.c_metadata.value_length]
def __len__(self):
return 2
def __getitem__(self, size_t i):
if i == 0:
return self.key
elif i == 1:
return self.value
else:
raise IndexError("index must be 0 (key) or 1 (value)")
def __iter__(self):
return iter((self.key, self.value))
cdef class _MetadataIterator:
cdef size_t i
cdef Metadata metadata
def __cinit__(self, Metadata metadata not None):
self.i = 0
self.metadata = metadata
def __next__(self):
if self.i < len(self.metadata):
result = self.metadata[self.i]
self.i = self.i + 1
return result
else:
raise StopIteration()
cdef class Metadata:
def __cinit__(self, metadata):
self.metadata = list(metadata)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
grpc.grpc_metadata_array_init(&self.c_metadata_array)
self.c_metadata_array.count = len(self.metadata)
self.c_metadata_array.capacity = len(self.metadata)
self.c_metadata_array.metadata = <grpc.grpc_metadata *>grpc.gpr_malloc(
self.c_metadata_array.count*sizeof(grpc.grpc_metadata)
)
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i] = (
(<Metadatum>self.metadata[i]).c_metadata)
def __dealloc__(self):
# this frees the allocated memory for the grpc_metadata_array (although
# it'd be nice if that were documented somewhere...) TODO(atash): document
# this in the C core
grpc.grpc_metadata_array_destroy(&self.c_metadata_array)
def __len__(self):
return self.c_metadata_array.count
def __getitem__(self, size_t i):
return Metadatum(
key=<bytes>self.c_metadata_array.metadata[i].key,
value=<bytes>self.c_metadata_array.metadata[i].value[
:self.c_metadata_array.metadata[i].value_length])
def __iter__(self):
return _MetadataIterator(self)
cdef class Operation:
def __cinit__(self):
self.references = []
self._received_status_details = NULL
self._received_status_details_capacity = 0
self.is_valid = False
@property
def type(self):
return self.c_op.type
@property
def received_message(self):
if self.c_op.type != grpc.GRPC_OP_RECV_MESSAGE:
raise TypeError("self must be an operation receiving a message")
return self._received_message
@property
def received_metadata(self):
if (self.c_op.type != grpc.GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
return self._received_metadata
@property
def received_status_code(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving a status code")
return self._received_status_code
@property
def received_status_details(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
if self._received_status_details:
return self._received_status_details
else:
return None
@property
def received_cancelled(self):
if self.c_op.type != grpc.GRPC_OP_RECV_CLOSE_ON_SERVER:
raise TypeError("self must be an operation receiving cancellation "
"information")
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
# We *almost* don't need to do anything; most of the objects are handled by
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
grpc.gpr_free(self._received_status_details)
def operation_send_initial_metadata(Metadata metadata):
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_SEND_INITIAL_METADATA
op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count
op.c_op.data.send_initial_metadata.metadata = (
metadata.c_metadata_array.metadata)
op.references.append(metadata)
op.is_valid = True
return op
def operation_send_message(data):
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_SEND_MESSAGE
byte_buffer = ByteBuffer(data)
op.c_op.data.send_message = byte_buffer.c_byte_buffer
op.references.append(byte_buffer)
op.is_valid = True
return op
def operation_send_close_from_client():
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_SEND_CLOSE_FROM_CLIENT
op.is_valid = True
return op
def operation_send_status_from_server(
Metadata metadata, grpc.grpc_status_code code, details):
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected a str or bytes object for details")
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.data.send_status_from_server.trailing_metadata_count = (
metadata.c_metadata_array.count)
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code
op.c_op.data.send_status_from_server.status_details = details
op.references.append(metadata)
op.references.append(details)
op.is_valid = True
return op
def operation_receive_initial_metadata():
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_RECV_INITIAL_METADATA
op._received_metadata = Metadata([])
op.c_op.data.receive_initial_metadata = (
&op._received_metadata.c_metadata_array)
op.is_valid = True
return op
def operation_receive_message():
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_RECV_MESSAGE
op._received_message = ByteBuffer(None)
# n.b. the c_op.data.receive_message field needs to be deleted by us,
# anyway, so we just let that be handled by the ByteBuffer() we allocated
# the line before.
op.c_op.data.receive_message = &op._received_message.c_byte_buffer
op.is_valid = True
return op
def operation_receive_status_on_client():
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_RECV_STATUS_ON_CLIENT
op._received_metadata = Metadata([])
op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._received_metadata.c_metadata_array)
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
&op._received_status_details)
op.c_op.data.receive_status_on_client.status_details_capacity = (
&op._received_status_details_capacity)
op.is_valid = True
return op
def operation_receive_close_on_server():
cdef Operation op = Operation()
op.c_op.type = grpc.GRPC_OP_RECV_CLOSE_ON_SERVER
op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
op.is_valid = True
return op
cdef class _OperationsIterator:
cdef size_t i
cdef Operations operations
def __cinit__(self, Operations operations not None):
self.i = 0
self.operations = operations
def __next__(self):
if self.i < len(self.operations):
result = self.operations[self.i]
self.i = self.i + 1
return result
else:
raise StopIteration()
cdef class Operations:
def __cinit__(self, operations):
self.operations = list(operations) # normalize iterable
self.c_ops = NULL
self.c_nops = 0
for operation in self.operations:
if not isinstance(operation, Operation):
raise TypeError("expected operations to be iterable of Operation")
self.c_nops = len(self.operations)
self.c_ops = <grpc.grpc_op *>grpc.gpr_malloc(
sizeof(grpc.grpc_op)*self.c_nops)
for i in range(self.c_nops):
self.c_ops[i] = (<Operation>(self.operations[i])).c_op
def __len__(self):
return self.c_nops
def __getitem__(self, size_t i):
# self.operations is never stale; it's only updated from this file
return self.operations[i]
def __dealloc__(self):
grpc.gpr_free(self.c_ops)
def __iter__(self):
return _OperationsIterator(self)

@ -0,0 +1,45 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from grpc._cython._cygrpc cimport grpc
from grpc._cython._cygrpc cimport completion_queue
cdef class Server:
cdef grpc.grpc_server *c_server
cdef bint is_started # start has been called
cdef bint is_shutting_down # shutdown has been called
cdef bint is_shutdown # notification of complete shutdown received
# used at dealloc when user forgets to shutdown
cdef completion_queue.CompletionQueue backup_shutdown_queue
cdef list references
cdef list registered_completion_queues
cdef notify_shutdown_complete(self)

@ -0,0 +1,158 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport cpython
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport completion_queue
from grpc._cython._cygrpc cimport credentials
from grpc._cython._cygrpc cimport records
import time
cdef class Server:
def __cinit__(self, records.ChannelArgs arguments=None):
cdef grpc.grpc_channel_args *c_arguments = NULL
self.references = []
self.registered_completion_queues = []
if arguments is not None:
c_arguments = &arguments.c_args
self.references.append(arguments)
self.c_server = grpc.grpc_server_create(c_arguments)
self.is_started = False
self.is_shutting_down = False
self.is_shutdown = False
def request_call(
self, completion_queue.CompletionQueue call_queue not None,
completion_queue.CompletionQueue server_queue not None, tag):
if not self.is_started or self.is_shutting_down:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
cdef records.OperationTag operation_tag = records.OperationTag(tag)
operation_tag.operation_call = call.Call()
operation_tag.request_call_details = records.CallDetails()
operation_tag.request_metadata = records.Metadata([])
operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True
operation_tag.batch_operations = records.Operations([])
cpython.Py_INCREF(operation_tag)
return grpc.grpc_server_request_call(
self.c_server, &operation_tag.operation_call.c_call,
&operation_tag.request_call_details.c_details,
&operation_tag.request_metadata.c_metadata_array,
call_queue.c_completion_queue, server_queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
def register_completion_queue(
self, completion_queue.CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
grpc.grpc_server_register_completion_queue(
self.c_server, queue.c_completion_queue)
self.registered_completion_queues.append(queue)
def start(self):
if self.is_started:
raise ValueError("the server has already started")
self.backup_shutdown_queue = completion_queue.CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
grpc.grpc_server_start(self.c_server)
def add_http2_port(self, address,
credentials.ServerCredentials server_credentials=None):
if isinstance(address, bytes):
pass
elif isinstance(address, basestring):
address = address.encode()
else:
raise TypeError("expected address to be a str or bytes")
self.references.append(address)
if server_credentials is not None:
self.references.append(server_credentials)
return grpc.grpc_server_add_secure_http2_port(
self.c_server, address, server_credentials.c_credentials)
else:
return grpc.grpc_server_add_http2_port(self.c_server, address)
def shutdown(self, completion_queue.CompletionQueue queue not None, tag):
cdef records.OperationTag operation_tag
if queue.is_shutting_down:
raise ValueError("queue must be live")
elif not self.is_started:
raise ValueError("the server hasn't started yet")
elif self.is_shutting_down:
return
elif queue not in self.registered_completion_queues:
raise ValueError("expected registered completion queue")
else:
self.is_shutting_down = True
operation_tag = records.OperationTag(tag)
operation_tag.shutting_down_server = self
operation_tag.references.extend([self, queue])
cpython.Py_INCREF(operation_tag)
grpc.grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
cdef notify_shutdown_complete(self):
# called only by a completion queue on receiving our shutdown operation tag
self.is_shutdown = True
def cancel_all_calls(self):
if not self.is_shutting_down:
raise ValueError("the server must be shutting down to cancel all calls")
elif self.is_shutdown:
return
else:
grpc.grpc_server_cancel_all_calls(self.c_server)
def __dealloc__(self):
if self.c_server != NULL:
if not self.is_started:
pass
elif self.is_shutdown:
pass
elif not self.is_shutting_down:
# the user didn't call shutdown - use our backup queue
self.shutdown(self.backup_shutdown_queue, None)
# and now we wait
while not self.is_shutdown:
self.backup_shutdown_queue.poll()
else:
# We're in the process of shutting down, but have not shutdown; can't do
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
grpc.grpc_server_destroy(self.c_server)

@ -0,0 +1,114 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Adapter from grpc._cython.types to the surface expected by
# grpc._adapter._intermediary_low.
#
# TODO(atash): Once this is plugged into grpc._adapter._intermediary_low, remove
# both grpc._adapter._intermediary_low and this file. The fore and rear links in
# grpc._adapter should be able to use grpc._cython.types directly.
from grpc._adapter import _types as type_interfaces
from grpc._cython import cygrpc
class ClientCredentials(object):
def __init__(self):
raise NotImplementedError()
@staticmethod
def google_default():
raise NotImplementedError()
@staticmethod
def ssl():
raise NotImplementedError()
@staticmethod
def composite():
raise NotImplementedError()
@staticmethod
def compute_engine():
raise NotImplementedError()
@staticmethod
def service_account():
raise NotImplementedError()
@staticmethod
def jwt():
raise NotImplementedError()
@staticmethod
def refresh_token():
raise NotImplementedError()
@staticmethod
def fake_transport_security():
raise NotImplementedError()
@staticmethod
def iam():
raise NotImplementedError()
class ServerCredentials(object):
def __init__(self):
raise NotImplementedError()
@staticmethod
def ssl():
raise NotImplementedError()
@staticmethod
def fake_transport_security():
raise NotImplementedError()
class CompletionQueue(type_interfaces.CompletionQueue):
def __init__(self):
raise NotImplementedError()
class Call(type_interfaces.Call):
def __init__(self):
raise NotImplementedError()
class Channel(type_interfaces.Channel):
def __init__(self):
raise NotImplementedError()
class Server(type_interfaces.Server):
def __init__(self):
raise NotImplementedError()

@ -0,0 +1,187 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Fork of grpc._adapter._low_test; the grpc._cython.types adapter in
# grpc._cython.low should transparently support the semantics expected of
# grpc._adapter._low.
import time
import unittest
from grpc._adapter import _types
from grpc._cython import adapter_low as _low
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue, [])
self.port = self.server.add_http2_port('[::]:0')
self.client_completion_queue = _low.CompletionQueue()
self.client_channel = _low.Channel('localhost:%d'%self.port, [])
self.server.start()
def tearDown(self):
self.server.shutdown()
del self.client_channel
self.client_completion_queue.shutdown()
while (self.client_completion_queue.next().type !=
_types.EventType.QUEUE_SHUTDOWN):
pass
self.server_completion_queue.shutdown()
while (self.server_completion_queue.next().type !=
_types.EventType.QUEUE_SHUTDOWN):
pass
del self.client_completion_queue
del self.server_completion_queue
del self.server
@unittest.skip('TODO(atash): implement grpc._cython.adapter_low')
def testEcho(self):
DEADLINE = time.time()+5
DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = 'key'
CLIENT_METADATA_ASCII_VALUE = 'val'
CLIENT_METADATA_BIN_KEY = 'key-bin'
CLIENT_METADATA_BIN_VALUE = b'\0'*1000
SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
SERVER_TRAILING_METADATA_KEY = 'California_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
SERVER_STATUS_CODE = _types.StatusCode.OK
SERVER_STATUS_DETAILS = 'our work is never over'
REQUEST = 'in death a member of project mayhem has a name'
RESPONSE = 'his name is robert paulson'
METHOD = 'twinkies'
HOST = 'hostess'
server_request_tag = object()
request_call_result = self.server.request_call(self.server_completion_queue,
server_request_tag)
self.assertEqual(_types.CallError.OK, request_call_result)
client_call_tag = object()
client_call = self.client_channel.create_call(self.client_completion_queue,
METHOD, HOST, DEADLINE)
client_initial_metadata = [
(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE),
(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
_types.OpArgs.send_message(REQUEST),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
_types.OpArgs.recv_status_on_client()
], client_call_tag)
self.assertEqual(_types.CallError.OK, client_start_batch_result)
request_event = self.server_completion_queue.next(DEADLINE)
self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEqual(1, len(request_event.results))
self.assertEqual(dict(client_initial_metadata),
dict(request_event.results[0].initial_metadata))
self.assertEqual(METHOD, request_event.call_details.method)
self.assertEqual(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline),
DEADLINE_TOLERANCE)
server_call_tag = object()
server_call = request_event.call
server_initial_metadata = [
(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
server_trailing_metadata = [
(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
_types.OpArgs.send_message(RESPONSE),
_types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(
server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
], server_call_tag)
self.assertEqual(_types.CallError.OK, server_start_batch_result)
client_event = self.client_completion_queue.next(DEADLINE)
server_event = self.server_completion_queue.next(DEADLINE)
self.assertEqual(6, len(client_event.results))
found_client_op_types = set()
for client_result in client_event.results:
# we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
self.assertEqual(dict(server_initial_metadata),
dict(client_result.initial_metadata))
elif client_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(RESPONSE, client_result.message)
elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
self.assertEqual(dict(server_trailing_metadata),
dict(client_result.trailing_metadata))
self.assertEqual(SERVER_STATUS_DETAILS, client_result.status.details)
self.assertEqual(SERVER_STATUS_CODE, client_result.status.code)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.SEND_MESSAGE,
_types.OpType.SEND_CLOSE_FROM_CLIENT,
_types.OpType.RECV_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE,
_types.OpType.RECV_STATUS_ON_CLIENT
]), found_client_op_types)
self.assertEqual(5, len(server_event.results))
found_server_op_types = set()
for server_result in server_event.results:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == _types.OpType.RECV_MESSAGE:
self.assertEqual(REQUEST, server_result.message)
elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
self.assertFalse(server_result.cancelled)
self.assertEqual(set([
_types.OpType.SEND_INITIAL_METADATA,
_types.OpType.RECV_MESSAGE,
_types.OpType.SEND_MESSAGE,
_types.OpType.RECV_CLOSE_ON_SERVER,
_types.OpType.SEND_STATUS_FROM_SERVER
]), found_server_op_types)
del client_call
del server_call
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,111 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport cpython
from grpc._cython._cygrpc cimport grpc
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport channel
from grpc._cython._cygrpc cimport credentials
from grpc._cython._cygrpc cimport completion_queue
from grpc._cython._cygrpc cimport records
from grpc._cython._cygrpc cimport server
from grpc._cython._cygrpc import call
from grpc._cython._cygrpc import channel
from grpc._cython._cygrpc import credentials
from grpc._cython._cygrpc import completion_queue
from grpc._cython._cygrpc import records
from grpc._cython._cygrpc import server
StatusCode = records.StatusCode
CallError = records.CallError
CompletionType = records.CompletionType
OperationType = records.OperationType
Timespec = records.Timespec
CallDetails = records.CallDetails
Event = records.Event
ByteBuffer = records.ByteBuffer
SslPemKeyCertPair = records.SslPemKeyCertPair
ChannelArg = records.ChannelArg
ChannelArgs = records.ChannelArgs
Metadatum = records.Metadatum
Metadata = records.Metadata
Operation = records.Operation
operation_send_initial_metadata = records.operation_send_initial_metadata
operation_send_message = records.operation_send_message
operation_send_close_from_client = records.operation_send_close_from_client
operation_send_status_from_server = records.operation_send_status_from_server
operation_receive_initial_metadata = records.operation_receive_initial_metadata
operation_receive_message = records.operation_receive_message
operation_receive_status_on_client = records.operation_receive_status_on_client
operation_receive_close_on_server = records.operation_receive_close_on_server
Operations = records.Operations
ClientCredentials = credentials.ClientCredentials
ServerCredentials = credentials.ServerCredentials
client_credentials_google_default = (
credentials.client_credentials_google_default)
client_credentials_ssl = credentials.client_credentials_ssl
client_credentials_composite_credentials = (
credentials.client_credentials_composite_credentials)
client_credentials_compute_engine = (
credentials.client_credentials_compute_engine)
client_credentials_jwt = credentials.client_credentials_jwt
client_credentials_refresh_token = credentials.client_credentials_refresh_token
client_credentials_fake_transport_security = (
credentials.client_credentials_fake_transport_security)
client_credentials_iam = credentials.client_credentials_iam
server_credentials_ssl = credentials.server_credentials_ssl
server_credentials_fake_transport_security = (
credentials.server_credentials_fake_transport_security)
CompletionQueue = completion_queue.CompletionQueue
Channel = channel.Channel
Server = server.Server
Call = call.Call
#
# Global state
#
cdef class _ModuleState:
def __cinit__(self):
grpc.grpc_init()
def __dealloc__(self):
grpc.grpc_shutdown()
_module_state = _ModuleState()

@ -0,0 +1,276 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import time
import unittest
from grpc._cython import cygrpc
from grpc._cython import test_utilities
class TypeSmokeTest(unittest.TestCase):
def testStringsInUtilitiesUpDown(self):
self.assertEqual(0, cygrpc.StatusCode.ok)
metadatum = cygrpc.Metadatum('a', 'b')
self.assertEqual('a'.encode(), metadatum.key)
self.assertEqual('b'.encode(), metadatum.value)
metadata = cygrpc.Metadata([metadatum])
self.assertEqual(1, len(metadata))
self.assertEqual(metadatum.key, metadata[0].key)
def testMetadataIteration(self):
metadata = cygrpc.Metadata([
cygrpc.Metadatum('a', 'b'), cygrpc.Metadatum('c', 'd')])
iterator = iter(metadata)
metadatum = next(iterator)
self.assertIsInstance(metadatum, cygrpc.Metadatum)
self.assertEqual(metadatum.key, 'a'.encode())
self.assertEqual(metadatum.value, 'b'.encode())
metadatum = next(iterator)
self.assertIsInstance(metadatum, cygrpc.Metadatum)
self.assertEqual(metadatum.key, 'c'.encode())
self.assertEqual(metadatum.value, 'd'.encode())
with self.assertRaises(StopIteration):
next(iterator)
def testOperationsIteration(self):
operations = cygrpc.Operations([
cygrpc.operation_send_message('asdf')])
iterator = iter(operations)
operation = next(iterator)
self.assertIsInstance(operation, cygrpc.Operation)
# `Operation`s are write-only structures; can't directly debug anything out
# of them. Just check that we stop iterating.
with self.assertRaises(StopIteration):
next(iterator)
def testTimespec(self):
now = time.time()
timespec = cygrpc.Timespec(now)
self.assertAlmostEqual(now, float(timespec), places=8)
def testClientCredentialsUpDown(self):
credentials = cygrpc.client_credentials_fake_transport_security()
del credentials
def testServerCredentialsUpDown(self):
credentials = cygrpc.server_credentials_fake_transport_security()
del credentials
def testCompletionQueueUpDown(self):
completion_queue = cygrpc.CompletionQueue()
del completion_queue
def testServerUpDown(self):
server = cygrpc.Server(cygrpc.ChannelArgs([]))
del server
def testChannelUpDown(self):
channel = cygrpc.Channel('[::]:0', cygrpc.ChannelArgs([]))
del channel
def testSecureChannelUpDown(self):
channel = cygrpc.Channel(
'[::]:0', cygrpc.ChannelArgs([]),
cygrpc.client_credentials_fake_transport_security())
del channel
@unittest.skip('TODO(atash): undo skip after #2229 is merged')
def testServerStartNoExplicitShutdown(self):
server = cygrpc.Server()
completion_queue = cygrpc.CompletionQueue()
server.register_completion_queue(completion_queue)
port = server.add_http2_port('[::]:0')
self.assertIsInstance(port, int)
server.start()
del server
@unittest.skip('TODO(atash): undo skip after #2229 is merged')
def testServerStartShutdown(self):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server()
server.add_http2_port('[::]:0')
server.register_completion_queue(completion_queue)
server.start()
shutdown_tag = object()
server.shutdown(completion_queue, shutdown_tag)
event = completion_queue.poll()
self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
self.assertIs(shutdown_tag, event.tag)
del server
del completion_queue
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server()
self.server.register_completion_queue(self.server_completion_queue)
self.port = self.server.add_http2_port('[::]:0')
self.server.start()
self.client_completion_queue = cygrpc.CompletionQueue()
self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port))
def tearDown(self):
del self.server
del self.client_completion_queue
del self.server_completion_queue
def testEcho(self):
DEADLINE = time.time()+5
DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = b'key'
CLIENT_METADATA_ASCII_VALUE = b'val'
CLIENT_METADATA_BIN_KEY = b'key-bin'
CLIENT_METADATA_BIN_VALUE = b'\0'*1000
SERVER_INITIAL_METADATA_KEY = b'init_me_me_me'
SERVER_INITIAL_METADATA_VALUE = b'whodawha?'
SERVER_TRAILING_METADATA_KEY = b'California_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = b'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok
SERVER_STATUS_DETAILS = b'our work is never over'
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
HOST = b'hostess'
cygrpc_deadline = cygrpc.Timespec(DEADLINE)
server_request_tag = object()
request_call_result = self.server.request_call(
self.server_completion_queue, self.server_completion_queue,
server_request_tag)
self.assertEqual(cygrpc.CallError.ok, request_call_result)
client_call_tag = object()
client_call = self.client_channel.create_call(self.client_completion_queue,
METHOD, HOST, cygrpc_deadline)
client_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_initial_metadata(client_initial_metadata),
cygrpc.operation_send_message(REQUEST),
cygrpc.operation_send_close_from_client(),
cygrpc.operation_receive_initial_metadata(),
cygrpc.operation_receive_message(),
cygrpc.operation_receive_status_on_client()
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
self.client_completion_queue, cygrpc_deadline)
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
request_event.type)
self.assertIsInstance(request_event.operation_call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEqual(0, len(request_event.batch_operations))
self.assertEqual(dict(client_initial_metadata),
dict(request_event.request_metadata))
self.assertEqual(METHOD, request_event.request_call_details.method)
self.assertEqual(HOST, request_event.request_call_details.host)
self.assertLess(
abs(DEADLINE - float(request_event.request_call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
server_call = request_event.operation_call
server_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY,
SERVER_INITIAL_METADATA_VALUE)])
server_trailing_metadata = cygrpc.Metadata([
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
cygrpc.operation_send_initial_metadata(server_initial_metadata),
cygrpc.operation_receive_message(),
cygrpc.operation_send_message(RESPONSE),
cygrpc.operation_receive_close_on_server(),
cygrpc.operation_send_status_from_server(
server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
client_event = client_event_future.result()
server_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(6, len(client_event.batch_operations))
found_client_op_types = set()
for client_result in client_event.batch_operations:
# we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == cygrpc.OperationType.receive_initial_metadata:
self.assertEqual(dict(server_initial_metadata),
dict(client_result.received_metadata))
elif client_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(RESPONSE, client_result.received_message.bytes())
elif client_result.type == cygrpc.OperationType.receive_status_on_client:
self.assertEqual(dict(server_trailing_metadata),
dict(client_result.received_metadata))
self.assertEqual(SERVER_STATUS_DETAILS,
client_result.received_status_details)
self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code)
self.assertEqual(set([
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.send_message,
cygrpc.OperationType.send_close_from_client,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.receive_status_on_client
]), found_client_op_types)
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.received_message.bytes())
elif server_result.type == cygrpc.OperationType.receive_close_on_server:
self.assertFalse(server_result.received_cancelled)
self.assertEqual(set([
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.send_message,
cygrpc.OperationType.receive_close_on_server,
cygrpc.OperationType.send_status_from_server
]), found_server_op_types)
del client_call
del server_call
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import threading
from grpc._cython._cygrpc import completion_queue
class CompletionQueuePollFuture:
def __init__(self, completion_queue, deadline):
def poller_function():
self._event_result = completion_queue.poll(deadline)
self._event_result = None
self._thread = threading.Thread(target=poller_function)
self._thread.start()
def result(self):
self._thread.join()
return self._event_result

@ -29,11 +29,20 @@
"""A setup module for the GRPC Python package."""
import os
import sys
from distutils import core as _core
from distutils import extension as _extension
import setuptools
import sys
_EXTENSION_SOURCES = (
# Use environment variables to determine whether or not the Cython extension
# should *use* Cython or use the generated C files. Note that this requires the
# C files to have been generated by building first *with* Cython support.
_BUILD_WITH_CYTHON = os.environ.get('GRPC_PYTHON_BUILD_WITH_CYTHON', False)
_C_EXTENSION_SOURCES = (
'grpc/_adapter/_c/module.c',
'grpc/_adapter/_c/types.c',
'grpc/_adapter/_c/utility.c',
@ -45,6 +54,19 @@ _EXTENSION_SOURCES = (
'grpc/_adapter/_c/types/server.c',
)
_CYTHON_EXTENSION_PACKAGE_NAMES = (
)
_CYTHON_EXTENSION_MODULE_NAMES = (
'grpc._cython.cygrpc',
'grpc._cython._cygrpc.call',
'grpc._cython._cygrpc.channel',
'grpc._cython._cygrpc.completion_queue',
'grpc._cython._cygrpc.credentials',
'grpc._cython._cygrpc.records',
'grpc._cython._cygrpc.server',
)
_EXTENSION_INCLUDE_DIRECTORIES = (
'.',
)
@ -56,15 +78,45 @@ _EXTENSION_LIBRARIES = (
if not "darwin" in sys.platform:
_EXTENSION_LIBRARIES += ('rt',)
_EXTENSION_MODULE = _core.Extension(
'grpc._adapter._c', sources=list(_EXTENSION_SOURCES),
_C_EXTENSION_MODULE = _core.Extension(
'grpc._adapter._c', sources=list(_C_EXTENSION_SOURCES),
include_dirs=list(_EXTENSION_INCLUDE_DIRECTORIES),
libraries=list(_EXTENSION_LIBRARIES),
)
)
_C_EXTENSION_MODULES = [_C_EXTENSION_MODULE]
def cython_extensions(package_names, module_names, include_dirs, libraries,
build_with_cython=False):
file_extension = 'pyx' if build_with_cython else 'c'
module_files = [name.replace('.', '/') + '.' + file_extension
for name in module_names]
extensions = [
_extension.Extension(
name=module_name, sources=[module_file],
include_dirs=include_dirs, libraries=libraries
) for (module_name, module_file) in zip(module_names, module_files)
]
if build_with_cython:
import Cython.Build
return Cython.Build.cythonize(extensions)
else:
return extensions
_CYTHON_EXTENSION_MODULES = cython_extensions(
list(_CYTHON_EXTENSION_PACKAGE_NAMES), list(_CYTHON_EXTENSION_MODULE_NAMES),
list(_EXTENSION_INCLUDE_DIRECTORIES), list(_EXTENSION_LIBRARIES),
bool(_BUILD_WITH_CYTHON))
_EXTENSION_MODULES = _C_EXTENSION_MODULES + _CYTHON_EXTENSION_MODULES
_PACKAGES = (
'grpc',
'grpc._adapter',
'grpc._cython',
'grpc._cython._cygrpc',
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
@ -79,6 +131,7 @@ _PACKAGES = (
_PACKAGE_DIRECTORIES = {
'grpc': 'grpc',
'grpc._adapter': 'grpc/_adapter',
'grpc._cython': 'grpc/_cython',
'grpc._junkdrawer': 'grpc/_junkdrawer',
'grpc.early_adopter': 'grpc/early_adopter',
'grpc.framework': 'grpc/framework',
@ -87,7 +140,7 @@ _PACKAGE_DIRECTORIES = {
setuptools.setup(
name='grpcio',
version='0.9.0a1',
ext_modules=[_EXTENSION_MODULE],
ext_modules=_EXTENSION_MODULES,
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=[

@ -66,6 +66,12 @@ try:
except:
pass
# Build the Cython C files
build_env = os.environ.copy()
build_env['GRPC_PYTHON_BUILD_WITH_CYTHON'] = "1"
cmd = ['python', 'setup.py', 'build_ext', '--inplace']
subprocess.call(cmd, cwd=pkgdir, env=build_env)
# Make the push.
cmd = ['python', 'setup.py', 'sdist']
subprocess.call(cmd, cwd=pkgdir)

@ -34,9 +34,21 @@ set -ex
cd $(dirname $0)/../..
root=`pwd`
rm -rf python2.7_virtual_environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install -r src/python/requirements.txt
CFLAGS="-I$root/include -std=c89 -Werror" LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
if [ ! -d 'python2.7_virtual_environment' ]
then
# Build the entire virtual environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install -r src/python/requirements.txt
else
source python2.7_virtual_environment/bin/activate
# Uninstall and re-install the packages we care about. Don't use
# --force-reinstall or --ignore-installed to avoid propagating this
# unnecessarily to dependencies. Don't use --no-deps to avoid missing
# dependency upgrades.
(yes | pip uninstall grpcio) || true
(yes | pip uninstall interop) || true
fi
CFLAGS="-I$root/include -std=c89" LDFLAGS=-L$root/libs/$CONFIG GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install src/python/src
pip install src/python/interop

@ -1,4 +1,10 @@
[
{
"module": "grpc._cython.cygrpc_test"
},
{
"module": "grpc._cython.adapter_low_test"
},
{
"module": "grpc._adapter._c_test"
},

Loading…
Cancel
Save