From 81edf5ff9af2d90813773acb9c2793e1a4cd1057 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 21 Dec 2017 06:25:16 +0000 Subject: [PATCH] Reform cygrpc.Operation It is now a family of classes conforming to an interface rather than a single class no single instance of which makes use of all behavior scoped to the class. It also now only uses gRPC Core memory for the time of a single batch rather than for the entire lifetime of the instance. --- src/python/grpcio/grpc/_channel.py | 66 +++-- .../grpcio/grpc/_cython/_cygrpc/grpc.pxi | 26 +- .../grpc/_cython/_cygrpc/operation.pxd.pxi | 109 ++++++++ .../grpc/_cython/_cygrpc/operation.pyx.pxi | 238 +++++++++++++++++ .../grpc/_cython/_cygrpc/records.pxd.pxi | 21 -- .../grpc/_cython/_cygrpc/records.pyx.pxi | 250 +----------------- src/python/grpcio/grpc/_cython/cygrpc.pxd | 1 + src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + src/python/grpcio/grpc/_server.py | 46 ++-- src/python/grpcio_tests/tests/tests.json | 1 + .../unit/_cython/_cancel_many_calls_test.py | 26 +- ...s_server_completion_queue_per_call_test.py | 22 +- ...ges_single_server_completion_queue_test.py | 20 +- .../_read_some_but_not_all_responses_test.py | 24 +- .../tests/unit/_cython/_server_test.py | 49 ++++ .../tests/unit/_cython/cygrpc_test.py | 94 +++---- 16 files changed, 571 insertions(+), 423 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi create mode 100644 src/python/grpcio_tests/tests/unit/_cython/_server_test.py diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d7456a3dd16..3572737c874 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -129,12 +129,12 @@ def _abort(state, code, details): def _handle_event(event, state, response_deserializer): callbacks = [] for batch_operation in event.batch_operations: - operation_type = batch_operation.type + operation_type = batch_operation.type() state.due.remove(operation_type) if operation_type == cygrpc.OperationType.receive_initial_metadata: - state.initial_metadata = batch_operation.received_metadata + state.initial_metadata = batch_operation.initial_metadata() elif operation_type == cygrpc.OperationType.receive_message: - serialized_response = batch_operation.received_message.bytes() + serialized_response = batch_operation.message() if serialized_response is not None: response = _common.deserialize(serialized_response, response_deserializer) @@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer): else: state.response = response elif operation_type == cygrpc.OperationType.receive_status_on_client: - state.trailing_metadata = batch_operation.received_metadata + state.trailing_metadata = batch_operation.trailing_metadata() if state.code is None: code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( - batch_operation.received_status_code) + batch_operation.code()) if code is None: state.code = grpc.StatusCode.UNKNOWN state.details = _unknown_code_details( - batch_operation.received_status_code, - batch_operation.received_status_details) + code, batch_operation.details()) else: state.code = code - state.details = batch_operation.received_status_details + state.details = batch_operation.details() callbacks.extend(state.callbacks) state.callbacks = None return callbacks @@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call, _abort(state, grpc.StatusCode.INTERNAL, details) return else: - operations = (cygrpc.operation_send_message( + operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_message) @@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call, with state.condition: if state.code is None: operations = ( - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) @@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): event_handler = _event_handler(self._state, self._call, self._response_deserializer) self._call.start_client_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) return state, operations, deadline, deadline_timespec, None def _blocking(self, request, timeout, metadata, credentials): @@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( + cygrpc.SendInitialMetadataOperation( + metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): call.set_credentials(credentials._credentials) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), - None) + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _consume_request_iterator(request_iterator, state, call, @@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 660263fc09d..7f919650073 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -17,6 +17,7 @@ cimport libc.time # Typedef types with approximately the same semantics to provide their names to # Cython +ctypedef unsigned char uint8_t ctypedef int int32_t ctypedef unsigned uint32_t ctypedef long int64_t @@ -25,6 +26,7 @@ ctypedef long int64_t cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil + void *gpr_zalloc(size_t size) nogil void gpr_free(void *ptr) nogil void *gpr_realloc(void *p, size_t size) nogil @@ -190,6 +192,18 @@ cdef extern from "grpc/grpc.h": size_t arguments_length "num_args" grpc_arg *arguments "args" + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + + ctypedef enum grpc_stream_compression_level: + GRPC_STREAM_COMPRESS_LEVEL_NONE + GRPC_STREAM_COMPRESS_LEVEL_LOW + GRPC_STREAM_COMPRESS_LEVEL_MED + GRPC_STREAM_COMPRESS_LEVEL_HIGH + ctypedef enum grpc_call_error: GRPC_CALL_OK GRPC_CALL_ERROR @@ -265,9 +279,19 @@ cdef extern from "grpc/grpc.h": GRPC_OP_RECV_STATUS_ON_CLIENT GRPC_OP_RECV_CLOSE_ON_SERVER + ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level: + uint8_t is_set + grpc_compression_level level + + ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level: + uint8_t is_set + grpc_stream_compression_level level + ctypedef struct grpc_op_data_send_initial_metadata: size_t count grpc_metadata *metadata + grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level + grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level ctypedef struct grpc_op_data_send_status_from_server: size_t trailing_metadata_count @@ -524,7 +548,7 @@ cdef extern from "grpc/grpc_security.h": grpc_auth_property_iterator grpc_auth_context_property_iterator( const grpc_auth_context *ctx) - + grpc_auth_property_iterator grpc_auth_context_peer_identity( const grpc_auth_context *ctx) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi new file mode 100644 index 00000000000..bfbe27785b7 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -0,0 +1,109 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) + cdef void un_c(self) + + # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this! + cdef grpc_op c_op + + +cdef class SendInitialMetadataOperation(Operation): + + cdef readonly object _initial_metadata; + cdef readonly int _flags + cdef grpc_metadata *_c_initial_metadata + cdef size_t _c_initial_metadata_count + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendMessageOperation(Operation): + + cdef readonly bytes _message + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendCloseFromClientOperation(Operation): + + cdef readonly int _flags + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendStatusFromServerOperation(Operation): + + cdef readonly object _trailing_metadata + cdef readonly object _code + cdef readonly object _details + cdef readonly int _flags + cdef grpc_metadata *_c_trailing_metadata + cdef size_t _c_trailing_metadata_count + cdef grpc_slice _c_details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + cdef readonly int _flags + cdef tuple _initial_metadata + cdef grpc_metadata_array _c_initial_metadata + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveMessageOperation(Operation): + + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + cdef bytes _message + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveStatusOnClientOperation(Operation): + + cdef readonly int _flags + cdef grpc_metadata_array _c_trailing_metadata + cdef grpc_status_code _c_code + cdef grpc_slice _c_details + cdef tuple _trailing_metadata + cdef object _code + cdef str _details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveCloseOnServerOperation(Operation): + + cdef readonly int _flags + cdef object _cancelled + cdef int _c_cancelled + + cdef void c(self) + cdef void un_c(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi new file mode 100644 index 00000000000..3c91abf7220 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -0,0 +1,238 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self): + raise NotImplementedError() + + cdef void un_c(self): + raise NotImplementedError() + + +cdef class SendInitialMetadataOperation(Operation): + + def __cinit__(self, initial_metadata, flags): + self._initial_metadata = initial_metadata + self._flags = flags + + def type(self): + return GRPC_OP_SEND_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + self.c_op.flags = self._flags + _store_c_metadata( + self._initial_metadata, &self._c_initial_metadata, + &self._c_initial_metadata_count) + self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata + self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count + self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0 + self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0 + + cdef void un_c(self): + _release_c_metadata( + self._c_initial_metadata, self._c_initial_metadata_count) + + +cdef class SendMessageOperation(Operation): + + def __cinit__(self, bytes message, int flags): + self._message = message + self._flags = flags + + def type(self): + return GRPC_OP_SEND_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_MESSAGE + self.c_op.flags = self._flags + cdef grpc_slice message_slice = grpc_slice_from_copied_buffer( + self._message, len(self._message)) + self._c_message_byte_buffer = grpc_raw_byte_buffer_create( + &message_slice, 1) + grpc_slice_unref(message_slice) + self.c_op.data.send_message.send_message = self._c_message_byte_buffer + + cdef void un_c(self): + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + + +cdef class SendCloseFromClientOperation(Operation): + + def __cinit__(self, int flags): + self._flags = flags + + def type(self): + return GRPC_OP_SEND_CLOSE_FROM_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + self.c_op.flags = self._flags + + cdef void un_c(self): + pass + + +cdef class SendStatusFromServerOperation(Operation): + + def __cinit__(self, trailing_metadata, code, object details, int flags): + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + self._flags = flags + + def type(self): + return GRPC_OP_SEND_STATUS_FROM_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + self.c_op.flags = self._flags + _store_c_metadata( + self._trailing_metadata, &self._c_trailing_metadata, + &self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.trailing_metadata = ( + self._c_trailing_metadata) + self.c_op.data.send_status_from_server.trailing_metadata_count = ( + self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.status = self._code + self._c_details = _slice_from_bytes(_encode(self._details)) + self.c_op.data.send_status_from_server.status_details = &self._c_details + + cdef void un_c(self): + grpc_slice_unref(self._c_details) + _release_c_metadata( + self._c_trailing_metadata, self._c_trailing_metadata_count) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_initial_metadata) + self.c_op.data.receive_initial_metadata.receive_initial_metadata = ( + &self._c_initial_metadata) + + cdef void un_c(self): + self._initial_metadata = _metadata(&self._c_initial_metadata) + grpc_metadata_array_destroy(&self._c_initial_metadata) + + def initial_metadata(self): + return self._initial_metadata + + +cdef class ReceiveMessageOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_MESSAGE + self.c_op.flags = self._flags + self.c_op.data.receive_message.receive_message = ( + &self._c_message_byte_buffer) + + cdef void un_c(self): + cdef grpc_byte_buffer_reader message_reader + cdef bint message_reader_status + cdef grpc_slice message_slice + cdef size_t message_slice_length + cdef void *message_slice_pointer + if self._c_message_byte_buffer != NULL: + message_reader_status = grpc_byte_buffer_reader_init( + &message_reader, self._c_message_byte_buffer) + if message_reader_status: + message = bytearray() + while grpc_byte_buffer_reader_next(&message_reader, &message_slice): + message_slice_pointer = grpc_slice_start_ptr(message_slice) + message_slice_length = grpc_slice_length(message_slice) + message += (message_slice_pointer)[:message_slice_length] + grpc_slice_unref(message_slice) + grpc_byte_buffer_reader_destroy(&message_reader) + self._message = bytes(message) + else: + self._message = None + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + else: + self._message = None + + def message(self): + return self._message + + +cdef class ReceiveStatusOnClientOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_STATUS_ON_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.trailing_metadata = ( + &self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.status = ( + &self._c_code) + self.c_op.data.receive_status_on_client.status_details = ( + &self._c_details) + + cdef void un_c(self): + self._trailing_metadata = _metadata(&self._c_trailing_metadata) + grpc_metadata_array_destroy(&self._c_trailing_metadata) + self._code = self._c_code + self._details = _decode(_slice_bytes(self._c_details)) + grpc_slice_unref(self._c_details) + + def trailing_metadata(self): + return self._trailing_metadata + + def code(self): + return self._code + + def details(self): + return self._details + + +cdef class ReceiveCloseOnServerOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_CLOSE_ON_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + self.c_op.flags = self._flags + self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled + + cdef void un_c(self): + self._cancelled = bool(self._c_cancelled) + + def cancelled(self): + return self._cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 594fdb1a8b2..537cf2b537f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -65,11 +65,6 @@ cdef class Event: cdef readonly object batch_operations -cdef class ByteBuffer: - - cdef grpc_byte_buffer *c_byte_buffer - - cdef class SslPemKeyCertPair: cdef grpc_ssl_pem_key_cert_pair c_pair @@ -89,22 +84,6 @@ cdef class ChannelArgs: cdef list args -cdef class Operation: - - cdef grpc_op c_op - cdef bint _c_metadata_needs_release - cdef size_t _c_metadata_count - cdef grpc_metadata *_c_metadata - cdef ByteBuffer _received_message - cdef bint _c_metadata_array_needs_destruction - cdef grpc_metadata_array _c_metadata_array - cdef grpc_status_code _received_status_code - cdef grpc_slice _status_details - cdef int _received_cancelled - cdef readonly bint is_valid - cdef object references - - cdef class CompressionOptions: cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 5877591f786..99f8ffa3e70 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -229,13 +229,15 @@ cdef class OperationTag: self.c_nops = 0 if self._operations is None else len(self._operations) if 0 < self.c_nops: self.c_ops = gpr_malloc(sizeof(grpc_op) * self.c_nops) - for index in range(self.c_nops): - self.c_ops[index] = ((self._operations[index])).c_op + for index, operation in enumerate(self._operations): + (operation).c() + self.c_ops[index] = (operation).c_op cdef object release_ops(self): if 0 < self.c_nops: for index, operation in enumerate(self._operations): (operation).c_op = self.c_ops[index] + (operation).un_c() gpr_free(self.c_ops) return self._operations else: @@ -260,69 +262,6 @@ cdef class Event: self.is_new_request = is_new_request -cdef class ByteBuffer: - - def __cinit__(self, bytes data): - grpc_init() - if data is None: - self.c_byte_buffer = NULL - return - - cdef char *c_data = data - cdef grpc_slice data_slice - cdef size_t data_length = len(data) - with nogil: - data_slice = grpc_slice_from_copied_buffer(c_data, data_length) - with nogil: - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - with nogil: - grpc_slice_unref(data_slice) - - def bytes(self): - cdef grpc_byte_buffer_reader reader - cdef grpc_slice data_slice - cdef size_t data_slice_length - cdef void *data_slice_pointer - cdef bint reader_status - if self.c_byte_buffer != NULL: - with nogil: - reader_status = grpc_byte_buffer_reader_init( - &reader, self.c_byte_buffer) - if not reader_status: - return None - result = bytearray() - with nogil: - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = grpc_slice_start_ptr(data_slice) - data_slice_length = grpc_slice_length(data_slice) - with gil: - result += (data_slice_pointer)[:data_slice_length] - grpc_slice_unref(data_slice) - with nogil: - grpc_byte_buffer_reader_destroy(&reader) - return bytes(result) - else: - return None - - def __len__(self): - cdef size_t result - if self.c_byte_buffer != NULL: - with nogil: - result = grpc_byte_buffer_length(self.c_byte_buffer) - return result - else: - return 0 - - def __str__(self): - return self.bytes() - - def __dealloc__(self): - if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) - grpc_shutdown() - - cdef class SslPemKeyCertPair: def __cinit__(self, bytes private_key, bytes certificate_chain): @@ -365,7 +304,7 @@ cdef class ChannelArg: elif hasattr(value, '__int__'): # Pointer objects must override __int__() to return # the underlying C address (Python ints are word size). The - # lifecycle of the pointer is fixed to the lifecycle of the + # lifecycle of the pointer is fixed to the lifecycle of the # python object wrapping it. self.ptr_vtable.copy = ©_ptr self.ptr_vtable.destroy = &destroy_ptr @@ -407,185 +346,6 @@ cdef class ChannelArgs: return self.args[i] -cdef class Operation: - - def __cinit__(self): - grpc_init() - self.references = [] - self._c_metadata_needs_release = False - self._c_metadata_array_needs_destruction = False - self._status_details = grpc_empty_slice() - self.is_valid = False - - @property - def type(self): - return self.c_op.type - - @property - def flags(self): - return self.c_op.flags - - @property - def has_status(self): - return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT - - @property - def received_message(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - raise TypeError("self must be an operation receiving a message") - return self._received_message - - @property - def received_message_or_none(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - return None - return self._received_message - - @property - def received_metadata(self): - if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and - self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): - raise TypeError("self must be an operation receiving metadata") - return _metadata(&self._c_metadata_array) - - @property - def received_status_code(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving a status code") - return self._received_status_code - - @property - def received_status_code_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return self._received_status_code - - @property - def received_status_details(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving status details") - return _slice_bytes(self._status_details) - - @property - def received_status_details_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return _slice_bytes(self._status_details) - - @property - def received_cancelled(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - raise TypeError("self must be an operation receiving cancellation " - "information") - return False if self._received_cancelled == 0 else True - - @property - def received_cancelled_or_none(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - return None - return False if self._received_cancelled == 0 else True - - def __dealloc__(self): - if self._c_metadata_needs_release: - _release_c_metadata(self._c_metadata, self._c_metadata_count) - if self._c_metadata_array_needs_destruction: - grpc_metadata_array_destroy(&self._c_metadata_array) - grpc_slice_unref(self._status_details) - grpc_shutdown() - -def operation_send_initial_metadata(metadata, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_initial_metadata.count = op._c_metadata_count - op.c_op.data.send_initial_metadata.metadata = op._c_metadata - op.is_valid = True - return op - -def operation_send_message(data, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_MESSAGE - op.c_op.flags = flags - byte_buffer = ByteBuffer(data) - op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer - op.references.append(byte_buffer) - op.is_valid = True - return op - -def operation_send_close_from_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT - op.c_op.flags = flags - op.is_valid = True - return op - -def operation_send_status_from_server( - metadata, grpc_status_code code, bytes details, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_status_from_server.trailing_metadata_count = ( - op._c_metadata_count) - op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata - op.c_op.data.send_status_from_server.status = code - grpc_slice_unref(op._status_details) - op._status_details = _slice_from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &op._status_details - op.is_valid = True - return op - -def operation_receive_initial_metadata(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.is_valid = True - return op - -def operation_receive_message(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_MESSAGE - op.c_op.flags = flags - op._received_message = ByteBuffer(None) - # n.b. the c_op.data.receive_message field needs to be deleted by us, - # anyway, so we just let that be handled by the ByteBuffer() we allocated - # the line before. - op.c_op.data.receive_message.receive_message = ( - &op._received_message.c_byte_buffer) - op.is_valid = True - return op - -def operation_receive_status_on_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.c_op.data.receive_status_on_client.status = ( - &op._received_status_code) - op.c_op.data.receive_status_on_client.status_details = ( - &op._status_details) - op.is_valid = True - return op - -def operation_receive_close_on_server(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER - op.c_op.flags = flags - op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled - op.is_valid = True - return op - - cdef class CompressionOptions: def __cinit__(self): diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 6fc5638d5d8..ad229de0ae6 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -19,6 +19,7 @@ include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" include "_cygrpc/metadata.pxd.pxi" +include "_cygrpc/operation.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index d6052298221..0964fb66ab7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -26,6 +26,7 @@ include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" include "_cygrpc/metadata.pyx.pxi" +include "_cygrpc/operation.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 02d3af8706d..eec31bdcf6b 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0 def _serialized_request(request_event): - return request_event.batch_operations[0].received_message.bytes() + return request_event.batch_operations[0].message() def _application_code(code): @@ -130,13 +130,13 @@ def _abort(state, call, code, details): effective_code = _abortion_code(state, code) effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendInitialMetadataOperation( + None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: - operations = (cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN @@ -150,8 +150,7 @@ def _receive_close_on_server(state): def receive_close_on_server(receive_close_on_server_event): with state.condition: - if receive_close_on_server_event.batch_operations[ - 0].received_cancelled: + if receive_close_on_server_event.batch_operations[0].cancelled(): state.client = _CANCELLED elif state.client is _OPEN: state.client = _CLOSED @@ -262,7 +261,7 @@ class _Context(grpc.ServicerContext): _raise_rpc_error(self._state) else: if self._state.initial_metadata_allowed: - operation = cygrpc.operation_send_initial_metadata( + operation = cygrpc.SendInitialMetadataOperation( initial_metadata, _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( (operation,), _send_initial_metadata(self._state)) @@ -305,7 +304,7 @@ class _RequestIterator(object): raise StopIteration() else: self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(self._state, self._call, self._request_deserializer)) self._state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -348,7 +347,7 @@ def _unary_request(rpc_event, state, request_deserializer): return None else: rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(state, rpc_event.operation_call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -424,14 +423,15 @@ def _send_response(rpc_event, state, serialized_response): return False else: if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_message( - serialized_response, _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: - operations = (cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + operations = (cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN rpc_event.operation_call.start_server_batch( operations, _send_message(state, token)) @@ -448,16 +448,16 @@ def _status(rpc_event, state, serialized_response): code = _completion_code(state) details = _details(state) operations = [ - cygrpc.operation_send_status_from_server( + cygrpc.SendStatusFromServerOperation( state.trailing_metadata, code, details, _EMPTY_FLAGS), ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS)) if serialized_response is not None: operations.append( - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS)) + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS)) rpc_event.operation_call.start_server_batch( operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) @@ -563,10 +563,10 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server((), status, details, - _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation(None, status, details, + _EMPTY_FLAGS),) rpc_state = _RPCState() rpc_event.operation_call.start_server_batch( operations, lambda ignored_event: (rpc_state, (),)) @@ -577,7 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) if method_handler.request_streaming: diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 3bf53087495..e033c1063f4 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -34,6 +34,7 @@ "unit._cython._no_messages_server_completion_queue_per_call_test.Test", "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", + "unit._cython._server_test.Test", "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index a8a7175cc71..458b4fe542a 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -65,10 +65,10 @@ class _Handler(object): with self._lock: self._call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _RECEIVE_CLOSE_ON_SERVER_TAG) self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _RECEIVE_MESSAGE_TAG) first_event = self._completion_queue.poll() if _is_cancellation_event(first_event): @@ -76,10 +76,10 @@ class _Handler(object): else: with self._lock: operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', _EMPTY_FLAGS),) self._call.start_server_batch(operations, @@ -169,13 +169,13 @@ class CancelManyCallsTest(unittest.TestCase): None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, _INFINITE_FUTURE) operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) tag = 'client_complete_call_{0:04d}_tag'.format(index) client_call.start_client_batch(operations, tag) client_due.add(tag) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index d08003af444..41291cc88f4 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) + self.assertEqual(cygrpc.CallError.ok, + client_receive_initial_metadata_start_batch_result) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) + self.assertEqual(cygrpc.CallError.ok, + client_complete_rpc_start_batch_result) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -72,7 +73,7 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_call_driver.add_due({ @@ -84,9 +85,8 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, b'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index d0166a2b29f..b429a20ed75 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) self.client_driver.add_due({ client_receive_initial_metadata_tag, @@ -67,7 +64,7 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) self.server_driver.add_due({ @@ -79,11 +76,10 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, - b'test details', _common.EMPTY_FLAGS), + 'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) self.server_driver.add_due({ server_complete_rpc_tag, diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 1deb15ba032..a6d92f22030 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -158,15 +158,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_due.add(client_receive_initial_metadata_tag) client_complete_rpc_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_complete_rpc_tag)) client_due.add(client_complete_rpc_tag) @@ -175,12 +175,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( server_send_initial_metadata_tag) @@ -189,12 +189,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_second_message_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( (), cygrpc.StatusCode.ok, b'test details', _EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -208,7 +208,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_receive_first_message_tag = 'client_receive_first_message_tag' client_receive_first_message_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], client_receive_first_message_tag)) client_due.add(client_receive_first_message_tag) client_receive_first_message_event = client_driver.event_with_tag( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py new file mode 100644 index 00000000000..12bf40be6b3 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test servers at the level of the Cython API.""" + +import threading +import time +import unittest + +from grpc._cython import cygrpc + + +class Test(unittest.TestCase): + + def test_lonely_server(self): + server_call_completion_queue = cygrpc.CompletionQueue() + server_shutdown_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server(cygrpc.ChannelArgs([])) + server.register_completion_queue(server_call_completion_queue) + server.register_completion_queue(server_shutdown_completion_queue) + port = server.add_http2_port(b'[::]:0') + server.start() + + server_request_call_tag = 'server_request_call_tag' + server_request_call_start_batch_result = server.request_call( + server_call_completion_queue, server_call_completion_queue, + server_request_call_tag) + + time.sleep(4) + + server_shutdown_tag = 'server_shutdown_tag' + server_shutdown_result = server.shutdown( + server_shutdown_completion_queue, server_shutdown_tag) + server_request_call_event = server_call_completion_queue.poll() + server_shutdown_event = server_shutdown_completion_queue.poll() + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 4eda6854865..002fb9b7f40 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -35,11 +35,6 @@ def _metadata_plugin(context, callback): class TypeSmokeTest(unittest.TestCase): - def testOperationFlags(self): - operation = cygrpc.operation_send_message(b'asdf', - cygrpc.WriteFlag.no_compress) - self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) - def testTimespec(self): now = time.time() now_timespec_a = cygrpc.Timespec(now) @@ -170,7 +165,7 @@ class ServerClientMixin(object): SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' SERVER_TRAILING_METADATA_VALUE = 'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' + SERVER_STATUS_DETAILS = 'our work is never over' REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' @@ -192,13 +187,13 @@ class ServerClientMixin(object): (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),) client_start_batch_result = client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendInitialMetadataOperation(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -227,12 +222,12 @@ class ServerClientMixin(object): server_trailing_metadata = ( (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),) server_start_batch_result = server_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( server_initial_metadata, - _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) @@ -245,25 +240,24 @@ class ServerClientMixin(object): found_client_op_types = set() for client_result in client_event.batch_operations: # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: + self.assertNotIn(client_result.type(), found_client_op_types) + found_client_op_types.add(client_result.type()) + if client_result.type( + ) == cygrpc.OperationType.receive_initial_metadata: self.assertTrue( test_common.metadata_transmitted( server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, - client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: + client_result.initial_metadata())) + elif client_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(RESPONSE, client_result.message()) + elif client_result.type( + ) == cygrpc.OperationType.receive_status_on_client: self.assertTrue( test_common.metadata_transmitted( server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, - client_result.received_status_code) + client_result.trailing_metadata())) + self.assertEqual(SERVER_STATUS_DETAILS, client_result.details()) + self.assertEqual(SERVER_STATUS_CODE, client_result.code()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -277,13 +271,13 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, - server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) + self.assertNotIn(client_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -319,9 +313,8 @@ class ServerClientMixin(object): cygrpc_deadline, description) client_event_future = perform_client_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) @@ -333,8 +326,7 @@ class ServerClientMixin(object): cygrpc_deadline, description) server_event_future = perform_server_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") client_event_future.result() # force completion @@ -343,12 +335,12 @@ class ServerClientMixin(object): # Messaging for _ in range(10): client_event_future = perform_client_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") server_event_future = perform_server_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") client_event_future.result() # force completion @@ -356,13 +348,13 @@ class ServerClientMixin(object): # Epilogue client_event_future = perform_client_operations([ - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") server_event_future = perform_server_operations([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue")