Merge pull request #13891 from nathanielmanistaatgoogle/12531

Reform cygrpc.OperationTag and cygrpc.Event.
pull/13895/head
Nathaniel Manista 7 years ago committed by GitHub
commit e1d592a999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  2. 6
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  3. 42
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  4. 45
      src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
  5. 55
      src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
  6. 37
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
  7. 44
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  8. 29
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  9. 58
      src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
  10. 87
      src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
  11. 2
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  12. 2
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  13. 65
      src/python/grpcio/grpc/_server.py
  14. 2
      src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
  15. 35
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
  16. 35
      src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
  17. 13
      src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
  18. 23
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py

@ -26,16 +26,13 @@ cdef class Call:
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
cdef OperationTag operation_tag = OperationTag(tag, operations)
if retain_self:
operation_tag.operation_call = self
else:
operation_tag.operation_call = None
operation_tag.store_ops()
cpython.Py_INCREF(operation_tag)
cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
tag, operations, self if retain_self else None)
batch_operation_tag.prepare()
cpython.Py_INCREF(batch_operation_tag)
return grpc_call_start_batch(
self.c_call, operation_tag.c_ops, operation_tag.c_nops,
<cpython.PyObject *>operation_tag, NULL)
self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
<cpython.PyObject *>batch_operation_tag, NULL)
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because

@ -76,12 +76,12 @@ cdef class Channel:
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag = OperationTag(tag, None)
cpython.Py_INCREF(operation_tag)
cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
cpython.Py_INCREF(connectivity_tag)
with nogil:
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
queue.c_completion_queue, <cpython.PyObject *>operation_tag)
queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
def target(self):
cdef char *target = NULL

@ -37,42 +37,20 @@ cdef class CompletionQueue:
self.is_shutdown = False
cdef _interpret_event(self, grpc_event event):
cdef OperationTag tag = None
cdef object user_tag = None
cdef Call operation_call = None
cdef CallDetails request_call_details = None
cdef object request_metadata = None
cdef object batch_operations = None
cdef _Tag tag = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
# NOTE(nathaniel): For now we coopt ConnectivityEvent here.
return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif event.type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
return Event(
event.type, True, None, None, None, None, False, None)
# NOTE(nathaniel): For now we coopt ConnectivityEvent here.
return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
else:
if event.tag != NULL:
tag = <OperationTag>event.tag
# We receive event tags only after they've been inc-ref'd elsewhere in
# the code.
cpython.Py_DECREF(tag)
if tag.shutting_down_server is not None:
tag.shutting_down_server.notify_shutdown_complete()
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
if tag.is_new_request:
request_metadata = _metadata(&tag._c_request_metadata)
grpc_metadata_array_destroy(&tag._c_request_metadata)
batch_operations = tag.release_ops()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
operation_call.references.extend(tag.references)
return Event(
event.type, event.success, user_tag, operation_call,
request_call_details, request_metadata, tag.is_new_request,
batch_operations)
tag = <_Tag>event.tag
# We receive event tags only after they've been inc-ref'd elsewhere in
# the code.
cpython.Py_DECREF(tag)
return tag.event(event)
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for

@ -0,0 +1,45 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class ConnectivityEvent:
cdef readonly grpc_completion_type completion_type
cdef readonly bint success
cdef readonly object tag
cdef class RequestCallEvent:
cdef readonly grpc_completion_type completion_type
cdef readonly bint success
cdef readonly object tag
cdef readonly Call call
cdef readonly CallDetails call_details
cdef readonly tuple invocation_metadata
cdef class BatchOperationEvent:
cdef readonly grpc_completion_type completion_type
cdef readonly bint success
cdef readonly object tag
cdef readonly object batch_operations
cdef class ServerShutdownEvent:
cdef readonly grpc_completion_type completion_type
cdef readonly bint success
cdef readonly object tag

