diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 6361669757e..0892215b6d4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -26,16 +26,13 @@ cdef class Call: def _start_batch(self, operations, tag, retain_self): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") - cdef OperationTag operation_tag = OperationTag(tag, operations) - if retain_self: - operation_tag.operation_call = self - else: - operation_tag.operation_call = None - operation_tag.store_ops() - cpython.Py_INCREF(operation_tag) + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag( + tag, operations, self if retain_self else None) + batch_operation_tag.prepare() + cpython.Py_INCREF(batch_operation_tag) return grpc_call_start_batch( - self.c_call, operation_tag.c_ops, operation_tag.c_nops, - operation_tag, NULL) + self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, + batch_operation_tag, NULL) def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 644df674cc7..443d534d7eb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -76,12 +76,12 @@ cdef class Channel: def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag = OperationTag(tag, None) - cpython.Py_INCREF(operation_tag) + cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) + cpython.Py_INCREF(connectivity_tag) with nogil: grpc_channel_watch_connectivity_state( self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, operation_tag) + queue.c_completion_queue, connectivity_tag) def target(self): cdef char *target = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 140fc357b9f..e259789b35f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -37,42 +37,20 @@ cdef class CompletionQueue: self.is_shutdown = False cdef _interpret_event(self, grpc_event event): - cdef OperationTag tag = None - cdef object user_tag = None - cdef Call operation_call = None - cdef CallDetails request_call_details = None - cdef object request_metadata = None - cdef object batch_operations = None + cdef _Tag tag = None if event.type == GRPC_QUEUE_TIMEOUT: - return Event( - event.type, False, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif event.type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - return Event( - event.type, True, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) else: - if event.tag != NULL: - tag = event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - if tag.shutting_down_server is not None: - tag.shutting_down_server.notify_shutdown_complete() - user_tag = tag.user_tag - operation_call = tag.operation_call - request_call_details = tag.request_call_details - if tag.is_new_request: - request_metadata = _metadata(&tag._c_request_metadata) - grpc_metadata_array_destroy(&tag._c_request_metadata) - batch_operations = tag.release_ops() - if tag.is_new_request: - # Stuff in the tag not explicitly handled by us needs to live through - # the life of the call - operation_call.references.extend(tag.references) - return Event( - event.type, event.success, user_tag, operation_call, - request_call_details, request_metadata, tag.is_new_request, - batch_operations) + tag = <_Tag>event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag.event(event) def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi new file mode 100644 index 00000000000..686199ecf4c --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi @@ -0,0 +1,45 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + + +cdef class RequestCallEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly Call call + cdef readonly CallDetails call_details + cdef readonly tuple invocation_metadata + + +cdef class BatchOperationEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly object batch_operations + + +cdef class ServerShutdownEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi new file mode 100644 index 00000000000..af26d273186 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi @@ -0,0 +1,55 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag + + +cdef class RequestCallEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + Call call, CallDetails call_details, tuple invocation_metadata): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.call = call + self.call_details = call_details + self.invocation_metadata = invocation_metadata + + +cdef class BatchOperationEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + object batch_operations): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.batch_operations = batch_operations + + +cdef class ServerShutdownEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 537cf2b537f..7b2482d9479 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -28,43 +28,6 @@ cdef class CallDetails: cdef grpc_call_details c_details -cdef class OperationTag: - - cdef object user_tag - cdef list references - # This allows CompletionQueue to notify the Python Server object that the - # underlying GRPC core server has shutdown - cdef Server shutting_down_server - cdef Call operation_call - cdef CallDetails request_call_details - cdef grpc_metadata_array _c_request_metadata - cdef grpc_op *c_ops - cdef size_t c_nops - cdef readonly object _operations - cdef bint is_new_request - - cdef void store_ops(self) - cdef object release_ops(self) - - -cdef class Event: - - cdef readonly grpc_completion_type type - cdef readonly bint success - cdef readonly object tag - - # For Server.request_call - cdef readonly bint is_new_request - cdef readonly CallDetails request_call_details - cdef readonly object request_metadata - - # For server calls - cdef readonly Call operation_call - - # For Call.start_batch - cdef readonly object batch_operations - - cdef class SslPemKeyCertPair: cdef grpc_ssl_pem_key_cert_pair c_pair diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 7e6272fe2a6..bc2cd0338e0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -218,50 +218,6 @@ cdef class CallDetails: return timespec -cdef class OperationTag: - - def __cinit__(self, user_tag, operations): - self.user_tag = user_tag - self.references = [] - self._operations = operations - - cdef void store_ops(self): - self.c_nops = 0 if self._operations is None else len(self._operations) - if 0 < self.c_nops: - self.c_ops = gpr_malloc(sizeof(grpc_op) * self.c_nops) - for index, operation in enumerate(self._operations): - (operation).c() - self.c_ops[index] = (operation).c_op - - cdef object release_ops(self): - if 0 < self.c_nops: - for index, operation in enumerate(self._operations): - (operation).c_op = self.c_ops[index] - (operation).un_c() - gpr_free(self.c_ops) - return self._operations - else: - return () - - -cdef class Event: - - def __cinit__(self, grpc_completion_type type, bint success, - object tag, Call operation_call, - CallDetails request_call_details, - object request_metadata, - bint is_new_request, - object batch_operations): - self.type = type - self.success = success - self.tag = tag - self.operation_call = operation_call - self.request_call_details = request_call_details - self.request_metadata = request_metadata - self.batch_operations = batch_operations - self.is_new_request = is_new_request - - cdef class SslPemKeyCertPair: def __cinit__(self, bytes private_key, bytes certificate_chain): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index f8d78928580..c19beccde63 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -78,19 +78,15 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") - cdef OperationTag operation_tag = OperationTag(tag, None) - operation_tag.operation_call = Call() - operation_tag.request_call_details = CallDetails() - grpc_metadata_array_init(&operation_tag._c_request_metadata) - operation_tag.references.extend([self, call_queue, server_queue]) - operation_tag.is_new_request = True - cpython.Py_INCREF(operation_tag) + cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) + request_call_tag.prepare() + cpython.Py_INCREF(request_call_tag) return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag._c_request_metadata, + self.c_server, &request_call_tag.call.c_call, + &request_call_tag.call_details.c_details, + &request_call_tag.c_invocation_metadata, call_queue.c_completion_queue, server_queue.c_completion_queue, - operation_tag) + request_call_tag) def register_completion_queue( self, CompletionQueue queue not None): @@ -131,16 +127,14 @@ cdef class Server: cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True - operation_tag = OperationTag(tag, None) - operation_tag.shutting_down_server = self - cpython.Py_INCREF(operation_tag) + cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) + cpython.Py_INCREF(server_shutdown_tag) with nogil: grpc_server_shutdown_and_notify( self.c_server, queue.c_completion_queue, - operation_tag) + server_shutdown_tag) def shutdown(self, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag if queue.is_shutting_down: raise ValueError("queue must be live") elif not self.is_started: @@ -153,7 +147,8 @@ cdef class Server: self._c_shutdown(queue, tag) cdef notify_shutdown_complete(self): - # called only by a completion queue on receiving our shutdown operation tag + # called only after our server shutdown tag has emerged from a completion + # queue. self.is_shutdown = True def cancel_all_calls(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi new file mode 100644 index 00000000000..f9a3b5e8f40 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi @@ -0,0 +1,58 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event) + + +cdef class _ConnectivityTag(_Tag): + + cdef readonly object _user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event) + + +cdef class _RequestCallTag(_Tag): + + cdef readonly object _user_tag + cdef Call call + cdef CallDetails call_details + cdef grpc_metadata_array c_invocation_metadata + + cdef void prepare(self) + cdef RequestCallEvent event(self, grpc_event c_event) + + +cdef class _BatchOperationTag(_Tag): + + cdef object _user_tag + cdef readonly object _operations + cdef readonly object _retained_call + cdef grpc_op *c_ops + cdef size_t c_nops + + cdef void prepare(self) + cdef BatchOperationEvent event(self, grpc_event c_event) + + +cdef class _ServerShutdownTag(_Tag): + + cdef readonly object _user_tag + # This allows CompletionQueue to notify the Python Server object that the + # underlying GRPC core server has shutdown + cdef readonly Server _shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi new file mode 100644 index 00000000000..aaca458442f --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi @@ -0,0 +1,87 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event): + raise NotImplementedError() + + +cdef class _ConnectivityTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event): + return ConnectivityEvent(c_event.type, c_event.success, self._user_tag) + + +cdef class _RequestCallTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + self.call = None + self.call_details = None + + cdef void prepare(self): + self.call = Call() + self.call_details = CallDetails() + grpc_metadata_array_init(&self.c_invocation_metadata) + + cdef RequestCallEvent event(self, grpc_event c_event): + cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata) + grpc_metadata_array_destroy(&self.c_invocation_metadata) + return RequestCallEvent( + c_event.type, c_event.success, self._user_tag, self.call, + self.call_details, invocation_metadata) + + +cdef class _BatchOperationTag: + + def __cinit__(self, user_tag, operations, call): + self._user_tag = user_tag + self._operations = operations + self._retained_call = call + + cdef void prepare(self): + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index, operation in enumerate(self._operations): + (operation).c() + self.c_ops[index] = (operation).c_op + + cdef BatchOperationEvent event(self, grpc_event c_event): + if 0 < self.c_nops: + for index, operation in enumerate(self._operations): + (operation).c_op = self.c_ops[index] + (operation).un_c() + gpr_free(self.c_ops) + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, self._operations) + else: + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, ()) + + +cdef class _ServerShutdownTag(_Tag): + + def __cinit__(self, user_tag, shutting_down_server): + self._user_tag = user_tag + self._shutting_down_server = shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event): + self._shutting_down_server.notify_shutdown_complete() + return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag) \ No newline at end of file diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index ad229de0ae6..b32fa518fc2 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -18,8 +18,10 @@ include "_cygrpc/call.pxd.pxi" include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/event.pxd.pxi" include "_cygrpc/metadata.pxd.pxi" include "_cygrpc/operation.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" +include "_cygrpc/tag.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 0964fb66ab7..5106394708b 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -25,11 +25,13 @@ include "_cygrpc/call.pyx.pxi" include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/event.pyx.pxi" include "_cygrpc/metadata.pyx.pxi" include "_cygrpc/operation.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" +include "_cygrpc/tag.pyx.pxi" # # initialize gRPC diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index eec31bdcf6b..22244b9cecf 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -217,11 +217,10 @@ class _Context(grpc.ServicerContext): def time_remaining(self): return max( - float(self._rpc_event.request_call_details.deadline) - time.time(), - 0) + float(self._rpc_event.call_details.deadline) - time.time(), 0) def cancel(self): - self._rpc_event.operation_call.cancel() + self._rpc_event.call.cancel() def add_callback(self, callback): with self._state.condition: @@ -236,23 +235,23 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return self._rpc_event.request_metadata + return self._rpc_event.invocation_metadata def peer(self): - return _common.decode(self._rpc_event.operation_call.peer()) + return _common.decode(self._rpc_event.call.peer()) def peer_identities(self): - return cygrpc.peer_identities(self._rpc_event.operation_call) + return cygrpc.peer_identities(self._rpc_event.call) def peer_identity_key(self): - id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call) + id_key = cygrpc.peer_identity_key(self._rpc_event.call) return id_key if id_key is None else _common.decode(id_key) def auth_context(self): return { _common.decode(key): value for key, value in six.iteritems( - cygrpc.auth_context(self._rpc_event.operation_call)) + cygrpc.auth_context(self._rpc_event.call)) } def send_initial_metadata(self, initial_metadata): @@ -263,7 +262,7 @@ class _Context(grpc.ServicerContext): if self._state.initial_metadata_allowed: operation = cygrpc.SendInitialMetadataOperation( initial_metadata, _EMPTY_FLAGS) - self._rpc_event.operation_call.start_server_batch( + self._rpc_event.call.start_server_batch( (operation,), _send_initial_metadata(self._state)) self._state.initial_metadata_allowed = False self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) @@ -346,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CANCELLED or state.statused: return None else: - rpc_event.operation_call.start_server_batch( + rpc_event.call.start_server_batch( (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), - _receive_message(state, rpc_event.operation_call, + _receive_message(state, rpc_event.call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) while True: @@ -356,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer): if state.request is None: if state.client is _CLOSED: details = '"{}" requires exactly one request message.'.format( - rpc_event.request_call_details.method) - _abort(state, rpc_event.operation_call, + rpc_event.call_details.method) + _abort(state, rpc_event.call, cygrpc.StatusCode.unimplemented, _common.encode(details)) return None @@ -378,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception calling application: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -396,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -410,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer): serialized_response = _common.serialize(response, response_serializer) if serialized_response is None: with state.condition: - _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal, + _abort(state, rpc_event.call, cygrpc.StatusCode.internal, b'Failed to serialize response!') return None else: @@ -433,8 +432,8 @@ def _send_response(rpc_event, state, serialized_response): operations = (cygrpc.SendMessageOperation(serialized_response, _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN - rpc_event.operation_call.start_server_batch( - operations, _send_message(state, token)) + rpc_event.call.start_server_batch(operations, + _send_message(state, token)) state.due.add(token) while True: state.condition.wait() @@ -458,7 +457,7 @@ def _status(rpc_event, state, serialized_response): operations.append( cygrpc.SendMessageOperation(serialized_response, _EMPTY_FLAGS)) - rpc_event.operation_call.start_server_batch( + rpc_event.call.start_server_batch( operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) state.statused = True @@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool): def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _unary_response_in_pool, rpc_event, state, method_handler.stream_unary, @@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _stream_response_in_pool, rpc_event, state, @@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): return None handler_call_details = _HandlerCallDetails( - _common.decode(rpc_event.request_call_details.method), - rpc_event.request_metadata) + _common.decode(rpc_event.call_details.method), + rpc_event.invocation_metadata) if interceptor_pipeline is not None: return interceptor_pipeline.execute(query_handlers, @@ -568,15 +567,15 @@ def _reject_rpc(rpc_event, status, details): cygrpc.SendStatusFromServerOperation(None, status, details, _EMPTY_FLAGS),) rpc_state = _RPCState() - rpc_event.operation_call.start_server_batch( - operations, lambda ignored_event: (rpc_state, (),)) + rpc_event.call.start_server_batch(operations, + lambda ignored_event: (rpc_state, (),)) return rpc_state def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: - rpc_event.operation_call.start_server_batch( + rpc_event.call.start_server_batch( (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) @@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded): if not rpc_event.success: return None, None - if rpc_event.request_call_details.method is not None: + if rpc_event.call_details.method is not None: try: method_handler = _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 75b6b9e928a..cdb35724535 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -53,7 +53,7 @@ class _Handler(object): self._state = state self._lock = threading.Lock() self._completion_queue = completion_queue - self._call = rpc_event.operation_call + self._call = rpc_event.call def __call__(self): with self._state.condition: diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 41291cc88f4..583136cf236 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -72,7 +72,7 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ + server_request_call_event.call.start_server_batch([ cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) @@ -84,7 +84,7 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ + server_request_call_event.call.start_server_batch([ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, @@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), + return (_common.OperationResult( + server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), + client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success),) def test_rpcs(self): expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index b429a20ed75..c5cf606c904 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -63,7 +63,7 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ + server_request_call_event.call.start_server_batch([ cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) @@ -75,7 +75,7 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ + server_request_call_event.call.start_server_batch([ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, @@ -92,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), + return (_common.OperationResult( + server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), + client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success),) def test_rpcs(self): expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 87d0dd7a853..a5ec54ee594 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -175,12 +175,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ + server_rpc_event.call.start_server_batch([ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ + server_rpc_event.call.start_server_batch([ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( @@ -189,11 +189,11 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag) with server_call_condition: server_send_second_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ + server_rpc_event.call.start_server_batch([ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ + server_rpc_event.call.start_server_batch([ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( (), cygrpc.StatusCode.ok, b'test details', @@ -232,9 +232,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, - server_rpc_event.type) - self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call) - self.assertEqual(0, len(server_rpc_event.batch_operations)) + server_rpc_event.completion_type) + self.assertIsInstance(server_rpc_event.call, cygrpc.Call) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index e34892c7792..5453735f11d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -87,7 +87,8 @@ class TypeSmokeTest(unittest.TestCase): shutdown_tag = object() server.shutdown(completion_queue, shutdown_tag) event = completion_queue.poll() - self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) + self.assertEqual(cygrpc.CompletionType.operation_complete, + event.completion_type) self.assertIs(shutdown_tag, event.tag) del server del completion_queue @@ -147,7 +148,7 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, call_result) event = queue.poll(deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - event.type) + event.completion_type) self.assertTrue(event.success) self.assertIs(tag, event.tag) except Exception as error: @@ -205,22 +206,20 @@ class ServerClientMixin(object): request_event = self.server_completion_queue.poll(cygrpc_deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - request_event.type) - self.assertIsInstance(request_event.operation_call, cygrpc.Call) + request_event.completion_type) + self.assertIsInstance(request_event.call, cygrpc.Call) self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(0, len(request_event.batch_operations)) self.assertTrue( test_common.metadata_transmitted(client_initial_metadata, - request_event.request_metadata)) - self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(self.expected_host, - request_event.request_call_details.host) + request_event.invocation_metadata)) + self.assertEqual(METHOD, request_event.call_details.method) + self.assertEqual(self.expected_host, request_event.call_details.host) self.assertLess( - abs(DEADLINE - float(request_event.request_call_details.deadline)), + abs(DEADLINE - float(request_event.call_details.deadline)), DEADLINE_TOLERANCE) server_call_tag = object() - server_call = request_event.operation_call + server_call = request_event.call server_initial_metadata = ( (SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),) server_trailing_metadata = ( @@ -322,7 +321,7 @@ class ServerClientMixin(object): ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) - server_call = request_event.operation_call + server_call = request_event.call def perform_server_operations(operations, description): return self._perform_operations(operations, server_call,