Expose new core functionality to Python

pull/2878/head
Masood Malekghassemi 9 years ago
parent b5ea2f89fd
commit be55ca770d
  1. 9
      src/python/grpcio/grpc/_adapter/_c/types.h
  2. 8
      src/python/grpcio/grpc/_adapter/_c/types/call.c
  3. 54
      src/python/grpcio/grpc/_adapter/_c/types/channel.c
  4. 21
      src/python/grpcio/grpc/_adapter/_c/utility.c
  5. 2
      src/python/grpcio/grpc/_adapter/_intermediary_low.py
  6. 14
      src/python/grpcio/grpc/_adapter/_low.py
  7. 95
      src/python/grpcio/grpc/_adapter/_types.py
  8. 13
      src/python/grpcio_test/grpc_test/_adapter/_low_test.py

@ -113,6 +113,7 @@ Call *pygrpc_Call_new_empty(CompletionQueue *cq);
void pygrpc_Call_dealloc(Call *self);
PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Call_peer(Call *self);
extern PyTypeObject pygrpc_Call_type;
@ -129,6 +130,11 @@ Channel *pygrpc_Channel_new(
void pygrpc_Channel_dealloc(Channel *self);
Call *pygrpc_Channel_create_call(
Channel *self, PyObject *args, PyObject *kwargs);
PyObject *pygrpc_Channel_check_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_watch_connectivity_state(Channel *self, PyObject *args,
PyObject *kwargs);
PyObject *pygrpc_Channel_target(Channel *self);
extern PyTypeObject pygrpc_Channel_type;
@ -181,6 +187,9 @@ pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call);
/* Construct a tag associated with a server shutdown. */
pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag);
/* Construct a tag associated with a channel state change. */
pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag);
/* Frees all resources owned by the tag and the tag itself. */
void pygrpc_discard_tag(pygrpc_tag *tag);

@ -42,6 +42,7 @@
PyMethodDef pygrpc_Call_methods[] = {
{"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
{"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
{"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
@ -161,3 +162,10 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
}
return PyInt_FromLong(errcode);
}
PyObject *pygrpc_Call_peer(Call *self) {
char *peer = grpc_call_get_peer(self->c_call);
PyObject *py_peer = PyString_FromString(peer);
gpr_free(peer);
return py_peer;
}

@ -36,10 +36,14 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
PyMethodDef pygrpc_Channel_methods[] = {
{"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""},
{"check_connectivity_state", (PyCFunction)pygrpc_Channel_check_connectivity_state, METH_KEYWORDS, ""},
{"watch_connectivity_state", (PyCFunction)pygrpc_Channel_watch_connectivity_state, METH_KEYWORDS, ""},
{"target", (PyCFunction)pygrpc_Channel_target, METH_NOARGS, ""},
{NULL}
};
const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel.";
@ -122,7 +126,7 @@ Call *pygrpc_Channel_create_call(
const char *host;
double deadline;
char *keywords[] = {"cq", "method", "host", "deadline", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!szd:create_call", keywords,
&pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) {
return NULL;
}
@ -132,3 +136,51 @@ Call *pygrpc_Channel_create_call(
pygrpc_cast_double_to_gpr_timespec(deadline));
return call;
}
PyObject *pygrpc_Channel_check_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *py_try_to_connect;
int try_to_connect;
char *keywords[] = {"try_to_connect", NULL};
grpc_connectivity_state state;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:connectivity_state", keywords,
&py_try_to_connect)) {
return NULL;
}
if (!PyBool_Check(py_try_to_connect)) {
Py_XDECREF(py_try_to_connect);
return NULL;
}
try_to_connect = Py_True == py_try_to_connect;
Py_DECREF(py_try_to_connect);
state = grpc_channel_check_connectivity_state(self->c_chan, try_to_connect);
return PyInt_FromLong(state);
}
PyObject *pygrpc_Channel_watch_connectivity_state(
Channel *self, PyObject *args, PyObject *kwargs) {
PyObject *tag;
double deadline;
int last_observed_state;
CompletionQueue *completion_queue;
char *keywords[] = {"last_observed_state", "deadline",
"completion_queue", "tag"};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "idO!O:watch_connectivity_state", keywords,
&last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
&completion_queue, &tag)) {
return NULL;
}
grpc_channel_watch_connectivity_state(
self->c_chan, (grpc_connectivity_state)last_observed_state,
pygrpc_cast_double_to_gpr_timespec(deadline), completion_queue->c_cq,
pygrpc_produce_channel_state_change_tag(tag));
Py_RETURN_NONE;
}
PyObject *pygrpc_Channel_target(Channel *self) {
char *target = grpc_channel_get_target(self->c_chan);
PyObject *py_target = PyString_FromString(target);
gpr_free(target);
return py_target;
}

