Migrate Python to batch core API

pull/1382/head
Masood Malekghassemi 10 years ago
parent cac5f1d532
commit f579e1d176
  1. 5
      src/python/src/grpc/_adapter/_c_test.py
  2. 237
      src/python/src/grpc/_adapter/_call.c
  3. 28
      src/python/src/grpc/_adapter/_call.h
  4. 171
      src/python/src/grpc/_adapter/_completion_queue.c
  5. 8
      src/python/src/grpc/_adapter/_low_test.py
  6. 20
      src/python/src/grpc/_adapter/_server.c
  7. 4
      src/python/src/grpc/_adapter/_server.h
  8. 65
      src/python/src/grpc/_adapter/_tag.c
  9. 70
      src/python/src/grpc/_adapter/_tag.h
  10. 2
      src/python/src/grpc/_adapter/rear.py
  11. 1
      src/python/src/setup.py

@ -83,8 +83,11 @@ class _CTest(unittest.TestCase):
_c.init()
channel = _c.Channel('%s:%d' % (host, 12345), None)
call = _c.Call(channel, method, host, time.time() + _TIMEOUT)
completion_queue = _c.CompletionQueue()
call = _c.Call(channel, completion_queue, method, host,
time.time() + _TIMEOUT)
del call
del completion_queue
del channel
_c.shut_down()

