Streamline metadata in gRPC Python

pull/13672/head
Nathaniel Manista 7 years ago
parent ae74a320dc
commit 80516e884a
  1. 27
      src/python/grpcio/grpc/_channel.py
  2. 17
      src/python/grpcio/grpc/_common.py
  3. 5
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  4. 8
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  5. 24
      src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
  6. 26
      src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi
  7. 62
      src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
  8. 25
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  9. 171
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  10. 17
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  11. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  12. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  13. 4
      src/python/grpcio/grpc/_plugin_wrapping.py
  14. 37
      src/python/grpcio/grpc/_server.py
  15. 2
      src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
  16. 15
      src/python/grpcio_tests/tests/unit/_cython/_common.py
  17. 6
      src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
  18. 67
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py

@ -122,8 +122,8 @@ def _abort(state, code, details):
state.code = code state.code = code
state.details = details state.details = details
if state.initial_metadata is None: if state.initial_metadata is None:
state.initial_metadata = _common.EMPTY_METADATA state.initial_metadata = ()
state.trailing_metadata = _common.EMPTY_METADATA state.trailing_metadata = ()
def _handle_event(event, state, response_deserializer): def _handle_event(event, state, response_deserializer):
@ -372,14 +372,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
with self._state.condition: with self._state.condition:
while self._state.initial_metadata is None: while self._state.initial_metadata is None:
self._state.condition.wait() self._state.condition.wait()
return _common.to_application_metadata(self._state.initial_metadata) return self._state.initial_metadata
def trailing_metadata(self): def trailing_metadata(self):
with self._state.condition: with self._state.condition:
while self._state.trailing_metadata is None: while self._state.trailing_metadata is None:
self._state.condition.wait() self._state.condition.wait()
return _common.to_application_metadata( return self._state.trailing_metadata
self._state.trailing_metadata)
def code(self): def code(self):
with self._state.condition: with self._state.condition:
@ -420,8 +419,7 @@ def _start_unary_request(request, timeout, request_serializer):
deadline, deadline_timespec = _deadline(timeout) deadline, deadline_timespec = _deadline(timeout)
serialized_request = _common.serialize(request, request_serializer) serialized_request = _common.serialize(request, request_serializer)
if serialized_request is None: if serialized_request is None:
state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA, state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
grpc.StatusCode.INTERNAL,
'Exception serializing request!') 'Exception serializing request!')
rendezvous = _Rendezvous(state, None, None, deadline) rendezvous = _Rendezvous(state, None, None, deadline)
return deadline, deadline_timespec, None, rendezvous return deadline, deadline_timespec, None, rendezvous
@ -458,8 +456,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else: else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = ( operations = (
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
_common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
@ -549,8 +546,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
)), event_handler) )), event_handler)
operations = ( operations = (
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(
_common.to_cygrpc_metadata(metadata), metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(
_EMPTY_FLAGS), cygrpc.operation_send_message(
serialized_request, _EMPTY_FLAGS), serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
@ -588,8 +584,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)),
None) None)
operations = ( operations = (
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
_common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch( call_error = call.start_client_batch(
@ -642,8 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)),
event_handler) event_handler)
operations = ( operations = (
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
_common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch( call_error = call.start_client_batch(
@ -685,8 +679,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)),
event_handler) event_handler)
operations = ( operations = (
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
_common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
call_error = call.start_client_batch( call_error = call.start_client_batch(
cygrpc.Operations(operations), event_handler) cygrpc.Operations(operations), event_handler)

@ -22,8 +22,6 @@ import six
import grpc import grpc
from grpc._cython import cygrpc from grpc._cython import cygrpc
EMPTY_METADATA = cygrpc.Metadata(())
CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
cygrpc.ConnectivityState.idle: cygrpc.ConnectivityState.idle:
grpc.ChannelConnectivity.IDLE, grpc.ChannelConnectivity.IDLE,
@ -91,21 +89,6 @@ def channel_args(options):
return cygrpc.ChannelArgs(cygrpc_args) return cygrpc.ChannelArgs(cygrpc_args)
def to_cygrpc_metadata(application_metadata):
return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
cygrpc.Metadatum(encode(key), encode(value))
for key, value in application_metadata)
def to_application_metadata(cygrpc_metadata):
if cygrpc_metadata is None:
return ()
else:
return tuple((decode(key), value
if key[-4:] == b'-bin' else decode(value))
for key, value in cygrpc_metadata)
def _transform(message, transformer, exception_message): def _transform(message, transformer, exception_message):
if transformer is None: if transformer is None:
return message return message

@ -61,8 +61,9 @@ cdef class CompletionQueue:
user_tag = tag.user_tag user_tag = tag.user_tag
operation_call = tag.operation_call operation_call = tag.operation_call
request_call_details = tag.request_call_details request_call_details = tag.request_call_details
if tag.request_metadata is not None: if tag.is_new_request:
request_metadata = tuple(tag.request_metadata) request_metadata = _metadata(&tag._c_request_metadata)
grpc_metadata_array_destroy(&tag._c_request_metadata)
batch_operations = tag.batch_operations batch_operations = tag.batch_operations
if tag.is_new_request: if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through # Stuff in the tag not explicitly handled by us needs to live through

@ -30,9 +30,13 @@ cdef int _get_metadata(
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status, size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil: const char **error_details) with gil:
def callback(Metadata metadata, grpc_status_code status, bytes error_details): cdef size_t metadata_count
cdef grpc_metadata *c_metadata
def callback(metadata, grpc_status_code status, bytes error_details):
if status is StatusCode.ok: if status is StatusCode.ok:
cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL) _store_c_metadata(metadata, &c_metadata, &metadata_count)
cb(user_data, c_metadata, metadata_count, status, NULL)
_release_c_metadata(c_metadata, metadata_count)
else: else:
cb(user_data, NULL, 0, status, error_details) cb(user_data, NULL, 0, status, error_details)
args = context.service_url, context.method_name, callback, args = context.service_url, context.method_name, callback,

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
# This function will ascii encode unicode string inputs if neccesary. # This function will ascii encode unicode string inputs if neccesary.
# In Python3, unicode strings are the default str type. # In Python3, unicode strings are the default str type.
@ -22,3 +24,25 @@ cdef bytes str_to_bytes(object s):
return s.encode('ascii') return s.encode('ascii')
else: else:
raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s))) raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s)))
cdef bytes _encode(str native_string_or_none):
if native_string_or_none is None:
return b''
elif isinstance(native_string_or_none, (bytes,)):
return <bytes>native_string_or_none
elif isinstance(native_string_or_none, (unicode,)):
return native_string_or_none.encode('ascii')
else:
raise TypeError('Expected str, not {}'.format(type(native_string_or_none)))
cdef str _decode(bytes bytestring):
if isinstance(bytestring, (str,)):
return <str>bytestring
else:
try:
return bytestring.decode('utf8')
except UnicodeDecodeError:
logging.exception('Invalid encoding on %s', bytestring)
return bytestring.decode('latin1')

