Fix metadata memory leak

The gRPC Core has two styles for passing metadata: as an integer count
along with a grpc_metadata* pointer, which is used for passing metadata
into the core, and as a grpc_metadata_array, which is used for passing
metadata out of the core. The Cython layer of gRPC Python was using a
single data structure wrapping grpc_metadata_array for both purposes,
but this was complex because the core manages the slices contained in
grpc_metadata_array objects (at least those of which it is aware), so
the Cython layer had to keep track of whether or not the core was aware
of the slices it was using (and it was also defective, leaking slices).

This is solved by realigning with the Cython layer’s intended design of
mirroring as closely as possible in Python the gRPC Core API: we use
one structure for passing metadata into the core (what is now called
cygrpc.Metadata) and second, different structure for receiving metadata
out of the core (what was called cygrpc.Metadata but is now
cygrpc.MetadataArray, reflecting that it wraps the core’s
grpc_metadata_array).

All bug fixes should contain added tests preventing regression but this
doesn't because I don't know at this time how to write a does-not-leak
test for Python that fits well into our existing body of tests. Phooey.

Thanks to Dominik Janků (djanku@email.cz) for investigation and an
earlier draft of a solution.
pull/12466/head
Nathaniel Manista 8 years ago
parent 8380090f95
commit eda5a4db1e
  1. 11
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  2. 9
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  3. 1
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  4. 13
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  5. 118
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  6. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi

@ -41,9 +41,8 @@ cdef class CompletionQueue:
cdef object user_tag = None
cdef Call operation_call = None
cdef CallDetails request_call_details = None
cdef Metadata request_metadata = None
cdef object request_metadata = None
cdef Operations batch_operations = None
cdef Operation batch_operation = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@ -63,14 +62,8 @@ cdef class CompletionQueue:
operation_call = tag.operation_call
request_call_details = tag.request_call_details
if tag.request_metadata is not None:
request_metadata = tag.request_metadata
request_metadata._claim_slice_ownership()
request_metadata = tuple(tag.request_metadata)
batch_operations = tag.batch_operations
if tag.batch_operations is not None:
for op in batch_operations.operations:
batch_operation = <Operation>op
if batch_operation._received_metadata is not None:
batch_operation._received_metadata._claim_slice_ownership()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call

