Merge github.com:google/grpc into async-api

pull/357/head
Craig Tiller 10 years ago
commit dce18cbbb7
  1. 7
      .editorconfig
  2. 2
      README.md
  3. 86
      src/python/setup.py
  4. 0
      src/python/src/__init__.py
  5. 0
      src/python/src/_adapter/__init__.py
  6. 17
      src/python/src/_adapter/_blocking_invocation_inline_service_test.py
  7. 77
      src/python/src/_adapter/_c.c
  8. 141
      src/python/src/_adapter/_c_test.py
  9. 292
      src/python/src/_adapter/_call.c
  10. 46
      src/python/src/_adapter/_call.h
  11. 109
      src/python/src/_adapter/_channel.c
  12. 46
      src/python/src/_adapter/_channel.h
  13. 76
      src/python/src/_adapter/_common.py
  14. 541
      src/python/src/_adapter/_completion_queue.c
  15. 48
      src/python/src/_adapter/_completion_queue.h
  16. 86
      src/python/src/_adapter/_datatypes.py
  17. 79
      src/python/src/_adapter/_error.c
  18. 42
      src/python/src/_adapter/_error.h
  19. 46
      src/python/src/_adapter/_event_invocation_synchronous_event_service_test.py
  20. 124
      src/python/src/_adapter/_face_test_case.py
  21. 46
      src/python/src/_adapter/_future_invocation_asynchronous_event_service_test.py
  22. 245
      src/python/src/_adapter/_links_test.py
  23. 97
      src/python/src/_adapter/_lonely_rear_link_test.py
  24. 55
      src/python/src/_adapter/_low.py
  25. 371
      src/python/src/_adapter/_low_test.py
  26. 261
      src/python/src/_adapter/_proto_scenarios.py
  27. 167
      src/python/src/_adapter/_server.c
  28. 44
      src/python/src/_adapter/_server.h
  29. 80
      src/python/src/_adapter/_test_links.py
  30. 309
      src/python/src/_adapter/fore.py
  31. 344
      src/python/src/_adapter/rear.py
  32. 0
      src/python/src/_framework/__init__.py
  33. 0
      src/python/src/_framework/base/__init__.py
  34. 0
      src/python/src/_framework/base/exceptions.py
  35. 0
      src/python/src/_framework/base/interfaces.py
  36. 0
      src/python/src/_framework/base/interfaces_test.py
  37. 0
      src/python/src/_framework/base/packets/__init__.py
  38. 0
      src/python/src/_framework/base/packets/_cancellation.py
  39. 0
      src/python/src/_framework/base/packets/_constants.py
  40. 0
      src/python/src/_framework/base/packets/_context.py
  41. 0
      src/python/src/_framework/base/packets/_emission.py
  42. 0
      src/python/src/_framework/base/packets/_ends.py
  43. 0
      src/python/src/_framework/base/packets/_expiration.py
  44. 0
      src/python/src/_framework/base/packets/_ingestion.py
  45. 0
      src/python/src/_framework/base/packets/_interfaces.py
  46. 0
      src/python/src/_framework/base/packets/_reception.py
  47. 0
      src/python/src/_framework/base/packets/_termination.py
  48. 0
      src/python/src/_framework/base/packets/_transmission.py
  49. 0
      src/python/src/_framework/base/packets/implementations.py
  50. 0
      src/python/src/_framework/base/packets/implementations_test.py
  51. 0
      src/python/src/_framework/base/packets/in_memory.py
  52. 0
      src/python/src/_framework/base/packets/interfaces.py
  53. 0
      src/python/src/_framework/base/packets/null.py
  54. 0
      src/python/src/_framework/base/packets/packets.py
  55. 0
      src/python/src/_framework/base/util.py
  56. 0
      src/python/src/_framework/common/__init__.py
  57. 0
      src/python/src/_framework/common/cardinality.py
  58. 0
      src/python/src/_framework/face/__init__.py
  59. 0
      src/python/src/_framework/face/_calls.py
  60. 0
      src/python/src/_framework/face/_control.py
  61. 0
      src/python/src/_framework/face/_service.py
  62. 0
      src/python/src/_framework/face/_test_case.py
  63. 0
      src/python/src/_framework/face/blocking_invocation_inline_service_test.py
  64. 0
      src/python/src/_framework/face/demonstration.py
  65. 0
      src/python/src/_framework/face/event_invocation_synchronous_event_service_test.py
  66. 0
      src/python/src/_framework/face/exceptions.py
  67. 0
      src/python/src/_framework/face/future_invocation_asynchronous_event_service_test.py
  68. 0
      src/python/src/_framework/face/implementations.py
  69. 0
      src/python/src/_framework/face/interfaces.py
  70. 0
      src/python/src/_framework/face/testing/__init__.py
  71. 0
      src/python/src/_framework/face/testing/base_util.py
  72. 0
      src/python/src/_framework/face/testing/blocking_invocation_inline_service_test_case.py
  73. 0
      src/python/src/_framework/face/testing/callback.py
  74. 0
      src/python/src/_framework/face/testing/control.py
  75. 0
      src/python/src/_framework/face/testing/coverage.py
  76. 0
      src/python/src/_framework/face/testing/digest.py
  77. 0
      src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
  78. 0
      src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
  79. 0
      src/python/src/_framework/face/testing/interfaces.py
  80. 0
      src/python/src/_framework/face/testing/serial.py
  81. 0
      src/python/src/_framework/face/testing/service.py
  82. 0
      src/python/src/_framework/face/testing/stock_service.py
  83. 0
      src/python/src/_framework/face/testing/test_case.py
  84. 0
      src/python/src/_framework/foundation/__init__.py
  85. 0
      src/python/src/_framework/foundation/_later_test.py
  86. 0
      src/python/src/_framework/foundation/_logging_pool_test.py
  87. 0
      src/python/src/_framework/foundation/_timer_future.py
  88. 0
      src/python/src/_framework/foundation/abandonment.py
  89. 0
      src/python/src/_framework/foundation/callable_util.py
  90. 0
      src/python/src/_framework/foundation/future.py
  91. 0
      src/python/src/_framework/foundation/later.py
  92. 0
      src/python/src/_framework/foundation/logging_pool.py
  93. 0
      src/python/src/_framework/foundation/stream.py
  94. 0
      src/python/src/_framework/foundation/stream_testing.py
  95. 0
      src/python/src/_framework/foundation/stream_util.py
  96. 0
      src/python/src/_junkdrawer/__init__.py
  97. 266
      src/python/src/_junkdrawer/math_pb2.py
  98. 0
      src/python/src/_junkdrawer/stock_pb2.py
  99. 6
      test/cpp/qps/client.cc
  100. 26
      test/cpp/qps/server.cc
  101. Some files were not shown because too many files have changed in this diff Show More

@ -0,0 +1,7 @@
root = true
[**]
end_of_line = LF
indent_style = space
indent_size = 2
insert_final_newline = true
tab_width = 8