@ -0,0 +1,26 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef void _store_c_metadata(
metadata, grpc_metadata **c_metadata, size_t *c_count)
cdef void _release_c_metadata(grpc_metadata *c_metadata, int count)
cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice)
cdef tuple _metadata(grpc_metadata_array *c_metadata_array)

@ -0,0 +1,62 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',))
cdef void _store_c_metadata(
metadata, grpc_metadata **c_metadata, size_t *c_count):
if metadata is None:
c_count[0] = 0
c_metadata[0] = NULL
else:
metadatum_count = len(metadata)
if metadatum_count == 0:
c_count[0] = 0
c_metadata[0] = NULL
else:
c_count[0] = metadatum_count
c_metadata[0] = <grpc_metadata *>gpr_malloc(
metadatum_count * sizeof(grpc_metadata))
for index, (key, value) in enumerate(metadata):
encoded_key = _encode(key)
encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value)
c_metadata[0][index].key = _slice_from_bytes(encoded_key)
c_metadata[0][index].value = _slice_from_bytes(encoded_value)
cdef void _release_c_metadata(grpc_metadata *c_metadata, int count):
if 0 < count:
for index in range(count):
grpc_slice_unref(c_metadata[index].key)
grpc_slice_unref(c_metadata[index].value)
gpr_free(c_metadata)
cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice):
cdef bytes key = _slice_bytes(key_slice)
cdef bytes value = _slice_bytes(value_slice)
return <tuple>_Metadatum(
_decode(key), value if key[-4:] == b'-bin' else _decode(value))
cdef tuple _metadata(grpc_metadata_array *c_metadata_array):
return tuple(
_metadatum(
c_metadata_array.metadata[index].key,
c_metadata_array.metadata[index].value)
for index in range(c_metadata_array.count))