@ -36,90 +36,166 @@
#include <math.h>
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include "grpc/_adapter/_channel.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
#include "grpc/_adapter/_tag.h"
static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) {
const PyObject *channel;
static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
Call *self = (Call *)type->tp_alloc(type, 0);
Channel *channel;
CompletionQueue *completion_queue;
const char *method;
const char *host;
double deadline;
static char *kwlist[] = {"channel", "method", "host", "deadline", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!ssd:Call", kwlist,
&pygrpc_ChannelType, &channel, &method,
&host, &deadline)) {
return -1;
static char *kwlist[] = {"channel", "completion_queue",
"method", "host", "deadline", NULL};
if (!PyArg_ParseTupleAndKeywords(
args, kwds, "O!O!ssd:Call", kwlist,
&pygrpc_ChannelType, &channel,
&pygrpc_CompletionQueueType, &completion_queue,
&method, &host, &deadline)) {
return NULL;
}
/* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own
* function with its own test coverage.
*/
self->c_call = grpc_channel_create_call_old(
((Channel *)channel)->c_channel, method, host,
self->c_call = grpc_channel_create_call(
channel->c_channel, completion_queue->c_completion_queue, method, host,
gpr_time_from_nanos(deadline * GPR_NS_PER_SEC));
return 0;
self->completion_queue = completion_queue;
Py_INCREF(self->completion_queue);
self->channel = channel;
Py_INCREF(self->channel);
grpc_call_details_init(&self->call_details);
grpc_metadata_array_init(&self->recv_metadata);
grpc_metadata_array_init(&self->recv_trailing_metadata);
self->send_metadata = NULL;
self->send_metadata_count = 0;
self->send_trailing_metadata = NULL;
self->send_trailing_metadata_count = 0;
self->send_message = NULL;
self->recv_message = NULL;
self->adding_to_trailing = 0;
return (PyObject *)self;
}
static void pygrpc_call_dealloc(Call *self) {
if (self->c_call != NULL) {
grpc_call_destroy(self->c_call);
}
Py_XDECREF(self->completion_queue);
Py_XDECREF(self->channel);
Py_XDECREF(self->server);
grpc_call_details_destroy(&self->call_details);
grpc_metadata_array_destroy(&self->recv_metadata);
grpc_metadata_array_destroy(&self->recv_trailing_metadata);
if (self->send_message) {
grpc_byte_buffer_destroy(self->send_message);
}
if (self->recv_message) {
grpc_byte_buffer_destroy(self->recv_message);
}
gpr_free(self->status_details);
gpr_free(self->send_metadata);
gpr_free(self->send_trailing_metadata);
self->ob_type->tp_free((PyObject *)self);
}
static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) {
const PyObject *completion_queue;
const PyObject *metadata_tag;
const PyObject *finish_tag;
PyObject *completion_queue;
PyObject *metadata_tag;
PyObject *finish_tag;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_init_metadata_tag;
pygrpc_tag *c_metadata_tag;
pygrpc_tag *c_finish_tag;
grpc_op send_initial_metadata;
grpc_op recv_initial_metadata;
grpc_op recv_status_on_client;
if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType,
&completion_queue, &metadata_tag, &finish_tag))) {
return NULL;
}
call_error = grpc_call_invoke_old(
self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
(void *)metadata_tag, (void *)finish_tag, 0);
send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA;
send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata;
send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count;
recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA;
recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata;
recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata;
recv_status_on_client.data.recv_status_on_client.status = &self->status;
recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details;
recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity;
c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self);
c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self);
call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag);
result = pygrpc_translate_call_error(call_error);
if (result == NULL) {
pygrpc_tag_destroy(c_init_metadata_tag);
pygrpc_tag_destroy(c_metadata_tag);
pygrpc_tag_destroy(c_finish_tag);
return result;
}
call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag);
result = pygrpc_translate_call_error(call_error);
if (result == NULL) {
pygrpc_tag_destroy(c_metadata_tag);
pygrpc_tag_destroy(c_finish_tag);
return result;
}
call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(metadata_tag);
Py_INCREF(finish_tag);
if (result == NULL) {
pygrpc_tag_destroy(c_finish_tag);
return result;
}
return result;
}
static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
const char *bytes;
int length;
const PyObject *tag;
PyObject *tag;
gpr_slice slice;
grpc_byte_buffer *byte_buffer;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag;
grpc_op op;
if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) {
return NULL;
}
c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self);
slice = gpr_slice_from_copied_buffer(bytes, length);
byte_buffer = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
call_error =
grpc_call_start_write_old(self->c_call, byte_buffer, (void *)tag, 0);
if (self->send_message) {
grpc_byte_buffer_destroy(self->send_message);
}
self->send_message = byte_buffer;
op.op = GRPC_OP_SEND_MESSAGE;
op.data.send_message = self->send_message;
grpc_byte_buffer_destroy(byte_buffer);
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
@ -127,36 +203,42 @@ static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
grpc_op op;
call_error = grpc_call_writes_done_old(self->c_call, (void *)tag);
op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
const PyObject *completion_queue;
const PyObject *tag;
PyObject *completion_queue;
PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag;
grpc_op op;
if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType,
&completion_queue, &tag))) {
return NULL;
}
call_error = grpc_call_server_accept_old(
self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
(void *)tag);
result = pygrpc_translate_call_error(call_error);
op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op.data.recv_close_on_server.cancelled = &self->cancelled;
c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self);
if (result != NULL) {
Py_INCREF(tag);
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
@ -171,24 +253,52 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) {
metadata.key = key;
metadata.value = value;
metadata.value_length = value_length;
return pygrpc_translate_call_error(
grpc_call_add_metadata_old(self->c_call, &metadata, 0));
if (self->adding_to_trailing) {
self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata));
self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata;
self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1;
} else {
self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata));
self->send_metadata[self->send_metadata_count] = metadata;
self->send_metadata_count = self->send_metadata_count + 1;
}
return pygrpc_translate_call_error(GRPC_CALL_OK);
}
static const PyObject *pygrpc_call_premetadata(Call *self) {
return pygrpc_translate_call_error(
grpc_call_server_end_initial_metadata_old(self->c_call, 0));
grpc_op op;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.data.send_initial_metadata.metadata = self->send_metadata;
op.data.send_initial_metadata.count = self->send_metadata_count;
self->adding_to_trailing = 1;
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) {
grpc_op op;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self);
call_error = grpc_call_start_read_old(self->c_call, (void *)tag);
op.op = GRPC_OP_RECV_MESSAGE;
if (self->recv_message) {
grpc_byte_buffer_destroy(self->recv_message);
self->recv_message = NULL;
}
op.data.recv_message = &self->recv_message;
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
@ -197,15 +307,18 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
PyObject *status;
PyObject *code;
PyObject *details;
const PyObject *tag;
PyObject *tag;
grpc_status_code c_code;
char *c_message;
grpc_call_error call_error;
const PyObject *result;
pygrpc_tag *c_tag;
grpc_op op;
if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) {
return NULL;
}
c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
code = PyObject_GetAttrString(status, "code");
if (code == NULL) {
@ -227,13 +340,21 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
if (c_message == NULL) {
return NULL;
}
call_error = grpc_call_start_write_status_old(self->c_call, c_code, c_message,
(void *)tag);
if (self->status_details) {
gpr_free(self->status_details);
}
self->status_details = gpr_malloc(strlen(c_message)+1);
strcpy(self->status_details, c_message);
op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count;
op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata;
op.data.send_status_from_server.status = c_code;
op.data.send_status_from_server.status_details = self->status_details;
call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
Py_INCREF(tag);
if (result == NULL) {
pygrpc_tag_destroy(c_tag);
}
return result;
}
@ -301,9 +422,9 @@ PyTypeObject pygrpc_CallType = {
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)pygrpc_call_init, /* tp_init */
0, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
pygrpc_call_new, /* tp_new */
};
int pygrpc_add_call(PyObject *module) {

@ -37,8 +37,36 @@
#include <Python.h>
#include <grpc/grpc.h>
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_channel.h"
#include "grpc/_adapter/_server.h"
typedef struct {
PyObject_HEAD
CompletionQueue *completion_queue;
Channel *channel;
Server *server;
/* Legacy state. */
grpc_call_details call_details;
grpc_metadata_array recv_metadata;
grpc_metadata_array recv_trailing_metadata;
grpc_metadata *send_metadata;
size_t send_metadata_count;
grpc_metadata *send_trailing_metadata;
size_t send_trailing_metadata_count;
int adding_to_trailing;
grpc_byte_buffer *send_message;
grpc_byte_buffer *recv_message;
grpc_status_code status;
char *status_details;
size_t status_details_capacity;
int cancelled;
grpc_call *c_call;
} Call;

@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include "grpc/_adapter/_call.h"
#include "grpc/_adapter/_tag.h"
static PyObject *status_class;
static PyObject *service_acceptance_class;
@ -138,74 +139,70 @@ static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
PyObject *write_accepted =
c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False;
return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag,
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
PyObject *write_accepted = Py_True;
return PyTuple_Pack(8, write_event_kind, user_tag,
write_accepted, Py_None, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
PyObject *complete_accepted =
c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False;
return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag,
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
PyObject *complete_accepted = Py_True;
return PyTuple_Pack(8, complete_event_kind, user_tag,
Py_None, complete_accepted, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
if (c_event->data.server_rpc_new.method == NULL) {
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
if (tag->call->call_details.method == NULL) {
return PyTuple_Pack(
8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None,
8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None,
Py_None, Py_None);
} else {
PyObject *method = NULL;
PyObject *host = NULL;
PyObject *service_deadline = NULL;
Call *call = NULL;
PyObject *service_acceptance = NULL;
PyObject *metadata = NULL;
PyObject *event_args = NULL;
method = PyBytes_FromString(c_event->data.server_rpc_new.method);
method = PyBytes_FromString(tag->call->call_details.method);
if (method == NULL) {
goto error;
}
host = PyBytes_FromString(c_event->data.server_rpc_new.host);
host = PyBytes_FromString(tag->call->call_details.host);
if (host == NULL) {
goto error;
}
service_deadline =
pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline);
pygrpc_as_py_time(&tag->call->call_details.deadline);
if (service_deadline == NULL) {
goto error;
}
call = PyObject_New(Call, &pygrpc_CallType);
if (call == NULL) {
goto error;
}
call->c_call = c_event->call;
service_acceptance =
PyObject_CallFunctionObjArgs(service_acceptance_class, call, method,
host, service_deadline, NULL);
PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call,
method, host, service_deadline, NULL);
if (service_acceptance == NULL) {
goto error;
}
metadata = pygrpc_metadata_collection_get(
c_event->data.server_rpc_new.metadata_elements,
c_event->data.server_rpc_new.metadata_count);
tag->call->recv_metadata.metadata,
tag->call->recv_metadata.count);
event_args = PyTuple_Pack(8, service_event_kind,
(PyObject *)c_event->tag, Py_None, Py_None,
user_tag, Py_None, Py_None,
service_acceptance, Py_None, Py_None,
metadata);
Py_DECREF(service_acceptance);
Py_DECREF(metadata);
error:
Py_XDECREF(call);
Py_XDECREF(method);
Py_XDECREF(host);
Py_XDECREF(service_deadline);
@ -215,8 +212,10 @@ error:
}
static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (c_event->data.read == NULL) {
return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
if (tag->call->recv_message == NULL) {
return PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, Py_None, Py_None);
} else {
size_t length;
@ -227,8 +226,8 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
PyObject *bytes;
PyObject *event_args;
length = grpc_byte_buffer_length(c_event->data.read);
reader = grpc_byte_buffer_reader_create(c_event->data.read);
length = grpc_byte_buffer_length(tag->call->recv_message);
reader = grpc_byte_buffer_reader_create(tag->call->recv_message);
c_bytes = gpr_malloc(length);
offset = 0;
while (grpc_byte_buffer_reader_next(reader, &slice)) {
@ -242,7 +241,7 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (bytes == NULL) {
return NULL;
}
event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
event_args = PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, bytes, Py_None,
Py_None);
Py_DECREF(bytes);
@ -251,32 +250,65 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
PyObject *metadata = pygrpc_metadata_collection_get(
c_event->data.client_metadata_read.elements,
c_event->data.client_metadata_read.count);
tag->call->recv_metadata.metadata,
tag->call->recv_metadata.count);
PyObject* result = PyTuple_Pack(
8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None,
8, metadata_event_kind, user_tag, Py_None, Py_None,
Py_None, Py_None, Py_None, metadata);
Py_DECREF(metadata);
return result;
}
static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) {
PyObject *code;
PyObject *details;
PyObject *status;
PyObject *event_args;
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK);
if (code == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
return NULL;
}
details = PyBytes_FromString("");
if (details == NULL) {
return NULL;
}
status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL);
Py_DECREF(details);
if (status == NULL) {
return NULL;
}
event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, status,
Py_None);
Py_DECREF(status);
return event_args;
}
static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) {
PyObject *code;
PyObject *details;
PyObject *status;
PyObject *event_args;
PyObject *metadata;
pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
PyObject *user_tag = tag->user_tag;
code = pygrpc_status_code(c_event->data.finished.status);
code = pygrpc_status_code(tag->call->status);
if (code == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
return NULL;
}
if (c_event->data.finished.details == NULL) {
if (tag->call->status_details == NULL) {
details = PyBytes_FromString("");
} else {
details = PyBytes_FromString(c_event->data.finished.details);
details = PyBytes_FromString(tag->call->status_details);
}
if (details == NULL) {
return NULL;
@ -287,9 +319,9 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
return NULL;
}
metadata = pygrpc_metadata_collection_get(
c_event->data.finished.metadata_elements,
c_event->data.finished.metadata_count);
event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag,
tag->call->recv_trailing_metadata.metadata,
tag->call->recv_trailing_metadata.count);
event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, status,
metadata);
Py_DECREF(status);
@ -348,28 +380,51 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
Py_RETURN_NONE;
}
pygrpc_tag *tag = (pygrpc_tag *)c_event->tag;
switch (c_event->type) {
case GRPC_QUEUE_SHUTDOWN:
event_args = pygrpc_stop_event_args(c_event);
break;
case GRPC_WRITE_ACCEPTED:
event_args = pygrpc_write_event_args(c_event);
break;
case GRPC_FINISH_ACCEPTED:
event_args = pygrpc_complete_event_args(c_event);
break;
case GRPC_SERVER_RPC_NEW:
event_args = pygrpc_service_event_args(c_event);
break;
case GRPC_READ:
event_args = pygrpc_read_event_args(c_event);
break;
case GRPC_CLIENT_METADATA_READ:
event_args = pygrpc_metadata_event_args(c_event);
break;
case GRPC_FINISHED:
event_args = pygrpc_finished_event_args(c_event);
case GRPC_OP_COMPLETE: {
if (!tag) {
PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
return NULL;
}
switch (tag->type) {
case PYGRPC_INITIAL_METADATA:
if (tag) {
pygrpc_tag_destroy(tag);
}
grpc_event_finish(c_event);
return pygrpc_completion_queue_get(self, args);
case PYGRPC_WRITE_ACCEPTED:
event_args = pygrpc_write_event_args(c_event);
break;
case PYGRPC_FINISH_ACCEPTED:
event_args = pygrpc_complete_event_args(c_event);
break;
case PYGRPC_SERVER_RPC_NEW:
event_args = pygrpc_service_event_args(c_event);
break;
case PYGRPC_READ:
event_args = pygrpc_read_event_args(c_event);
break;
case PYGRPC_CLIENT_METADATA_READ:
event_args = pygrpc_metadata_event_args(c_event);
break;
case PYGRPC_FINISHED_CLIENT:
event_args = pygrpc_finished_client_event_args(c_event);
break;
case PYGRPC_FINISHED_SERVER:
event_args = pygrpc_finished_server_event_args(c_event);
break;
default:
PyErr_SetString(PyExc_Exception, "Unrecognized op event type!");
return NULL;
}
break;
}
default:
PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
return NULL;
@ -382,7 +437,9 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
event = PyObject_CallObject(event_class, event_args);
Py_DECREF(event_args);
Py_XDECREF((PyObject *)c_event->tag);
if (tag) {
pygrpc_tag_destroy(tag);
}
grpc_event_finish(c_event);
return event;

@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase):
completion_queue = _low.CompletionQueue()
channel = _low.Channel('%s:%d' % (host, port), None)
client_call = _low.Call(channel, method, host, deadline)
client_call = _low.Call(channel, completion_queue, method, host, deadline)
client_call.invoke(completion_queue, metadata_tag, finish_tag)
first_event = completion_queue.get(after_deadline)
@ -138,7 +138,8 @@ class EchoTest(unittest.TestCase):
server_data = []
client_data = []
client_call = _low.Call(self.channel, method, self.host, deadline)
client_call = _low.Call(self.channel, self.client_completion_queue,
method, self.host, deadline)
client_call.add_metadata(client_metadata_key, client_metadata_value)
client_call.add_metadata(client_binary_metadata_key,
client_binary_metadata_value)
@ -335,7 +336,8 @@ class CancellationTest(unittest.TestCase):
server_data = []
client_data = []
client_call = _low.Call(self.channel, method, self.host, deadline)
client_call = _low.Call(self.channel, self.client_completion_queue,
method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)