@ -24,7 +24,7 @@ Developers using gRPC typically start with the description of an RPC service
(a collection of methods), and generate client and server side interfaces
which they use on the client-side and implement on the server side.
By default, gRPC uses [Protocol Buffers](github.com/google/protobuf) as the
By default, gRPC uses [Protocol Buffers](https://github.com/google/protobuf) as the
Interface Definition Language (IDL) for describing both the service interface
and the structure of the payload messages. It is possible to use other
alternatives if desired.

@ -0,0 +1,86 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A setup module for the GRPC Python package."""
from distutils import core as _core
_EXTENSION_SOURCES = (
'src/_adapter/_c.c',
'src/_adapter/_call.c',
'src/_adapter/_channel.c',
'src/_adapter/_completion_queue.c',
'src/_adapter/_error.c',
'src/_adapter/_server.c',
)
_EXTENSION_INCLUDE_DIRECTORIES = (
'src',
# TODO(nathaniel): Can this path specification be made to work?
#'../../include',
)
_EXTENSION_LIBRARIES = (
'gpr',
'grpc',
)
_EXTENSION_LIBRARY_DIRECTORIES = (
# TODO(nathaniel): Can this path specification be made to work?
#'../../libs/dbg',
)
_EXTENSION_MODULE = _core.Extension(
'_adapter._c', sources=list(_EXTENSION_SOURCES),
include_dirs=_EXTENSION_INCLUDE_DIRECTORIES,
libraries=_EXTENSION_LIBRARIES,
library_dirs=_EXTENSION_LIBRARY_DIRECTORIES)
_PACKAGES=(
'_adapter',
'_framework',
'_framework.base',
'_framework.base.packets',
'_framework.common',
'_framework.face',
'_framework.face.testing',
'_framework.foundation',
'_junkdrawer',
)
_PACKAGE_DIRECTORIES = {
'_adapter': 'src/_adapter',
'_framework': 'src/_framework',
'_junkdrawer': 'src/_junkdrawer',
}
_core.setup(
name='grpc', version='0.0.1',
ext_modules=[_EXTENSION_MODULE], packages=_PACKAGES,
package_dir=_PACKAGE_DIRECTORIES)

@ -0,0 +1,17 @@
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _adapter import _face_test_case
from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case
class BlockingInvocationInlineServiceTest(
_face_test_case.FaceTestCase,
test_case.BlockingInvocationInlineServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,77 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <Python.h>
#include <grpc/grpc.h>
#include "_adapter/_completion_queue.h"
#include "_adapter/_channel.h"
#include "_adapter/_call.h"
#include "_adapter/_server.h"
static PyObject *init(PyObject *self, PyObject *args) {
grpc_init();
Py_RETURN_NONE;
}
static PyObject *shutdown(PyObject *self, PyObject *args) {
grpc_shutdown();
Py_RETURN_NONE;
}
static PyMethodDef _c_methods[] = {
{"init", init, METH_VARARGS, "Initialize the module's static state."},
{"shut_down", shutdown, METH_VARARGS,
"Shut down the module's static state."},
{NULL},
};
PyMODINIT_FUNC init_c(void) {
PyObject *module;
module = Py_InitModule3("_c", _c_methods,
"Wrappings of C structures and functions.");
if (pygrpc_add_completion_queue(module) == -1) {
return;
}
if (pygrpc_add_channel(module) == -1) {
return;
}
if (pygrpc_add_call(module) == -1) {
return;
}
if (pygrpc_add_server(module) == -1) {
return;
}
}

@ -0,0 +1,141 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for _adapter._c."""
import threading
import time
import unittest
from _adapter import _c
from _adapter import _datatypes
_TIMEOUT = 3
_FUTURE = time.time() + 60 * 60 * 24
_IDEMPOTENCE_DEMONSTRATION = 7
class _CTest(unittest.TestCase):
def testUpAndDown(self):
_c.init()
_c.shut_down()
def testCompletionQueue(self):
_c.init()
completion_queue = _c.CompletionQueue()
event = completion_queue.get(0)
self.assertIsNone(event)
event = completion_queue.get(time.time())
self.assertIsNone(event)
event = completion_queue.get(time.time() + _TIMEOUT)
self.assertIsNone(event)
completion_queue.stop()
for _ in range(_IDEMPOTENCE_DEMONSTRATION):
event = completion_queue.get(time.time() + _TIMEOUT)
self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
del completion_queue
del event
_c.shut_down()
def testChannel(self):
_c.init()
channel = _c.Channel('test host:12345')
del channel
_c.shut_down()
def testCall(self):
method = 'test method'
host = 'test host'
_c.init()
channel = _c.Channel('%s:%d' % (host, 12345))
call = _c.Call(channel, method, host, time.time() + _TIMEOUT)
del call
del channel
_c.shut_down()
def testServer(self):
_c.init()
completion_queue = _c.CompletionQueue()
server = _c.Server(completion_queue)
server.add_http2_addr('[::]:0')
server.start()
server.stop()
completion_queue.stop()
del server
del completion_queue
service_tag = object()
completion_queue = _c.CompletionQueue()
server = _c.Server(completion_queue)
server.add_http2_addr('[::]:0')
server.start()
server.service(service_tag)
server.stop()
completion_queue.stop()
event = completion_queue.get(time.time() + _TIMEOUT)
self.assertIs(event.kind, _datatypes.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(event.tag, service_tag)
self.assertIsNone(event.service_acceptance)
for _ in range(_IDEMPOTENCE_DEMONSTRATION):
event = completion_queue.get(time.time() + _TIMEOUT)
self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
del server
del completion_queue
completion_queue = _c.CompletionQueue()
server = _c.Server(completion_queue)
server.add_http2_addr('[::]:0')
server.start()
thread = threading.Thread(target=completion_queue.get, args=(_FUTURE,))
thread.start()
time.sleep(1)
server.stop()
completion_queue.stop()
for _ in range(_IDEMPOTENCE_DEMONSTRATION):
event = completion_queue.get(time.time() + _TIMEOUT)
self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
thread.join()
del server
del completion_queue
_c.shut_down()
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,292 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "_adapter/_call.h"
#include <math.h>
#include <Python.h>
#include <grpc/grpc.h>
#include "_adapter/_channel.h"
#include "_adapter/_completion_queue.h"
#include "_adapter/_error.h"
static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) {
const PyObject *channel;
const char *method;
const char *host;
const double deadline;
if (!PyArg_ParseTuple(args, "O!ssd", &pygrpc_ChannelType, &channel, &method,
&host, &deadline)) {
self->c_call = NULL;
return -1;
}
/* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own
* function with its own test coverage.
*/
self->c_call =
grpc_channel_create_call(((Channel *)channel)->c_channel, method, host,
gpr_time_from_nanos(deadline * GPR_NS_PER_SEC));
return 0;
}
static void pygrpc_call_dealloc(Call *self) {
if (self->c_call != NULL) {
grpc_call_destroy(self->c_call);
}
self->ob_type->tp_free((PyObject *)self);
}
static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) {
const PyObject *completion_queue;
const PyObject *metadata_tag;
const PyObject *finish_tag;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "O!OO", &pygrpc_CompletionQueueType,
&completion_queue, &metadata_tag, &finish_tag))) {
return NULL;
}
call_error = grpc_call_invoke(
self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
(void *)metadata_tag, (void *)finish_tag, 0);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(metadata_tag);
Py_INCREF(finish_tag);
}
return result;
}
static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
const char *bytes;
int length;
const PyObject *tag;
gpr_slice slice;
grpc_byte_buffer *byte_buffer;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "s#O", &bytes, &length, &tag))) {
return NULL;
}
slice = gpr_slice_from_copied_buffer(bytes, length);
byte_buffer = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
call_error = grpc_call_start_write(self->c_call, byte_buffer, (void *)tag, 0);
grpc_byte_buffer_destroy(byte_buffer);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static const PyObject *pygrpc_call_complete(Call *self, PyObject *args) {
const PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "O", &tag))) {
return NULL;
}
call_error = grpc_call_writes_done(self->c_call, (void *)tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
const PyObject *completion_queue;
const PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "O!O", &pygrpc_CompletionQueueType,
&completion_queue, &tag))) {
return NULL;
}
call_error = grpc_call_server_accept(
self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
(void *)tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static const PyObject *pygrpc_call_premetadata(Call *self, PyObject *args) {
/* TODO(b/18702680): Actually support metadata. */
return pygrpc_translate_call_error(
grpc_call_server_end_initial_metadata(self->c_call, 0));
}
static const PyObject *pygrpc_call_read(Call *self, PyObject *args) {
const PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "O", &tag))) {
return NULL;
}
call_error = grpc_call_start_read(self->c_call, (void *)tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
PyObject *status;
PyObject *code;
PyObject *details;
const PyObject *tag;
grpc_status_code c_code;
char *c_message;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "OO", &status, &tag))) {
return NULL;
}
code = PyObject_GetAttrString(status, "code");
details = PyObject_GetAttrString(status, "details");
c_code = PyInt_AsLong(code);
c_message = PyBytes_AsString(details);
Py_DECREF(code);
Py_DECREF(details);
call_error = grpc_call_start_write_status(self->c_call, c_code, c_message,
(void *)tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static const PyObject *pygrpc_call_cancel(Call *self) {
return pygrpc_translate_call_error(grpc_call_cancel(self->c_call));
}
static PyMethodDef methods[] = {
{"invoke", (PyCFunction)pygrpc_call_invoke, METH_VARARGS,
"Invoke this call."},
{"write", (PyCFunction)pygrpc_call_write, METH_VARARGS,
"Write bytes to this call."},
{"complete", (PyCFunction)pygrpc_call_complete, METH_VARARGS,
"Complete writes to this call."},
{"accept", (PyCFunction)pygrpc_call_accept, METH_VARARGS, "Accept an RPC."},
{"premetadata", (PyCFunction)pygrpc_call_premetadata, METH_VARARGS,
"Indicate the end of leading metadata in the response."},
{"read", (PyCFunction)pygrpc_call_read, METH_VARARGS,
"Read bytes from this call."},
{"status", (PyCFunction)pygrpc_call_status, METH_VARARGS,
"Report this call's status."},
{"cancel", (PyCFunction)pygrpc_call_cancel, METH_NOARGS,
"Cancel this call."},
{NULL}};
PyTypeObject pygrpc_CallType = {
PyObject_HEAD_INIT(NULL)0, /*ob_size*/
"_grpc.Call", /*tp_name*/
sizeof(Call), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)pygrpc_call_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /*tp_flags*/
"Wrapping of grpc_call.", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)pygrpc_call_init, /* tp_init */
};
int pygrpc_add_call(PyObject *module) {
pygrpc_CallType.tp_new = PyType_GenericNew;
if (PyType_Ready(&pygrpc_CallType) < 0) {
PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_CallType!");
return -1;
}
if (PyModule_AddObject(module, "Call", (PyObject *)&pygrpc_CallType) == -1) {
PyErr_SetString(PyExc_ImportError, "Couldn't add Call type to module!");
}
return 0;
}

@ -0,0 +1,46 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__CALL_H_
#define _ADAPTER__CALL_H_
#include <Python.h>
#include <grpc/grpc.h>
typedef struct { PyObject_HEAD grpc_call *c_call; } Call;
PyTypeObject pygrpc_CallType;
int pygrpc_add_call(PyObject *module);
#endif /* _ADAPTER__CALL_H_ */

@ -0,0 +1,109 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "_adapter/_channel.h"
#include <Python.h>
#include <grpc/grpc.h>
static int pygrpc_channel_init(Channel *self, PyObject *args, PyObject *kwds) {
const char *hostport;
if (!(PyArg_ParseTuple(args, "s", &hostport))) {
self->c_channel = NULL;
return -1;
}
self->c_channel = grpc_channel_create(hostport, NULL);
return 0;
}
static void pygrpc_channel_dealloc(Channel *self) {
if (self->c_channel != NULL) {
grpc_channel_destroy(self->c_channel);
}
self->ob_type->tp_free((PyObject *)self);
}
PyTypeObject pygrpc_ChannelType = {
PyObject_HEAD_INIT(NULL)0, /*ob_size*/
"_grpc.Channel", /*tp_name*/
sizeof(Channel), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)pygrpc_channel_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /*tp_flags*/
"Wrapping of grpc_channel.", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
0, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)pygrpc_channel_init, /* tp_init */
};
int pygrpc_add_channel(PyObject *module) {
pygrpc_ChannelType.tp_new = PyType_GenericNew;
if (PyType_Ready(&pygrpc_ChannelType) < 0) {
PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_ChannelType!");
return -1;
}
if (PyModule_AddObject(module, "Channel", (PyObject *)&pygrpc_ChannelType) ==
-1) {
PyErr_SetString(PyExc_ImportError, "Couldn't add Channel type to module!");
return -1;
}
return 0;
}

@ -0,0 +1,46 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__CHANNEL_H_
#define _ADAPTER__CHANNEL_H_
#include <Python.h>
#include <grpc/grpc.h>
typedef struct { PyObject_HEAD grpc_channel *c_channel; } Channel;
PyTypeObject pygrpc_ChannelType;
int pygrpc_add_channel(PyObject *module);
#endif /* _ADAPTER__CHANNEL_H_ */

@ -0,0 +1,76 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""State used by both invocation-side and service-side code."""
import enum
@enum.unique
class HighWrite(enum.Enum):
"""The possible categories of high-level write state."""
OPEN = 'OPEN'
CLOSED = 'CLOSED'
class WriteState(object):
"""A description of the state of writing to an RPC.
Attributes:
low: A side-specific value describing the low-level state of writing.
high: A HighWrite value describing the high-level state of writing.
pending: A list of bytestrings for the RPC waiting to be written to the
other side of the RPC.
"""
def __init__(self, low, high, pending):
self.low = low
self.high = high
self.pending = pending
class CommonRPCState(object):
"""A description of an RPC's state.
Attributes:
write: A WriteState describing the state of writing to the RPC.
sequence_number: The lowest-unused sequence number for use in generating
tickets locally describing the progress of the RPC.
deserializer: The behavior to be used to deserialize payload bytestreams
taken off the wire.
serializer: The behavior to be used to serialize payloads to be sent on the
wire.
"""
def __init__(self, write, sequence_number, deserializer, serializer):
self.write = write
self.sequence_number = sequence_number
self.deserializer = deserializer
self.serializer = serializer

@ -0,0 +1,541 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "_adapter/_completion_queue.h"
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include "_adapter/_call.h"
static PyObject *status_class;
static PyObject *service_acceptance_class;
static PyObject *event_class;
static PyObject *ok_status_code;
static PyObject *cancelled_status_code;
static PyObject *unknown_status_code;
static PyObject *invalid_argument_status_code;
static PyObject *expired_status_code;
static PyObject *not_found_status_code;
static PyObject *already_exists_status_code;
static PyObject *permission_denied_status_code;
static PyObject *unauthenticated_status_code;
static PyObject *resource_exhausted_status_code;
static PyObject *failed_precondition_status_code;
static PyObject *aborted_status_code;
static PyObject *out_of_range_status_code;
static PyObject *unimplemented_status_code;
static PyObject *internal_error_status_code;
static PyObject *unavailable_status_code;
static PyObject *data_loss_status_code;
static PyObject *stop_event_kind;
static PyObject *write_event_kind;
static PyObject *complete_event_kind;
static PyObject *service_event_kind;
static PyObject *read_event_kind;
static PyObject *metadata_event_kind;
static PyObject *finish_event_kind;
static PyObject *pygrpc_as_py_time(gpr_timespec *timespec) {
return Py_BuildValue("f",
timespec->tv_sec + ((double)timespec->tv_nsec) / 1.0E9);
}
static PyObject *pygrpc_status_code(grpc_status_code c_status_code) {
switch (c_status_code) {
case GRPC_STATUS_OK:
return ok_status_code;
case GRPC_STATUS_CANCELLED:
return cancelled_status_code;
case GRPC_STATUS_UNKNOWN:
return unknown_status_code;
case GRPC_STATUS_INVALID_ARGUMENT:
return invalid_argument_status_code;
case GRPC_STATUS_DEADLINE_EXCEEDED:
return expired_status_code;
case GRPC_STATUS_NOT_FOUND:
return not_found_status_code;
case GRPC_STATUS_ALREADY_EXISTS:
return already_exists_status_code;
case GRPC_STATUS_PERMISSION_DENIED:
return permission_denied_status_code;
case GRPC_STATUS_UNAUTHENTICATED:
return unauthenticated_status_code;
case GRPC_STATUS_RESOURCE_EXHAUSTED:
return resource_exhausted_status_code;
case GRPC_STATUS_FAILED_PRECONDITION:
return failed_precondition_status_code;
case GRPC_STATUS_ABORTED:
return aborted_status_code;
case GRPC_STATUS_OUT_OF_RANGE:
return out_of_range_status_code;
case GRPC_STATUS_UNIMPLEMENTED:
return unimplemented_status_code;
case GRPC_STATUS_INTERNAL:
return internal_error_status_code;
case GRPC_STATUS_UNAVAILABLE:
return unavailable_status_code;
case GRPC_STATUS_DATA_LOSS:
return data_loss_status_code;
default:
return NULL;
}
}
static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
return Py_BuildValue("(OOOOOOO)", stop_event_kind, Py_None, Py_None, Py_None,
Py_None, Py_None, Py_None);
}
static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
PyObject *write_accepted =
c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False;
return Py_BuildValue("(OOOOOOO)", write_event_kind, (PyObject *)c_event->tag,
write_accepted, Py_None, Py_None, Py_None, Py_None);
}
static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
PyObject *complete_accepted =
c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False;
return Py_BuildValue("(OOOOOOO)", complete_event_kind,
(PyObject *)c_event->tag, Py_None, complete_accepted,
Py_None, Py_None, Py_None);
}
static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
if (c_event->data.server_rpc_new.method == NULL) {
return Py_BuildValue("(OOOOOOO)", service_event_kind, c_event->tag,
Py_None, Py_None, Py_None, Py_None, Py_None);
} else {
PyObject *method = PyBytes_FromString(c_event->data.server_rpc_new.method);
PyObject *host = PyBytes_FromString(c_event->data.server_rpc_new.host);
PyObject *service_deadline =
pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline);
Call *call;
PyObject *service_acceptance_args;
PyObject *service_acceptance;
PyObject *event_args;
call = PyObject_New(Call, &pygrpc_CallType);
call->c_call = c_event->call;
service_acceptance_args =
Py_BuildValue("(OOOO)", call, method, host, service_deadline);
Py_DECREF(call);
Py_DECREF(method);
Py_DECREF(host);
Py_DECREF(service_deadline);
service_acceptance =
PyObject_CallObject(service_acceptance_class, service_acceptance_args);
Py_DECREF(service_acceptance_args);
event_args = Py_BuildValue("(OOOOOOO)", service_event_kind,
(PyObject *)c_event->tag, Py_None, Py_None,
service_acceptance, Py_None, Py_None);
Py_DECREF(service_acceptance);
return event_args;
}
}
static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (c_event->data.read == NULL) {
return Py_BuildValue("(OOOOOOO)", read_event_kind,
(PyObject *)c_event->tag, Py_None, Py_None, Py_None,
Py_None, Py_None);
} else {
size_t length;
size_t offset;
grpc_byte_buffer_reader *reader;
gpr_slice slice;
char *c_bytes;
PyObject *bytes;
PyObject *event_args;
length = grpc_byte_buffer_length(c_event->data.read);
reader = grpc_byte_buffer_reader_create(c_event->data.read);
c_bytes = gpr_malloc(length);
offset = 0;
while (grpc_byte_buffer_reader_next(reader, &slice)) {
memcpy(c_bytes + offset, GPR_SLICE_START_PTR(slice),
GPR_SLICE_LENGTH(slice));
offset += GPR_SLICE_LENGTH(slice);
}
grpc_byte_buffer_reader_destroy(reader);
bytes = PyBytes_FromStringAndSize(c_bytes, length);
gpr_free(c_bytes);
event_args =
Py_BuildValue("(OOOOOOO)", read_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, bytes, Py_None);
Py_DECREF(bytes);
return event_args;
}
}
static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
/* TODO(nathaniel): Actual transmission of metadata. */
return Py_BuildValue("(OOOOOOO)", metadata_event_kind,
(PyObject *)c_event->tag, Py_None, Py_None, Py_None,
Py_None, Py_None);
}
static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
PyObject *code;
PyObject *details;
PyObject *status_args;
PyObject *status;
PyObject *event_args;
code = pygrpc_status_code(c_event->data.finished.status);
if (code == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
return NULL;
}
if (c_event->data.finished.details == NULL) {
details = PyBytes_FromString("");
} else {
details = PyBytes_FromString(c_event->data.finished.details);
}
status_args = Py_BuildValue("(OO)", code, details);
Py_DECREF(details);
status = PyObject_CallObject(status_class, status_args);
Py_DECREF(status_args);
event_args =
Py_BuildValue("(OOOOOOO)", finish_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, status);
Py_DECREF(status);
return event_args;
}
static int pygrpc_completion_queue_init(CompletionQueue *self, PyObject *args,
PyObject *kwds) {
self->c_completion_queue = grpc_completion_queue_create();
return 0;
}
static void pygrpc_completion_queue_dealloc(CompletionQueue *self) {
grpc_completion_queue_destroy(self->c_completion_queue);
self->ob_type->tp_free((PyObject *)self);
}
static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
PyObject *args) {
PyObject *deadline;
double double_deadline;
gpr_timespec deadline_timespec;
grpc_event *c_event;
PyObject *event_args;
PyObject *event;
if (!(PyArg_ParseTuple(args, "O", &deadline))) {
return NULL;
}
if (deadline == Py_None) {
deadline_timespec = gpr_inf_future;
} else {
double_deadline = PyFloat_AsDouble(deadline);
deadline_timespec = gpr_time_from_nanos((long)(double_deadline * 1.0E9));
}
/* TODO(nathaniel): Suppress clang-format in this block and remove the
unnecessary and unPythonic semicolons trailing the _ALLOW_THREADS macros.
(Right now clang-format only understands //-demarcated suppressions.) */
Py_BEGIN_ALLOW_THREADS;
c_event =
grpc_completion_queue_next(self->c_completion_queue, deadline_timespec);
Py_END_ALLOW_THREADS;
if (c_event == NULL) {
Py_RETURN_NONE;
}
switch (c_event->type) {
case GRPC_QUEUE_SHUTDOWN:
event_args = pygrpc_stop_event_args(c_event);
break;
case GRPC_WRITE_ACCEPTED:
event_args = pygrpc_write_event_args(c_event);
break;
case GRPC_FINISH_ACCEPTED:
event_args = pygrpc_complete_event_args(c_event);
break;
case GRPC_SERVER_RPC_NEW:
event_args = pygrpc_service_event_args(c_event);
break;
case GRPC_READ:
event_args = pygrpc_read_event_args(c_event);
break;
case GRPC_CLIENT_METADATA_READ:
event_args = pygrpc_metadata_event_args(c_event);
break;
case GRPC_FINISHED:
event_args = pygrpc_finished_event_args(c_event);
break;
default:
PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
return NULL;
}
if (event_args == NULL) {
return NULL;
}
event = PyObject_CallObject(event_class, event_args);
Py_DECREF(event_args);
Py_XDECREF((PyObject *)c_event->tag);
grpc_event_finish(c_event);
return event;
}
static PyObject *pygrpc_completion_queue_stop(CompletionQueue *self) {
grpc_completion_queue_shutdown(self->c_completion_queue);
Py_RETURN_NONE;
}
static PyMethodDef methods[] = {
{"get", (PyCFunction)pygrpc_completion_queue_get, METH_VARARGS,
"Get the next event."},
{"stop", (PyCFunction)pygrpc_completion_queue_stop, METH_NOARGS,
"Stop this completion queue."},
{NULL}};
PyTypeObject pygrpc_CompletionQueueType = {
PyObject_HEAD_INIT(NULL)0, /*ob_size*/
"_gprc.CompletionQueue", /*tp_name*/
sizeof(CompletionQueue), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)pygrpc_completion_queue_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /*tp_flags*/
"Wrapping of grpc_completion_queue.", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)pygrpc_completion_queue_init, /* tp_init */
};
static int pygrpc_get_status_codes(PyObject *datatypes_module) {
PyObject *code_class = PyObject_GetAttrString(datatypes_module, "Code");
if (code_class == NULL) {
return -1;
}
ok_status_code = PyObject_GetAttrString(code_class, "OK");
if (ok_status_code == NULL) {
return -1;
}
cancelled_status_code = PyObject_GetAttrString(code_class, "CANCELLED");
if (cancelled_status_code == NULL) {
return -1;
}
unknown_status_code = PyObject_GetAttrString(code_class, "UNKNOWN");
if (unknown_status_code == NULL) {
return -1;
}
invalid_argument_status_code =
PyObject_GetAttrString(code_class, "INVALID_ARGUMENT");
if (invalid_argument_status_code == NULL) {
return -1;
}
expired_status_code = PyObject_GetAttrString(code_class, "EXPIRED");
if (expired_status_code == NULL) {
return -1;
}
not_found_status_code = PyObject_GetAttrString(code_class, "NOT_FOUND");
if (not_found_status_code == NULL) {
return -1;
}
already_exists_status_code =
PyObject_GetAttrString(code_class, "ALREADY_EXISTS");
if (already_exists_status_code == NULL) {
return -1;
}
permission_denied_status_code =
PyObject_GetAttrString(code_class, "PERMISSION_DENIED");
if (permission_denied_status_code == NULL) {
return -1;
}
unauthenticated_status_code =
PyObject_GetAttrString(code_class, "UNAUTHENTICATED");
if (unauthenticated_status_code == NULL) {
return -1;
}
resource_exhausted_status_code =
PyObject_GetAttrString(code_class, "RESOURCE_EXHAUSTED");
if (resource_exhausted_status_code == NULL) {
return -1;
}
failed_precondition_status_code =
PyObject_GetAttrString(code_class, "FAILED_PRECONDITION");
if (failed_precondition_status_code == NULL) {
return -1;
}
aborted_status_code = PyObject_GetAttrString(code_class, "ABORTED");
if (aborted_status_code == NULL) {
return -1;
}
out_of_range_status_code = PyObject_GetAttrString(code_class, "OUT_OF_RANGE");
if (out_of_range_status_code == NULL) {
return -1;
}
unimplemented_status_code =
PyObject_GetAttrString(code_class, "UNIMPLEMENTED");
if (unimplemented_status_code == NULL) {
return -1;
}
internal_error_status_code =
PyObject_GetAttrString(code_class, "INTERNAL_ERROR");
if (internal_error_status_code == NULL) {
return -1;
}
unavailable_status_code = PyObject_GetAttrString(code_class, "UNAVAILABLE");
if (unavailable_status_code == NULL) {
return -1;
}
data_loss_status_code = PyObject_GetAttrString(code_class, "DATA_LOSS");
if (data_loss_status_code == NULL) {
return -1;
}
Py_DECREF(code_class);
return 0;
}
static int pygrpc_get_event_kinds(PyObject *event_class) {
PyObject *kind_class = PyObject_GetAttrString(event_class, "Kind");
if (kind_class == NULL) {
return -1;
}
stop_event_kind = PyObject_GetAttrString(kind_class, "STOP");
if (stop_event_kind == NULL) {
return -1;
}
write_event_kind = PyObject_GetAttrString(kind_class, "WRITE_ACCEPTED");
if (write_event_kind == NULL) {
return -1;
}
complete_event_kind = PyObject_GetAttrString(kind_class, "COMPLETE_ACCEPTED");
if (complete_event_kind == NULL) {
return -1;
}
service_event_kind = PyObject_GetAttrString(kind_class, "SERVICE_ACCEPTED");
if (service_event_kind == NULL) {
return -1;
}
read_event_kind = PyObject_GetAttrString(kind_class, "READ_ACCEPTED");
if (read_event_kind == NULL) {
return -1;
}
metadata_event_kind = PyObject_GetAttrString(kind_class, "METADATA_ACCEPTED");
if (metadata_event_kind == NULL) {
return -1;
}
finish_event_kind = PyObject_GetAttrString(kind_class, "FINISH");
if (finish_event_kind == NULL) {
return -1;
}
Py_DECREF(kind_class);
return 0;
}
int pygrpc_add_completion_queue(PyObject *module) {
char *datatypes_module_path = "_adapter._datatypes";
PyObject *datatypes_module = PyImport_ImportModule(datatypes_module_path);
if (datatypes_module == NULL) {
PyErr_SetString(PyExc_ImportError, datatypes_module_path);
return -1;
}
status_class = PyObject_GetAttrString(datatypes_module, "Status");
service_acceptance_class =
PyObject_GetAttrString(datatypes_module, "ServiceAcceptance");
event_class = PyObject_GetAttrString(datatypes_module, "Event");
if (status_class == NULL || service_acceptance_class == NULL ||
event_class == NULL) {
PyErr_SetString(PyExc_ImportError, "Missing classes in _datatypes module!");
return -1;
}
if (pygrpc_get_status_codes(datatypes_module) == -1) {
PyErr_SetString(PyExc_ImportError, "Status codes import broken!");
return -1;
}
if (pygrpc_get_event_kinds(event_class) == -1) {
PyErr_SetString(PyExc_ImportError, "Event kinds import broken!");
return -1;
}
Py_DECREF(datatypes_module);
pygrpc_CompletionQueueType.tp_new = PyType_GenericNew;
if (PyType_Ready(&pygrpc_CompletionQueueType) < 0) {
PyErr_SetString(PyExc_RuntimeError,
"Error defining pygrpc_CompletionQueueType!");
return -1;
}
if (PyModule_AddObject(module, "CompletionQueue",
(PyObject *)&pygrpc_CompletionQueueType) == -1) {
PyErr_SetString(PyExc_ImportError,
"Couldn't add CompletionQueue type to module!");
return -1;
}
return 0;
}

@ -0,0 +1,48 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__COMPLETION_QUEUE_H_
#define _ADAPTER__COMPLETION_QUEUE_H_
#include <Python.h>
#include <grpc/grpc.h>
typedef struct {
PyObject_HEAD grpc_completion_queue *c_completion_queue;
} CompletionQueue;
PyTypeObject pygrpc_CompletionQueueType;
int pygrpc_add_completion_queue(PyObject *module);
#endif /* _ADAPTER__COMPLETION_QUEUE_H_ */

@ -0,0 +1,86 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Datatypes passed between Python and C code."""
import collections
import enum
@enum.unique
class Code(enum.IntEnum):
"""One Platform error codes (see status.h and codes.proto)."""
OK = 0
CANCELLED = 1
UNKNOWN = 2
INVALID_ARGUMENT = 3
EXPIRED = 4
NOT_FOUND = 5
ALREADY_EXISTS = 6
PERMISSION_DENIED = 7
UNAUTHENTICATED = 16
RESOURCE_EXHAUSTED = 8
FAILED_PRECONDITION = 9
ABORTED = 10
OUT_OF_RANGE = 11
UNIMPLEMENTED = 12
INTERNAL_ERROR = 13
UNAVAILABLE = 14
DATA_LOSS = 15
class Status(collections.namedtuple('Status', ['code', 'details'])):
"""Describes an RPC's overall status."""
class ServiceAcceptance(
collections.namedtuple(
'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
"""Describes an RPC on the service side at the start of service."""
class Event(
collections.namedtuple(
'Event',
['kind', 'tag', 'write_accepted', 'complete_accepted',
'service_acceptance', 'bytes', 'status'])):
"""Describes an event emitted from a completion queue."""
@enum.unique
class Kind(enum.Enum):
"""Describes the kind of an event."""
STOP = object()
WRITE_ACCEPTED = object()
COMPLETE_ACCEPTED = object()
SERVICE_ACCEPTED = object()
READ_ACCEPTED = object()
METADATA_ACCEPTED = object()
FINISH = object()

@ -0,0 +1,79 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "_adapter/_error.h"
#include <Python.h>
#include <grpc/grpc.h>
const PyObject *pygrpc_translate_call_error(grpc_call_error call_error) {
switch (call_error) {
case GRPC_CALL_OK:
Py_RETURN_NONE;
case GRPC_CALL_ERROR:
PyErr_SetString(PyExc_Exception, "Defect: unknown defect!");
return NULL;
case GRPC_CALL_ERROR_NOT_ON_SERVER:
PyErr_SetString(PyExc_Exception,
"Defect: client-only method called on server!");
return NULL;
case GRPC_CALL_ERROR_NOT_ON_CLIENT:
PyErr_SetString(PyExc_Exception,
"Defect: server-only method called on client!");
return NULL;
case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
PyErr_SetString(PyExc_Exception,
"Defect: attempted to accept already-accepted call!");
return NULL;
case GRPC_CALL_ERROR_ALREADY_INVOKED:
PyErr_SetString(PyExc_Exception,
"Defect: attempted to invoke already-invoked call!");
return NULL;
case GRPC_CALL_ERROR_NOT_INVOKED:
PyErr_SetString(PyExc_Exception, "Defect: Call not yet invoked!");
return NULL;
case GRPC_CALL_ERROR_ALREADY_FINISHED:
PyErr_SetString(PyExc_Exception, "Defect: Call already finished!");
return NULL;
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
PyErr_SetString(PyExc_Exception,
"Defect: Attempted extra read or extra write on call!");
return NULL;
case GRPC_CALL_ERROR_INVALID_FLAGS:
PyErr_SetString(PyExc_Exception, "Defect: invalid flags!");
return NULL;
default:
PyErr_SetString(PyExc_Exception, "Defect: Unknown call error!");
return NULL;
}
}

@ -0,0 +1,42 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__ERROR_H_
#define _ADAPTER__ERROR_H_
#include <Python.h>
#include <grpc/grpc.h>
const PyObject *pygrpc_translate_call_error(grpc_call_error call_error);
#endif /* _ADAPTER__ERROR_H_ */

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _adapter import _face_test_case
from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
class EventInvocationSynchronousEventServiceTest(
_face_test_case.FaceTestCase,
test_case.EventInvocationSynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,124 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Common construction and destruction for GRPC-backed Face-layer tests."""
import unittest
from _adapter import fore
from _adapter import rear
from _framework.base import util
from _framework.base.packets import implementations as tickets_implementations
from _framework.face import implementations as face_implementations
from _framework.face.testing import coverage
from _framework.face.testing import serial
from _framework.face.testing import test_case
from _framework.foundation import logging_pool
_TIMEOUT = 3
_MAXIMUM_TIMEOUT = 90
_MAXIMUM_POOL_SIZE = 400
class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
"""Provides abstract Face-layer tests a GRPC-backed implementation."""
def set_up_implementation(
self,
name,
methods,
inline_value_in_value_out_methods,
inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods,
event_value_in_value_out_methods,
event_value_in_stream_out_methods,
event_stream_in_value_out_methods,
event_stream_in_stream_out_methods,
multi_method):
pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
servicer = face_implementations.servicer(
pool,
inline_value_in_value_out_methods=inline_value_in_value_out_methods,
inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
event_value_in_value_out_methods=event_value_in_value_out_methods,
event_value_in_stream_out_methods=event_value_in_stream_out_methods,
event_stream_in_value_out_methods=event_stream_in_value_out_methods,
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
serialization = serial.serialization(methods)
fore_link = fore.ForeLink(
pool, serialization.request_deserializers,
serialization.response_serializers)
port = fore_link.start()
rear_link = rear.RearLink(
'localhost', port, pool,
serialization.request_serializers, serialization.response_deserializers)
rear_link.start()
front = tickets_implementations.front(pool, pool, pool)
back = tickets_implementations.back(
servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT)
fore_link.join_rear_link(back)
back.join_fore_link(fore_link)
rear_link.join_fore_link(front)
front.join_rear_link(rear_link)
server = face_implementations.server()
stub = face_implementations.stub(front, pool)
return server, stub, (rear_link, fore_link, front, back)
def tear_down_implementation(self, memo):
rear_link, fore_link, front, back = memo
# TODO(nathaniel): Waiting for the front and back to idle possibly should
# not be necessary - investigate as part of graceful shutdown work.
util.wait_for_idle(front)
util.wait_for_idle(back)
rear_link.stop()
fore_link.stop()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedUnaryRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedUnaryRequestStreamResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedStreamRequestUnaryResponse(self):
raise NotImplementedError()
@unittest.skip('Service-side failure not transmitted by GRPC.')
def testFailedStreamRequestStreamResponse(self):
raise NotImplementedError()

@ -0,0 +1,46 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""One of the tests of the Face layer of RPC Framework."""
import unittest
from _adapter import _face_test_case
from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
class FutureInvocationAsynchronousEventServiceTest(
_face_test_case.FaceTestCase,
test_case.FutureInvocationAsynchronousEventServiceTestCase,
unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,245 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test of the GRPC-backed ForeLink and RearLink."""
import threading
import unittest
from _adapter import _proto_scenarios
from _adapter import _test_links
from _adapter import fore
from _adapter import rear
from _framework.base import interfaces
from _framework.base.packets import packets as tickets
from _framework.foundation import logging_pool
_IDENTITY = lambda x: x
_TIMEOUT = 2
class RoundTripTest(unittest.TestCase):
def setUp(self):
self.fore_link_pool = logging_pool.pool(80)
self.rear_link_pool = logging_pool.pool(80)
def tearDown(self):
self.rear_link_pool.shutdown(wait=True)
self.fore_link_pool.shutdown(wait=True)
def testZeroMessageRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE):
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, 0, tickets.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: None}, {test_method: None})
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: None},
{test_method: None})
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is tickets.Kind.CONTINUATION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_fore_link.condition:
self.assertIs(test_fore_link.tickets[-1].kind, tickets.Kind.COMPLETION)
def testEntireRoundTrip(self):
test_operation_id = object()
test_method = 'test method'
test_front_to_back_datum = b'\x07'
test_back_to_front_datum = b'\x08'
test_fore_link = _test_links.ForeLink(None, None)
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.payload is None:
payload = None
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE)
if payload is not None or terminal:
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, rear_sequence_number[0],
tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION,
payload)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY})
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool, {test_method: _IDENTITY},
{test_method: _IDENTITY})
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
None, test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
front_to_back_payloads = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
back_to_front_payloads = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTupleEqual((test_front_to_back_datum,), front_to_back_payloads)
self.assertTupleEqual((test_back_to_front_datum,), back_to_front_payloads)
def _perform_scenario_test(self, scenario):
test_operation_id = object()
test_method = scenario.method()
test_fore_link = _test_links.ForeLink(None, None)
rear_lock = threading.Lock()
rear_sequence_number = [0]
def rear_action(front_to_back_ticket, fore_link):
with rear_lock:
if front_to_back_ticket.payload is not None:
response = scenario.response_for_request(front_to_back_ticket.payload)
else:
response = None
terminal = front_to_back_ticket.kind in (
tickets.Kind.COMPLETION, tickets.Kind.ENTIRE)
if response is not None or terminal:
back_to_front_ticket = tickets.BackToFrontPacket(
front_to_back_ticket.operation_id, rear_sequence_number[0],
tickets.Kind.COMPLETION if terminal else tickets.Kind.CONTINUATION,
response)
rear_sequence_number[0] += 1
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
fore_link = fore.ForeLink(
self.fore_link_pool, {test_method: scenario.deserialize_request},
{test_method: scenario.serialize_response})
fore_link.join_rear_link(test_rear_link)
test_rear_link.join_fore_link(fore_link)
port = fore_link.start()
rear_link = rear.RearLink(
'localhost', port, self.rear_link_pool,
{test_method: scenario.serialize_request},
{test_method: scenario.deserialize_response})
rear_link.join_fore_link(test_fore_link)
test_fore_link.join_rear_link(rear_link)
rear_link.start()
commencement_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method,
interfaces.FULL, None, None, _TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
continuation_ticket = tickets.FrontToBackPacket(
test_operation_id, fore_sequence_number, tickets.Kind.CONTINUATION,
None, None, None, request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
completion_ticket = tickets.FrontToBackPacket(
test_operation_id, fore_sequence_number, tickets.Kind.COMPLETION, None,
None, None, None, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not tickets.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
fore_link.stop()
with test_rear_link.condition:
requests = tuple(
ticket.payload for ticket in test_rear_link.tickets
if ticket.payload is not None)
with test_fore_link.condition:
responses = tuple(
ticket.payload for ticket in test_fore_link.tickets
if ticket.payload is not None)
self.assertTrue(scenario.verify_requests(requests))
self.assertTrue(scenario.verify_responses(responses))
def testEmptyScenario(self):
self._perform_scenario_test(_proto_scenarios.EmptyScenario())
def testBidirectionallyUnaryScenario(self):
self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario())
def testBidirectionallyStreamingScenario(self):
self._perform_scenario_test(
_proto_scenarios.BidirectionallyStreamingScenario())
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,97 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A test of invocation-side code unconnected to an RPC server."""
import unittest
from _adapter import _test_links
from _adapter import rear
from _framework.base import interfaces
from _framework.base.packets import packets
from _framework.foundation import logging_pool
_IDENTITY = lambda x: x
_TIMEOUT = 2
class LonelyRearLinkTest(unittest.TestCase):
def setUp(self):
self.pool = logging_pool.pool(80)
def tearDown(self):
self.pool.shutdown(wait=True)
def testUpAndDown(self):
rear_link = rear.RearLink('nonexistent', 54321, self.pool, {}, {})
rear_link.start()
rear_link.stop()
def _perform_lonely_client_test_with_ticket_kind(
self, front_to_back_ticket_kind):
test_operation_id = object()
test_method = 'test method'
fore_link = _test_links.ForeLink(None, None)
rear_link = rear.RearLink(
'nonexistent', 54321, self.pool, {test_method: None},
{test_method: None})
rear_link.join_fore_link(fore_link)
rear_link.start()
front_to_back_ticket = packets.FrontToBackPacket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with fore_link.condition:
while True:
if (fore_link.tickets and
fore_link.tickets[-1].kind is not packets.Kind.CONTINUATION):
break
fore_link.condition.wait()
rear_link.stop()
with fore_link.condition:
self.assertIsNot(fore_link.tickets[-1].kind, packets.Kind.COMPLETION)
@unittest.skip('TODO(nathaniel): This seems to have broken in the last few weeks; fix it.')
def testLonelyClientCommencementPacket(self):
self._perform_lonely_client_test_with_ticket_kind(
packets.Kind.COMMENCEMENT)
def testLonelyClientEntirePacket(self):
self._perform_lonely_client_test_with_ticket_kind(packets.Kind.ENTIRE)
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,55 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A Python interface for GRPC C core structures and behaviors."""
import atexit
import gc
from _adapter import _c
from _adapter import _datatypes
def _shut_down():
# force garbage collection before shutting down grpc, to ensure all grpc
# objects are cleaned up
gc.collect()
_c.shut_down()
_c.init()
atexit.register(_shut_down)
# pylint: disable=invalid-name
Code = _datatypes.Code
Status = _datatypes.Status
Event = _datatypes.Event
Call = _c.Call
Channel = _c.Channel
CompletionQueue = _c.CompletionQueue
Server = _c.Server
# pylint: enable=invalid-name

@ -0,0 +1,371 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests for _adapter._low."""
import time
import unittest
from _adapter import _low
_STREAM_LENGTH = 300
_TIMEOUT = 5
_AFTER_DELAY = 2
_FUTURE = time.time() + 60 * 60 * 24
_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
_BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH))
class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self):
host = 'nosuchhostexists'
port = 54321
method = 'test method'
deadline = time.time() + _TIMEOUT
after_deadline = deadline + _AFTER_DELAY
metadata_tag = object()
finish_tag = object()
completion_queue = _low.CompletionQueue()
channel = _low.Channel('%s:%d' % (host, port))
client_call = _low.Call(channel, method, host, deadline)
client_call.invoke(completion_queue, metadata_tag, finish_tag)
first_event = completion_queue.get(after_deadline)
self.assertIsNotNone(first_event)
second_event = completion_queue.get(after_deadline)
self.assertIsNotNone(second_event)
kinds = [event.kind for event in (first_event, second_event)]
self.assertItemsEqual(
(_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
kinds)
self.assertIsNone(completion_queue.get(after_deadline))
completion_queue.stop()
stop_event = completion_queue.get(_FUTURE)
self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
class EchoTest(unittest.TestCase):
def setUp(self):
self.host = 'localhost'
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port))
def tearDown(self):
self.server.stop()
# NOTE(nathaniel): Yep, this is weird; it's a consequence of
# grpc_server_destroy's being what has the effect of telling the server's
# completion queue to pump out all pending events/tags immediately rather
# than gracefully completing all outstanding RPCs while accepting no new
# ones.
# TODO(nathaniel): Deallocation of a Python object shouldn't have this kind
# of observable side effect let alone such an important one.
del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
while True:
event = self.server_completion_queue.get(_FUTURE)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
while True:
event = self.client_completion_queue.get(_FUTURE)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
self.server_completion_queue = None
self.client_completion_queue = None
def _perform_echo_test(self, test_data):
method = 'test method'
details = 'test details'
deadline = _FUTURE
metadata_tag = object()
finish_tag = object()
write_tag = object()
complete_tag = object()
service_tag = object()
read_tag = object()
status_tag = object()
server_data = []
client_data = []
client_call = _low.Call(self.channel, method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
service_accepted = self.server_completion_queue.get(_FUTURE)
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
self.assertEqual(method, service_accepted.service_acceptance.method)
self.assertEqual(self.host, service_accepted.service_acceptance.host)
self.assertIsNotNone(service_accepted.service_acceptance.call)
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
metadata_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
# TODO(nathaniel): Test transmission and reception of metadata.
for datum in test_data:
client_call.write(datum, write_tag)
write_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE)
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNotNone(read_accepted.bytes)
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
write_accepted = self.server_completion_queue.get(_FUTURE)
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag)
read_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNotNone(read_accepted.bytes)
client_data.append(read_accepted.bytes)
client_call.complete(complete_tag)
complete_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE)
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNone(read_accepted.bytes)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
else:
status_accepted = server_terminal_event_two
rpc_accepted = server_terminal_event_one
self.assertIsNotNone(status_accepted)
self.assertIsNotNone(rpc_accepted)
self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
self.assertEqual(status_tag, status_accepted.tag)
self.assertTrue(status_accepted.complete_accepted)
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(finish_tag, rpc_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag)
client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two
else:
read_accepted = client_terminal_event_two
finish_accepted = client_terminal_event_one
self.assertIsNotNone(read_accepted)
self.assertIsNotNone(finish_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNone(read_accepted.bytes)
self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
self.assertEqual(finish_tag, finish_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
server_timeout_none_event = self.server_completion_queue.get(0)
self.assertIsNone(server_timeout_none_event)
client_timeout_none_event = self.client_completion_queue.get(0)
self.assertIsNone(client_timeout_none_event)
self.assertSequenceEqual(test_data, server_data)
self.assertSequenceEqual(test_data, client_data)
def testNoEcho(self):
self._perform_echo_test(())
def testOneByteEcho(self):
self._perform_echo_test([b'\x07'])
def testOneManyByteEcho(self):
self._perform_echo_test([_BYTE_SEQUENCE])
def testManyOneByteEchoes(self):
self._perform_echo_test(_BYTE_SEQUENCE)
def testManyManyByteEchoes(self):
self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
class CancellationTest(unittest.TestCase):
def setUp(self):
self.host = 'localhost'
self.server_completion_queue = _low.CompletionQueue()
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port))
def tearDown(self):
self.server.stop()
del self.server
self.server_completion_queue.stop()
self.client_completion_queue.stop()
while True:
event = self.server_completion_queue.get(0)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
while True:
event = self.client_completion_queue.get(0)
if event is not None and event.kind is _low.Event.Kind.STOP:
break
def testCancellation(self):
method = 'test method'
deadline = _FUTURE
metadata_tag = object()
finish_tag = object()
write_tag = object()
service_tag = object()
read_tag = object()
test_data = _BYTE_SEQUENCE_SEQUENCE
server_data = []
client_data = []
client_call = _low.Call(self.channel, method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
service_accepted = self.server_completion_queue.get(_FUTURE)
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
metadata_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
client_call.write(datum, write_tag)
write_accepted = self.client_completion_queue.get(_FUTURE)
server_call.read(read_tag)
read_accepted = self.server_completion_queue.get(_FUTURE)
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
write_accepted = self.server_completion_queue.get(_FUTURE)
self.assertIsNotNone(write_accepted)
client_call.read(read_tag)
read_accepted = self.client_completion_queue.get(_FUTURE)
client_data.append(read_accepted.bytes)
client_call.cancel()
# cancel() is idempotent.
client_call.cancel()
client_call.cancel()
client_call.cancel()
server_call.read(read_tag)
server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
else:
read_accepted = server_terminal_event_two
rpc_accepted = server_terminal_event_one
self.assertIsNotNone(read_accepted)
self.assertIsNotNone(rpc_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertIsNone(read_accepted.bytes)
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
finish_event = self.client_completion_queue.get(_FUTURE)
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), finish_event.status)
server_timeout_none_event = self.server_completion_queue.get(0)
self.assertIsNone(server_timeout_none_event)
client_timeout_none_event = self.client_completion_queue.get(0)
self.assertIsNone(client_timeout_none_event)
self.assertSequenceEqual(test_data, server_data)
self.assertSequenceEqual(test_data, client_data)
class ExpirationTest(unittest.TestCase):
@unittest.skip('TODO(nathaniel): Expiration test!')
def testExpiration(self):
pass
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,261 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Test scenarios using protocol buffers."""
import abc
import threading
from _junkdrawer import math_pb2
class ProtoScenario(object):
"""An RPC test scenario using protocol buffers."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def method(self):
"""Access the test method name.
Returns:
The test method name.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_request(self, request):
"""Serialize a request protocol buffer.
Args:
request: A request protocol buffer.
Returns:
The bytestring serialization of the given request protocol buffer.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_request(self, request_bytestring):
"""Deserialize a request protocol buffer.
Args:
request_bytestring: The bytestring serialization of a request protocol
buffer.
Returns:
The request protocol buffer deserialized from the given byte string.
"""
raise NotImplementedError()
@abc.abstractmethod
def serialize_response(self, response):
"""Serialize a response protocol buffer.
Args:
response: A response protocol buffer.
Returns:
The bytestring serialization of the given response protocol buffer.
"""
raise NotImplementedError()
@abc.abstractmethod
def deserialize_response(self, response_bytestring):
"""Deserialize a response protocol buffer.
Args:
response_bytestring: The bytestring serialization of a response protocol
buffer.
Returns:
The response protocol buffer deserialized from the given byte string.
"""
raise NotImplementedError()
@abc.abstractmethod
def requests(self):
"""Access the sequence of requests for this scenario.
Returns:
A sequence of request protocol buffers.
"""
raise NotImplementedError()
@abc.abstractmethod
def response_for_request(self, request):
"""Access the response for a particular request.
Args:
request: A request protocol buffer.
Returns:
The response protocol buffer appropriate for the given request.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify_requests(self, experimental_requests):
"""Verify the requests transmitted through the system under test.
Args:
experimental_requests: The request protocol buffers transmitted through
the system under test.
Returns:
True if the requests satisfy this test scenario; False otherwise.
"""
raise NotImplementedError()
@abc.abstractmethod
def verify_responses(self, experimental_responses):
"""Verify the responses transmitted through the system under test.
Args:
experimental_responses: The response protocol buffers transmitted through
the system under test.
Returns:
True if the responses satisfy this test scenario; False otherwise.
"""
raise NotImplementedError()
class EmptyScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
def method(self):
return 'DivMany'
def serialize_request(self, request):
raise ValueError('This should not be necessary to call!')
def deserialize_request(self, request_bytestring):
raise ValueError('This should not be necessary to call!')
def serialize_response(self, response):
raise ValueError('This should not be necessary to call!')
def deserialize_response(self, response_bytestring):
raise ValueError('This should not be necessary to call!')
def requests(self):
return ()
def response_for_request(self, request):
raise ValueError('This should not be necessary to call!')
def verify_requests(self, experimental_requests):
return not experimental_requests
def verify_responses(self, experimental_responses):
return not experimental_responses
class BidirectionallyUnaryScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
_DIVIDEND = 59
_DIVISOR = 7
_QUOTIENT = 8
_REMAINDER = 3
_REQUEST = math_pb2.DivArgs(dividend=_DIVIDEND, divisor=_DIVISOR)
_RESPONSE = math_pb2.DivReply(quotient=_QUOTIENT, remainder=_REMAINDER)
def method(self):
return 'Div'
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, request_bytestring):
return math_pb2.DivArgs.FromString(request_bytestring)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, response_bytestring):
return math_pb2.DivReply.FromString(response_bytestring)
def requests(self):
return [self._REQUEST]
def response_for_request(self, request):
return self._RESPONSE
def verify_requests(self, experimental_requests):
return tuple(experimental_requests) == (self._REQUEST,)
def verify_responses(self, experimental_responses):
return tuple(experimental_responses) == (self._RESPONSE,)
class BidirectionallyStreamingScenario(ProtoScenario):
"""A scenario that transmits no protocol buffers in either direction."""
_STREAM_LENGTH = 200
_REQUESTS = tuple(
math_pb2.DivArgs(dividend=59 + index, divisor=7 + index)
for index in range(_STREAM_LENGTH))
def __init__(self):
self._lock = threading.Lock()
self._responses = []
def method(self):
return 'DivMany'
def serialize_request(self, request):
return request.SerializeToString()
def deserialize_request(self, request_bytestring):
return math_pb2.DivArgs.FromString(request_bytestring)
def serialize_response(self, response):
return response.SerializeToString()
def deserialize_response(self, response_bytestring):
return math_pb2.DivReply.FromString(response_bytestring)
def requests(self):
return self._REQUESTS
def response_for_request(self, request):
quotient, remainder = divmod(request.dividend, request.divisor)
response = math_pb2.DivReply(quotient=quotient, remainder=remainder)
with self._lock:
self._responses.append(response)
return response
def verify_requests(self, experimental_requests):
return tuple(experimental_requests) == self._REQUESTS
def verify_responses(self, experimental_responses):
with self._lock:
return tuple(experimental_responses) == tuple(self._responses)

@ -0,0 +1,167 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "_adapter/_server.h"
#include <Python.h>
#include <grpc/grpc.h>
#include "_adapter/_completion_queue.h"
#include "_adapter/_error.h"
static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
const PyObject *completion_queue;
if (!(PyArg_ParseTuple(args, "O!", &pygrpc_CompletionQueueType,
&completion_queue))) {
self->c_server = NULL;
return -1;
}
self->c_server = grpc_server_create(
((CompletionQueue *)completion_queue)->c_completion_queue, NULL);
return 0;
}
static void pygrpc_server_dealloc(Server *self) {
if (self->c_server != NULL) {
grpc_server_destroy(self->c_server);
}
self->ob_type->tp_free((PyObject *)self);
}
static PyObject *pygrpc_server_add_http2_addr(Server *self, PyObject *args) {
const char *addr;
int port;
PyArg_ParseTuple(args, "s", &addr);
port = grpc_server_add_http2_port(self->c_server, addr);
if (port == 0) {
PyErr_SetString(PyExc_RuntimeError, "Couldn't add port to server!");
return NULL;
}
return PyInt_FromLong(port);
}
static PyObject *pygrpc_server_start(Server *self) {
grpc_server_start(self->c_server);
Py_RETURN_NONE;
}
static const PyObject *pygrpc_server_service(Server *self, PyObject *args) {
const PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
if (!(PyArg_ParseTuple(args, "O", &tag))) {
return NULL;
}
call_error = grpc_server_request_call(self->c_server, (void *)tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
}
return result;
}
static PyObject *pygrpc_server_stop(Server *self) {
grpc_server_shutdown(self->c_server);
Py_RETURN_NONE;
}
static PyMethodDef methods[] = {
{"add_http2_addr", (PyCFunction)pygrpc_server_add_http2_addr, METH_VARARGS,
"Add an HTTP2 address."},
{"start", (PyCFunction)pygrpc_server_start, METH_NOARGS,
"Starts the server."},
{"service", (PyCFunction)pygrpc_server_service, METH_VARARGS,
"Services a call."},
{"stop", (PyCFunction)pygrpc_server_stop, METH_NOARGS, "Stops the server."},
{NULL}};
static PyTypeObject pygrpc_ServerType = {
PyObject_HEAD_INIT(NULL)0, /*ob_size*/
"_gprc.Server", /*tp_name*/
sizeof(Server), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)pygrpc_server_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
0, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT, /*tp_flags*/
"Wrapping of grpc_server.", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)pygrpc_server_init, /* tp_init */
};
int pygrpc_add_server(PyObject *module) {
pygrpc_ServerType.tp_new = PyType_GenericNew;
if (PyType_Ready(&pygrpc_ServerType) < 0) {
PyErr_SetString(PyExc_RuntimeError, "Error defining pygrpc_ServerType!");
return -1;
}
if (PyModule_AddObject(module, "Server", (PyObject *)&pygrpc_ServerType) ==
-1) {
PyErr_SetString(PyExc_ImportError, "Couldn't add Server type to module!");
return -1;
}
return 0;
}

@ -0,0 +1,44 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__SERVER_H_
#define _ADAPTER__SERVER_H_
#include <Python.h>
#include <grpc/grpc.h>
typedef struct { PyObject_HEAD grpc_server *c_server; } Server;
int pygrpc_add_server(PyObject *module);
#endif /* _ADAPTER__SERVER_H_ */

@ -0,0 +1,80 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Links suitable for use in tests."""
import threading
from _framework.base.packets import interfaces
class ForeLink(interfaces.ForeLink):
"""A ForeLink suitable for use in tests of RearLinks."""
def __init__(self, action, rear_link):
self.condition = threading.Condition()
self.tickets = []
self.action = action
self.rear_link = rear_link
def accept_back_to_front_ticket(self, ticket):
with self.condition:
self.tickets.append(ticket)
self.condition.notify_all()
action, rear_link = self.action, self.rear_link
if action is not None:
action(ticket, rear_link)
def join_rear_link(self, rear_link):
with self.condition:
self.rear_link = rear_link
class RearLink(interfaces.RearLink):
"""A RearLink suitable for use in tests of ForeLinks."""
def __init__(self, action, fore_link):
self.condition = threading.Condition()
self.tickets = []
self.action = action
self.fore_link = fore_link
def accept_front_to_back_ticket(self, ticket):
with self.condition:
self.tickets.append(ticket)
self.condition.notify_all()
action, fore_link = self.action, self.fore_link
if action is not None:
action(ticket, fore_link)
def join_fore_link(self, fore_link):
with self.condition:
self.fore_link = fore_link

