Merge pull request #8917 from nathanielmanistaatgoogle/call-management-v1.0.x

Refactor channel call management.
pull/9041/head
Nathaniel Manista 8 years ago committed by GitHub
commit a433fed989
  1. 12
      include/grpc/grpc.h
  2. 68
      src/python/grpcio/grpc/_channel.py

@ -197,9 +197,15 @@ GRPCAPI grpc_call *grpc_channel_create_registered_call(
completion of type 'tag' to the completion queue bound to the call. completion of type 'tag' to the completion queue bound to the call.
The order of ops specified in the batch has no significance. The order of ops specified in the batch has no significance.
Only one operation of each type can be active at once in any given Only one operation of each type can be active at once in any given
batch. You must call grpc_completion_queue_next or batch.
grpc_completion_queue_pluck on the completion queue associated with 'call' If a call to grpc_call_start_batch returns GRPC_CALL_OK you must call
for work to be performed. grpc_completion_queue_next or grpc_completion_queue_pluck on the completion
queue associated with 'call' for work to be performed. If a call to
grpc_call_start_batch returns any value other than GRPC_CALL_OK it is
guaranteed that no state associated with 'call' is changed and it is not
appropriate to call grpc_completion_queue_next or
grpc_completion_queue_pluck consequent to the failed grpc_call_start_batch
call.
THREAD SAFETY: access to grpc_call_start_batch in multi-threaded environment THREAD SAFETY: access to grpc_call_start_batch in multi-threaded environment
needs to be synchronized. As an optimization, you may synchronize batches needs to be synchronized. As an optimization, you may synchronize batches
containing just send operations independently from batches containing just containing just send operations independently from batches containing just

@ -36,8 +36,8 @@ import time
import grpc import grpc
from grpc import _common from grpc import _common
from grpc import _grpcio_metadata from grpc import _grpcio_metadata
from grpc.framework.foundation import callable_util
from grpc._cython import cygrpc from grpc._cython import cygrpc
from grpc.framework.foundation import callable_util
_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) _USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
@ -358,7 +358,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
if self._state.callbacks is None: if self._state.callbacks is None:
return False return False
else: else:
self._state.callbacks.append(lambda: callback()) self._state.callbacks.append(callback)
return True return True
def initial_metadata(self): def initial_metadata(self):
@ -435,10 +435,10 @@ def _end_unary_response_blocking(state, with_call, deadline):
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
def __init__( def __init__(
self, channel, create_managed_call, method, request_serializer, self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
self._create_managed_call = create_managed_call self._managed_call = managed_call
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
@ -490,23 +490,24 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
if rendezvous: if rendezvous:
return rendezvous return rendezvous
else: else:
call = self._create_managed_call( call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec) None, 0, self._method, None, deadline_timespec)
if credentials is not None: if credentials is not None:
call.set_credentials(credentials._credentials) call.set_credentials(credentials._credentials)
event_handler = _event_handler(state, call, self._response_deserializer) event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition: with state.condition:
call.start_client_batch(cygrpc.Operations(operations), event_handler) call.start_client_batch(cygrpc.Operations(operations), event_handler)
drive_call()
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
def __init__( def __init__(
self, channel, create_managed_call, method, request_serializer, self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
self._create_managed_call = create_managed_call self._managed_call = managed_call
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
@ -518,7 +519,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
raise rendezvous raise rendezvous
else: else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call( call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec) None, 0, self._method, None, deadline_timespec)
if credentials is not None: if credentials is not None:
call.set_credentials(credentials._credentials) call.set_credentials(credentials._credentials)
@ -536,16 +537,17 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
) )
call.start_client_batch(cygrpc.Operations(operations), event_handler) call.start_client_batch(cygrpc.Operations(operations), event_handler)
drive_call()
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
def __init__( def __init__(
self, channel, create_managed_call, method, request_serializer, self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
self._create_managed_call = create_managed_call self._managed_call = managed_call
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
@ -597,7 +599,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self, request_iterator, timeout=None, metadata=None, credentials=None): self, request_iterator, timeout=None, metadata=None, credentials=None):
deadline, deadline_timespec = _deadline(timeout) deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call( call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec) None, 0, self._method, None, deadline_timespec)
if credentials is not None: if credentials is not None:
call.set_credentials(credentials._credentials) call.set_credentials(credentials._credentials)
@ -614,6 +616,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
) )
call.start_client_batch(cygrpc.Operations(operations), event_handler) call.start_client_batch(cygrpc.Operations(operations), event_handler)
drive_call()
_consume_request_iterator( _consume_request_iterator(
request_iterator, state, call, self._request_serializer) request_iterator, state, call, self._request_serializer)
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -622,10 +625,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
def __init__( def __init__(
self, channel, create_managed_call, method, request_serializer, self, channel, managed_call, method, request_serializer,
response_deserializer): response_deserializer):
self._channel = channel self._channel = channel
self._create_managed_call = create_managed_call self._managed_call = managed_call
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
@ -634,7 +637,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
self, request_iterator, timeout=None, metadata=None, credentials=None): self, request_iterator, timeout=None, metadata=None, credentials=None):
deadline, deadline_timespec = _deadline(timeout) deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
call = self._create_managed_call( call, drive_call = self._managed_call(
None, 0, self._method, None, deadline_timespec) None, 0, self._method, None, deadline_timespec)
if credentials is not None: if credentials is not None:
call.set_credentials(credentials._credentials) call.set_credentials(credentials._credentials)
@ -650,6 +653,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
) )
call.start_client_batch(cygrpc.Operations(operations), event_handler) call.start_client_batch(cygrpc.Operations(operations), event_handler)
drive_call()
_consume_request_iterator( _consume_request_iterator(
request_iterator, state, call, self._request_serializer) request_iterator, state, call, self._request_serializer)
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -687,16 +691,13 @@ def _run_channel_spin_thread(state):
channel_spin_thread.start() channel_spin_thread.start()
def _create_channel_managed_call(state): def _channel_managed_call_management(state):
def create_channel_managed_call(parent, flags, method, host, deadline): def create(parent, flags, method, host, deadline):
"""Creates a managed cygrpc.Call. """Creates a managed cygrpc.Call and a function to call to drive it.
Callers of this function must conduct at least one operation on the returned If operations are successfully added to the returned cygrpc.Call, the
call. The tags associated with operations conducted on the returned call returned function must be called. If operations are not successfully added
must be no-argument callables that return None to indicate that this channel to the returned cygrpc.Call, the returned function must not be called.
should continue polling for events associated with the call and return the
call itself to indicate that no more events associated with the call will be
generated.
Args: Args:
parent: A cygrpc.Call to be used as the parent of the created call. parent: A cygrpc.Call to be used as the parent of the created call.
@ -706,18 +707,22 @@ def _create_channel_managed_call(state):
deadline: A cygrpc.Timespec to be the deadline of the created call. deadline: A cygrpc.Timespec to be the deadline of the created call.
Returns: Returns:
A cygrpc.Call with which to conduct an RPC. A cygrpc.Call with which to conduct an RPC and a function to call if
operations are successfully started on the call.
""" """
with state.lock:
call = state.channel.create_call( call = state.channel.create_call(
parent, flags, state.completion_queue, method, host, deadline) parent, flags, state.completion_queue, method, host, deadline)
def drive():
with state.lock:
if state.managed_calls is None: if state.managed_calls is None:
state.managed_calls = set((call,)) state.managed_calls = set((call,))
_run_channel_spin_thread(state) _run_channel_spin_thread(state)
else: else:
state.managed_calls.add(call) state.managed_calls.add(call)
return call
return create_channel_managed_call return call, drive
return create
class _ChannelConnectivityState(object): class _ChannelConnectivityState(object):
@ -857,6 +862,7 @@ def _options(options):
class Channel(grpc.Channel): class Channel(grpc.Channel):
"""A cygrpc.Channel-backed implementation of grpc.Channel."""
def __init__(self, target, options, credentials): def __init__(self, target, options, credentials):
"""Constructor. """Constructor.
@ -880,25 +886,25 @@ class Channel(grpc.Channel):
def unary_unary( def unary_unary(
self, method, request_serializer=None, response_deserializer=None): self, method, request_serializer=None, response_deserializer=None):
return _UnaryUnaryMultiCallable( return _UnaryUnaryMultiCallable(
self._channel, _create_channel_managed_call(self._call_state), self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer) _common.encode(method), request_serializer, response_deserializer)
def unary_stream( def unary_stream(
self, method, request_serializer=None, response_deserializer=None): self, method, request_serializer=None, response_deserializer=None):
return _UnaryStreamMultiCallable( return _UnaryStreamMultiCallable(
self._channel, _create_channel_managed_call(self._call_state), self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer) _common.encode(method), request_serializer, response_deserializer)
def stream_unary( def stream_unary(
self, method, request_serializer=None, response_deserializer=None): self, method, request_serializer=None, response_deserializer=None):
return _StreamUnaryMultiCallable( return _StreamUnaryMultiCallable(
self._channel, _create_channel_managed_call(self._call_state), self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer) _common.encode(method), request_serializer, response_deserializer)
def stream_stream( def stream_stream(
self, method, request_serializer=None, response_deserializer=None): self, method, request_serializer=None, response_deserializer=None):
return _StreamStreamMultiCallable( return _StreamStreamMultiCallable(
self._channel, _create_channel_managed_call(self._call_state), self._channel, _channel_managed_call_management(self._call_state),
_common.encode(method), request_serializer, response_deserializer) _common.encode(method), request_serializer, response_deserializer)
def __del__(self): def __del__(self):

Loading…
Cancel
Save