Merge pull request #3150 from nathanielmanistaatgoogle/metadata-and-serialization

Metadata and serialization for Python
pull/3171/head
Masood Malekghassemi 9 years ago
commit 504e379afc
  1. 43
      src/python/grpcio/grpc/_links/invocation.py
  2. 16
      src/python/grpcio/grpc/_links/service.py
  3. 24
      src/python/grpcio/grpc/beta/_stub.py
  4. 4
      src/python/grpcio/grpc/beta/beta.py
  5. 2
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  6. 2
      src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
  7. 6
      src/python/grpcio_test/grpc_test/_links/_lonely_invocation_link_test.py
  8. 6
      src/python/grpcio_test/grpc_test/_links/_transmission_test.py

@ -41,6 +41,8 @@ from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links from grpc.framework.interfaces.links import links
_IDENTITY = lambda x: x
_STOP = _intermediary_low.Event.Kind.STOP _STOP = _intermediary_low.Event.Kind.STOP
_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED _WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED _COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
@ -95,11 +97,12 @@ def _no_longer_due(kind, rpc_state, key, rpc_states):
class _Kernel(object): class _Kernel(object):
def __init__( def __init__(
self, channel, host, request_serializers, response_deserializers, self, channel, host, metadata_transformer, request_serializers,
ticket_relay): response_deserializers, ticket_relay):
self._lock = threading.Lock() self._lock = threading.Lock()
self._channel = channel self._channel = channel
self._host = host self._host = host
self._metadata_transformer = metadata_transformer
self._request_serializers = request_serializers self._request_serializers = request_serializers
self._response_deserializers = response_deserializers self._response_deserializers = response_deserializers
self._relay = ticket_relay self._relay = ticket_relay
@ -225,20 +228,17 @@ class _Kernel(object):
else: else:
return return
request_serializer = self._request_serializers.get((group, method)) transformed_initial_metadata = self._metadata_transformer(initial_metadata)
response_deserializer = self._response_deserializers.get((group, method)) request_serializer = self._request_serializers.get(
if request_serializer is None or response_deserializer is None: (group, method), _IDENTITY)
cancellation_ticket = links.Ticket( response_deserializer = self._response_deserializers.get(
operation_id, 0, None, None, None, None, None, None, None, None, None, (group, method), _IDENTITY)
None, links.Ticket.Termination.CANCELLATION)
self._relay.add_value(cancellation_ticket)
return
call = _intermediary_low.Call( call = _intermediary_low.Call(
self._channel, self._completion_queue, '/%s/%s' % (group, method), self._channel, self._completion_queue, '/%s/%s' % (group, method),
self._host, time.time() + timeout) self._host, time.time() + timeout)
if initial_metadata is not None: if transformed_initial_metadata is not None:
for metadata_key, metadata_value in initial_metadata: for metadata_key, metadata_value in transformed_initial_metadata:
call.add_metadata(metadata_key, metadata_value) call.add_metadata(metadata_key, metadata_value)
call.invoke(self._completion_queue, operation_id, operation_id) call.invoke(self._completion_queue, operation_id, operation_id)
if payload is None: if payload is None:
@ -336,10 +336,15 @@ class InvocationLink(links.Link, activated.Activated):
class _InvocationLink(InvocationLink): class _InvocationLink(InvocationLink):
def __init__( def __init__(
self, channel, host, request_serializers, response_deserializers): self, channel, host, metadata_transformer, request_serializers,
response_deserializers):
self._relay = relay.relay(None) self._relay = relay.relay(None)
self._kernel = _Kernel( self._kernel = _Kernel(
channel, host, request_serializers, response_deserializers, self._relay) channel, host,
_IDENTITY if metadata_transformer is None else metadata_transformer,
{} if request_serializers is None else request_serializers,
{} if response_deserializers is None else response_deserializers,
self._relay)
def _start(self): def _start(self):
self._relay.start() self._relay.start()
@ -376,12 +381,17 @@ class _InvocationLink(InvocationLink):
self._stop() self._stop()
def invocation_link(channel, host, request_serializers, response_deserializers): def invocation_link(
channel, host, metadata_transformer, request_serializers,
response_deserializers):
"""Creates an InvocationLink. """Creates an InvocationLink.
Args: Args:
channel: An _intermediary_low.Channel for use by the link. channel: An _intermediary_low.Channel for use by the link.
host: The host to specify when invoking RPCs. host: The host to specify when invoking RPCs.
metadata_transformer: A callable that takes an invocation-side initial
metadata value and returns another metadata value to send in its place.
May be None.
request_serializers: A dict from group-method pair to request object request_serializers: A dict from group-method pair to request object
serialization behavior. serialization behavior.
response_deserializers: A dict from group-method pair to response object response_deserializers: A dict from group-method pair to response object
@ -391,4 +401,5 @@ def invocation_link(channel, host, request_serializers, response_deserializers):
An InvocationLink. An InvocationLink.
""" """
return _InvocationLink( return _InvocationLink(
channel, host, request_serializers, response_deserializers) channel, host, metadata_transformer, request_serializers,
response_deserializers)