@ -0,0 +1,55 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class ConnectivityEvent:
def __cinit__(
self, grpc_completion_type completion_type, bint success, object tag):
self.completion_type = completion_type
self.success = success
self.tag = tag
cdef class RequestCallEvent:
def __cinit__(
self, grpc_completion_type completion_type, bint success, object tag,
Call call, CallDetails call_details, tuple invocation_metadata):
self.completion_type = completion_type
self.success = success
self.tag = tag
self.call = call
self.call_details = call_details
self.invocation_metadata = invocation_metadata
cdef class BatchOperationEvent:
def __cinit__(
self, grpc_completion_type completion_type, bint success, object tag,
object batch_operations):
self.completion_type = completion_type
self.success = success
self.tag = tag
self.batch_operations = batch_operations
cdef class ServerShutdownEvent:
def __cinit__(
self, grpc_completion_type completion_type, bint success, object tag):
self.completion_type = completion_type
self.success = success
self.tag = tag

@ -28,43 +28,6 @@ cdef class CallDetails:
cdef grpc_call_details c_details
cdef class OperationTag:
cdef object user_tag
cdef list references
# This allows CompletionQueue to notify the Python Server object that the
# underlying GRPC core server has shutdown
cdef Server shutting_down_server
cdef Call operation_call
cdef CallDetails request_call_details
cdef grpc_metadata_array _c_request_metadata
cdef grpc_op *c_ops
cdef size_t c_nops
cdef readonly object _operations
cdef bint is_new_request
cdef void store_ops(self)
cdef object release_ops(self)
cdef class Event:
cdef readonly grpc_completion_type type
cdef readonly bint success
cdef readonly object tag
# For Server.request_call
cdef readonly bint is_new_request
cdef readonly CallDetails request_call_details
cdef readonly object request_metadata
# For server calls
cdef readonly Call operation_call
# For Call.start_batch
cdef readonly object batch_operations
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair

@ -218,50 +218,6 @@ cdef class CallDetails:
return timespec
cdef class OperationTag:
def __cinit__(self, user_tag, operations):
self.user_tag = user_tag
self.references = []
self._operations = operations
cdef void store_ops(self):
self.c_nops = 0 if self._operations is None else len(self._operations)
if 0 < self.c_nops:
self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
for index, operation in enumerate(self._operations):
(<Operation>operation).c()
self.c_ops[index] = (<Operation>operation).c_op
cdef object release_ops(self):
if 0 < self.c_nops:
for index, operation in enumerate(self._operations):
(<Operation>operation).c_op = self.c_ops[index]
(<Operation>operation).un_c()
gpr_free(self.c_ops)
return self._operations
else:
return ()
cdef class Event:
def __cinit__(self, grpc_completion_type type, bint success,
object tag, Call operation_call,
CallDetails request_call_details,
object request_metadata,
bint is_new_request,
object batch_operations):
self.type = type
self.success = success
self.tag = tag
self.operation_call = operation_call
self.request_call_details = request_call_details
self.request_metadata = request_metadata
self.batch_operations = batch_operations
self.is_new_request = is_new_request
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):

