Merge pull request #18314 from grpc/refcount-vtable

Refcount vtables
pull/18156/head^2
Richard Belleville 6 years ago committed by GitHub
commit 08ad670e55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/python/grpcio/grpc/_cython/BUILD.bazel
  2. 15
      src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi
  3. 34
      src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi
  4. 4
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  5. 2
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
  6. 18
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  7. 4
      src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi
  8. 15
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  9. 1
      src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
  10. 5
      src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
  11. 26
      src/python/grpcio/grpc/_cython/_cygrpc/vtable.pxd.pxi
  12. 39
      src/python/grpcio/grpc/_cython/_cygrpc/vtable.pyx.pxi
  13. 3
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  14. 5
      src/python/grpcio/grpc/_cython/cygrpc.pyx

@ -43,10 +43,12 @@ pyx_library(
"_cygrpc/tag.pyx.pxi",
"_cygrpc/time.pxd.pxi",
"_cygrpc/time.pyx.pxi",
"_cygrpc/vtable.pxd.pxi",
"_cygrpc/vtable.pyx.pxi",
"cygrpc.pxd",
"cygrpc.pyx",
],
deps = [
"//:grpc",
"//:grpc",
],
)

@ -13,15 +13,6 @@
# limitations under the License.
cdef void* _copy_pointer(void* pointer)
cdef void _destroy_pointer(void* pointer)
cdef int _compare_pointer(void* first_pointer, void* second_pointer)
cdef tuple _wrap_grpc_arg(grpc_arg arg)
@ -32,7 +23,7 @@ cdef class _ChannelArg:
cdef grpc_arg c_argument
cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except *
cdef void c(self, argument, _VTable vtable, references) except *
cdef class _ChannelArgs:
@ -42,8 +33,4 @@ cdef class _ChannelArgs:
cdef readonly list _references
cdef grpc_channel_args _c_arguments
cdef void _c(self, grpc_arg_pointer_vtable *vtable) except *
cdef grpc_channel_args *c_args(self) except *
@staticmethod
cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable)

