Fix Python memory errors

... but for real this time.
reviewable/pr8842/r16
Masood Malekghassemi 8 years ago
parent 5a96770fe3
commit c4d10dfb1b
  1. 19
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  2. 10
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  3. 24
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  4. 144
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi

@ -60,24 +60,25 @@ cdef class Channel:
method, host, Timespec deadline not None):
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
cdef Slice method_slice = Slice.from_bytes(method)
cdef Slice host_slice
cdef grpc_slice *host_c_slice = NULL
cdef grpc_slice method_slice = _slice_from_bytes(method)
cdef grpc_slice host_slice
cdef grpc_slice *host_slice_ptr = NULL
if host is not None:
host_slice = Slice.from_bytes(host)
host_c_slice = &host_slice.c_slice
else:
host_slice = Slice()
host_slice = _slice_from_bytes(host)
host_slice_ptr = &host_slice
cdef Call operation_call = Call()
operation_call.references = [self, method_slice, host_slice, queue]
operation_call.references = [self, queue]
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
with nogil:
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
queue.c_completion_queue, method_slice.c_slice, host_c_slice,
queue.c_completion_queue, method_slice, host_slice_ptr,
deadline.c_time, NULL)
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
return operation_call
def check_connectivity_state(self, bint try_to_connect):

@ -51,6 +51,7 @@ cdef class CompletionQueue:
cdef CallDetails request_call_details = None
cdef Metadata request_metadata = None
cdef Operations batch_operations = None
cdef Operation batch_operation = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@ -69,10 +70,15 @@ cdef class CompletionQueue:
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
request_metadata = tag.request_metadata
if request_metadata is not None:
if tag.request_metadata is not None:
request_metadata = tag.request_metadata
request_metadata._claim_slice_ownership()
batch_operations = tag.batch_operations
if tag.batch_operations is not None:
for op in batch_operations.operations:
batch_operation = <Operation>op
if batch_operation._received_metadata is not None:
batch_operation._received_metadata._claim_slice_ownership()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call

@ -28,6 +28,11 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cdef bytes _slice_bytes(grpc_slice slice)
cdef grpc_slice _copy_slice(grpc_slice slice) nogil
cdef grpc_slice _slice_from_bytes(bytes value) nogil
cdef class Timespec:
cdef gpr_timespec c_time
@ -70,17 +75,6 @@ cdef class Event:
cdef readonly Operations batch_operations
cdef class Slice:
cdef grpc_slice c_slice
cdef void _assign_slice(self, grpc_slice new_slice) nogil
@staticmethod
cdef Slice from_slice(grpc_slice slice)
@staticmethod
cdef bytes bytes_from_slice(grpc_slice slice)
cdef class ByteBuffer:
cdef grpc_byte_buffer *c_byte_buffer
@ -108,17 +102,13 @@ cdef class ChannelArgs:
cdef class Metadatum:
cdef grpc_metadata c_metadata
cdef Slice _key,
cdef Slice _value
cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
cdef class Metadata:
cdef grpc_metadata_array c_metadata_array
cdef bint owns_metadata_slices
cdef object metadata
cdef void _claim_slice_ownership(self)
cdef void _drop_slice_ownership(self)
cdef class Operation:
@ -127,7 +117,7 @@ cdef class Operation:
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef grpc_status_code _received_status_code
cdef Slice _received_status_details
cdef grpc_slice _status_details
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references