@ -36,12 +36,14 @@
#include <Python.h>
#include <grpc/grpc.h>
#include "grpc/_adapter/_call.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
#include "grpc/_adapter/_server_credentials.h"
#include "grpc/_adapter/_tag.h"
static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
const PyObject *completion_queue;
CompletionQueue *completion_queue;
static char *kwlist[] = {"completion_queue", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist,
@ -50,7 +52,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
return -1;
}
self->c_server = grpc_server_create(
((CompletionQueue *)completion_queue)->c_completion_queue, NULL);
completion_queue->c_completion_queue, NULL);
self->completion_queue = completion_queue;
Py_INCREF(completion_queue);
return 0;
}
@ -58,6 +62,7 @@ static void pygrpc_server_dealloc(Server *self) {
if (self->c_server != NULL) {
grpc_server_destroy(self->c_server);
}
Py_XDECREF(self->completion_queue);
self->ob_type->tp_free((PyObject *)self);
}
@ -109,8 +114,15 @@ static PyObject *pygrpc_server_start(Server *self) {
static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
call_error = grpc_server_request_call_old(self->c_server, (void *)tag);
pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag);
c_tag->call->completion_queue = self->completion_queue;
c_tag->call->server = self;
Py_INCREF(c_tag->call->completion_queue);
Py_INCREF(c_tag->call->server);
call_error = grpc_server_request_call(
self->c_server, &c_tag->call->c_call, &c_tag->call->call_details,
&c_tag->call->recv_metadata, self->completion_queue->c_completion_queue,
c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {

@ -37,8 +37,12 @@
#include <Python.h>
#include <grpc/grpc.h>
#include "grpc/_adapter/_completion_queue.h"
typedef struct {
PyObject_HEAD
CompletionQueue *completion_queue;
grpc_server *c_server;
} Server;

@ -0,0 +1,65 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "grpc/_adapter/_tag.h"
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
Call *call) {
pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag));
memset(self, 0, sizeof(pygrpc_tag));
if (user_tag == NULL) {
self->user_tag = Py_None;
} else {
self->user_tag = user_tag;
}
Py_INCREF(self->user_tag);
self->type = type;
self->call = call;
Py_INCREF(call);
return self;
}
pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) {
return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag,
(Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0));
}
void pygrpc_tag_destroy(pygrpc_tag *self) {
Py_XDECREF(self->user_tag);
Py_XDECREF(self->call);
gpr_free(self);
}