@ -78,19 +78,15 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
cdef OperationTag operation_tag = OperationTag(tag, None)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
grpc_metadata_array_init(&operation_tag._c_request_metadata)
operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True
cpython.Py_INCREF(operation_tag)
cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
request_call_tag.prepare()
cpython.Py_INCREF(request_call_tag)
return grpc_server_request_call(
self.c_server, &operation_tag.operation_call.c_call,
&operation_tag.request_call_details.c_details,
&operation_tag._c_request_metadata,
self.c_server, &request_call_tag.call.c_call,
&request_call_tag.call_details.c_details,
&request_call_tag.c_invocation_metadata,
call_queue.c_completion_queue, server_queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
<cpython.PyObject *>request_call_tag)
def register_completion_queue(
self, CompletionQueue queue not None):
@ -131,16 +127,14 @@ cdef class Server:
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
operation_tag = OperationTag(tag, None)
operation_tag.shutting_down_server = self
cpython.Py_INCREF(operation_tag)
cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
cpython.Py_INCREF(server_shutdown_tag)
with nogil:
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
<cpython.PyObject *>operation_tag)
<cpython.PyObject *>server_shutdown_tag)
def shutdown(self, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag
if queue.is_shutting_down:
raise ValueError("queue must be live")
elif not self.is_started:
@ -153,7 +147,8 @@ cdef class Server:
self._c_shutdown(queue, tag)
cdef notify_shutdown_complete(self):
# called only by a completion queue on receiving our shutdown operation tag
# called only after our server shutdown tag has emerged from a completion
# queue.
self.is_shutdown = True
def cancel_all_calls(self):

@ -0,0 +1,58 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _Tag:
cdef object event(self, grpc_event c_event)
cdef class _ConnectivityTag(_Tag):
cdef readonly object _user_tag
cdef ConnectivityEvent event(self, grpc_event c_event)
cdef class _RequestCallTag(_Tag):
cdef readonly object _user_tag
cdef Call call
cdef CallDetails call_details
cdef grpc_metadata_array c_invocation_metadata
cdef void prepare(self)
cdef RequestCallEvent event(self, grpc_event c_event)
cdef class _BatchOperationTag(_Tag):
cdef object _user_tag
cdef readonly object _operations
cdef readonly object _retained_call
cdef grpc_op *c_ops
cdef size_t c_nops
cdef void prepare(self)
cdef BatchOperationEvent event(self, grpc_event c_event)
cdef class _ServerShutdownTag(_Tag):
cdef readonly object _user_tag
# This allows CompletionQueue to notify the Python Server object that the
# underlying GRPC core server has shutdown
cdef readonly Server _shutting_down_server
cdef ServerShutdownEvent event(self, grpc_event c_event)

@ -0,0 +1,87 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _Tag:
cdef object event(self, grpc_event c_event):
raise NotImplementedError()
cdef class _ConnectivityTag(_Tag):
def __cinit__(self, user_tag):
self._user_tag = user_tag
cdef ConnectivityEvent event(self, grpc_event c_event):
return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
cdef class _RequestCallTag(_Tag):
def __cinit__(self, user_tag):
self._user_tag = user_tag
self.call = None
self.call_details = None
cdef void prepare(self):
self.call = Call()
self.call_details = CallDetails()
grpc_metadata_array_init(&self.c_invocation_metadata)
cdef RequestCallEvent event(self, grpc_event c_event):
cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
grpc_metadata_array_destroy(&self.c_invocation_metadata)
return RequestCallEvent(
c_event.type, c_event.success, self._user_tag, self.call,
self.call_details, invocation_metadata)
cdef class _BatchOperationTag:
def __cinit__(self, user_tag, operations, call):
self._user_tag = user_tag
self._operations = operations
self._retained_call = call
cdef void prepare(self):
self.c_nops = 0 if self._operations is None else len(self._operations)
if 0 < self.c_nops:
self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
for index, operation in enumerate(self._operations):
(<Operation>operation).c()
self.c_ops[index] = (<Operation>operation).c_op
cdef BatchOperationEvent event(self, grpc_event c_event):
if 0 < self.c_nops:
for index, operation in enumerate(self._operations):
(<Operation>operation).c_op = self.c_ops[index]
(<Operation>operation).un_c()
gpr_free(self.c_ops)
return BatchOperationEvent(
c_event.type, c_event.success, self._user_tag, self._operations)
else:
return BatchOperationEvent(
c_event.type, c_event.success, self._user_tag, ())
cdef class _ServerShutdownTag(_Tag):
def __cinit__(self, user_tag, shutting_down_server):
self._user_tag = user_tag
self._shutting_down_server = shutting_down_server
cdef ServerShutdownEvent event(self, grpc_event c_event):
self._shutting_down_server.notify_shutdown_complete()
return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag)

@ -18,8 +18,10 @@ include "_cygrpc/call.pxd.pxi"
include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
include "_cygrpc/event.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"

@ -25,11 +25,13 @@ include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
include "_cygrpc/event.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
include "_cygrpc/tag.pyx.pxi"
#
# initialize gRPC