@ -40,6 +40,8 @@ from grpc.framework.foundation import logging_pool
from grpc.framework.foundation import relay from grpc.framework.foundation import relay
from grpc.framework.interfaces.links import links from grpc.framework.interfaces.links import links
_IDENTITY = lambda x: x
_TERMINATION_KIND_TO_CODE = { _TERMINATION_KIND_TO_CODE = {
links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK, links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK,
links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED, links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED,
@ -154,12 +156,10 @@ class _Kernel(object):
except ValueError: except ValueError:
logging.info('Illegal path "%s"!', service_acceptance.method) logging.info('Illegal path "%s"!', service_acceptance.method)
return return
request_deserializer = self._request_deserializers.get((group, method)) request_deserializer = self._request_deserializers.get(
response_serializer = self._response_serializers.get((group, method)) (group, method), _IDENTITY)
if request_deserializer is None or response_serializer is None: response_serializer = self._response_serializers.get(
# TODO(nathaniel): Terminate the RPC with code NOT_FOUND. (group, method), _IDENTITY)
call.cancel()
return
call.read(call) call.read(call)
self._rpc_states[call] = _RPCState( self._rpc_states[call] = _RPCState(
@ -433,7 +433,9 @@ class _ServiceLink(ServiceLink):
def __init__(self, request_deserializers, response_serializers): def __init__(self, request_deserializers, response_serializers):
self._relay = relay.relay(None) self._relay = relay.relay(None)
self._kernel = _Kernel( self._kernel = _Kernel(
request_deserializers, response_serializers, self._relay) {} if request_deserializers is None else request_deserializers,
{} if response_serializers is None else response_serializers,
self._relay)
def accept_ticket(self, ticket): def accept_ticket(self, ticket):
self._kernel.add_ticket(ticket) self._kernel.add_ticket(ticket)

@ -54,11 +54,12 @@ class _AutoIntermediary(object):
def _assemble( def _assemble(
channel, host, request_serializers, response_deserializers, thread_pool, channel, host, metadata_transformer, request_serializers,
thread_pool_size): response_deserializers, thread_pool, thread_pool_size):
end_link = _core_implementations.invocation_end_link() end_link = _core_implementations.invocation_end_link()
grpc_link = invocation.invocation_link( grpc_link = invocation.invocation_link(
channel, host, request_serializers, response_deserializers) channel, host, metadata_transformer, request_serializers,
response_deserializers)
if thread_pool is None: if thread_pool is None:
invocation_pool = logging_pool.pool( invocation_pool = logging_pool.pool(
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
@ -89,21 +90,22 @@ def _wrap_assembly(stub, end_link, grpc_link, assembly_pool):
def generic_stub( def generic_stub(
channel, host, request_serializers, response_deserializers, thread_pool, channel, host, metadata_transformer, request_serializers,
thread_pool_size): response_deserializers, thread_pool, thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble( end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
channel, host, request_serializers, response_deserializers, thread_pool, channel, host, metadata_transformer, request_serializers,
thread_pool_size) response_deserializers, thread_pool, thread_pool_size)
stub = _crust_implementations.generic_stub(end_link, invocation_pool) stub = _crust_implementations.generic_stub(end_link, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)
def dynamic_stub( def dynamic_stub(
channel, host, service, cardinalities, request_serializers, channel, host, service, cardinalities, metadata_transformer,
response_deserializers, thread_pool, thread_pool_size): request_serializers, response_deserializers, thread_pool,
thread_pool_size):
end_link, grpc_link, invocation_pool, assembly_pool = _assemble( end_link, grpc_link, invocation_pool, assembly_pool = _assemble(
channel, host, request_serializers, response_deserializers, thread_pool, channel, host, metadata_transformer, request_serializers,
thread_pool_size) response_deserializers, thread_pool, thread_pool_size)
stub = _crust_implementations.dynamic_stub( stub = _crust_implementations.dynamic_stub(
end_link, service, cardinalities, invocation_pool) end_link, service, cardinalities, invocation_pool)
return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) return _wrap_assembly(stub, end_link, grpc_link, assembly_pool)

@ -238,6 +238,7 @@ def generic_stub(channel, options=None):
effective_options = _EMPTY_STUB_OPTIONS if options is None else options effective_options = _EMPTY_STUB_OPTIONS if options is None else options
return _stub.generic_stub( return _stub.generic_stub(
channel._intermediary_low_channel, effective_options.host, # pylint: disable=protected-access channel._intermediary_low_channel, effective_options.host, # pylint: disable=protected-access
effective_options.metadata_transformer,
effective_options.request_serializers, effective_options.request_serializers,
effective_options.response_deserializers, effective_options.thread_pool, effective_options.response_deserializers, effective_options.thread_pool,
effective_options.thread_pool_size) effective_options.thread_pool_size)
@ -260,7 +261,8 @@ def dynamic_stub(channel, service, cardinalities, options=None):
effective_options = StubOptions() if options is None else options effective_options = StubOptions() if options is None else options
return _stub.dynamic_stub( return _stub.dynamic_stub(
channel._intermediary_low_channel, effective_options.host, service, # pylint: disable=protected-access channel._intermediary_low_channel, effective_options.host, service, # pylint: disable=protected-access
cardinalities, effective_options.request_serializers, cardinalities, effective_options.metadata_transformer,
effective_options.request_serializers,
effective_options.response_deserializers, effective_options.thread_pool, effective_options.response_deserializers, effective_options.thread_pool,
effective_options.thread_pool_size) effective_options.thread_pool_size)

@ -94,7 +94,7 @@ class _Implementation(test_interfaces.Implementation):
port = service_grpc_link.add_port('[::]:0', None) port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None) channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link( invocation_grpc_link = invocation.invocation_link(
channel, b'localhost', channel, b'localhost', None,
serialization_behaviors.request_serializers, serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers) serialization_behaviors.response_deserializers)

@ -87,7 +87,7 @@ class _Implementation(test_interfaces.Implementation):
port = service_grpc_link.add_port('[::]:0', None) port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None) channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link( invocation_grpc_link = invocation.invocation_link(
channel, b'localhost', channel, b'localhost', None,
serialization_behaviors.request_serializers, serialization_behaviors.request_serializers,
serialization_behaviors.response_deserializers) serialization_behaviors.response_deserializers)

@ -45,7 +45,8 @@ class LonelyInvocationLinkTest(unittest.TestCase):
def testUpAndDown(self): def testUpAndDown(self):
channel = _intermediary_low.Channel('nonexistent:54321', None) channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link(channel, 'nonexistent', {}, {}) invocation_link = invocation.invocation_link(
channel, 'nonexistent', None, {}, {})
invocation_link.start() invocation_link.start()
invocation_link.stop() invocation_link.stop()
@ -58,8 +59,7 @@ class LonelyInvocationLinkTest(unittest.TestCase):
channel = _intermediary_low.Channel('nonexistent:54321', None) channel = _intermediary_low.Channel('nonexistent:54321', None)
invocation_link = invocation.invocation_link( invocation_link = invocation.invocation_link(
channel, 'nonexistent', {(test_group, test_method): _NULL_BEHAVIOR}, channel, 'nonexistent', None, {}, {})
{(test_group, test_method): _NULL_BEHAVIOR})
invocation_link.join_link(invocation_link_mate) invocation_link.join_link(invocation_link_mate)
invocation_link.start() invocation_link.start()

@ -54,7 +54,7 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
service_link.start() service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None) channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link( invocation_link = invocation.invocation_link(
channel, 'localhost', channel, 'localhost', None,
{self.group_and_method(): self.serialize_request}, {self.group_and_method(): self.serialize_request},
{self.group_and_method(): self.deserialize_response}) {self.group_and_method(): self.deserialize_response})
invocation_link.start() invocation_link.start()
@ -121,7 +121,7 @@ class RoundTripTest(unittest.TestCase):
service_link.start() service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None) channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link( invocation_link = invocation.invocation_link(
channel, 'localhost', identity_transformation, identity_transformation) channel, None, None, identity_transformation, identity_transformation)
invocation_mate = test_utilities.RecordingLink() invocation_mate = test_utilities.RecordingLink()
invocation_link.join_link(invocation_mate) invocation_link.join_link(invocation_mate)
invocation_link.start() invocation_link.start()
@ -166,7 +166,7 @@ class RoundTripTest(unittest.TestCase):
service_link.start() service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None) channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link( invocation_link = invocation.invocation_link(
channel, 'localhost', channel, 'localhost', None,
{(test_group, test_method): scenario.serialize_request}, {(test_group, test_method): scenario.serialize_request},
{(test_group, test_method): scenario.deserialize_response}) {(test_group, test_method): scenario.deserialize_response})
invocation_mate = test_utilities.RecordingLink() invocation_mate = test_utilities.RecordingLink()

Loading…
Cancel
Save