@ -37,7 +37,7 @@ cdef class OperationTag:
cdef Server shutting_down_server cdef Server shutting_down_server
cdef Call operation_call cdef Call operation_call
cdef CallDetails request_call_details cdef CallDetails request_call_details
cdef MetadataArray request_metadata cdef grpc_metadata_array _c_request_metadata
cdef Operations batch_operations cdef Operations batch_operations
cdef bint is_new_request cdef bint is_new_request
@ -84,28 +84,15 @@ cdef class ChannelArgs:
cdef list args cdef list args
cdef class Metadatum:
cdef grpc_metadata c_metadata
cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
cdef class Metadata:
cdef grpc_metadata *c_metadata
cdef readonly size_t c_count
cdef class MetadataArray:
cdef grpc_metadata_array c_metadata_array
cdef class Operation: cdef class Operation:
cdef grpc_op c_op cdef grpc_op c_op
cdef bint _c_metadata_needs_release
cdef size_t _c_metadata_count
cdef grpc_metadata *_c_metadata
cdef ByteBuffer _received_message cdef ByteBuffer _received_message
cdef MetadataArray _received_metadata cdef bint _c_metadata_array_needs_destruction
cdef grpc_metadata_array _c_metadata_array
cdef grpc_status_code _received_status_code cdef grpc_status_code _received_status_code
cdef grpc_slice _status_details cdef grpc_slice _status_details
cdef int _received_cancelled cdef int _received_cancelled

@ -390,140 +390,13 @@ cdef class ChannelArgs:
return self.args[i] return self.args[i]
cdef class Metadatum:
def __cinit__(self, bytes key, bytes value):
self.c_metadata.key = _slice_from_bytes(key)
self.c_metadata.value = _slice_from_bytes(value)
cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
destination[0].key = _copy_slice(self.c_metadata.key)
destination[0].value = _copy_slice(self.c_metadata.value)
@property
def key(self):
return _slice_bytes(self.c_metadata.key)
@property
def value(self):
return _slice_bytes(self.c_metadata.value)
def __len__(self):
return 2
def __getitem__(self, size_t i):
if i == 0:
return self.key
elif i == 1:
return self.value
else:
raise IndexError("index must be 0 (key) or 1 (value)")
def __iter__(self):
return iter((self.key, self.value))
def __dealloc__(self):
grpc_slice_unref(self.c_metadata.key)
grpc_slice_unref(self.c_metadata.value)
cdef class _MetadataIterator:
cdef size_t i
cdef size_t _length
cdef object _metadatum_indexable
def __cinit__(self, length, metadatum_indexable):
self._length = length
self._metadatum_indexable = metadatum_indexable
self.i = 0
def __iter__(self):
return self
def __next__(self):
if self.i < self._length:
result = self._metadatum_indexable[self.i]
self.i = self.i + 1
return result
else:
raise StopIteration()
# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an
# ordinary sequence of pairs of bytestrings all the way down to the
# grpc_call_start_batch call.
cdef class Metadata:
"""Metadata being passed from application to core."""
def __cinit__(self, metadata_iterable):
metadata_sequence = tuple(metadata_iterable)
cdef size_t count = len(metadata_sequence)
with nogil:
grpc_init()
self.c_metadata = <grpc_metadata *>gpr_malloc(
count * sizeof(grpc_metadata))
self.c_count = count
for index, metadatum in enumerate(metadata_sequence):
self.c_metadata[index].key = grpc_slice_copy(
(<Metadatum>metadatum).c_metadata.key)
self.c_metadata[index].value = grpc_slice_copy(
(<Metadatum>metadatum).c_metadata.value)
def __dealloc__(self):
with nogil:
for index in range(self.c_count):
grpc_slice_unref(self.c_metadata[index].key)
grpc_slice_unref(self.c_metadata[index].value)
gpr_free(self.c_metadata)
grpc_shutdown()
def __len__(self):
return self.c_count
def __getitem__(self, size_t index):
if index < self.c_count:
key = _slice_bytes(self.c_metadata[index].key)
value = _slice_bytes(self.c_metadata[index].value)
return Metadatum(key, value)
else:
raise IndexError()
def __iter__(self):
return _MetadataIterator(self.c_count, self)
cdef class MetadataArray:
"""Metadata being passed from core to application."""
def __cinit__(self):
with nogil:
grpc_init()
grpc_metadata_array_init(&self.c_metadata_array)
def __dealloc__(self):
with nogil:
grpc_metadata_array_destroy(&self.c_metadata_array)
grpc_shutdown()
def __len__(self):
return self.c_metadata_array.count
def __getitem__(self, size_t i):
if i >= self.c_metadata_array.count:
raise IndexError()
key = _slice_bytes(self.c_metadata_array.metadata[i].key)
value = _slice_bytes(self.c_metadata_array.metadata[i].value)
return Metadatum(key=key, value=value)
def __iter__(self):
return _MetadataIterator(self.c_metadata_array.count, self)
cdef class Operation: cdef class Operation:
def __cinit__(self): def __cinit__(self):
grpc_init() grpc_init()
self.references = [] self.references = []
self._c_metadata_needs_release = False
self._c_metadata_array_needs_destruction = False
self._status_details = grpc_empty_slice() self._status_details = grpc_empty_slice()
self.is_valid = False self.is_valid = False
@ -556,13 +429,7 @@ cdef class Operation:
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata") raise TypeError("self must be an operation receiving metadata")
# TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython return _metadata(&self._c_metadata_array)
# objects must be legitimate for use from Python at any time" policy in
# place today, shift the policy toward "Operation objects are only usable
# while their calls are active", and move this making-a-copy-because-this-
# data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
# lowest Python layer.
return tuple(self._received_metadata)
@property @property
def received_status_code(self): def received_status_code(self):
@ -602,16 +469,21 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True return False if self._received_cancelled == 0 else True
def __dealloc__(self): def __dealloc__(self):
if self._c_metadata_needs_release:
_release_c_metadata(self._c_metadata, self._c_metadata_count)
if self._c_metadata_array_needs_destruction:
grpc_metadata_array_destroy(&self._c_metadata_array)
grpc_slice_unref(self._status_details) grpc_slice_unref(self._status_details)
grpc_shutdown() grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags): def operation_send_initial_metadata(metadata, int flags):
cdef Operation op = Operation() cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags op.c_op.flags = flags
op.c_op.data.send_initial_metadata.count = metadata.c_count _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata op._c_metadata_needs_release = True
op.references.append(metadata) op.c_op.data.send_initial_metadata.count = op._c_metadata_count
op.c_op.data.send_initial_metadata.metadata = op._c_metadata
op.is_valid = True op.is_valid = True
return op return op
@ -633,18 +505,19 @@ def operation_send_close_from_client(int flags):
return op return op
def operation_send_status_from_server( def operation_send_status_from_server(
Metadata metadata, grpc_status_code code, bytes details, int flags): metadata, grpc_status_code code, bytes details, int flags):
cdef Operation op = Operation() cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags op.c_op.flags = flags
_store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
op._c_metadata_needs_release = True
op.c_op.data.send_status_from_server.trailing_metadata_count = ( op.c_op.data.send_status_from_server.trailing_metadata_count = (
metadata.c_count) op._c_metadata_count)
op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
op.c_op.data.send_status_from_server.status = code op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details) grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details) op._status_details = _slice_from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &op._status_details op.c_op.data.send_status_from_server.status_details = &op._status_details
op.references.append(metadata)
op.is_valid = True op.is_valid = True
return op return op
@ -652,9 +525,10 @@ def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation() cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags op.c_op.flags = flags
op._received_metadata = MetadataArray() grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
&op._received_metadata.c_metadata_array) &op._c_metadata_array)
op._c_metadata_array_needs_destruction = True
op.is_valid = True op.is_valid = True
return op return op
@ -675,9 +549,10 @@ def operation_receive_status_on_client(int flags):
cdef Operation op = Operation() cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags op.c_op.flags = flags
op._received_metadata = MetadataArray() grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_status_on_client.trailing_metadata = ( op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._received_metadata.c_metadata_array) &op._c_metadata_array)
op._c_metadata_array_needs_destruction = True
op.c_op.data.receive_status_on_client.status = ( op.c_op.data.receive_status_on_client.status = (
&op._received_status_code) &op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = ( op.c_op.data.receive_status_on_client.status_details = (

@ -78,23 +78,20 @@ cdef class Server:
raise ValueError("server must be started and not shutting down") raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues: if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue") raise ValueError("server_queue must be a registered completion queue")
cdef grpc_call_error result
cdef OperationTag operation_tag = OperationTag(tag) cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call() operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails() operation_tag.request_call_details = CallDetails()
operation_tag.request_metadata = MetadataArray() grpc_metadata_array_init(&operation_tag._c_request_metadata)
operation_tag.references.extend([self, call_queue, server_queue]) operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([]) operation_tag.batch_operations = Operations([])
cpython.Py_INCREF(operation_tag) cpython.Py_INCREF(operation_tag)
with nogil: return grpc_server_request_call(
result = grpc_server_request_call( self.c_server, &operation_tag.operation_call.c_call,
self.c_server, &operation_tag.operation_call.c_call, &operation_tag.request_call_details.c_details,
&operation_tag.request_call_details.c_details, &operation_tag._c_request_metadata,
&operation_tag.request_metadata.c_metadata_array, call_queue.c_completion_queue, server_queue.c_completion_queue,
call_queue.c_completion_queue, server_queue.c_completion_queue, <cpython.PyObject *>operation_tag)
<cpython.PyObject *>operation_tag)
return result
def register_completion_queue( def register_completion_queue(
self, CompletionQueue queue not None): self, CompletionQueue queue not None):

@ -18,6 +18,7 @@ include "_cygrpc/call.pxd.pxi"
include "_cygrpc/channel.pxd.pxi" include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/records.pxd.pxi" include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi" include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi" include "_cygrpc/server.pxd.pxi"

@ -25,6 +25,7 @@ include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi" include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/records.pyx.pxi" include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi" include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi" include "_cygrpc/server.pyx.pxi"

@ -54,9 +54,7 @@ class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
'AuthMetadataPluginCallback raised exception "{}"!'.format( 'AuthMetadataPluginCallback raised exception "{}"!'.format(
self._state.exception)) self._state.exception))
if error is None: if error is None:
self._callback( self._callback(metadata, cygrpc.StatusCode.ok, None)
_common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok,
None)
else: else:
self._callback(None, cygrpc.StatusCode.internal, self._callback(None, cygrpc.StatusCode.internal,
_common.encode(str(error))) _common.encode(str(error)))

@ -129,15 +129,14 @@ def _abort(state, call, code, details):
effective_details = details if state.details is None else state.details effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed: if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata( operations = (cygrpc.operation_send_initial_metadata(
_common.EMPTY_METADATA, (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( state.trailing_metadata, effective_code, effective_details,
_common.to_cygrpc_metadata(state.trailing_metadata), _EMPTY_FLAGS),)
effective_code, effective_details, _EMPTY_FLAGS),)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else: else:
operations = (cygrpc.operation_send_status_from_server( operations = (cygrpc.operation_send_status_from_server(
_common.to_cygrpc_metadata(state.trailing_metadata), state.trailing_metadata, effective_code, effective_details,
effective_code, effective_details, _EMPTY_FLAGS),) _EMPTY_FLAGS),)
token = _SEND_STATUS_FROM_SERVER_TOKEN token = _SEND_STATUS_FROM_SERVER_TOKEN
call.start_server_batch( call.start_server_batch(
cygrpc.Operations(operations), cygrpc.Operations(operations),
@ -237,7 +236,7 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True self._state.disable_next_compression = True
def invocation_metadata(self): def invocation_metadata(self):
return _common.to_application_metadata(self._rpc_event.request_metadata) return self._rpc_event.request_metadata
def peer(self): def peer(self):
return _common.decode(self._rpc_event.operation_call.peer()) return _common.decode(self._rpc_event.operation_call.peer())
@ -263,8 +262,7 @@ class _Context(grpc.ServicerContext):
else: else:
if self._state.initial_metadata_allowed: if self._state.initial_metadata_allowed:
operation = cygrpc.operation_send_initial_metadata( operation = cygrpc.operation_send_initial_metadata(
_common.to_cygrpc_metadata(initial_metadata), initial_metadata, _EMPTY_FLAGS)
_EMPTY_FLAGS)
self._rpc_event.operation_call.start_server_batch( self._rpc_event.operation_call.start_server_batch(
cygrpc.Operations((operation,)), cygrpc.Operations((operation,)),
_send_initial_metadata(self._state)) _send_initial_metadata(self._state))
@ -275,8 +273,7 @@ class _Context(grpc.ServicerContext):
def set_trailing_metadata(self, trailing_metadata): def set_trailing_metadata(self, trailing_metadata):
with self._state.condition: with self._state.condition:
self._state.trailing_metadata = _common.to_cygrpc_metadata( self._state.trailing_metadata = trailing_metadata
trailing_metadata)
def set_code(self, code): def set_code(self, code):
with self._state.condition: with self._state.condition:
@ -417,9 +414,8 @@ def _send_response(rpc_event, state, serialized_response):
else: else:
if state.initial_metadata_allowed: if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata( operations = (cygrpc.operation_send_initial_metadata(
_common.EMPTY_METADATA, _EMPTY_FLAGS), (), _EMPTY_FLAGS), cygrpc.operation_send_message(
cygrpc.operation_send_message(serialized_response, serialized_response, _EMPTY_FLAGS),)
_EMPTY_FLAGS),)
state.initial_metadata_allowed = False state.initial_metadata_allowed = False
token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
else: else:
@ -438,8 +434,7 @@ def _send_response(rpc_event, state, serialized_response):
def _status(rpc_event, state, serialized_response): def _status(rpc_event, state, serialized_response):
with state.condition: with state.condition:
if state.client is not _CANCELLED: if state.client is not _CANCELLED:
trailing_metadata = _common.to_cygrpc_metadata( trailing_metadata = state.trailing_metadata
state.trailing_metadata)
code = _completion_code(state) code = _completion_code(state)
details = _details(state) details = _details(state)
operations = [ operations = [
@ -448,8 +443,7 @@ def _status(rpc_event, state, serialized_response):
] ]
if state.initial_metadata_allowed: if state.initial_metadata_allowed:
operations.append( operations.append(
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS))
_common.EMPTY_METADATA, _EMPTY_FLAGS))
if serialized_response is not None: if serialized_response is not None:
operations.append( operations.append(
cygrpc.operation_send_message(serialized_response, cygrpc.operation_send_message(serialized_response,
@ -551,11 +545,10 @@ def _find_method_handler(rpc_event, generic_handlers):
def _reject_rpc(rpc_event, status, details): def _reject_rpc(rpc_event, status, details):
operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA, operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
_EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server( cygrpc.operation_send_status_from_server((), status, details,
_common.EMPTY_METADATA, status, details, _EMPTY_FLAGS),) _EMPTY_FLAGS),)
rpc_state = _RPCState() rpc_state = _RPCState()
rpc_event.operation_call.start_server_batch( rpc_event.operation_call.start_server_batch(
operations, lambda ignored_event: (rpc_state, (),)) operations, lambda ignored_event: (rpc_state, (),))

@ -22,7 +22,7 @@ from tests.unit.framework.common import test_constants
_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
_EMPTY_FLAGS = 0 _EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(()) _EMPTY_METADATA = ()
_SERVER_SHUTDOWN_TAG = 'server_shutdown' _SERVER_SHUTDOWN_TAG = 'server_shutdown'
_REQUEST_CALL_TAG = 'request_call' _REQUEST_CALL_TAG = 'request_call'

@ -23,17 +23,14 @@ RPC_COUNT = 4000
INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
EMPTY_FLAGS = 0 EMPTY_FLAGS = 0
INVOCATION_METADATA = cygrpc.Metadata( INVOCATION_METADATA = (('client-md-key', 'client-md-key'),
(cygrpc.Metadatum(b'client-md-key', b'client-md-key'), ('client-md-key-bin', b'\x00\x01' * 3000),)
cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),))
INITIAL_METADATA = cygrpc.Metadata( INITIAL_METADATA = (('server-initial-md-key', 'server-initial-md-value'),
(cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'), ('server-initial-md-key-bin', b'\x00\x02' * 3000),)
cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),))
TRAILING_METADATA = cygrpc.Metadata( TRAILING_METADATA = (('server-trailing-md-key', 'server-trailing-md-value'),
(cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'), ('server-trailing-md-key-bin', b'\x00\x03' * 3000),)
cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),))
class QueueDriver(object): class QueueDriver(object):

@ -20,7 +20,7 @@ from grpc._cython import cygrpc
_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
_EMPTY_FLAGS = 0 _EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(()) _EMPTY_METADATA = ()
class _ServerDriver(object): class _ServerDriver(object):
@ -197,8 +197,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_rpc_event.operation_call.start_server_batch([ server_rpc_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server( cygrpc.operation_send_status_from_server(
cygrpc.Metadata(()), cygrpc.StatusCode.ok, (), cygrpc.StatusCode.ok, b'test details',
b'test details', _EMPTY_FLAGS), _EMPTY_FLAGS),
], server_complete_rpc_tag)) ], server_complete_rpc_tag))
server_send_second_message_event = server_call_driver.event_with_tag( server_send_second_message_event = server_call_driver.event_with_tag(
server_send_second_message_tag) server_send_second_message_tag)

