Actually build CensusContext

pull/17484/head
Richard Belleville 6 years ago
parent 551a037073
commit bd142d6c46
  1. 40
      src/python/grpcio/grpc/_channel.py

@ -499,6 +499,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
def _prepare(self, request, timeout, metadata, wait_for_ready): def _prepare(self, request, timeout, metadata, wait_for_ready):
deadline, serialized_request, rendezvous = _start_unary_request( deadline, serialized_request, rendezvous = _start_unary_request(
@ -528,11 +529,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
raise rendezvous raise rendezvous
else: else:
call = self._channel.segregated_call( call = self._channel.segregated_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials, (( if credentials is None else credentials._credentials, ((
operations, operations,
None, None,
),)) ),), self._context)
event = call.next_event() event = call.next_event()
_handle_event(event, state, self._response_deserializer) _handle_event(event, state, self._response_deserializer)
return state, call, return state, call,
@ -570,9 +572,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else: else:
event_handler = _event_handler(state, self._response_deserializer) event_handler = _event_handler(state, self._response_deserializer)
call = self._managed_call( call = self._managed_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
(operations,), event_handler) (operations,), event_handler, self._context)
return _Rendezvous(state, call, self._response_deserializer, return _Rendezvous(state, call, self._response_deserializer,
deadline) deadline)
@ -587,6 +590,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
def __call__(self, def __call__(self,
request, request,
@ -615,9 +619,10 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
) )
event_handler = _event_handler(state, self._response_deserializer) event_handler = _event_handler(state, self._response_deserializer)
call = self._managed_call( call = self._managed_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
operationses, event_handler) operationses, event_handler, self._context)
return _Rendezvous(state, call, self._response_deserializer, return _Rendezvous(state, call, self._response_deserializer,
deadline) deadline)
@ -632,6 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
def _blocking(self, request_iterator, timeout, metadata, credentials, def _blocking(self, request_iterator, timeout, metadata, credentials,
wait_for_ready): wait_for_ready):
@ -640,10 +646,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready) wait_for_ready)
call = self._channel.segregated_call( call = self._channel.segregated_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
_stream_unary_invocation_operationses_and_tags( _stream_unary_invocation_operationses_and_tags(
metadata, initial_metadata_flags)) metadata, initial_metadata_flags), self._context)
_consume_request_iterator(request_iterator, state, call, _consume_request_iterator(request_iterator, state, call,
self._request_serializer, None) self._request_serializer, None)
while True: while True:
@ -687,10 +694,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
wait_for_ready) wait_for_ready)
call = self._managed_call( call = self._managed_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
None, deadline, metadata, None
if credentials is None else credentials._credentials, if credentials is None else credentials._credentials,
_stream_unary_invocation_operationses( _stream_unary_invocation_operationses(
metadata, initial_metadata_flags), event_handler) metadata, initial_metadata_flags), event_handler, self._context)
_consume_request_iterator(request_iterator, state, call, _consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler) self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -706,6 +714,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
self._method = method self._method = method
self._request_serializer = request_serializer self._request_serializer = request_serializer
self._response_deserializer = response_deserializer self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
def __call__(self, def __call__(self,
request_iterator, request_iterator,
@ -727,9 +736,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
) )
event_handler = _event_handler(state, self._response_deserializer) event_handler = _event_handler(state, self._response_deserializer)
call = self._managed_call( call = self._managed_call(
0, self._method, None, deadline, metadata, None cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
None, deadline, metadata, None
if credentials is None else credentials._credentials, operationses, if credentials is None else credentials._credentials, operationses,
event_handler) event_handler, self._context)
_consume_request_iterator(request_iterator, state, call, _consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler) self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline) return _Rendezvous(state, call, self._response_deserializer, deadline)
@ -789,7 +799,7 @@ def _channel_managed_call_management(state):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def create(flags, method, host, deadline, metadata, credentials, def create(flags, method, host, deadline, metadata, credentials,
operationses, event_handler): operationses, event_handler, context):
"""Creates a cygrpc.IntegratedCall. """Creates a cygrpc.IntegratedCall.
Args: Args:
@ -804,7 +814,7 @@ def _channel_managed_call_management(state):
started on the call. started on the call.
event_handler: A behavior to call to handle the events resultant from event_handler: A behavior to call to handle the events resultant from
the operations on the call. the operations on the call.
context: Context object for distributed tracing.
Returns: Returns:
A cygrpc.IntegratedCall with which to conduct an RPC. A cygrpc.IntegratedCall with which to conduct an RPC.
""" """
@ -815,7 +825,7 @@ def _channel_managed_call_management(state):
with state.lock: with state.lock:
call = state.channel.integrated_call(flags, method, host, deadline, call = state.channel.integrated_call(flags, method, host, deadline,
metadata, credentials, metadata, credentials,
operationses_and_tags) operationses_and_tags, context)
if state.managed_calls == 0: if state.managed_calls == 0:
state.managed_calls = 1 state.managed_calls = 1
_run_channel_spin_thread(state) _run_channel_spin_thread(state)

Loading…
Cancel
Save