Begin patching Cython

reviewable/pr8842/r2
Masood Malekghassemi 8 years ago
parent 3153e5af0c
commit 990983110f
  1. 14
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  2. 35
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  3. 15
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  4. 94
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi

@ -60,19 +60,23 @@ cdef class Channel:
method, host, Timespec deadline not None):
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
cdef char *method_c_string = method
cdef char *host_c_string = NULL
cdef Slice method_slice = Slice.from_bytes(method)
cdef Slice host_slice
cdef grpc_slice *host_c_slice = NULL
if host is not None:
host_c_string = host
host_slice = Slice.from_bytes(host)
host_c_slice = &host_slice.c_slice
else:
host_slice = Slice()
cdef Call operation_call = Call()
operation_call.references = [self, method, host, queue]
operation_call.references = [self, method_slice, host_slice, queue]
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
with nogil:
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
queue.c_completion_queue, method_c_string, host_c_string,
queue.c_completion_queue, method_slice.c_slice, host_c_slice,
deadline.c_time, NULL)
return operation_call

@ -51,6 +51,13 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass
cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h":
struct grpc_exec_ctx:
# We don't care about the internals
pass
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice:
@ -60,6 +67,7 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_ref(grpc_slice s) nogil
void grpc_slice_unref(grpc_slice s) nogil
grpc_slice grpc_empty_slice() nogil
grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
grpc_slice grpc_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t)) nogil
@ -175,7 +183,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_arg_pointer_vtable:
void *(*copy)(void *)
void (*destroy)(void *)
void (*destroy)(grpc_exec_ctx *, void *)
int (*cmp)(void *, void *)
ctypedef struct grpc_arg_value_pointer:
@ -217,9 +225,8 @@ cdef extern from "grpc/grpc.h":
GRPC_CHANNEL_SHUTDOWN
ctypedef struct grpc_metadata:
const char *key
const char *value
size_t value_length
grpc_slice key
grpc_slice value
# ignore the 'internal_data.obfuscated' fields.
ctypedef enum grpc_completion_type:
@ -241,10 +248,8 @@ cdef extern from "grpc/grpc.h":
void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
char *method
size_t method_capacity
char *host
size_t host_capacity
grpc_slice method
grpc_slice host
gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details) nogil
@ -268,13 +273,12 @@ cdef extern from "grpc/grpc.h":
size_t trailing_metadata_count
grpc_metadata *trailing_metadata
grpc_status_code status
const char *status_details
grpc_slice *status_details
ctypedef struct grpc_op_data_recv_status_on_client:
grpc_metadata_array *trailing_metadata
grpc_status_code *status
char **status_details
size_t *status_details_capacity
grpc_slice *status_details
ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled
@ -321,9 +325,9 @@ cdef extern from "grpc/grpc.h":
const grpc_channel_args *args,
void *reserved) nogil
grpc_call *grpc_channel_create_call(
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, const char *method,
const char *host, gpr_timespec deadline, void *reserved) nogil
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
grpc_completion_queue *completion_queue, grpc_slice method,
const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
@ -473,8 +477,7 @@ cdef extern from "grpc/compression.h":
grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse(
const char *name, size_t name_length,
grpc_compression_algorithm *algorithm) nogil
grpc_slice value, grpc_compression_algorithm *algorithm) nogil
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) nogil
grpc_compression_algorithm grpc_compression_algorithm_for_level(

@ -70,6 +70,15 @@ cdef class Event:
cdef readonly Operations batch_operations
cdef class Slice:
cdef grpc_slice c_slice
cdef void _assign_slice(self, grpc_slice new_slice) nogil
@staticmethod
cdef Slice from_slice(grpc_slice slice)
cdef class ByteBuffer:
cdef grpc_byte_buffer *c_byte_buffer
@ -97,7 +106,8 @@ cdef class ChannelArgs:
cdef class Metadatum:
cdef grpc_metadata c_metadata
cdef object _key, _value
cdef Slice _key,
cdef Slice _value
cdef class Metadata:
@ -112,8 +122,7 @@ cdef class Operation:
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef grpc_status_code _received_status_code
cdef char *_received_status_details
cdef size_t _received_status_details_capacity
cdef Slice _received_status_details
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references

@ -189,17 +189,11 @@ cdef class CallDetails:
@property
def method(self):
if self.c_details.method != NULL:
return <bytes>self.c_details.method
else:
return None
return Slice.from_slice(self.c_details.method).bytes()
@property
def host(self):
if self.c_details.host != NULL:
return <bytes>self.c_details.host
else:
return None
return Slice.from_slice(self.c_details.host).bytes()
@property
def deadline(self):
@ -233,6 +227,42 @@ cdef class Event:
self.is_new_request = is_new_request
cdef class Slice:
def __cinit__(self):
with nogil:
grpc_init()
self.c_slice = grpc_empty_slice()
cdef void _assign_slice(self, grpc_slice new_slice) nogil:
grpc_slice_unref(self.c_slice)
self.c_slice = new_slice
@staticmethod
def from_bytes(bytes data):
cdef Slice self = Slice()
self._assign_slice(grpc_slice_from_copied_buffer(data, len(data)))
return self
@staticmethod
cdef Slice from_slice(grpc_slice slice):
cdef Slice self = Slice()
grpc_slice_ref(slice)
self._assign_slice(slice)
return self
def bytes(self):
with nogil:
pointer = grpc_slice_start_ptr(self.c_slice)
length = grpc_slice_length(self.c_slice)
return (<char *>pointer)[:length]
def __dealloc__(self):
with nogil:
grpc_slice_unref(self.c_slice)
grpc_shutdown()
cdef class ByteBuffer:
def __cinit__(self, bytes data):
@ -310,7 +340,7 @@ cdef void* copy_ptr(void* ptr):
return ptr
cdef void destroy_ptr(void* ptr):
cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
pass
@ -382,20 +412,20 @@ cdef class ChannelArgs:
cdef class Metadatum:
# TODO(atash) this should just accept Slice objects.
def __cinit__(self, bytes key, bytes value):
self._key = key
self._value = value
self.c_metadata.key = self._key
self.c_metadata.value = self._value
self.c_metadata.value_length = len(self._value)
self._key = Slice.from_bytes(key)
self._value = Slice.from_bytes(value)
self.c_metadata.key = self._key.c_slice
self.c_metadata.value = self._value.c_slice
@property
def key(self):
return <bytes>self.c_metadata.key
return self._key.bytes()
@property
def value(self):
return <bytes>self.c_metadata.value[:self.c_metadata.value_length]
return self._value.bytes()
def __len__(self):
return 2
@ -465,9 +495,8 @@ cdef class Metadata:
def __getitem__(self, size_t i):
return Metadatum(
key=<bytes>self.c_metadata_array.metadata[i].key,
value=<bytes>self.c_metadata_array.metadata[i].value[
:self.c_metadata_array.metadata[i].value_length])
key=Slice.from_slice(self.c_metadata_array.metadata[i].key).bytes(),
value=Slice.from_slice(self.c_metadata_array.metadata[i].value).bytes())
def __iter__(self):
return _MetadataIterator(self)
@ -478,8 +507,7 @@ cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
self._received_status_details = NULL
self._received_status_details_capacity = 0
self._received_status_details = Slice()
self.is_valid = False
@property
@ -536,19 +564,13 @@ cdef class Operation:
def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
if self._received_status_details:
return self._received_status_details
else:
return None
return self._received_status_details.bytes()
@property
def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
if self._received_status_details:
return self._received_status_details
else:
return None
return self._received_status_details.bytes()
@property
def received_cancelled(self):
@ -564,11 +586,6 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
# We *almost* don't need to do anything; most of the objects are handled by
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
gpr_free(self._received_status_details)
grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags):
@ -609,9 +626,10 @@ def operation_send_status_from_server(
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code
op.c_op.data.send_status_from_server.status_details = details
cdef Slice details_slice = Slice.from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice
op.references.append(metadata)
op.references.append(details)
op.references.append(details_slice)
op.is_valid = True
return op
@ -647,9 +665,7 @@ def operation_receive_status_on_client(int flags):
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
&op._received_status_details)
op.c_op.data.receive_status_on_client.status_details_capacity = (
&op._received_status_details_capacity)
&op._received_status_details.c_slice)
op.is_valid = True
return op

Loading…
Cancel
Save