@ -29,39 +29,12 @@ _EMPTY_FLAGS = 0
def _metadata_plugin(context, callback): def _metadata_plugin(context, callback):
callback( callback(((_CALL_CREDENTIALS_METADATA_KEY,
cygrpc.Metadata([ _CALL_CREDENTIALS_METADATA_VALUE,),), cygrpc.StatusCode.ok, b'')
cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY,
_CALL_CREDENTIALS_METADATA_VALUE)
]), cygrpc.StatusCode.ok, b'')
class TypeSmokeTest(unittest.TestCase): class TypeSmokeTest(unittest.TestCase):
def testStringsInUtilitiesUpDown(self):
self.assertEqual(0, cygrpc.StatusCode.ok)
metadatum = cygrpc.Metadatum(b'a', b'b')
self.assertEqual(b'a', metadatum.key)
self.assertEqual(b'b', metadatum.value)
metadata = cygrpc.Metadata([metadatum])
self.assertEqual(1, len(metadata))
self.assertEqual(metadatum.key, metadata[0].key)
def testMetadataIteration(self):
metadata = cygrpc.Metadata(
[cygrpc.Metadatum(b'a', b'b'), cygrpc.Metadatum(b'c', b'd')])
iterator = iter(metadata)
metadatum = next(iterator)
self.assertIsInstance(metadatum, cygrpc.Metadatum)
self.assertEqual(metadatum.key, b'a')
self.assertEqual(metadatum.value, b'b')
metadatum = next(iterator)
self.assertIsInstance(metadatum, cygrpc.Metadatum)
self.assertEqual(metadatum.key, b'c')
self.assertEqual(metadatum.value, b'd')
with self.assertRaises(StopIteration):
next(iterator)
def testOperationsIteration(self): def testOperationsIteration(self):
operations = cygrpc.Operations( operations = cygrpc.Operations(
[cygrpc.operation_send_message(b'asdf', _EMPTY_FLAGS)]) [cygrpc.operation_send_message(b'asdf', _EMPTY_FLAGS)])
@ -200,14 +173,14 @@ class ServerClientMixin(object):
def test_echo(self): def test_echo(self):
DEADLINE = time.time() + 5 DEADLINE = time.time() + 5
DEADLINE_TOLERANCE = 0.25 DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = b'key' CLIENT_METADATA_ASCII_KEY = 'key'
CLIENT_METADATA_ASCII_VALUE = b'val' CLIENT_METADATA_ASCII_VALUE = 'val'
CLIENT_METADATA_BIN_KEY = b'key-bin' CLIENT_METADATA_BIN_KEY = 'key-bin'
CLIENT_METADATA_BIN_VALUE = b'\0' * 1000 CLIENT_METADATA_BIN_VALUE = b'\0' * 1000
SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
SERVER_INITIAL_METADATA_VALUE = b'whodawha?' SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = b'zomg it is' SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok SERVER_STATUS_CODE = cygrpc.StatusCode.ok
SERVER_STATUS_DETAILS = b'our work is never over' SERVER_STATUS_DETAILS = b'our work is never over'
REQUEST = b'in death a member of project mayhem has a name' REQUEST = b'in death a member of project mayhem has a name'
@ -227,11 +200,9 @@ class ServerClientMixin(object):
client_call = self.client_channel.create_call( client_call = self.client_channel.create_call(
None, 0, self.client_completion_queue, METHOD, self.host_argument, None, 0, self.client_completion_queue, METHOD, self.host_argument,
cygrpc_deadline) cygrpc_deadline)
client_initial_metadata = cygrpc.Metadata([ client_initial_metadata = (
cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,),
CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),)
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)
])
client_start_batch_result = client_call.start_client_batch([ client_start_batch_result = client_call.start_client_batch([
cygrpc.operation_send_initial_metadata(client_initial_metadata, cygrpc.operation_send_initial_metadata(client_initial_metadata,
_EMPTY_FLAGS), _EMPTY_FLAGS),
@ -263,14 +234,10 @@ class ServerClientMixin(object):
server_call_tag = object() server_call_tag = object()
server_call = request_event.operation_call server_call = request_event.operation_call
server_initial_metadata = cygrpc.Metadata([ server_initial_metadata = (
cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY, (SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
SERVER_INITIAL_METADATA_VALUE) server_trailing_metadata = (
]) (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),)
server_trailing_metadata = cygrpc.Metadata([
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)
])
server_start_batch_result = server_call.start_server_batch([ server_start_batch_result = server_call.start_server_batch([
cygrpc.operation_send_initial_metadata( cygrpc.operation_send_initial_metadata(
server_initial_metadata, server_initial_metadata,
@ -347,7 +314,7 @@ class ServerClientMixin(object):
METHOD = b'twinkies' METHOD = b'twinkies'
cygrpc_deadline = cygrpc.Timespec(DEADLINE) cygrpc_deadline = cygrpc.Timespec(DEADLINE)
empty_metadata = cygrpc.Metadata([]) empty_metadata = ()
server_request_tag = object() server_request_tag = object()
self.server.request_call(self.server_completion_queue, self.server.request_call(self.server_completion_queue,

Loading…
Cancel
Save