@ -15,25 +15,6 @@
cimport cpython
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
cdef void* _copy_pointer(void* pointer):
return pointer
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
cdef void _destroy_pointer(void* pointer):
pass
cdef int _compare_pointer(void* first_pointer, void* second_pointer):
if first_pointer < second_pointer:
return -1
elif first_pointer > second_pointer:
return 1
else:
return 0
cdef class _GrpcArgWrapper:
cdef grpc_arg arg
@ -52,7 +33,7 @@ cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg):
cdef class _ChannelArg:
cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except *:
cdef void c(self, argument, _VTable vtable, references) except *:
key, value = argument
cdef bytes encoded_key = _encode(key)
if encoded_key is not key:
@ -75,7 +56,7 @@ cdef class _ChannelArg:
# lifecycle of the pointer is fixed to the lifecycle of the
# python object wrapping it.
self.c_argument.type = GRPC_ARG_POINTER
self.c_argument.value.pointer.vtable = vtable
self.c_argument.value.pointer.vtable = &vtable.c_vtable
self.c_argument.value.pointer.address = <void*>(<intptr_t>int(value))
else:
raise TypeError(
@ -84,13 +65,10 @@ cdef class _ChannelArg:
cdef class _ChannelArgs:
def __cinit__(self, arguments):
def __cinit__(self, arguments, _VTable vtable not None):
self._arguments = () if arguments is None else tuple(arguments)
self._channel_args = []
self._references = []
self._c_arguments.arguments = NULL
cdef void _c(self, grpc_arg_pointer_vtable *vtable) except *:
self._c_arguments.arguments_length = len(self._arguments)
if self._c_arguments.arguments_length != 0:
self._c_arguments.arguments = <grpc_arg *>gpr_malloc(
@ -107,9 +85,3 @@ cdef class _ChannelArgs:
def __dealloc__(self):
if self._c_arguments.arguments != NULL:
gpr_free(self._c_arguments.arguments)
@staticmethod
cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable):
cdef _ChannelArgs channel_args = _ChannelArgs(arguments)
channel_args._c(vtable)
return channel_args

@ -17,11 +17,11 @@ cimport cpython
cdef class Call:
def __cinit__(self):
def __cinit__(self, _VTable vtable not None):
# Create an *empty* call
fork_handlers_and_grpc_init()
self.c_call = NULL
self.references = []
self.references = [vtable]
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:

@ -68,8 +68,8 @@ cdef class SegregatedCall:
cdef class Channel:
cdef grpc_arg_pointer_vtable _vtable
cdef _ChannelState _state
cdef _VTable _vtable
# TODO(https://github.com/grpc/grpc/issues/15662): Eliminate this.
cdef tuple _arguments

@ -420,11 +420,14 @@ cdef class Channel:
arguments = () if arguments is None else tuple(arguments)
fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
self._vtable.cmp = &_compare_pointer
cdef _ChannelArgs channel_args = _ChannelArgs.from_args(
arguments, &self._vtable)
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._arguments = arguments
self._vtable = _VTable()
cdef _ChannelArgs channel_args = _ChannelArgs(
arguments, self._vtable)
if channel_credentials is None:
self._state.c_channel = grpc_insecure_channel_create(
<char *>target, channel_args.c_args(), NULL)
@ -433,11 +436,6 @@ cdef class Channel:
self._state.c_channel = grpc_secure_channel_create(
c_channel_credentials, <char *>target, channel_args.c_args(), NULL)
grpc_channel_credentials_release(c_channel_credentials)
self._state.c_call_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._state.c_connectivity_completion_queue = (
grpc_completion_queue_create_for_next(NULL))
self._arguments = arguments
def target(self):
cdef char *c_target

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class Server:
cdef grpc_arg_pointer_vtable _vtable
cdef grpc_server *c_server
cdef _VTable _vtable
cdef bint is_started # start has been called
cdef bint is_shutting_down # shutdown has been called
cdef bint is_shutdown # notification of complete shutdown received

@ -20,22 +20,21 @@ import grpc
_LOGGER = logging.getLogger(__name__)
cdef class Server:
def __cinit__(self, object arguments):
fork_handlers_and_grpc_init()
self.references = []
self.registered_completion_queues = []
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
self._vtable.cmp = &_compare_pointer
cdef _ChannelArgs channel_args = _ChannelArgs.from_args(
arguments, &self._vtable)
self.c_server = grpc_server_create(channel_args.c_args(), NULL)
self.references.append(arguments)
self.is_started = False
self.is_shutting_down = False
self.is_shutdown = False
self.c_server = NULL
self._vtable = _VTable()
cdef _ChannelArgs channel_args = _ChannelArgs(arguments, self._vtable)
self.c_server = grpc_server_create(channel_args.c_args(), NULL)
self.references.append(arguments)
def request_call(
self, CompletionQueue call_queue not None,
@ -44,7 +43,7 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
cdef _RequestCallTag request_call_tag = _RequestCallTag(tag, self._vtable)
request_call_tag.prepare()
cpython.Py_INCREF(request_call_tag)
return grpc_server_request_call(

@ -29,6 +29,7 @@ cdef class _RequestCallTag(_Tag):
cdef readonly object _user_tag
cdef Call call
cdef _VTable _vtable
cdef CallDetails call_details
cdef grpc_metadata_array c_invocation_metadata

@ -30,13 +30,14 @@ cdef class _ConnectivityTag(_Tag):
cdef class _RequestCallTag(_Tag):
def __cinit__(self, user_tag):
def __cinit__(self, user_tag, _VTable vtable not None):
self._user_tag = user_tag
self.call = None
self.call_details = None
self._vtable = vtable
cdef void prepare(self) except *:
self.call = Call()
self.call = Call(self._vtable)
self.call_details = CallDetails()
grpc_metadata_array_init(&self.c_invocation_metadata)

@ -0,0 +1,26 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef void* _copy_pointer(void* pointer)
cdef void _destroy_pointer(void* pointer)
cdef int _compare_pointer(void* first_pointer, void* second_pointer)
cdef class _VTable:
cdef grpc_arg_pointer_vtable c_vtable

@ -0,0 +1,39 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
cdef void* _copy_pointer(void* pointer):
return pointer
# TODO(https://github.com/grpc/grpc/issues/15662): Reform this.
cdef void _destroy_pointer(void* pointer):
pass
cdef int _compare_pointer(void* first_pointer, void* second_pointer):
if first_pointer < second_pointer:
return -1
elif first_pointer > second_pointer:
return 1
else:
return 0
cdef class _VTable:
def __cinit__(self):
self.c_vtable.copy = &_copy_pointer
self.c_vtable.destroy = &_destroy_pointer
self.c_vtable.cmp = &_compare_pointer

@ -23,13 +23,14 @@ include "_cygrpc/completion_queue.pxd.pxi"
include "_cygrpc/event.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/propagation_bits.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"
include "_cygrpc/time.pxd.pxi"
include "_cygrpc/vtable.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
include "_cygrpc/propagation_bits.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"

@ -24,19 +24,20 @@ include "_cygrpc/grpc_string.pyx.pxi"
include "_cygrpc/arguments.pyx.pxi"
include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/channelz.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
include "_cygrpc/event.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/propagation_bits.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/vtable.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/channelz.pyx.pxi"
include "_cygrpc/propagation_bits.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"

Loading…
Cancel
Save