@ -0,0 +1,70 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef _ADAPTER__TAG_H_
#define _ADAPTER__TAG_H_
#include <Python.h>
#include <grpc/grpc.h>
#include "grpc/_adapter/_call.h"
#include "grpc/_adapter/_completion_queue.h"
/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial
replacement for its descriptive functionality until Python can move its whole
C and C adapter stack to more closely resemble the core batching API. */
typedef enum {
PYGRPC_SERVER_RPC_NEW = 0,
PYGRPC_INITIAL_METADATA = 1,
PYGRPC_READ = 2,
PYGRPC_WRITE_ACCEPTED = 3,
PYGRPC_FINISH_ACCEPTED = 4,
PYGRPC_CLIENT_METADATA_READ = 5,
PYGRPC_FINISHED_CLIENT = 6,
PYGRPC_FINISHED_SERVER = 7,
} pygrpc_tag_type;
typedef struct {
pygrpc_tag_type type;
PyObject *user_tag;
Call *call;
} pygrpc_tag;
pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
Call *call);
pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag);
void pygrpc_tag_destroy(pygrpc_tag *self);
#endif /* _ADAPTER__TAG_H_ */

@ -246,7 +246,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
timeout: A duration of time in seconds to allow for the RPC.
"""
request_serializer = self._request_serializers[name]
call = _low.Call(self._channel, name, self._host, time.time() + timeout)
call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout)
if self._metadata_transformer is not None:
metadata = self._metadata_transformer([])
for metadata_key, metadata_value in metadata:

@ -42,6 +42,7 @@ _EXTENSION_SOURCES = (
'grpc/_adapter/_server.c',
'grpc/_adapter/_client_credentials.c',
'grpc/_adapter/_server_credentials.c',
'grpc/_adapter/_tag.c'
)
_EXTENSION_INCLUDE_DIRECTORIES = (

Loading…
Cancel
Save