@ -0,0 +1,309 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire."""
import enum
import logging
import threading
import time
from _adapter import _common
from _adapter import _low
from _framework.base import interfaces
from _framework.base.packets import interfaces as ticket_interfaces
from _framework.base.packets import null
from _framework.base.packets import packets as tickets
@enum.unique
class _LowWrite(enum.Enum):
"""The possible categories of low-level write state."""
OPEN = 'OPEN'
ACTIVE = 'ACTIVE'
CLOSED = 'CLOSED'
def _write(call, rpc_state, payload):
serialized_payload = rpc_state.serializer(payload)
if rpc_state.write.low is _LowWrite.OPEN:
call.write(serialized_payload, call)
rpc_state.write.low = _LowWrite.ACTIVE
else:
rpc_state.write.pending.append(serialized_payload)
def _status(call, rpc_state):
call.status(_low.Status(_low.Code.OK, ''), call)
rpc_state.write.low = _LowWrite.CLOSED
class ForeLink(ticket_interfaces.ForeLink):
"""A service-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
self, pool, request_deserializers, response_serializers, port=None):
"""Constructor.
Args:
pool: A thread pool.
request_deserializers: A dict from RPC method names to request object
deserializer behaviors.
response_serializers: A dict from RPC method names to response object
serializer behaviors.
port: The port on which to serve, or None to have a port selected
automatically.
"""
self._condition = threading.Condition()
self._pool = pool
self._request_deserializers = request_deserializers
self._response_serializers = response_serializers
self._port = port
self._rear_link = null.NULL_REAR_LINK
self._completion_queue = None
self._server = None
self._rpc_states = {}
self._spinning = False
def _on_stop_event(self):
self._spinning = False
self._condition.notify_all()
def _on_service_acceptance_event(self, event, server):
"""Handle a service invocation event."""
service_acceptance = event.service_acceptance
if service_acceptance is None:
return
call = service_acceptance.call
call.accept(self._completion_queue, call)
# TODO(nathaniel): Metadata support.
call.premetadata()
call.read(call)
method = service_acceptance.method
self._rpc_states[call] = _common.CommonRPCState(
_common.WriteState(_LowWrite.OPEN, _common.HighWrite.OPEN, []), 1,
self._request_deserializers[method],
self._response_serializers[method])
ticket = tickets.FrontToBackPacket(
call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)
server.service(None)
def _on_read_event(self, event):
"""Handle data arriving during an RPC."""
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if event.bytes is None:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.COMPLETION, None, None, None,
None, None)
else:
call.read(call)
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.CONTINUATION, None, None, None,
rpc_state.deserializer(event.bytes), None)
self._rear_link.accept_front_to_back_ticket(ticket)
def _on_write_event(self, event):
call = event.tag
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
if rpc_state.write.pending:
serialized_payload = rpc_state.write.pending.pop(0)
call.write(serialized_payload, call)
elif rpc_state.write.high is _common.HighWrite.CLOSED:
_status(call, rpc_state)
else:
rpc_state.write.low = _LowWrite.OPEN
def _on_complete_event(self, event):
if not event.complete_accepted:
logging.error('Complete not accepted! %s', (event,))
call = event.tag
rpc_state = self._rpc_states.pop(call, None)
if rpc_state is None:
return
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None,
None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
def _on_finish_event(self, event):
"""Handle termination of an RPC."""
call = event.tag
rpc_state = self._rpc_states.pop(call, None)
if rpc_state is None:
return
code = event.status.code
if code is _low.Code.OK:
return
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if code is _low.Code.CANCELLED:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.CANCELLATION, None, None, None,
None, None)
elif code is _low.Code.EXPIRED:
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.EXPIRATION, None, None, None,
None, None)
else:
# TODO(nathaniel): Better mapping of codes to ticket-categories
ticket = tickets.FrontToBackPacket(
call, sequence_number, tickets.Kind.TRANSMISSION_FAILURE, None, None,
None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
def _spin(self, completion_queue, server):
while True:
event = completion_queue.get(None)
with self._condition:
if event.kind is _low.Event.Kind.STOP:
self._on_stop_event()
return
elif self._server is None:
continue
elif event.kind is _low.Event.Kind.SERVICE_ACCEPTED:
self._on_service_acceptance_event(event, server)
elif event.kind is _low.Event.Kind.READ_ACCEPTED:
self._on_read_event(event)
elif event.kind is _low.Event.Kind.WRITE_ACCEPTED:
self._on_write_event(event)
elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED:
self._on_complete_event(event)
elif event.kind is _low.Event.Kind.FINISH:
self._on_finish_event(event)
else:
logging.error('Illegal event! %s', (event,))
def _continue(self, call, payload):
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
_write(call, rpc_state, payload)
def _complete(self, call, payload):
"""Handle completion of the writes of an RPC."""
rpc_state = self._rpc_states.get(call, None)
if rpc_state is None:
return
if rpc_state.write.low is _LowWrite.OPEN:
if payload is None:
_status(call, rpc_state)
else:
_write(call, rpc_state, payload)
elif rpc_state.write.low is _LowWrite.ACTIVE:
if payload is not None:
rpc_state.write.pending.append(rpc_state.serializer(payload))
else:
raise ValueError('Called to complete after having already completed!')
rpc_state.write.high = _common.HighWrite.CLOSED
def _cancel(self, call):
call.cancel()
self._rpc_states.pop(call, None)
def join_rear_link(self, rear_link):
"""See ticket_interfaces.ForeLink.join_rear_link for specification."""
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
def start(self):
"""Starts this ForeLink.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._condition:
self._completion_queue = _low.CompletionQueue()
self._server = _low.Server(self._completion_queue)
port = self._server.add_http2_addr(
'[::]:%d' % (0 if self._port is None else self._port))
self._server.start()
self._server.service(None)
self._pool.submit(self._spin, self._completion_queue, self._server)
self._spinning = True
return port
# TODO(nathaniel): Expose graceful-shutdown semantics in which this object
# enters a state in which it finishes ongoing RPCs but refuses new ones.
def stop(self):
"""Stops this ForeLink.
This method must be called for proper termination of this object, and no
attempts to exchange tickets with this object may be made after this method
has been called.
"""
with self._condition:
self._server.stop()
# TODO(b/18904187): Yep, this is weird. Deleting a server shouldn't have a
# behaviorally significant side-effect.
self._server = None
self._completion_queue.stop()
while self._spinning:
self._condition.wait()
def accept_back_to_front_ticket(self, ticket):
"""See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
with self._condition:
if self._server is None:
return
if ticket.kind is tickets.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)

@ -0,0 +1,344 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire."""
import enum
import logging
import threading
import time
from _adapter import _common
from _adapter import _low
from _framework.base.packets import interfaces as ticket_interfaces
from _framework.base.packets import null
from _framework.base.packets import packets as tickets
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
_low.Event.Kind.FINISH
)
@enum.unique
class _LowWrite(enum.Enum):
"""The possible categories of low-level write state."""
OPEN = 'OPEN'
ACTIVE = 'ACTIVE'
CLOSED = 'CLOSED'
class _RPCState(object):
"""The full state of any tracked RPC.
Attributes:
call: The _low.Call object for the RPC.
outstanding: The set of Event.Kind values describing expected future events
for the RPC.
active: A boolean indicating whether or not the RPC is active.
common: An _common.RPCState describing additional state for the RPC.
"""
def __init__(self, call, outstanding, active, common):
self.call = call
self.outstanding = outstanding
self.active = active
self.common = common
def _write(operation_id, call, outstanding, write_state, serialized_payload):
if write_state.low is _LowWrite.OPEN:
call.write(serialized_payload, operation_id)
outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
write_state.low = _LowWrite.ACTIVE
elif write_state.low is _LowWrite.ACTIVE:
write_state.pending.append(serialized_payload)
else:
raise ValueError('Write attempted after writes completed!')
class RearLink(ticket_interfaces.RearLink):
"""An invocation-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
self, host, port, pool, request_serializers, response_deserializers):
"""Constructor.
Args:
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
pool: A thread pool.
request_serializers: A dict from RPC method names to request object
serializer behaviors.
response_deserializers: A dict from RPC method names to response object
deserializer behaviors.
"""
self._condition = threading.Condition()
self._host = host
self._port = port
self._pool = pool
self._request_serializers = request_serializers
self._response_deserializers = response_deserializers
self._fore_link = null.NULL_FORE_LINK
self._completion_queue = None
self._channel = None
self._rpc_states = {}
self._spinning = False
def _on_write_event(self, operation_id, event, rpc_state):
if event.write_accepted:
if rpc_state.common.write.pending:
rpc_state.call.write(
rpc_state.common.write.pending.pop(0), operation_id)
rpc_state.outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
elif rpc_state.common.write.high is _common.HighWrite.CLOSED:
rpc_state.call.complete(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
rpc_state.common.write.low = _LowWrite.CLOSED
else:
rpc_state.common.write.low = _LowWrite.OPEN
else:
logging.error('RPC write not accepted! Event: %s', (event,))
rpc_state.active = False
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
def _on_read_event(self, operation_id, event, rpc_state):
if event.bytes is not None:
rpc_state.call.read(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.CONTINUATION, rpc_state.common.deserializer(event.bytes))
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
def _on_complete_event(self, operation_id, event, rpc_state):
if not event.complete_accepted:
logging.error('RPC complete not accepted! Event: %s', (event,))
rpc_state.active = False
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number,
tickets.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
# TODO(nathaniel): Metadata support.
def _on_metadata_event(self, operation_id, event, rpc_state): # pylint: disable=unused-argument
rpc_state.call.read(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
def _on_finish_event(self, operation_id, event, rpc_state):
"""Handle termination of an RPC."""
# TODO(nathaniel): Cover all statuses.
if event.status.code is _low.Code.OK:
category = tickets.Kind.COMPLETION
elif event.status.code is _low.Code.CANCELLED:
category = tickets.Kind.CANCELLATION
elif event.status.code is _low.Code.EXPIRED:
category = tickets.Kind.EXPIRATION
else:
category = tickets.Kind.TRANSMISSION_FAILURE
ticket = tickets.BackToFrontPacket(
operation_id, rpc_state.common.sequence_number, category,
None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
def _spin(self, completion_queue):
while True:
event = completion_queue.get(None)
operation_id = event.tag
with self._condition:
rpc_state = self._rpc_states[operation_id]
rpc_state.outstanding.remove(event.kind)
if rpc_state.active and self._completion_queue is not None:
if event.kind is _low.Event.Kind.WRITE_ACCEPTED:
self._on_write_event(operation_id, event, rpc_state)
elif event.kind is _low.Event.Kind.METADATA_ACCEPTED:
self._on_metadata_event(operation_id, event, rpc_state)
elif event.kind is _low.Event.Kind.READ_ACCEPTED:
self._on_read_event(operation_id, event, rpc_state)
elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED:
self._on_complete_event(operation_id, event, rpc_state)
elif event.kind is _low.Event.Kind.FINISH:
self._on_finish_event(operation_id, event, rpc_state)
else:
logging.error('Illegal RPC event! %s', (event,))
if not rpc_state.outstanding:
self._rpc_states.pop(operation_id)
if not self._rpc_states:
self._spinning = False
self._condition.notify_all()
return
def _invoke(self, operation_id, name, high_state, payload, timeout):
"""Invoke an RPC.
Args:
operation_id: Any object to be used as an operation ID for the RPC.
name: The RPC method name.
high_state: A _common.HighWrite value representing the "high write state"
of the RPC.
payload: A payload object for the RPC or None if no payload was given at
invocation-time.
timeout: A duration of time in seconds to allow for the RPC.
"""
request_serializer = self._request_serializers[name]
call = _low.Call(self._channel, name, self._host, time.time() + timeout)
call.invoke(self._completion_queue, operation_id, operation_id)
outstanding = set(_INVOCATION_EVENT_KINDS)
if payload is None:
if high_state is _common.HighWrite.CLOSED:
call.complete(operation_id)
low_state = _LowWrite.CLOSED
outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
else:
low_state = _LowWrite.OPEN
else:
serialized_payload = request_serializer(payload)
call.write(serialized_payload, operation_id)
outstanding.add(_low.Event.Kind.WRITE_ACCEPTED)
low_state = _LowWrite.ACTIVE
write_state = _common.WriteState(low_state, high_state, [])
common_state = _common.CommonRPCState(
write_state, 0, self._response_deserializers[name], request_serializer)
self._rpc_states[operation_id] = _RPCState(
call, outstanding, True, common_state)
if not self._spinning:
self._pool.submit(self._spin, self._completion_queue)
self._spinning = True
def _commence(self, operation_id, name, payload, timeout):
self._invoke(operation_id, name, _common.HighWrite.OPEN, payload, timeout)
def _continue(self, operation_id, payload):
rpc_state = self._rpc_states.get(operation_id, None)
if rpc_state is None or not rpc_state.active:
return
_write(
operation_id, rpc_state.call, rpc_state.outstanding,
rpc_state.common.write, rpc_state.common.serializer(payload))
def _complete(self, operation_id, payload):
"""Close writes associated with an ongoing RPC.
Args:
operation_id: Any object being use as an operation ID for the RPC.
payload: A payload object for the RPC (and thus the last payload object
for the RPC) or None if no payload was given along with the instruction
to indicate the end of writes for the RPC.
"""
rpc_state = self._rpc_states.get(operation_id, None)
if rpc_state is None or not rpc_state.active:
return
write_state = rpc_state.common.write
if payload is None:
if write_state.low is _LowWrite.OPEN:
rpc_state.call.complete(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.COMPLETE_ACCEPTED)
write_state.low = _LowWrite.CLOSED
else:
_write(
operation_id, rpc_state.call, rpc_state.outstanding, write_state,
rpc_state.common.serializer(payload))
write_state.high = _common.HighWrite.CLOSED
def _entire(self, operation_id, name, payload, timeout):
self._invoke(operation_id, name, _common.HighWrite.CLOSED, payload, timeout)
def _cancel(self, operation_id):
rpc_state = self._rpc_states.get(operation_id, None)
if rpc_state is not None and rpc_state.active:
rpc_state.call.cancel()
rpc_state.active = False
def join_fore_link(self, fore_link):
"""See ticket_interfaces.RearLink.join_fore_link for specification."""
with self._condition:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
def start(self):
"""Starts this RearLink.
This method must be called before attempting to exchange tickets with this
object.
"""
with self._condition:
self._completion_queue = _low.CompletionQueue()
self._channel = _low.Channel('%s:%d' % (self._host, self._port))
def stop(self):
"""Stops this RearLink.
This method must be called for proper termination of this object, and no
attempts to exchange tickets with this object may be made after this method
has been called.
"""
with self._condition:
self._completion_queue.stop()
self._completion_queue = None
while self._spinning:
self._condition.wait()
def accept_front_to_back_ticket(self, ticket):
"""See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec."""
with self._condition:
if self._completion_queue is None:
return
if ticket.kind is tickets.Kind.COMMENCEMENT:
self._commence(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is tickets.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
elif ticket.kind is tickets.Kind.ENTIRE:
self._entire(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
elif ticket.kind is tickets.Kind.CANCELLATION:
self._cancel(ticket.operation_id)
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id)

@ -0,0 +1,266 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# TODO(nathaniel): Remove this from source control after having made
# generation from the math.proto source part of GRPC's build-and-test
# process.
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: math.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='math.proto',
package='math',
serialized_pb=_b('\n\nmath.proto\x12\x04math\",\n\x07\x44ivArgs\x12\x10\n\x08\x64ividend\x18\x01 \x02(\x03\x12\x0f\n\x07\x64ivisor\x18\x02 \x02(\x03\"/\n\x08\x44ivReply\x12\x10\n\x08quotient\x18\x01 \x02(\x03\x12\x11\n\tremainder\x18\x02 \x02(\x03\"\x18\n\x07\x46ibArgs\x12\r\n\x05limit\x18\x01 \x01(\x03\"\x12\n\x03Num\x12\x0b\n\x03num\x18\x01 \x02(\x03\"\x19\n\x08\x46ibReply\x12\r\n\x05\x63ount\x18\x01 \x02(\x03\x32\xa4\x01\n\x04Math\x12&\n\x03\x44iv\x12\r.math.DivArgs\x1a\x0e.math.DivReply\"\x00\x12.\n\x07\x44ivMany\x12\r.math.DivArgs\x1a\x0e.math.DivReply\"\x00(\x01\x30\x01\x12#\n\x03\x46ib\x12\r.math.FibArgs\x1a\t.math.Num\"\x00\x30\x01\x12\x1f\n\x03Sum\x12\t.math.Num\x1a\t.math.Num\"\x00(\x01')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_DIVARGS = _descriptor.Descriptor(
name='DivArgs',
full_name='math.DivArgs',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='dividend', full_name='math.DivArgs.dividend', index=0,
number=1, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='divisor', full_name='math.DivArgs.divisor', index=1,
number=2, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=20,
serialized_end=64,
)
_DIVREPLY = _descriptor.Descriptor(
name='DivReply',
full_name='math.DivReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='quotient', full_name='math.DivReply.quotient', index=0,
number=1, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='remainder', full_name='math.DivReply.remainder', index=1,
number=2, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=66,
serialized_end=113,
)
_FIBARGS = _descriptor.Descriptor(
name='FibArgs',
full_name='math.FibArgs',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='limit', full_name='math.FibArgs.limit', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=115,
serialized_end=139,
)
_NUM = _descriptor.Descriptor(
name='Num',
full_name='math.Num',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='num', full_name='math.Num.num', index=0,
number=1, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=141,
serialized_end=159,
)
_FIBREPLY = _descriptor.Descriptor(
name='FibReply',
full_name='math.FibReply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='count', full_name='math.FibReply.count', index=0,
number=1, type=3, cpp_type=2, label=2,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
extension_ranges=[],
oneofs=[
],
serialized_start=161,
serialized_end=186,
)
DESCRIPTOR.message_types_by_name['DivArgs'] = _DIVARGS
DESCRIPTOR.message_types_by_name['DivReply'] = _DIVREPLY
DESCRIPTOR.message_types_by_name['FibArgs'] = _FIBARGS
DESCRIPTOR.message_types_by_name['Num'] = _NUM
DESCRIPTOR.message_types_by_name['FibReply'] = _FIBREPLY
DivArgs = _reflection.GeneratedProtocolMessageType('DivArgs', (_message.Message,), dict(
DESCRIPTOR = _DIVARGS,
__module__ = 'math_pb2'
# @@protoc_insertion_point(class_scope:math.DivArgs)
))
_sym_db.RegisterMessage(DivArgs)
DivReply = _reflection.GeneratedProtocolMessageType('DivReply', (_message.Message,), dict(
DESCRIPTOR = _DIVREPLY,
__module__ = 'math_pb2'
# @@protoc_insertion_point(class_scope:math.DivReply)
))
_sym_db.RegisterMessage(DivReply)
FibArgs = _reflection.GeneratedProtocolMessageType('FibArgs', (_message.Message,), dict(
DESCRIPTOR = _FIBARGS,
__module__ = 'math_pb2'
# @@protoc_insertion_point(class_scope:math.FibArgs)
))
_sym_db.RegisterMessage(FibArgs)
Num = _reflection.GeneratedProtocolMessageType('Num', (_message.Message,), dict(
DESCRIPTOR = _NUM,
__module__ = 'math_pb2'
# @@protoc_insertion_point(class_scope:math.Num)
))
_sym_db.RegisterMessage(Num)
FibReply = _reflection.GeneratedProtocolMessageType('FibReply', (_message.Message,), dict(
DESCRIPTOR = _FIBREPLY,
__module__ = 'math_pb2'
# @@protoc_insertion_point(class_scope:math.FibReply)
))
_sym_db.RegisterMessage(FibReply)
# @@protoc_insertion_point(module_scope)

@ -44,6 +44,7 @@
#include <google/gflags.h>
#include <grpc++/client_context.h>
#include <grpc++/status.h>
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/qpstest.pb.h"
@ -129,6 +130,8 @@ void RunTest(const int client_threads, const int client_channels,
grpc::Status status_beg = stub_stats->CollectServerStats(
&context_stats_begin, stats_request, &server_stats_begin);
grpc_profiler_start("qps_client.prof");
for (int i = 0; i < client_threads; i++) {
gpr_histogram *hist = gpr_histogram_create(0.01, 60e9);
GPR_ASSERT(hist != NULL);
@ -172,6 +175,9 @@ void RunTest(const int client_threads, const int client_channels,
for (auto &t : threads) {
t.join();
}
grpc_profiler_stop();
for (int i = 0; i < client_threads; i++) {
gpr_histogram *h = thread_stats[i];
gpr_log(GPR_INFO, "latency at thread %d (50/90/95/99/99.9): %f/%f/%f/%f/%f",

@ -33,6 +33,7 @@
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/signal.h>
#include <thread>
#include <google/gflags.h>
@ -43,6 +44,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
#include <grpc/grpc.h>
@ -63,11 +65,15 @@ using grpc::testing::StatsRequest;
using grpc::testing::TestService;
using grpc::Status;
static bool got_sigint = false;
static void sigint_handler(int x) { got_sigint = 1; }
static double time_double(struct timeval* tv) {
return tv->tv_sec + 1e-6 * tv->tv_usec;
}
bool SetPayload(PayloadType type, int size, Payload* payload) {
static bool SetPayload(PayloadType type, int size, Payload* payload) {
PayloadType response_type = type;
// TODO(yangg): Support UNCOMPRESSABLE payload.
if (type != PayloadType::COMPRESSABLE) {
@ -79,7 +85,9 @@ bool SetPayload(PayloadType type, int size, Payload* payload) {
return true;
}
class TestServiceImpl : public TestService::Service {
namespace {
class TestServiceImpl final : public TestService::Service {
public:
Status CollectServerStats(ServerContext* context, const StatsRequest*,
ServerStats* response) {
@ -104,7 +112,9 @@ class TestServiceImpl : public TestService::Service {
}
};
void RunServer() {
} // namespace
static void RunServer() {
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", FLAGS_port);
@ -118,10 +128,15 @@ void RunServer() {
builder.RegisterService(service.service());
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
while (true) {
grpc_profiler_start("qps_server.prof");
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
grpc_profiler_stop();
gpr_free(server_address);
}
@ -129,6 +144,8 @@ int main(int argc, char** argv) {
grpc_init();
google::ParseCommandLineFlags(&argc, &argv, true);
signal(SIGINT, sigint_handler);
GPR_ASSERT(FLAGS_port != 0);
GPR_ASSERT(!FLAGS_enable_ssl);
RunServer();
@ -136,3 +153,4 @@ int main(int argc, char** argv) {
grpc_shutdown();
return 0;
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save