@ -88,6 +88,19 @@ pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) {
return tag;
}
pygrpc_tag *pygrpc_produce_channel_state_change_tag(PyObject *user_tag) {
pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
tag->user_tag = user_tag;
Py_XINCREF(tag->user_tag);
tag->call = NULL;
tag->ops = NULL;
tag->nops = 0;
grpc_call_details_init(&tag->request_call_details);
grpc_metadata_array_init(&tag->request_metadata);
tag->is_new_call = 0;
return tag;
}
void pygrpc_discard_tag(pygrpc_tag *tag) {
if (!tag) {
return;
@ -139,7 +152,7 @@ PyObject *pygrpc_consume_event(grpc_event event) {
}
int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int OP_TUPLE_SIZE = 5;
static const int OP_TUPLE_SIZE = 6;
static const int STATUS_TUPLE_SIZE = 2;
static const int TYPE_INDEX = 0;
static const int INITIAL_METADATA_INDEX = 1;
@ -148,6 +161,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
static const int STATUS_INDEX = 4;
static const int STATUS_CODE_INDEX = 0;
static const int STATUS_DETAILS_INDEX = 1;
static const int WRITE_FLAGS_INDEX = 5;
int type;
Py_ssize_t message_size;
char *message;
@ -170,7 +184,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
return 0;
}
c_op.op = type;
c_op.flags = 0;
c_op.flags = PyInt_AsLong(PyTuple_GET_ITEM(op, WRITE_FLAGS_INDEX));
if (PyErr_Occurred()) {
return 0;
}
switch (type) {
case GRPC_OP_SEND_INITIAL_METADATA:
if (!pygrpc_cast_pyseq_to_send_metadata(

@ -127,7 +127,7 @@ class Call(object):
def write(self, message, tag):
return self._internal.start_batch([
_types.OpArgs.send_message(message)
_types.OpArgs.send_message(message, 0)
], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
def complete(self, tag):

@ -72,6 +72,9 @@ class Call(_types.Call):
else:
return self.call.cancel(code, details)
def peer(self):
return self.call.peer()
class Channel(_types.Channel):
@ -84,6 +87,17 @@ class Channel(_types.Channel):
def create_call(self, completion_queue, method, host, deadline=None):
return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline))
def check_connectivity_state(self, try_to_connect):
return self.channel.check_connectivity_state(try_to_connect)
def watch_connectivity_state(self, last_observed_state, deadline,
completion_queue, tag):
self.channel.watch_connectivity_state(
last_observed_state, deadline, completion_queue.completion_queue, tag)
def target(self):
return self.channel.target()
_NO_TAG = object()

@ -31,13 +31,12 @@ import abc
import collections
import enum
# TODO(atash): decide whether or not to move these enums to the _c module to
# force build errors with upstream changes.
class GrpcChannelArgumentKeys(enum.Enum):
"""Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'
@enum.unique
class CallError(enum.IntEnum):
"""Mirrors grpc_call_error in the C core."""
@ -53,6 +52,7 @@ class CallError(enum.IntEnum):
ERROR_INVALID_FLAGS = 9
ERROR_INVALID_METADATA = 10
@enum.unique
class StatusCode(enum.IntEnum):
"""Mirrors grpc_status_code in the C core."""
@ -74,6 +74,14 @@ class StatusCode(enum.IntEnum):
DATA_LOSS = 15
UNAUTHENTICATED = 16
@enum.unique
class OpWriteFlags(enum.IntEnum):
"""Mirrors defined write-flag constants in the C core."""
WRITE_BUFFER_HINT = 1
WRITE_NO_COMPRESS = 2
@enum.unique
class OpType(enum.IntEnum):
"""Mirrors grpc_op_type in the C core."""
@ -86,12 +94,24 @@ class OpType(enum.IntEnum):
RECV_STATUS_ON_CLIENT = 6
RECV_CLOSE_ON_SERVER = 7
@enum.unique
class EventType(enum.IntEnum):
"""Mirrors grpc_completion_type in the C core."""
QUEUE_SHUTDOWN = 0
QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
OP_COMPLETE = 2
QUEUE_SHUTDOWN = 0
QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
OP_COMPLETE = 2
@enum.unique
class ConnectivityState(enum.IntEnum):
"""Mirrors grpc_connectivity_state in the C core."""
IDLE = 0
CONNECTING = 1
READY = 2
TRANSIENT_FAILURE = 3
FATAL_FAILURE = 4
class Status(collections.namedtuple(
'Status', [
@ -105,6 +125,7 @@ class Status(collections.namedtuple(
details (str): ...
"""
class CallDetails(collections.namedtuple(
'CallDetails', [
'method',
@ -119,6 +140,7 @@ class CallDetails(collections.namedtuple(
deadline (float): ...
"""
class OpArgs(collections.namedtuple(
'OpArgs', [
'type',
@ -126,6 +148,7 @@ class OpArgs(collections.namedtuple(
'trailing_metadata',
'message',
'status',
'write_flags',
])):
"""Arguments passed into a GRPC operation.
@ -138,39 +161,40 @@ class OpArgs(collections.namedtuple(
message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
is None.
write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values.
"""
@staticmethod
def send_initial_metadata(initial_metadata):
return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None)
return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0)
@staticmethod
def send_message(message):
return OpArgs(OpType.SEND_MESSAGE, None, None, message, None)
def send_message(message, flags):
return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags)
@staticmethod
def send_close_from_client():
return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None)
return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0)
@staticmethod
def send_status_from_server(trailing_metadata, status_code, status_details):
return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details))
return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0)
@staticmethod
def recv_initial_metadata():
return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None);
return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0);
@staticmethod
def recv_message():
return OpArgs(OpType.RECV_MESSAGE, None, None, None, None)
return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0)
@staticmethod
def recv_status_on_client():
return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None)
return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0)
@staticmethod
def recv_close_on_server():
return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None)
return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0)
class OpResult(collections.namedtuple(
@ -290,6 +314,15 @@ class Call:
"""
return CallError.ERROR
@abc.abstractmethod
def peer(self):
"""Get the peer of this call.
Returns:
str: the peer of this call.
"""
return None
class Channel:
__metaclass__ = abc.ABCMeta
@ -321,6 +354,40 @@ class Channel:
"""
return None
@abc.abstractmethod
def check_connectivity_state(self, try_to_connect):
"""Check and optionally repair the connectivity state of the channel.
Args:
try_to_connect (bool): whether or not to try to connect the channel if
disconnected.
Returns:
ConnectivityState: state of the channel at the time of this invocation.
"""
return None
@abc.abstractmethod
def watch_connectivity_state(self, last_observed_state, deadline,
completion_queue, tag):
"""Watch for connectivity state changes from the last_observed_state.
Args:
last_observed_state (ConnectivityState): ...
deadline (float): ...
completion_queue (CompletionQueue): ...
tag (object) ...
"""
@abc.abstractmethod
def target(self):
"""Get the target of this channel.
Returns:
str: the target of this channel.
"""
return None
class Server:
__metaclass__ = abc.ABCMeta

@ -115,7 +115,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
client_start_batch_result = client_call.start_batch([
_types.OpArgs.send_initial_metadata(client_initial_metadata),
_types.OpArgs.send_message(REQUEST),
_types.OpArgs.send_message(REQUEST, 0),
_types.OpArgs.send_close_from_client(),
_types.OpArgs.recv_initial_metadata(),
_types.OpArgs.recv_message(),
@ -137,6 +137,15 @@ class InsecureServerInsecureClient(unittest.TestCase):
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
# Check that the channel is connected, and that both it and the call have
# the proper target and peer; do this after the first flurry of messages to
# avoid the possibility that connection was delayed by the core until the
# first message was sent.
self.assertEqual(_types.ConnectivityState.READY,
self.client_channel.check_connectivity_state(False))
self.assertIsNotNone(self.client_channel.target())
self.assertIsNotNone(client_call.peer())
server_call_tag = object()
server_call = request_event.call
server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
@ -144,7 +153,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
server_start_batch_result = server_call.start_batch([
_types.OpArgs.send_initial_metadata(server_initial_metadata),
_types.OpArgs.recv_message(),
_types.OpArgs.send_message(RESPONSE),
_types.OpArgs.send_message(RESPONSE, 0),
_types.OpArgs.recv_close_on_server(),
_types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
], server_call_tag)

Loading…
Cancel
Save