Reform cygrpc.Operation

It is now a family of classes conforming to an interface rather than a
single class no single instance of which makes use of all behavior
scoped to the class.

It also now only uses gRPC Core memory for the time of a single batch
rather than for the entire lifetime of the instance.
pull/13852/head
Nathaniel Manista 7 years ago
parent 6ec2642bab
commit 81edf5ff9a
  1. 66
      src/python/grpcio/grpc/_channel.py
  2. 26
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  3. 109
      src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
  4. 238
      src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
  5. 21
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  6. 250
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  7. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  8. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  9. 46
      src/python/grpcio/grpc/_server.py
  10. 1
      src/python/grpcio_tests/tests/tests.json
  11. 26
      src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
  12. 22
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
  13. 20
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
  14. 24
      src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
  15. 49
      src/python/grpcio_tests/tests/unit/_cython/_server_test.py
  16. 94
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py

@ -129,12 +129,12 @@ def _abort(state, code, details):
def _handle_event(event, state, response_deserializer):
callbacks = []
for batch_operation in event.batch_operations:
operation_type = batch_operation.type
operation_type = batch_operation.type()
state.due.remove(operation_type)
if operation_type == cygrpc.OperationType.receive_initial_metadata:
state.initial_metadata = batch_operation.received_metadata
state.initial_metadata = batch_operation.initial_metadata()
elif operation_type == cygrpc.OperationType.receive_message:
serialized_response = batch_operation.received_message.bytes()
serialized_response = batch_operation.message()
if serialized_response is not None:
response = _common.deserialize(serialized_response,
response_deserializer)
@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer):
else:
state.response = response
elif operation_type == cygrpc.OperationType.receive_status_on_client:
state.trailing_metadata = batch_operation.received_metadata
state.trailing_metadata = batch_operation.trailing_metadata()
if state.code is None:
code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
batch_operation.received_status_code)
batch_operation.code())
if code is None:
state.code = grpc.StatusCode.UNKNOWN
state.details = _unknown_code_details(
batch_operation.received_status_code,
batch_operation.received_status_details)
code, batch_operation.details())
else:
state.code = code
state.details = batch_operation.received_status_details
state.details = batch_operation.details()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call,
_abort(state, grpc.StatusCode.INTERNAL, details)
return
else:
operations = (cygrpc.operation_send_message(
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_message)
@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call,
with state.condition:
if state.code is None:
operations = (
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),)
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
event_handler = _event_handler(self._state, self._call,
self._response_deserializer)
self._call.start_client_batch(
(cygrpc.operation_receive_message(_EMPTY_FLAGS),),
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
return state, operations, deadline, deadline_timespec, None
def _blocking(self, request, timeout, metadata, credentials):
@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer)
with state.condition:
call.start_client_batch(
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
cygrpc.operation_send_initial_metadata(
metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(
cygrpc.SendInitialMetadataOperation(
metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
call.set_credentials(credentials._credentials)
with state.condition:
call.start_client_batch(
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
None)
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
operations = (
cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, None)
_check_call_error(call_error, metadata)
_consume_request_iterator(request_iterator, state, call,
@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
(cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)

@ -17,6 +17,7 @@ cimport libc.time
# Typedef types with approximately the same semantics to provide their names to
# Cython
ctypedef unsigned char uint8_t
ctypedef int int32_t
ctypedef unsigned uint32_t
ctypedef long int64_t
@ -25,6 +26,7 @@ ctypedef long int64_t
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size) nogil
void *gpr_zalloc(size_t size) nogil
void gpr_free(void *ptr) nogil
void *gpr_realloc(void *p, size_t size) nogil
@ -190,6 +192,18 @@ cdef extern from "grpc/grpc.h":
size_t arguments_length "num_args"
grpc_arg *arguments "args"
ctypedef enum grpc_compression_level:
GRPC_COMPRESS_LEVEL_NONE
GRPC_COMPRESS_LEVEL_LOW
GRPC_COMPRESS_LEVEL_MED
GRPC_COMPRESS_LEVEL_HIGH
ctypedef enum grpc_stream_compression_level:
GRPC_STREAM_COMPRESS_LEVEL_NONE
GRPC_STREAM_COMPRESS_LEVEL_LOW
GRPC_STREAM_COMPRESS_LEVEL_MED
GRPC_STREAM_COMPRESS_LEVEL_HIGH
ctypedef enum grpc_call_error:
GRPC_CALL_OK
GRPC_CALL_ERROR
@ -265,9 +279,19 @@ cdef extern from "grpc/grpc.h":
GRPC_OP_RECV_STATUS_ON_CLIENT
GRPC_OP_RECV_CLOSE_ON_SERVER
ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level:
uint8_t is_set
grpc_compression_level level
ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level:
uint8_t is_set
grpc_stream_compression_level level
ctypedef struct grpc_op_data_send_initial_metadata:
size_t count
grpc_metadata *metadata
grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level
ctypedef struct grpc_op_data_send_status_from_server:
size_t trailing_metadata_count
@ -524,7 +548,7 @@ cdef extern from "grpc/grpc_security.h":
grpc_auth_property_iterator grpc_auth_context_property_iterator(
const grpc_auth_context *ctx)
grpc_auth_property_iterator grpc_auth_context_peer_identity(
const grpc_auth_context *ctx)

@ -0,0 +1,109 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class Operation:
cdef void c(self)
cdef void un_c(self)
# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this!
cdef grpc_op c_op
cdef class SendInitialMetadataOperation(Operation):
cdef readonly object _initial_metadata;
cdef readonly int _flags
cdef grpc_metadata *_c_initial_metadata
cdef size_t _c_initial_metadata_count
cdef void c(self)
cdef void un_c(self)
cdef class SendMessageOperation(Operation):
cdef readonly bytes _message
cdef readonly int _flags
cdef grpc_byte_buffer *_c_message_byte_buffer
cdef void c(self)
cdef void un_c(self)
cdef class SendCloseFromClientOperation(Operation):
cdef readonly int _flags
cdef void c(self)
cdef void un_c(self)
cdef class SendStatusFromServerOperation(Operation):
cdef readonly object _trailing_metadata
cdef readonly object _code
cdef readonly object _details
cdef readonly int _flags
cdef grpc_metadata *_c_trailing_metadata
cdef size_t _c_trailing_metadata_count
cdef grpc_slice _c_details
cdef void c(self)
cdef void un_c(self)
cdef class ReceiveInitialMetadataOperation(Operation):
cdef readonly int _flags
cdef tuple _initial_metadata
cdef grpc_metadata_array _c_initial_metadata
cdef void c(self)
cdef void un_c(self)
cdef class ReceiveMessageOperation(Operation):
cdef readonly int _flags
cdef grpc_byte_buffer *_c_message_byte_buffer
cdef bytes _message
cdef void c(self)
cdef void un_c(self)
cdef class ReceiveStatusOnClientOperation(Operation):
cdef readonly int _flags
cdef grpc_metadata_array _c_trailing_metadata
cdef grpc_status_code _c_code
cdef grpc_slice _c_details
cdef tuple _trailing_metadata
cdef object _code
cdef str _details
cdef void c(self)
cdef void un_c(self)
cdef class ReceiveCloseOnServerOperation(Operation):
cdef readonly int _flags
cdef object _cancelled
cdef int _c_cancelled
cdef void c(self)
cdef void un_c(self)

@ -0,0 +1,238 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class Operation:
cdef void c(self):
raise NotImplementedError()
cdef void un_c(self):
raise NotImplementedError()
cdef class SendInitialMetadataOperation(Operation):
def __cinit__(self, initial_metadata, flags):
self._initial_metadata = initial_metadata
self._flags = flags
def type(self):
return GRPC_OP_SEND_INITIAL_METADATA
cdef void c(self):
self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
self.c_op.flags = self._flags
_store_c_metadata(
self._initial_metadata, &self._c_initial_metadata,
&self._c_initial_metadata_count)
self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0
cdef void un_c(self):
_release_c_metadata(
self._c_initial_metadata, self._c_initial_metadata_count)
cdef class SendMessageOperation(Operation):
def __cinit__(self, bytes message, int flags):
self._message = message
self._flags = flags
def type(self):
return GRPC_OP_SEND_MESSAGE
cdef void c(self):
self.c_op.type = GRPC_OP_SEND_MESSAGE
self.c_op.flags = self._flags
cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
self._message, len(self._message))
self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
&message_slice, 1)
grpc_slice_unref(message_slice)
self.c_op.data.send_message.send_message = self._c_message_byte_buffer
cdef void un_c(self):
grpc_byte_buffer_destroy(self._c_message_byte_buffer)
cdef class SendCloseFromClientOperation(Operation):
def __cinit__(self, int flags):
self._flags = flags
def type(self):
return GRPC_OP_SEND_CLOSE_FROM_CLIENT
cdef void c(self):
self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
self.c_op.flags = self._flags
cdef void un_c(self):
pass
cdef class SendStatusFromServerOperation(Operation):
def __cinit__(self, trailing_metadata, code, object details, int flags):
self._trailing_metadata = trailing_metadata
self._code = code
self._details = details
self._flags = flags
def type(self):
return GRPC_OP_SEND_STATUS_FROM_SERVER
cdef void c(self):
self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
self.c_op.flags = self._flags
_store_c_metadata(
self._trailing_metadata, &self._c_trailing_metadata,
&self._c_trailing_metadata_count)
self.c_op.data.send_status_from_server.trailing_metadata = (
self._c_trailing_metadata)
self.c_op.data.send_status_from_server.trailing_metadata_count = (
self._c_trailing_metadata_count)
self.c_op.data.send_status_from_server.status = self._code
self._c_details = _slice_from_bytes(_encode(self._details))
self.c_op.data.send_status_from_server.status_details = &self._c_details
cdef void un_c(self):
grpc_slice_unref(self._c_details)
_release_c_metadata(
self._c_trailing_metadata, self._c_trailing_metadata_count)
cdef class ReceiveInitialMetadataOperation(Operation):
def __cinit__(self, flags):
self._flags = flags
def type(self):
return GRPC_OP_RECV_INITIAL_METADATA
cdef void c(self):
self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
self.c_op.flags = self._flags
grpc_metadata_array_init(&self._c_initial_metadata)
self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
&self._c_initial_metadata)
cdef void un_c(self):
self._initial_metadata = _metadata(&self._c_initial_metadata)
grpc_metadata_array_destroy(&self._c_initial_metadata)
def initial_metadata(self):
return self._initial_metadata
cdef class ReceiveMessageOperation(Operation):
def __cinit__(self, flags):
self._flags = flags
def type(self):
return GRPC_OP_RECV_MESSAGE
cdef void c(self):
self.c_op.type = GRPC_OP_RECV_MESSAGE
self.c_op.flags = self._flags
self.c_op.data.receive_message.receive_message = (
&self._c_message_byte_buffer)
cdef void un_c(self):
cdef grpc_byte_buffer_reader message_reader
cdef bint message_reader_status
cdef grpc_slice message_slice
cdef size_t message_slice_length
cdef void *message_slice_pointer
if self._c_message_byte_buffer != NULL:
message_reader_status = grpc_byte_buffer_reader_init(
&message_reader, self._c_message_byte_buffer)
if message_reader_status:
message = bytearray()
while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
message_slice_pointer = grpc_slice_start_ptr(message_slice)
message_slice_length = grpc_slice_length(message_slice)
message += (<char *>message_slice_pointer)[:message_slice_length]
grpc_slice_unref(message_slice)
grpc_byte_buffer_reader_destroy(&message_reader)
self._message = bytes(message)
else:
self._message = None
grpc_byte_buffer_destroy(self._c_message_byte_buffer)
else:
self._message = None
def message(self):
return self._message
cdef class ReceiveStatusOnClientOperation(Operation):
def __cinit__(self, flags):
self._flags = flags
def type(self):
return GRPC_OP_RECV_STATUS_ON_CLIENT
cdef void c(self):
self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
self.c_op.flags = self._flags
grpc_metadata_array_init(&self._c_trailing_metadata)
self.c_op.data.receive_status_on_client.trailing_metadata = (
&self._c_trailing_metadata)
self.c_op.data.receive_status_on_client.status = (
&self._c_code)
self.c_op.data.receive_status_on_client.status_details = (
&self._c_details)
cdef void un_c(self):
self._trailing_metadata = _metadata(&self._c_trailing_metadata)
grpc_metadata_array_destroy(&self._c_trailing_metadata)
self._code = self._c_code
self._details = _decode(_slice_bytes(self._c_details))
grpc_slice_unref(self._c_details)
def trailing_metadata(self):
return self._trailing_metadata
def code(self):
return self._code
def details(self):
return self._details
cdef class ReceiveCloseOnServerOperation(Operation):
def __cinit__(self, flags):
self._flags = flags
def type(self):
return GRPC_OP_RECV_CLOSE_ON_SERVER
cdef void c(self):
self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
self.c_op.flags = self._flags
self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
cdef void un_c(self):
self._cancelled = bool(self._c_cancelled)
def cancelled(self):
return self._cancelled

@ -65,11 +65,6 @@ cdef class Event:
cdef readonly object batch_operations
cdef class ByteBuffer:
cdef grpc_byte_buffer *c_byte_buffer
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair
@ -89,22 +84,6 @@ cdef class ChannelArgs:
cdef list args
cdef class Operation:
cdef grpc_op c_op
cdef bint _c_metadata_needs_release
cdef size_t _c_metadata_count
cdef grpc_metadata *_c_metadata
cdef ByteBuffer _received_message
cdef bint _c_metadata_array_needs_destruction
cdef grpc_metadata_array _c_metadata_array
cdef grpc_status_code _received_status_code
cdef grpc_slice _status_details
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references
cdef class CompressionOptions:
cdef grpc_compression_options c_options

@ -229,13 +229,15 @@ cdef class OperationTag:
self.c_nops = 0 if self._operations is None else len(self._operations)
if 0 < self.c_nops:
self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
for index in range(self.c_nops):
self.c_ops[index] = (<Operation>(self._operations[index])).c_op
for index, operation in enumerate(self._operations):
(<Operation>operation).c()
self.c_ops[index] = (<Operation>operation).c_op
cdef object release_ops(self):
if 0 < self.c_nops:
for index, operation in enumerate(self._operations):
(<Operation>operation).c_op = self.c_ops[index]
(<Operation>operation).un_c()
gpr_free(self.c_ops)
return self._operations
else:
@ -260,69 +262,6 @@ cdef class Event:
self.is_new_request = is_new_request
cdef class ByteBuffer:
def __cinit__(self, bytes data):
grpc_init()
if data is None:
self.c_byte_buffer = NULL
return
cdef char *c_data = data
cdef grpc_slice data_slice
cdef size_t data_length = len(data)
with nogil:
data_slice = grpc_slice_from_copied_buffer(c_data, data_length)
with nogil:
self.c_byte_buffer = grpc_raw_byte_buffer_create(
&data_slice, 1)
with nogil:
grpc_slice_unref(data_slice)
def bytes(self):
cdef grpc_byte_buffer_reader reader
cdef grpc_slice data_slice
cdef size_t data_slice_length
cdef void *data_slice_pointer
cdef bint reader_status
if self.c_byte_buffer != NULL:
with nogil:
reader_status = grpc_byte_buffer_reader_init(
&reader, self.c_byte_buffer)
if not reader_status:
return None
result = bytearray()
with nogil:
while grpc_byte_buffer_reader_next(&reader, &data_slice):
data_slice_pointer = grpc_slice_start_ptr(data_slice)
data_slice_length = grpc_slice_length(data_slice)
with gil:
result += (<char *>data_slice_pointer)[:data_slice_length]
grpc_slice_unref(data_slice)
with nogil:
grpc_byte_buffer_reader_destroy(&reader)
return bytes(result)
else:
return None
def __len__(self):
cdef size_t result
if self.c_byte_buffer != NULL:
with nogil:
result = grpc_byte_buffer_length(self.c_byte_buffer)
return result
else:
return 0
def __str__(self):
return self.bytes()
def __dealloc__(self):
if self.c_byte_buffer != NULL:
grpc_byte_buffer_destroy(self.c_byte_buffer)
grpc_shutdown()
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):
@ -365,7 +304,7 @@ cdef class ChannelArg:
elif hasattr(value, '__int__'):
# Pointer objects must override __int__() to return
# the underlying C address (Python ints are word size). The
# lifecycle of the pointer is fixed to the lifecycle of the
# lifecycle of the pointer is fixed to the lifecycle of the
# python object wrapping it.
self.ptr_vtable.copy = &copy_ptr
self.ptr_vtable.destroy = &destroy_ptr
@ -407,185 +346,6 @@ cdef class ChannelArgs:
return self.args[i]
cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
self._c_metadata_needs_release = False
self._c_metadata_array_needs_destruction = False
self._status_details = grpc_empty_slice()
self.is_valid = False
@property
def type(self):
return self.c_op.type
@property
def flags(self):
return self.c_op.flags
@property
def has_status(self):
return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT
@property
def received_message(self):
if self.c_op.type != GRPC_OP_RECV_MESSAGE:
raise TypeError("self must be an operation receiving a message")
return self._received_message
@property
def received_message_or_none(self):
if self.c_op.type != GRPC_OP_RECV_MESSAGE:
return None
return self._received_message
@property
def received_metadata(self):
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
return _metadata(&self._c_metadata_array)
@property
def received_status_code(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving a status code")
return self._received_status_code
@property
def received_status_code_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
return self._received_status_code
@property
def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
return _slice_bytes(self._status_details)
@property
def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
return _slice_bytes(self._status_details)
@property
def received_cancelled(self):
if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
raise TypeError("self must be an operation receiving cancellation "
"information")
return False if self._received_cancelled == 0 else True
@property
def received_cancelled_or_none(self):
if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
return None
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
if self._c_metadata_needs_release:
_release_c_metadata(self._c_metadata, self._c_metadata_count)
if self._c_metadata_array_needs_destruction:
grpc_metadata_array_destroy(&self._c_metadata_array)
grpc_slice_unref(self._status_details)
grpc_shutdown()
def operation_send_initial_metadata(metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags
_store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
op._c_metadata_needs_release = True
op.c_op.data.send_initial_metadata.count = op._c_metadata_count
op.c_op.data.send_initial_metadata.metadata = op._c_metadata
op.is_valid = True
return op
def operation_send_message(data, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_MESSAGE
op.c_op.flags = flags
byte_buffer = ByteBuffer(data)
op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer
op.references.append(byte_buffer)
op.is_valid = True
return op
def operation_send_close_from_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
op.c_op.flags = flags
op.is_valid = True
return op
def operation_send_status_from_server(
metadata, grpc_status_code code, bytes details, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags
_store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
op._c_metadata_needs_release = True
op.c_op.data.send_status_from_server.trailing_metadata_count = (
op._c_metadata_count)
op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &op._status_details
op.is_valid = True
return op
def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
&op._c_metadata_array)
op._c_metadata_array_needs_destruction = True
op.is_valid = True
return op
def operation_receive_message(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_MESSAGE
op.c_op.flags = flags
op._received_message = ByteBuffer(None)
# n.b. the c_op.data.receive_message field needs to be deleted by us,
# anyway, so we just let that be handled by the ByteBuffer() we allocated
# the line before.
op.c_op.data.receive_message.receive_message = (
&op._received_message.c_byte_buffer)
op.is_valid = True
return op
def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags
grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._c_metadata_array)
op._c_metadata_array_needs_destruction = True
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
&op._status_details)
op.is_valid = True
return op
def operation_receive_close_on_server(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
op.c_op.flags = flags
op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
op.is_valid = True
return op
cdef class CompressionOptions:
def __cinit__(self):

@ -19,6 +19,7 @@ include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"

@ -26,6 +26,7 @@ include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"

@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0
def _serialized_request(request_event):
return request_event.batch_operations[0].received_message.bytes()
return request_event.batch_operations[0].message()
def _application_code(code):
@ -130,13 +130,13 @@ def _abort(state, call, code, details):
effective_code = _abortion_code(state, code)
effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata(
(), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
operations = (cygrpc.SendInitialMetadataOperation(
None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else:
operations = (cygrpc.operation_send_status_from_server(
operations = (cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_STATUS_FROM_SERVER_TOKEN
@ -150,8 +150,7 @@ def _receive_close_on_server(state):
def receive_close_on_server(receive_close_on_server_event):
with state.condition:
if receive_close_on_server_event.batch_operations[
0].received_cancelled:
if receive_close_on_server_event.batch_operations[0].cancelled():
state.client = _CANCELLED
elif state.client is _OPEN:
state.client = _CLOSED
@ -262,7 +261,7 @@ class _Context(grpc.ServicerContext):
_raise_rpc_error(self._state)
else:
if self._state.initial_metadata_allowed:
operation = cygrpc.operation_send_initial_metadata(
operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
self._rpc_event.operation_call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
@ -305,7 +304,7 @@ class _RequestIterator(object):
raise StopIteration()
else:
self._call.start_server_batch(
(cygrpc.operation_receive_message(_EMPTY_FLAGS),),
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(self._state, self._call,
self._request_deserializer))
self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
@ -348,7 +347,7 @@ def _unary_request(rpc_event, state, request_deserializer):
return None
else:
rpc_event.operation_call.start_server_batch(
(cygrpc.operation_receive_message(_EMPTY_FLAGS),),
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(state, rpc_event.operation_call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
@ -424,14 +423,15 @@ def _send_response(rpc_event, state, serialized_response):
return False
else:
if state.initial_metadata_allowed:
operations = (cygrpc.operation_send_initial_metadata(
(), _EMPTY_FLAGS), cygrpc.operation_send_message(
serialized_response, _EMPTY_FLAGS),)
operations = (cygrpc.SendInitialMetadataOperation(None,
_EMPTY_FLAGS),
cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS),)
state.initial_metadata_allowed = False
token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
else:
operations = (cygrpc.operation_send_message(serialized_response,
_EMPTY_FLAGS),)
operations = (cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
rpc_event.operation_call.start_server_batch(
operations, _send_message(state, token))
@ -448,16 +448,16 @@ def _status(rpc_event, state, serialized_response):
code = _completion_code(state)
details = _details(state)
operations = [
cygrpc.operation_send_status_from_server(
cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, code, details, _EMPTY_FLAGS),
]
if state.initial_metadata_allowed:
operations.append(
cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS))
cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS))
if serialized_response is not None:
operations.append(
cygrpc.operation_send_message(serialized_response,
_EMPTY_FLAGS))
cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS))
rpc_event.operation_call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
@ -563,10 +563,10 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
def _reject_rpc(rpc_event, status, details):
operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server((), status, details,
_EMPTY_FLAGS),)
operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS),
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(None, status, details,
_EMPTY_FLAGS),)
rpc_state = _RPCState()
rpc_event.operation_call.start_server_batch(
operations, lambda ignored_event: (rpc_state, (),))
@ -577,7 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
rpc_event.operation_call.start_server_batch(
(cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
if method_handler.request_streaming:

@ -34,6 +34,7 @@
"unit._cython._no_messages_server_completion_queue_per_call_test.Test",
"unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"unit._cython._server_test.Test",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",
"unit._cython.cygrpc_test.TypeSmokeTest",

@ -65,10 +65,10 @@ class _Handler(object):
with self._lock:
self._call.start_server_batch(
(cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_RECEIVE_CLOSE_ON_SERVER_TAG)
self._call.start_server_batch(
(cygrpc.operation_receive_message(_EMPTY_FLAGS),),
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_RECEIVE_MESSAGE_TAG)
first_event = self._completion_queue.poll()
if _is_cancellation_event(first_event):
@ -76,10 +76,10 @@ class _Handler(object):
else:
with self._lock:
operations = (
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
_EMPTY_FLAGS),)
self._call.start_server_batch(operations,
@ -169,13 +169,13 @@ class CancelManyCallsTest(unittest.TestCase):
None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies',
None, _INFINITE_FUTURE)
operations = (
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
tag = 'client_complete_call_{0:04d}_tag'.format(index)
client_call.start_client_batch(operations, tag)
client_due.add(tag)

@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
cygrpc.operation_receive_initial_metadata(
_common.EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
self.assertEqual(cygrpc.CallError.ok,
client_receive_initial_metadata_start_batch_result)
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
cygrpc.operation_send_initial_metadata(
cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(
_common.EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(
_common.EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
self.assertEqual(cygrpc.CallError.ok,
client_complete_rpc_start_batch_result)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
client_complete_rpc_tag,
@ -72,7 +73,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_send_initial_metadata(
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_call_driver.add_due({
@ -84,9 +85,8 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(
_common.EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))

@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
cygrpc.operation_receive_initial_metadata(
_common.EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
cygrpc.operation_send_initial_metadata(
cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(
_common.EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(
_common.EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
@ -67,7 +64,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_send_initial_metadata(
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
self.server_driver.add_due({
@ -79,11 +76,10 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(
_common.EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
self.server_driver.add_due({
server_complete_rpc_tag,

@ -158,15 +158,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_due.add(client_receive_initial_metadata_tag)
client_complete_rpc_start_batch_result = (
client_call.start_client_batch([
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_complete_rpc_tag))
client_due.add(client_complete_rpc_tag)
@ -175,12 +175,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
_EMPTY_FLAGS),
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
server_send_initial_metadata_tag)
@ -189,12 +189,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_second_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
_EMPTY_FLAGS),
], server_complete_rpc_tag))
@ -208,7 +208,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
client_receive_first_message_tag = 'client_receive_first_message_tag'
client_receive_first_message_start_batch_result = (
client_call.start_client_batch([
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], client_receive_first_message_tag))
client_due.add(client_receive_first_message_tag)
client_receive_first_message_event = client_driver.event_with_tag(

@ -0,0 +1,49 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test servers at the level of the Cython API."""
import threading
import time
import unittest
from grpc._cython import cygrpc
class Test(unittest.TestCase):
def test_lonely_server(self):
server_call_completion_queue = cygrpc.CompletionQueue()
server_shutdown_completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(cygrpc.ChannelArgs([]))
server.register_completion_queue(server_call_completion_queue)
server.register_completion_queue(server_shutdown_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
server_request_call_tag = 'server_request_call_tag'
server_request_call_start_batch_result = server.request_call(
server_call_completion_queue, server_call_completion_queue,
server_request_call_tag)
time.sleep(4)
server_shutdown_tag = 'server_shutdown_tag'
server_shutdown_result = server.shutdown(
server_shutdown_completion_queue, server_shutdown_tag)
server_request_call_event = server_call_completion_queue.poll()
server_shutdown_event = server_shutdown_completion_queue.poll()
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -35,11 +35,6 @@ def _metadata_plugin(context, callback):
class TypeSmokeTest(unittest.TestCase):
def testOperationFlags(self):
operation = cygrpc.operation_send_message(b'asdf',
cygrpc.WriteFlag.no_compress)
self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
def testTimespec(self):
now = time.time()
now_timespec_a = cygrpc.Timespec(now)
@ -170,7 +165,7 @@ class ServerClientMixin(object):
SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok
SERVER_STATUS_DETAILS = b'our work is never over'
SERVER_STATUS_DETAILS = 'our work is never over'
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
@ -192,13 +187,13 @@ class ServerClientMixin(object):
(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,),
(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),)
client_start_batch_result = client_call.start_client_batch([
cygrpc.operation_send_initial_metadata(client_initial_metadata,
_EMPTY_FLAGS),
cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
cygrpc.SendInitialMetadataOperation(client_initial_metadata,
_EMPTY_FLAGS),
cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@ -227,12 +222,12 @@ class ServerClientMixin(object):
server_trailing_metadata = (
(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),)
server_start_batch_result = server_call.start_server_batch([
cygrpc.operation_send_initial_metadata(
cygrpc.SendInitialMetadataOperation(
server_initial_metadata,
_EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
_EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
server_trailing_metadata, SERVER_STATUS_CODE,
SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
@ -245,25 +240,24 @@ class ServerClientMixin(object):
found_client_op_types = set()
for client_result in client_event.batch_operations:
# we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == cygrpc.OperationType.receive_initial_metadata:
self.assertNotIn(client_result.type(), found_client_op_types)
found_client_op_types.add(client_result.type())
if client_result.type(
) == cygrpc.OperationType.receive_initial_metadata:
self.assertTrue(
test_common.metadata_transmitted(
server_initial_metadata,
client_result.received_metadata))
elif client_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(RESPONSE,
client_result.received_message.bytes())
elif client_result.type == cygrpc.OperationType.receive_status_on_client:
client_result.initial_metadata()))
elif client_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(RESPONSE, client_result.message())
elif client_result.type(
) == cygrpc.OperationType.receive_status_on_client:
self.assertTrue(
test_common.metadata_transmitted(
server_trailing_metadata,
client_result.received_metadata))
self.assertEqual(SERVER_STATUS_DETAILS,
client_result.received_status_details)
self.assertEqual(SERVER_STATUS_CODE,
client_result.received_status_code)
client_result.trailing_metadata()))
self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
self.assertEqual(SERVER_STATUS_CODE, client_result.code())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@ -277,13 +271,13 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST,
server_result.received_message.bytes())
elif server_result.type == cygrpc.OperationType.receive_close_on_server:
self.assertFalse(server_result.received_cancelled)
self.assertNotIn(client_result.type(), found_server_op_types)
found_server_op_types.add(server_result.type())
if server_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.message())
elif server_result.type(
) == cygrpc.OperationType.receive_close_on_server:
self.assertFalse(server_result.cancelled())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@ -319,9 +313,8 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
client_event_future = perform_client_operations([
cygrpc.operation_send_initial_metadata(empty_metadata,
_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
@ -333,8 +326,7 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
server_event_future = perform_server_operations([
cygrpc.operation_send_initial_metadata(empty_metadata,
_EMPTY_FLAGS),
cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
], "Server prologue")
client_event_future.result() # force completion
@ -343,12 +335,12 @@ class ServerClientMixin(object):
# Messaging
for _ in range(10):
client_event_future = perform_client_operations([
cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Client message")
server_event_future = perform_server_operations([
cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Server receive")
client_event_future.result() # force completion
@ -356,13 +348,13 @@ class ServerClientMixin(object):
# Epilogue
client_event_future = perform_client_operations([
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
], "Client epilogue")
server_event_future = perform_server_operations([
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")

Loading…
Cancel
Save