diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c index d8806e56805..f837267e9a5 100644 --- a/src/python/src/grpc/_adapter/_call.c +++ b/src/python/src/grpc/_adapter/_call.c @@ -160,8 +160,22 @@ static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) { return result; } +static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) { + const char* key = NULL; + const char* value = NULL; + int value_length = 0; + if (!PyArg_ParseTuple(args, "ss#", &key, &value, &value_length)) { + return NULL; + } + grpc_metadata metadata; + metadata.key = key; + metadata.value = value; + metadata.value_length = value_length; + return pygrpc_translate_call_error( + grpc_call_add_metadata_old(self->c_call, &metadata, 0)); +} + static const PyObject *pygrpc_call_premetadata(Call *self) { - /* TODO(nathaniel): Metadata support. */ return pygrpc_translate_call_error( grpc_call_server_end_initial_metadata_old(self->c_call, 0)); } @@ -236,6 +250,11 @@ static PyMethodDef methods[] = { {"complete", (PyCFunction)pygrpc_call_complete, METH_O, "Complete writes to this call."}, {"accept", (PyCFunction)pygrpc_call_accept, METH_VARARGS, "Accept an RPC."}, + {"add_metadata", (PyCFunction)pygrpc_call_add_metadata, METH_VARARGS, + "Add metadata to the call. May not be called after invoke on the client " + "side. On the server side: when called before premetadata it provides " + "'leading' metadata, when called after premetadata but before status it " + "provides 'trailing metadata'; may not be called after status."}, {"premetadata", (PyCFunction)pygrpc_call_premetadata, METH_VARARGS, "Indicate the end of leading metadata in the response."}, {"read", (PyCFunction)pygrpc_call_read, METH_O, diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c index b56ca1926e5..76d6b6cb449 100644 --- a/src/python/src/grpc/_adapter/_completion_queue.c +++ b/src/python/src/grpc/_adapter/_completion_queue.c @@ -115,35 +115,56 @@ static PyObject *pygrpc_status_code(grpc_status_code c_status_code) { } } +static PyObject *pygrpc_metadata_collection_get( + grpc_metadata *metadata_elements, size_t count) { + PyObject *metadata = PyList_New(count); + size_t i; + for (i = 0; i < count; ++i) { + grpc_metadata elem = metadata_elements[i]; + PyObject *key = PyString_FromString(elem.key); + PyObject *value = PyString_FromStringAndSize(elem.value, elem.value_length); + PyObject* kvp = PyTuple_Pack(2, key, value); + // n.b. PyList_SetItem *steals* a reference to the set element. + PyList_SetItem(metadata, i, kvp); + Py_DECREF(key); + Py_DECREF(value); + } + return metadata; +} + static PyObject *pygrpc_stop_event_args(grpc_event *c_event) { - return PyTuple_Pack(7, stop_event_kind, Py_None, Py_None, Py_None, - Py_None, Py_None, Py_None); + return PyTuple_Pack(8, stop_event_kind, Py_None, Py_None, Py_None, + Py_None, Py_None, Py_None, Py_None); } static PyObject *pygrpc_write_event_args(grpc_event *c_event) { PyObject *write_accepted = c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(7, write_event_kind, (PyObject *)c_event->tag, - write_accepted, Py_None, Py_None, Py_None, Py_None); + return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag, + write_accepted, Py_None, Py_None, Py_None, Py_None, + Py_None); } static PyObject *pygrpc_complete_event_args(grpc_event *c_event) { PyObject *complete_accepted = c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(7, complete_event_kind, (PyObject *)c_event->tag, - Py_None, complete_accepted, Py_None, Py_None, Py_None); + return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag, + Py_None, complete_accepted, Py_None, Py_None, Py_None, + Py_None); } static PyObject *pygrpc_service_event_args(grpc_event *c_event) { if (c_event->data.server_rpc_new.method == NULL) { - return PyTuple_Pack(7, service_event_kind, c_event->tag, - Py_None, Py_None, Py_None, Py_None, Py_None); + return PyTuple_Pack( + 8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None, + Py_None, Py_None); } else { PyObject *method = NULL; PyObject *host = NULL; PyObject *service_deadline = NULL; Call *call = NULL; PyObject *service_acceptance = NULL; + PyObject *metadata = NULL; PyObject *event_args = NULL; method = PyBytes_FromString(c_event->data.server_rpc_new.method); @@ -173,11 +194,16 @@ static PyObject *pygrpc_service_event_args(grpc_event *c_event) { goto error; } - event_args = PyTuple_Pack(7, service_event_kind, + metadata = pygrpc_metadata_collection_get( + c_event->data.server_rpc_new.metadata_elements, + c_event->data.server_rpc_new.metadata_count); + event_args = PyTuple_Pack(8, service_event_kind, (PyObject *)c_event->tag, Py_None, Py_None, - service_acceptance, Py_None, Py_None); + service_acceptance, Py_None, Py_None, + metadata); Py_DECREF(service_acceptance); + Py_DECREF(metadata); error: Py_XDECREF(call); Py_XDECREF(method); @@ -190,8 +216,8 @@ error: static PyObject *pygrpc_read_event_args(grpc_event *c_event) { if (c_event->data.read == NULL) { - return PyTuple_Pack(7, read_event_kind, (PyObject *)c_event->tag, - Py_None, Py_None, Py_None, Py_None, Py_None); + return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + Py_None, Py_None, Py_None, Py_None, Py_None, Py_None); } else { size_t length; size_t offset; @@ -216,17 +242,23 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { if (bytes == NULL) { return NULL; } - event_args = PyTuple_Pack(7, read_event_kind, (PyObject *)c_event->tag, - Py_None, Py_None, Py_None, bytes, Py_None); + event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + Py_None, Py_None, Py_None, bytes, Py_None, + Py_None); Py_DECREF(bytes); return event_args; } } static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) { - /* TODO(nathaniel): Actual transmission of metadata. */ - return PyTuple_Pack(7, metadata_event_kind, (PyObject *)c_event->tag, - Py_None, Py_None, Py_None, Py_None, Py_None); + PyObject *metadata = pygrpc_metadata_collection_get( + c_event->data.client_metadata_read.elements, + c_event->data.client_metadata_read.count); + PyObject* result = PyTuple_Pack( + 8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None, + Py_None, Py_None, Py_None, metadata); + Py_DECREF(metadata); + return result; } static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { @@ -253,9 +285,14 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { if (status == NULL) { return NULL; } - event_args = PyTuple_Pack(7, finish_event_kind, (PyObject *)c_event->tag, - Py_None, Py_None, Py_None, Py_None, status); + PyObject* metadata = pygrpc_metadata_collection_get( + c_event->data.finished.metadata_elements, + c_event->data.finished.metadata_count); + event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag, + Py_None, Py_None, Py_None, Py_None, status, + metadata); Py_DECREF(status); + Py_DECREF(metadata); return event_args; } diff --git a/src/python/src/grpc/_adapter/_datatypes.py b/src/python/src/grpc/_adapter/_datatypes.py index e271ec83b90..3b227842432 100644 --- a/src/python/src/grpc/_adapter/_datatypes.py +++ b/src/python/src/grpc/_adapter/_datatypes.py @@ -70,7 +70,7 @@ class Event( collections.namedtuple( 'Event', ['kind', 'tag', 'write_accepted', 'complete_accepted', - 'service_acceptance', 'bytes', 'status'])): + 'service_acceptance', 'bytes', 'status', 'metadata'])): """Describes an event emitted from a completion queue.""" @enum.unique diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index b04ac1c9509..e88b70969ca 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -115,6 +115,18 @@ class EchoTest(unittest.TestCase): def _perform_echo_test(self, test_data): method = 'test method' details = 'test details' + server_leading_metadata_key = 'my_server_leading_key' + server_leading_metadata_value = 'my_server_leading_value' + server_trailing_metadata_key = 'my_server_trailing_key' + server_trailing_metadata_value = 'my_server_trailing_value' + client_metadata_key = 'my_client_key' + client_metadata_value = 'my_client_value' + server_leading_binary_metadata_key = 'my_server_leading_key-bin' + server_leading_binary_metadata_value = b'\0'*2047 + server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' + server_trailing_binary_metadata_value = b'\0'*2047 + client_binary_metadata_key = 'my_client_key-bin' + client_binary_metadata_value = b'\0'*2047 deadline = _FUTURE metadata_tag = object() finish_tag = object() @@ -128,6 +140,9 @@ class EchoTest(unittest.TestCase): client_data = [] client_call = _low.Call(self.channel, method, self.host, deadline) + client_call.add_metadata(client_metadata_key, client_metadata_value) + client_call.add_metadata(client_binary_metadata_key, + client_binary_metadata_value) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) @@ -139,15 +154,31 @@ class EchoTest(unittest.TestCase): self.assertEqual(method, service_accepted.service_acceptance.method) self.assertEqual(self.host, service_accepted.service_acceptance.host) self.assertIsNotNone(service_accepted.service_acceptance.call) + metadata = dict(service_accepted.metadata) + self.assertIn(client_metadata_key, metadata) + self.assertEqual(client_metadata_value, metadata[client_metadata_key]) + self.assertIn(client_binary_metadata_key, metadata) + self.assertEqual(client_binary_metadata_value, + metadata[client_binary_metadata_key]) server_call = service_accepted.service_acceptance.call server_call.accept(self.server_completion_queue, finish_tag) + server_call.add_metadata(server_leading_metadata_key, + server_leading_metadata_value) + server_call.add_metadata(server_leading_binary_metadata_key, + server_leading_binary_metadata_value) server_call.premetadata() metadata_accepted = self.client_completion_queue.get(_FUTURE) self.assertIsNotNone(metadata_accepted) self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) self.assertEqual(metadata_tag, metadata_accepted.tag) - # TODO(nathaniel): Test transmission and reception of metadata. + metadata = dict(metadata_accepted.metadata) + self.assertIn(server_leading_metadata_key, metadata) + self.assertEqual(server_leading_metadata_value, + metadata[server_leading_metadata_key]) + self.assertIn(server_leading_binary_metadata_key, metadata) + self.assertEqual(server_leading_binary_metadata_value, + metadata[server_leading_binary_metadata_key]) for datum in test_data: client_call.write(datum, write_tag) @@ -194,6 +225,11 @@ class EchoTest(unittest.TestCase): self.assertEqual(read_tag, read_accepted.tag) self.assertIsNone(read_accepted.bytes) + server_call.add_metadata(server_trailing_metadata_key, + server_trailing_metadata_value) + server_call.add_metadata(server_trailing_binary_metadata_key, + server_trailing_binary_metadata_value) + server_call.status(_low.Status(_low.Code.OK, details), status_tag) server_terminal_event_one = self.server_completion_queue.get(_FUTURE) server_terminal_event_two = self.server_completion_queue.get(_FUTURE) @@ -229,6 +265,13 @@ class EchoTest(unittest.TestCase): self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) self.assertEqual(finish_tag, finish_accepted.tag) self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) + metadata = dict(finish_accepted.metadata) + self.assertIn(server_trailing_metadata_key, metadata) + self.assertEqual(server_trailing_metadata_value, + metadata[server_trailing_metadata_key]) + self.assertIn(server_trailing_binary_metadata_key, metadata) + self.assertEqual(server_trailing_binary_metadata_value, + metadata[server_trailing_binary_metadata_key]) server_timeout_none_event = self.server_completion_queue.get(0) self.assertIsNone(server_timeout_none_event)