@ -217,11 +217,10 @@ class _Context(grpc.ServicerContext):
def time_remaining(self):
return max(
float(self._rpc_event.request_call_details.deadline) - time.time(),
0)
float(self._rpc_event.call_details.deadline) - time.time(), 0)
def cancel(self):
self._rpc_event.operation_call.cancel()
self._rpc_event.call.cancel()
def add_callback(self, callback):
with self._state.condition:
@ -236,23 +235,23 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True
def invocation_metadata(self):
return self._rpc_event.request_metadata
return self._rpc_event.invocation_metadata
def peer(self):
return _common.decode(self._rpc_event.operation_call.peer())
return _common.decode(self._rpc_event.call.peer())
def peer_identities(self):
return cygrpc.peer_identities(self._rpc_event.operation_call)
return cygrpc.peer_identities(self._rpc_event.call)
def peer_identity_key(self):
id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call)
id_key = cygrpc.peer_identity_key(self._rpc_event.call)
return id_key if id_key is None else _common.decode(id_key)
def auth_context(self):
return {
_common.decode(key): value
for key, value in six.iteritems(
cygrpc.auth_context(self._rpc_event.operation_call))
cygrpc.auth_context(self._rpc_event.call))
}
def send_initial_metadata(self, initial_metadata):
@ -263,7 +262,7 @@ class _Context(grpc.ServicerContext):
if self._state.initial_metadata_allowed:
operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
self._rpc_event.operation_call.start_server_batch(
self._rpc_event.call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
self._state.initial_metadata_allowed = False
self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
@ -346,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.client is _CANCELLED or state.statused:
return None
else:
rpc_event.operation_call.start_server_batch(
rpc_event.call.start_server_batch(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(state, rpc_event.operation_call,
_receive_message(state, rpc_event.call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
while True:
@ -356,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.request is None:
if state.client is _CLOSED:
details = '"{}" requires exactly one request message.'.format(
rpc_event.request_call_details.method)
_abort(state, rpc_event.operation_call,
rpc_event.call_details.method)
_abort(state, rpc_event.call,
cygrpc.StatusCode.unimplemented,
_common.encode(details))
return None
@ -378,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, b'RPC Aborted')
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
_common.encode(details))
return None, False
@ -396,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, b'RPC Aborted')
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
_common.encode(details))
return None, False
@ -410,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer):
serialized_response = _common.serialize(response, response_serializer)
if serialized_response is None:
with state.condition:
_abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
_abort(state, rpc_event.call, cygrpc.StatusCode.internal,
b'Failed to serialize response!')
return None
else:
@ -433,8 +432,8 @@ def _send_response(rpc_event, state, serialized_response):
operations = (cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
rpc_event.operation_call.start_server_batch(
operations, _send_message(state, token))
rpc_event.call.start_server_batch(operations,
_send_message(state, token))
state.due.add(token)
while True:
state.condition.wait()
@ -458,7 +457,7 @@ def _status(rpc_event, state, serialized_response):
operations.append(
cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS))
rpc_event.operation_call.start_server_batch(
rpc_event.call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
state.statused = True
@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool):
def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
request_iterator = _RequestIterator(state, rpc_event.operation_call,
request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_unary_response_in_pool, rpc_event, state, method_handler.stream_unary,
@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
request_iterator = _RequestIterator(state, rpc_event.operation_call,
request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_stream_response_in_pool, rpc_event, state,
@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
return None
handler_call_details = _HandlerCallDetails(
_common.decode(rpc_event.request_call_details.method),
rpc_event.request_metadata)
_common.decode(rpc_event.call_details.method),
rpc_event.invocation_metadata)
if interceptor_pipeline is not None:
return interceptor_pipeline.execute(query_handlers,
@ -568,15 +567,15 @@ def _reject_rpc(rpc_event, status, details):
cygrpc.SendStatusFromServerOperation(None, status, details,
_EMPTY_FLAGS),)
rpc_state = _RPCState()
rpc_event.operation_call.start_server_batch(
operations, lambda ignored_event: (rpc_state, (),))
rpc_event.call.start_server_batch(operations,
lambda ignored_event: (rpc_state, (),))
return rpc_state
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
rpc_event.operation_call.start_server_batch(
rpc_event.call.start_server_batch(
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
if rpc_event.request_call_details.method is not None:
if rpc_event.call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)

@ -53,7 +53,7 @@ class _Handler(object):
self._state = state
self._lock = threading.Lock()
self._completion_queue = completion_queue
self._call = rpc_event.operation_call
self._call = rpc_event.call
def __call__(self):
with self._state.condition:

@ -72,7 +72,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@ -84,7 +84,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
return (_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.type,
server_request_call_event.success),
return (_common.OperationResult(
server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success), _common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
_common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.type,
client_receive_initial_metadata_event.success),
_common.OperationResult(client_complete_rpc_start_batch_result,
client_complete_rpc_event.type,
client_complete_rpc_event.success),
client_complete_rpc_start_batch_result,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success), _common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.completion_type,
server_send_initial_metadata_event.success),
_common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.type,
server_send_initial_metadata_event.success),
_common.OperationResult(server_complete_rpc_start_batch_result,
server_complete_rpc_event.type,
server_complete_rpc_event.success),)
server_complete_rpc_start_batch_result,
server_complete_rpc_event.completion_type,
server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *

@ -63,7 +63,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@ -75,7 +75,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@ -92,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
return (_common.OperationResult(server_request_call_start_batch_result,
server_request_call_event.type,
server_request_call_event.success),
return (_common.OperationResult(
server_request_call_start_batch_result,
server_request_call_event.completion_type,
server_request_call_event.success), _common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.completion_type,
client_receive_initial_metadata_event.success),
_common.OperationResult(
client_receive_initial_metadata_start_batch_result,
client_receive_initial_metadata_event.type,
client_receive_initial_metadata_event.success),
_common.OperationResult(client_complete_rpc_start_batch_result,
client_complete_rpc_event.type,
client_complete_rpc_event.success),
client_complete_rpc_start_batch_result,
client_complete_rpc_event.completion_type,
client_complete_rpc_event.success), _common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.completion_type,
server_send_initial_metadata_event.success),
_common.OperationResult(
server_send_initial_metadata_start_batch_result,
server_send_initial_metadata_event.type,
server_send_initial_metadata_event.success),
_common.OperationResult(server_complete_rpc_start_batch_result,
server_complete_rpc_event.type,
server_complete_rpc_event.success),)
server_complete_rpc_start_batch_result,
server_complete_rpc_event.completion_type,
server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *

@ -175,12 +175,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
server_rpc_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
@ -189,11 +189,11 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_send_first_message_tag)
with server_call_condition:
server_send_second_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
server_rpc_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
@ -232,9 +232,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
server_rpc_event.type)
self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
self.assertEqual(0, len(server_rpc_event.batch_operations))
server_rpc_event.completion_type)
self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
if __name__ == '__main__':

@ -87,7 +87,8 @@ class TypeSmokeTest(unittest.TestCase):
shutdown_tag = object()
server.shutdown(completion_queue, shutdown_tag)
event = completion_queue.poll()
self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
self.assertEqual(cygrpc.CompletionType.operation_complete,
event.completion_type)
self.assertIs(shutdown_tag, event.tag)
del server
del completion_queue
@ -147,7 +148,7 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, call_result)
event = queue.poll(deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
event.type)
event.completion_type)
self.assertTrue(event.success)
self.assertIs(tag, event.tag)
except Exception as error:
@ -205,22 +206,20 @@ class ServerClientMixin(object):
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
request_event.type)
self.assertIsInstance(request_event.operation_call, cygrpc.Call)
request_event.completion_type)
self.assertIsInstance(request_event.call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEqual(0, len(request_event.batch_operations))
self.assertTrue(
test_common.metadata_transmitted(client_initial_metadata,
request_event.request_metadata))
self.assertEqual(METHOD, request_event.request_call_details.method)
self.assertEqual(self.expected_host,
request_event.request_call_details.host)
request_event.invocation_metadata))
self.assertEqual(METHOD, request_event.call_details.method)
self.assertEqual(self.expected_host, request_event.call_details.host)
self.assertLess(
abs(DEADLINE - float(request_event.request_call_details.deadline)),
abs(DEADLINE - float(request_event.call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
server_call = request_event.operation_call
server_call = request_event.call
server_initial_metadata = (
(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
server_trailing_metadata = (
@ -322,7 +321,7 @@ class ServerClientMixin(object):
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
server_call = request_event.operation_call
server_call = request_event.call
def perform_server_operations(operations, description):
return self._perform_operations(operations, server_call,

Loading…
Cancel
Save