@ -29,6 +29,26 @@
from libc.stdint cimport intptr_t
cdef bytes _slice_bytes(grpc_slice slice):
cdef void *start = grpc_slice_start_ptr(slice)
cdef size_t length = grpc_slice_length(slice)
return (<const char *>start)[:length]
cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
cdef void *start = grpc_slice_start_ptr(slice)
cdef size_t length = grpc_slice_length(slice)
return grpc_slice_from_copied_buffer(<const char *>start, length)
cdef grpc_slice _slice_from_bytes(bytes value) nogil:
cdef const char *value_ptr
cdef size_t length
with gil:
value_ptr = <const char *>value
length = len(value)
return grpc_slice_from_copied_buffer(value_ptr, length)
class ConnectivityState:
idle = GRPC_CHANNEL_IDLE
connecting = GRPC_CHANNEL_CONNECTING
@ -189,11 +209,11 @@ cdef class CallDetails:
@property
def method(self):
return Slice.bytes_from_slice(self.c_details.method)
return _slice_bytes(self.c_details.method)
@property
def host(self):
return Slice.bytes_from_slice(self.c_details.host)
return _slice_bytes(self.c_details.host)
@property
def deadline(self):
@ -227,46 +247,6 @@ cdef class Event:
self.is_new_request = is_new_request
cdef class Slice:
def __cinit__(self):
with nogil:
grpc_init()
self.c_slice = grpc_empty_slice()
cdef void _assign_slice(self, grpc_slice new_slice) nogil:
grpc_slice_unref(self.c_slice)
self.c_slice = new_slice
@staticmethod
def from_bytes(bytes data):
cdef Slice self = Slice()
self._assign_slice(grpc_slice_from_copied_buffer(data, len(data)))
return self
@staticmethod
cdef Slice from_slice(grpc_slice slice):
cdef Slice self = Slice()
grpc_slice_ref(slice)
self._assign_slice(slice)
return self
@staticmethod
cdef bytes bytes_from_slice(grpc_slice slice):
with nogil:
pointer = grpc_slice_start_ptr(slice)
length = grpc_slice_length(slice)
return (<char *>pointer)[:length]
def bytes(self):
return Slice.bytes_from_slice(self.c_slice)
def __dealloc__(self):
with nogil:
grpc_slice_unref(self.c_slice)
grpc_shutdown()
cdef class ByteBuffer:
def __cinit__(self, bytes data):
@ -416,20 +396,21 @@ cdef class ChannelArgs:
cdef class Metadatum:
# TODO(atash) this should just accept Slice objects.
def __cinit__(self, bytes key, bytes value):
self._key = Slice.from_bytes(key)
self._value = Slice.from_bytes(value)
self.c_metadata.key = self._key.c_slice
self.c_metadata.value = self._value.c_slice
self.c_metadata.key = _slice_from_bytes(key)
self.c_metadata.value = _slice_from_bytes(value)
cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
destination[0].key = _copy_slice(self.c_metadata.key)
destination[0].value = _copy_slice(self.c_metadata.value)
@property
def key(self):
return self._key.bytes()
return _slice_bytes(self.c_metadata.key)
@property
def value(self):
return self._value.bytes()
return _slice_bytes(self.c_metadata.value)
def __len__(self):
return 2
@ -445,6 +426,9 @@ cdef class Metadatum:
def __iter__(self):
return iter((self.key, self.value))
def __dealloc__(self):
grpc_slice_unref(self.c_metadata.key)
grpc_slice_unref(self.c_metadata.value)
cdef class _MetadataIterator:
@ -466,34 +450,27 @@ cdef class _MetadataIterator:
else:
raise StopIteration
cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
cdef void *start = grpc_slice_start_ptr(slice)
cdef size_t length = grpc_slice_length(slice)
return grpc_slice_from_copied_buffer(<const char *>start, length)
cdef class Metadata:
def __cinit__(self, metadata):
def __cinit__(self, metadata_iterable):
with nogil:
grpc_init()
grpc_metadata_array_init(&self.c_metadata_array)
self.owns_metadata_slices = False
self.metadata = list(metadata)
for metadatum in self.metadata:
metadata = list(metadata_iterable)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
self.c_metadata_array.count = len(self.metadata)
self.c_metadata_array.capacity = len(self.metadata)
self.c_metadata_array.count = len(metadata)
self.c_metadata_array.capacity = len(metadata)
with nogil:
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i] = (
(<Metadatum>self.metadata[i]).c_metadata)
(<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
def __dealloc__(self):
self._drop_slice_ownership()
with nogil:
# this frees the allocated memory for the grpc_metadata_array (although
# it'd be nice if that were documented somewhere...)
@ -507,30 +484,26 @@ cdef class Metadata:
def __getitem__(self, size_t i):
if i >= self.c_metadata_array.count:
raise IndexError
return Metadatum(
key=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].key),
value=Slice.bytes_from_slice(self.c_metadata_array.metadata[i].value))
key = _slice_bytes(self.c_metadata_array.metadata[i].key)
value = _slice_bytes(self.c_metadata_array.metadata[i].value)
return Metadatum(key=key, value=value)
def __iter__(self):
return _MetadataIterator(self)
cdef void _claim_slice_ownership(self):
if self.owns_metadata_slices:
return
cdef grpc_metadata_array new_c_metadata_array
grpc_metadata_array_init(&new_c_metadata_array)
new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata))
new_c_metadata_array.count = self.c_metadata_array.count
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i].key = _copy_slice(
new_c_metadata_array.metadata[i].key = _copy_slice(
self.c_metadata_array.metadata[i].key)
self.c_metadata_array.metadata[i].value = _copy_slice(
new_c_metadata_array.metadata[i].value = _copy_slice(
self.c_metadata_array.metadata[i].value)
self.owns_metadata_slices = True
cdef void _drop_slice_ownership(self):
if not self.owns_metadata_slices:
return
for i in range(self.c_metadata_array.count):
grpc_slice_unref(self.c_metadata_array.metadata[i].key)
grpc_slice_unref(self.c_metadata_array.metadata[i].value)
self.owns_metadata_slices = False
grpc_metadata_array_destroy(&self.c_metadata_array)
self.c_metadata_array = new_c_metadata_array
cdef class Operation:
@ -538,7 +511,7 @@ cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
self._received_status_details = Slice()
self._status_details = grpc_empty_slice()
self.is_valid = False
@property
@ -595,13 +568,13 @@ cdef class Operation:
def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
return self._received_status_details.bytes()
return _slice_bytes(self._status_details)
@property
def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
return self._received_status_details.bytes()
return _slice_bytes(self._status_details)
@property
def received_cancelled(self):
@ -617,6 +590,7 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
grpc_slice_unref(self._status_details)
grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags):
@ -657,10 +631,10 @@ def operation_send_status_from_server(
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code
cdef Slice details_slice = Slice.from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &details_slice.c_slice
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &op._status_details
op.references.append(metadata)
op.references.append(details_slice)
op.is_valid = True
return op
@ -696,7 +670,7 @@ def operation_receive_status_on_client(int flags):
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
&op._received_status_details.c_slice)
&op._status_details)
op.is_valid = True
return op

Loading…
Cancel
Save