Add metadata support to low-level Python framework

pull/1163/head
Masood Malekghassemi 10 years ago
parent 7a6ecc2f91
commit 841f90f86f
  1. 21
      src/python/src/grpc/_adapter/_call.c
  2. 75
      src/python/src/grpc/_adapter/_completion_queue.c
  3. 2
      src/python/src/grpc/_adapter/_datatypes.py
  4. 45
      src/python/src/grpc/_adapter/_low_test.py

@ -160,8 +160,22 @@ static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
return result;
}
static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) {
const char* key = NULL;
const char* value = NULL;
int value_length = 0;
if (!PyArg_ParseTuple(args, "ss#", &key, &value, &value_length)) {
return NULL;
}
grpc_metadata metadata;
metadata.key = key;
metadata.value = value;
metadata.value_length = value_length;
return pygrpc_translate_call_error(
grpc_call_add_metadata_old(self->c_call, &metadata, 0));
}
static const PyObject *pygrpc_call_premetadata(Call *self) {
/* TODO(nathaniel): Metadata support. */
return pygrpc_translate_call_error(
grpc_call_server_end_initial_metadata_old(self->c_call, 0));
}
@ -236,6 +250,11 @@ static PyMethodDef methods[] = {
{"complete", (PyCFunction)pygrpc_call_complete, METH_O,
"Complete writes to this call."},
{"accept", (PyCFunction)pygrpc_call_accept, METH_VARARGS, "Accept an RPC."},
{"add_metadata", (PyCFunction)pygrpc_call_add_metadata, METH_VARARGS,
"Add metadata to the call. May not be called after invoke on the client "
"side. On the server side: when called before premetadata it provides "
"'leading' metadata, when called after premetadata but before status it "
"provides 'trailing metadata'; may not be called after status."},
{"premetadata", (PyCFunction)pygrpc_call_premetadata, METH_VARARGS,
"Indicate the end of leading metadata in the response."},
{"read", (PyCFunction)pygrpc_call_read, METH_O,

@ -115,35 +115,56 @@ static PyObject *pygrpc_status_code(grpc_status_code c_status_code) {
}
}
static PyObject *pygrpc_metadata_collection_get(
grpc_metadata *metadata_elements, size_t count) {
PyObject *metadata = PyList_New(count);
size_t i;
for (i = 0; i < count; ++i) {
grpc_metadata elem = metadata_elements[i];
PyObject *key = PyString_FromString(elem.key);
PyObject *value = PyString_FromStringAndSize(elem.value, elem.value_length);
PyObject* kvp = PyTuple_Pack(2, key, value);
// n.b. PyList_SetItem *steals* a reference to the set element.
PyList_SetItem(metadata, i, kvp);
Py_DECREF(key);
Py_DECREF(value);
}
return metadata;
}
static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
return PyTuple_Pack(7, stop_event_kind, Py_None, Py_None, Py_None,
Py_None, Py_None, Py_None);
return PyTuple_Pack(8, stop_event_kind, Py_None, Py_None, Py_None,
Py_None, Py_None, Py_None, Py_None);
}
static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
PyObject *write_accepted =
c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False;
return PyTuple_Pack(7, write_event_kind, (PyObject *)c_event->tag,
write_accepted, Py_None, Py_None, Py_None, Py_None);
return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag,
write_accepted, Py_None, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
PyObject *complete_accepted =
c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False;
return PyTuple_Pack(7, complete_event_kind, (PyObject *)c_event->tag,
Py_None, complete_accepted, Py_None, Py_None, Py_None);
return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag,
Py_None, complete_accepted, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
if (c_event->data.server_rpc_new.method == NULL) {
return PyTuple_Pack(7, service_event_kind, c_event->tag,
Py_None, Py_None, Py_None, Py_None, Py_None);
return PyTuple_Pack(
8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None,
Py_None, Py_None);
} else {
PyObject *method = NULL;
PyObject *host = NULL;
PyObject *service_deadline = NULL;
Call *call = NULL;
PyObject *service_acceptance = NULL;
PyObject *metadata = NULL;
PyObject *event_args = NULL;
method = PyBytes_FromString(c_event->data.server_rpc_new.method);
@ -173,11 +194,16 @@ static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
goto error;
}
event_args = PyTuple_Pack(7, service_event_kind,
metadata = pygrpc_metadata_collection_get(
c_event->data.server_rpc_new.metadata_elements,
c_event->data.server_rpc_new.metadata_count);
event_args = PyTuple_Pack(8, service_event_kind,
(PyObject *)c_event->tag, Py_None, Py_None,
service_acceptance, Py_None, Py_None);
service_acceptance, Py_None, Py_None,
metadata);
Py_DECREF(service_acceptance);
Py_DECREF(metadata);
error:
Py_XDECREF(call);
Py_XDECREF(method);
@ -190,8 +216,8 @@ error:
static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (c_event->data.read == NULL) {
return PyTuple_Pack(7, read_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, Py_None);
return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, Py_None, Py_None);
} else {
size_t length;
size_t offset;
@ -216,17 +242,23 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (bytes == NULL) {
return NULL;
}
event_args = PyTuple_Pack(7, read_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, bytes, Py_None);
event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, bytes, Py_None,
Py_None);
Py_DECREF(bytes);
return event_args;
}
}
static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
/* TODO(nathaniel): Actual transmission of metadata. */
return PyTuple_Pack(7, metadata_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, Py_None);
PyObject *metadata = pygrpc_metadata_collection_get(
c_event->data.client_metadata_read.elements,
c_event->data.client_metadata_read.count);
PyObject* result = PyTuple_Pack(
8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None,
Py_None, Py_None, Py_None, metadata);
Py_DECREF(metadata);
return result;
}
static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
@ -253,9 +285,14 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
if (status == NULL) {
return NULL;
}
event_args = PyTuple_Pack(7, finish_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, status);
PyObject* metadata = pygrpc_metadata_collection_get(
c_event->data.finished.metadata_elements,
c_event->data.finished.metadata_count);
event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag,
Py_None, Py_None, Py_None, Py_None, status,
metadata);
Py_DECREF(status);
Py_DECREF(metadata);
return event_args;
}

@ -70,7 +70,7 @@ class Event(
collections.namedtuple(
'Event',
['kind', 'tag', 'write_accepted', 'complete_accepted',
'service_acceptance', 'bytes', 'status'])):
'service_acceptance', 'bytes', 'status', 'metadata'])):
"""Describes an event emitted from a completion queue."""
@enum.unique

@ -115,6 +115,18 @@ class EchoTest(unittest.TestCase):
def _perform_echo_test(self, test_data):
method = 'test method'
details = 'test details'
server_leading_metadata_key = 'my_server_leading_key'
server_leading_metadata_value = 'my_server_leading_value'
server_trailing_metadata_key = 'my_server_trailing_key'
server_trailing_metadata_value = 'my_server_trailing_value'
client_metadata_key = 'my_client_key'
client_metadata_value = 'my_client_value'
server_leading_binary_metadata_key = 'my_server_leading_key-bin'
server_leading_binary_metadata_value = b'\0'*2047
server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
server_trailing_binary_metadata_value = b'\0'*2047
client_binary_metadata_key = 'my_client_key-bin'
client_binary_metadata_value = b'\0'*2047
deadline = _FUTURE
metadata_tag = object()
finish_tag = object()
@ -128,6 +140,9 @@ class EchoTest(unittest.TestCase):
client_data = []
client_call = _low.Call(self.channel, method, self.host, deadline)
client_call.add_metadata(client_metadata_key, client_metadata_value)
client_call.add_metadata(client_binary_metadata_key,
client_binary_metadata_value)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
@ -139,15 +154,31 @@ class EchoTest(unittest.TestCase):
self.assertEqual(method, service_accepted.service_acceptance.method)
self.assertEqual(self.host, service_accepted.service_acceptance.host)
self.assertIsNotNone(service_accepted.service_acceptance.call)
metadata = dict(service_accepted.metadata)
self.assertIn(client_metadata_key, metadata)
self.assertEqual(client_metadata_value, metadata[client_metadata_key])
self.assertIn(client_binary_metadata_key, metadata)
self.assertEqual(client_binary_metadata_value,
metadata[client_binary_metadata_key])
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.add_metadata(server_leading_metadata_key,
server_leading_metadata_value)
server_call.add_metadata(server_leading_binary_metadata_key,
server_leading_binary_metadata_value)
server_call.premetadata()
metadata_accepted = self.client_completion_queue.get(_FUTURE)
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
# TODO(nathaniel): Test transmission and reception of metadata.
metadata = dict(metadata_accepted.metadata)
self.assertIn(server_leading_metadata_key, metadata)
self.assertEqual(server_leading_metadata_value,
metadata[server_leading_metadata_key])
self.assertIn(server_leading_binary_metadata_key, metadata)
self.assertEqual(server_leading_binary_metadata_value,
metadata[server_leading_binary_metadata_key])
for datum in test_data:
client_call.write(datum, write_tag)
@ -194,6 +225,11 @@ class EchoTest(unittest.TestCase):
self.assertEqual(read_tag, read_accepted.tag)
self.assertIsNone(read_accepted.bytes)
server_call.add_metadata(server_trailing_metadata_key,
server_trailing_metadata_value)
server_call.add_metadata(server_trailing_binary_metadata_key,
server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
@ -229,6 +265,13 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
self.assertEqual(finish_tag, finish_accepted.tag)
self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
metadata = dict(finish_accepted.metadata)
self.assertIn(server_trailing_metadata_key, metadata)
self.assertEqual(server_trailing_metadata_value,
metadata[server_trailing_metadata_key])
self.assertIn(server_trailing_binary_metadata_key, metadata)
self.assertEqual(server_trailing_binary_metadata_value,
metadata[server_trailing_binary_metadata_key])
server_timeout_none_event = self.server_completion_queue.get(0)
self.assertIsNone(server_timeout_none_event)

Loading…
Cancel
Save