@ -76,7 +76,7 @@ cdef class CredentialsMetadataPlugin:
"""
Args:
plugin_callback (callable): Callback accepting a service URL (str/bytes)
and callback object (accepting a Metadata,
and callback object (accepting a MetadataArray,
grpc_status_code, and a str/bytes error message). This argument
when called should be non-blocking and eventually call the callback
object with the appropriate status code/details and metadata (if
@ -129,8 +129,7 @@ cdef void plugin_get_metadata(
def python_callback(
Metadata metadata, grpc_status_code status,
bytes error_details):
cb(user_data, metadata.c_metadata_array.metadata,
metadata.c_metadata_array.count, status, error_details)
cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
called_flag[0] = True
cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
cdef AuthMetadataContext cy_context = AuthMetadataContext()
@ -139,8 +138,8 @@ cdef void plugin_get_metadata(
self.plugin_callback(cy_context, python_callback)
except Exception as error:
if not called_flag[0]:
cb(user_data, Metadata([]).c_metadata_array.metadata,
0, StatusCode.unknown, traceback.format_exc().encode())
cb(user_data, NULL, 0, StatusCode.unknown,
traceback.format_exc().encode())
cdef void plugin_destroy_c_plugin_state(void *state) with gil:
cpython.Py_DECREF(<CredentialsMetadataPlugin>state)

@ -59,6 +59,7 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_malloc(size_t length) nogil
grpc_slice grpc_slice_from_copied_string(const char *source) nogil
grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil
grpc_slice grpc_slice_copy(grpc_slice s) nogil
# Declare functions for function-like macros (because Cython)...
void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil

@ -37,7 +37,7 @@ cdef class OperationTag:
cdef Server shutting_down_server
cdef Call operation_call
cdef CallDetails request_call_details
cdef Metadata request_metadata
cdef MetadataArray request_metadata
cdef Operations batch_operations
cdef bint is_new_request
@ -51,7 +51,7 @@ cdef class Event:
# For Server.request_call
cdef readonly bint is_new_request
cdef readonly CallDetails request_call_details
cdef readonly Metadata request_metadata
cdef readonly object request_metadata
# For server calls
cdef readonly Call operation_call
@ -92,15 +92,20 @@ cdef class Metadatum:
cdef class Metadata:
cdef grpc_metadata *c_metadata
cdef readonly size_t c_count
cdef class MetadataArray:
cdef grpc_metadata_array c_metadata_array
cdef void _claim_slice_ownership(self)
cdef class Operation:
cdef grpc_op c_op
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef MetadataArray _received_metadata
cdef grpc_status_code _received_status_code
cdef grpc_slice _status_details
cdef int _received_cancelled

@ -238,7 +238,7 @@ cdef class Event:
def __cinit__(self, grpc_completion_type type, bint success,
object tag, Call operation_call,
CallDetails request_call_details,
Metadata request_metadata,
object request_metadata,
bint is_new_request,
Operations batch_operations):
self.type = type
@ -437,48 +437,79 @@ cdef class Metadatum:
cdef class _MetadataIterator:
cdef size_t i
cdef Metadata metadata
cdef size_t _length
cdef object _metadatum_indexable
def __cinit__(self, Metadata metadata not None):
def __cinit__(self, length, metadatum_indexable):
self._length = length
self._metadatum_indexable = metadatum_indexable
self.i = 0
self.metadata = metadata
def __iter__(self):
return self
def __next__(self):
if self.i < len(self.metadata):
result = self.metadata[self.i]
if self.i < self._length:
result = self._metadatum_indexable[self.i]
self.i = self.i + 1
return result
else:
raise StopIteration
# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an
# ordinary sequence of pairs of bytestrings all the way down to the
# grpc_call_start_batch call.
cdef class Metadata:
"""Metadata being passed from application to core."""
def __cinit__(self, metadata_iterable):
metadata_sequence = tuple(metadata_iterable)
cdef size_t count = len(metadata_sequence)
with nogil:
grpc_init()
grpc_metadata_array_init(&self.c_metadata_array)
metadata = list(metadata_iterable)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
self.c_metadata_array.count = len(metadata)
self.c_metadata_array.capacity = len(metadata)
self.c_metadata = <grpc_metadata *>gpr_malloc(
count * sizeof(grpc_metadata))
self.c_count = count
for index, metadatum in enumerate(metadata_sequence):
self.c_metadata[index].key = grpc_slice_copy(
(<Metadatum>metadatum).c_metadata.key)
self.c_metadata[index].value = grpc_slice_copy(
(<Metadatum>metadatum).c_metadata.value)
def __dealloc__(self):
with nogil:
for index in range(self.c_count):
grpc_slice_unref(self.c_metadata[index].key)
grpc_slice_unref(self.c_metadata[index].value)
gpr_free(self.c_metadata)
grpc_shutdown()
def __len__(self):
return self.c_count
def __getitem__(self, size_t index):
if index < self.c_count:
key = _slice_bytes(self.c_metadata[index].key)
value = _slice_bytes(self.c_metadata[index].value)
return Metadatum(key, value)
else:
raise IndexError()
def __iter__(self):
return _MetadataIterator(self.c_count, self)
cdef class MetadataArray:
"""Metadata being passed from core to application."""
def __cinit__(self):
with nogil:
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
for i in range(self.c_metadata_array.count):
(<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
grpc_init()
grpc_metadata_array_init(&self.c_metadata_array)
def __dealloc__(self):
with nogil:
# this frees the allocated memory for the grpc_metadata_array (although
# it'd be nice if that were documented somewhere...)
# TODO(atash): document this in the C core
grpc_metadata_array_destroy(&self.c_metadata_array)
grpc_shutdown()
@ -493,21 +524,7 @@ cdef class Metadata:
return Metadatum(key=key, value=value)
def __iter__(self):
return _MetadataIterator(self)
cdef void _claim_slice_ownership(self):
cdef grpc_metadata_array new_c_metadata_array
grpc_metadata_array_init(&new_c_metadata_array)
new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata))
new_c_metadata_array.count = self.c_metadata_array.count
for i in range(self.c_metadata_array.count):
new_c_metadata_array.metadata[i].key = _copy_slice(
self.c_metadata_array.metadata[i].key)
new_c_metadata_array.metadata[i].value = _copy_slice(
self.c_metadata_array.metadata[i].value)
grpc_metadata_array_destroy(&self.c_metadata_array)
self.c_metadata_array = new_c_metadata_array
return _MetadataIterator(self.c_metadata_array.count, self)
cdef class Operation:
@ -547,14 +564,13 @@ cdef class Operation:
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
return self._received_metadata
@property
def received_metadata_or_none(self):
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
return None
return self._received_metadata
# TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython
# objects must be legitimate for use from Python at any time" policy in
# place today, shift the policy toward "Operation objects are only usable
# while their calls are active", and move this making-a-copy-because-this-
# data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
# lowest Python layer.
return tuple(self._received_metadata)
@property
def received_status_code(self):
@ -601,9 +617,8 @@ def operation_send_initial_metadata(Metadata metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags
op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count
op.c_op.data.send_initial_metadata.metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_initial_metadata.count = metadata.c_count
op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata
op.references.append(metadata)
op.is_valid = True
return op
@ -631,9 +646,8 @@ def operation_send_status_from_server(
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags
op.c_op.data.send_status_from_server.trailing_metadata_count = (
metadata.c_metadata_array.count)
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
metadata.c_count)
op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata
op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
@ -646,7 +660,7 @@ def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
op._received_metadata = Metadata([])
op._received_metadata = MetadataArray()
op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
&op._received_metadata.c_metadata_array)
op.is_valid = True
@ -669,7 +683,7 @@ def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags
op._received_metadata = Metadata([])
op._received_metadata = MetadataArray()
op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._received_metadata.c_metadata_array)
op.c_op.data.receive_status_on_client.status = (

@ -44,7 +44,7 @@ cdef class Server:
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
operation_tag.request_metadata = Metadata([])
operation_tag.request_metadata = MetadataArray()
operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([])

Loading…
Cancel
Save