diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d5a6e869145..02a3747ecba 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -118,8 +118,6 @@ def _abort(state, code, details): def _handle_event(event, state, response_deserializer): callbacks = [] - # print("************* Handling event with operations: {}".format(event.batch_operations)) - # import traceback; traceback.print_stack() for batch_operation in event.batch_operations: operation_type = batch_operation.type() state.due.remove(operation_type) @@ -127,8 +125,6 @@ def _handle_event(event, state, response_deserializer): state.initial_metadata = batch_operation.initial_metadata() elif operation_type == cygrpc.OperationType.receive_message: serialized_response = batch_operation.message() - # print("Batch operation message: {}".format(batch_operation.message())) - # print("Serialized response is '{}'".format(serialized_response)) if serialized_response is not None: response = _common.deserialize(serialized_response, response_deserializer) @@ -253,7 +249,7 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer, # TODO: Docstrings. -class _SingleThreadedRendezvous(grpc.Call): # pylint: disable=too-many-ancestors +class _SingleThreadedRendezvous(grpc.RpcError, grpc.Call): # pylint: disable=too-many-ancestors def __init__(self, state, call, response_deserializer, deadline): super(_SingleThreadedRendezvous, self).__init__() # TODO: Is this still needed? Or is it just for inter-thread @@ -263,43 +259,87 @@ class _SingleThreadedRendezvous(grpc.Call): # pylint: disable=too-many-ancestor self._response_deserializer = response_deserializer self._deadline = deadline + # TODO: Dedupe between here and the default Rendezvous. def is_active(self): """See grpc.RpcContext.is_active""" - raise NotImplementedError() + with self._state.condition: + return self._state.code is None def time_remaining(self): """See grpc.RpcContext.time_remaining""" - raise NotImplementedError() + with self._state.condition: + if self._deadline is None: + return None + else: + return max(self._deadline - time.time(), 0) def cancel(self): """See grpc.RpcContext.cancel""" - raise NotImplementedError() + with self._state.condition: + if self._state.code is None: + code = grpc.StatusCode.CANCELLED + details = 'Locally cancelled by application!' + self._call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) + self._state.cancelled = True + _abort(self._state, code, details) + return True + else: + return False def add_callback(self, callback): """See grpc.RpcContext.add_callback""" - raise NotImplementedError() + with self._state.condition: + if self._state.callbacks is None: + return False + else: + self._state.callbacks.append(callback) + return True def initial_metadata(self): """See grpc.Call.initial_metadata""" - raise NotImplementedError() + with self._state.condition: + + def _done(): + return self._state.initial_metadata is not None + + _common.wait(self._state.condition.wait, _done) + return self._state.initial_metadata def trailing_metadata(self): """See grpc.Call.trailing_metadata""" - raise NotImplementedError() + with self._state.condition: + + def _done(): + return self._state.trailing_metadata is not None + + _common.wait(self._state.condition.wait, _done) + return self._state.trailing_metadata def code(self): """See grpc.Call.code""" - raise NotImplementedError() + with self._state.condition: + + def _done(): + return self._state.code is not None + + _common.wait(self._state.condition.wait, _done) + return self._state.code def details(self): """See grpc.Call.details""" - raise NotImplementedError() + with self._state.condition: + + def _done(): + return self._state.details is not None + + _common.wait(self._state.condition.wait, _done) + return _common.decode(self._state.details) # TODO: How does this work when the server sends back zero messages? def _next(self): - # Since no other thread has access to self._state, we can access it - # without taking the lock. If we ever add a Future interface, we'll - # have to add synchronization. + # TODO(rbellevi): Take lock. + # TODO(rbellevi): This conditional block is very similar to the one + # below. Dedupe. if self._state.code is None: operating = self._call.operate((cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None) if operating: @@ -308,9 +348,7 @@ class _SingleThreadedRendezvous(grpc.Call): # pylint: disable=too-many-ancestor elif self._state.code is grpc.StatusCode.OK: raise StopIteration() else: - # TODO: Figure out what to raise instead. - # Alternatively, become a grpc.RPCError - raise ValueError("I should be a rendezvous! Or something...") + raise self while True: # TODO: Consider how this interacts with fork support. event = self._call.next_event() @@ -328,14 +366,10 @@ class _SingleThreadedRendezvous(grpc.Call): # pylint: disable=too-many-ancestor self._state.response = None return response elif cygrpc.OperationType.receive_message not in self._state.due: - # TODO: Justify this. When can this even happen? if self._state.code is grpc.StatusCode.OK: raise StopIteration() - else: - pass - # print(self._state.__dict__) - # TODO: Figure out what to raise instead. - # raise ValueError() + elif self._state.code is not None: + raise self def __next__(self): return self._next() @@ -346,6 +380,45 @@ class _SingleThreadedRendezvous(grpc.Call): # pylint: disable=too-many-ancestor def __iter__(self): return self + def exception(self, timeout=None): + """Return the exception raised by the computation. + + See grpc.Future.exception for the full API contract. + """ + with self._state.condition: + timed_out = _common.wait( + self._state.condition.wait, self._is_complete, timeout=timeout) + if timed_out: + raise grpc.FutureTimeoutError() + else: + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + return self + + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. + + See grpc.future.traceback for the full API contract. + """ + with self._state.condition: + timed_out = _common.wait( + self._state.condition.wait, self._is_complete, timeout=timeout) + if timed_out: + raise grpc.FutureTimeoutError() + else: + if self._state.code is grpc.StatusCode.OK: + return None + elif self._state.cancelled: + raise grpc.FutureCancelledError() + else: + try: + raise self + except grpc.RpcError: + return sys.exc_info()[2] + class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): # pylint: disable=too-many-ancestors diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index ebb1f0415ab..16a5a01f6ff 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -92,7 +92,6 @@ cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) tag.prepare() cpython.Py_INCREF(tag) - # print("****************** Starting batch with operations {}".format(operations)) with nogil: c_call_error = grpc_call_start_batch( c_call, tag.c_ops, tag.c_nops, tag, NULL) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi index ea1ff32a61a..c8a390106a8 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -172,7 +172,6 @@ cdef class ReceiveMessageOperation(Operation): grpc_byte_buffer_reader_destroy(&message_reader) self._message = bytes(message) else: - message = bytearray() self._message = None grpc_byte_buffer_destroy(self._c_message_byte_buffer) else: