Merge pull request #17460 from grpc/enable-census

Add hooks for census context propagation
pull/17475/head
Richard Belleville 6 years ago committed by GitHub
commit e829a81118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      src/python/grpcio/grpc/_cython/BUILD.bazel
  2. 15
      src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
  3. 27
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  4. 20
      src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pxd.pxi
  5. 20
      src/python/grpcio/grpc/_cython/_cygrpc/propagation_bits.pyx.pxi
  6. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  7. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  8. 72
      src/python/grpcio/grpc/_server.py

@ -6,46 +6,47 @@ pyx_library(
name = "cygrpc",
srcs = [
"__init__.py",
"cygrpc.pxd",
"cygrpc.pyx",
"_cygrpc/_hooks.pxd.pxi",
"_cygrpc/_hooks.pyx.pxi",
"_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pxd.pxi",
"_cygrpc/call.pyx.pxi",
"_cygrpc/channelz.pyx.pxi",
"_cygrpc/channel.pxd.pxi",
"_cygrpc/channel.pyx.pxi",
"_cygrpc/credentials.pyx.pxi",
"_cygrpc/channelz.pyx.pxi",
"_cygrpc/completion_queue.pxd.pxi",
"_cygrpc/completion_queue.pyx.pxi",
"_cygrpc/event.pyx.pxi",
"_cygrpc/fork_posix.pyx.pxi",
"_cygrpc/metadata.pyx.pxi",
"_cygrpc/operation.pyx.pxi",
"_cygrpc/records.pyx.pxi",
"_cygrpc/security.pyx.pxi",
"_cygrpc/server.pyx.pxi",
"_cygrpc/tag.pyx.pxi",
"_cygrpc/time.pyx.pxi",
"_cygrpc/grpc_gevent.pyx.pxi",
"_cygrpc/grpc.pxi",
"_cygrpc/_hooks.pxd.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/call.pxd.pxi",
"_cygrpc/channel.pxd.pxi",
"_cygrpc/credentials.pxd.pxi",
"_cygrpc/completion_queue.pxd.pxi",
"_cygrpc/credentials.pyx.pxi",
"_cygrpc/event.pxd.pxi",
"_cygrpc/event.pyx.pxi",
"_cygrpc/fork_posix.pxd.pxi",
"_cygrpc/fork_posix.pyx.pxi",
"_cygrpc/grpc.pxi",
"_cygrpc/grpc_gevent.pxd.pxi",
"_cygrpc/grpc_gevent.pyx.pxi",
"_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/metadata.pxd.pxi",
"_cygrpc/metadata.pyx.pxi",
"_cygrpc/operation.pxd.pxi",
"_cygrpc/operation.pyx.pxi",
"_cygrpc/propagation_bits.pxd.pxi",
"_cygrpc/propagation_bits.pyx.pxi",
"_cygrpc/records.pxd.pxi",
"_cygrpc/records.pyx.pxi",
"_cygrpc/security.pxd.pxi",
"_cygrpc/security.pyx.pxi",
"_cygrpc/server.pxd.pxi",
"_cygrpc/server.pyx.pxi",
"_cygrpc/tag.pxd.pxi",
"_cygrpc/tag.pyx.pxi",
"_cygrpc/time.pxd.pxi",
"_cygrpc/grpc_gevent.pxd.pxi",
"_cygrpc/time.pyx.pxi",
"cygrpc.pxd",
"cygrpc.pyx",
],
deps = [
"//:grpc",
],
)

@ -15,3 +15,18 @@
cdef object _custom_op_on_c_call(int op, grpc_call *call):
raise NotImplementedError("No custom hooks are implemented")
def install_census_context_from_call(Call call):
pass
def uninstall_context():
pass
def build_context():
pass
cdef class CensusContext:
pass
def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
pass

@ -159,7 +159,8 @@ cdef void _call(
_ChannelState channel_state, _CallState call_state,
grpc_completion_queue *c_completion_queue, on_success, int flags, method,
host, object deadline, CallCredentials credentials,
object operationses_and_user_tags, object metadata) except *:
object operationses_and_user_tags, object metadata,
object context) except *:
"""Invokes an RPC.
Args:
@ -185,6 +186,7 @@ cdef void _call(
which is an object to be used as a tag. A SendInitialMetadataOperation
must be present in the first element of this value.
metadata: The metadata for this call.
context: Context object for distributed tracing.
"""
cdef grpc_slice method_slice
cdef grpc_slice host_slice
@ -208,6 +210,8 @@ cdef void _call(
grpc_slice_unref(method_slice)
if host_slice_ptr:
grpc_slice_unref(host_slice)
if context is not None:
set_census_context_on_call(call_state, context)
if credentials is not None:
c_call_credentials = credentials.c()
c_call_error = grpc_call_set_credentials(
@ -257,7 +261,8 @@ cdef class IntegratedCall:
cdef IntegratedCall _integrated_call(
_ChannelState state, int flags, method, host, object deadline,
object metadata, CallCredentials credentials, operationses_and_user_tags):
object metadata, CallCredentials credentials, operationses_and_user_tags,
object context):
call_state = _CallState()
def on_success(started_tags):
@ -266,7 +271,7 @@ cdef IntegratedCall _integrated_call(
_call(
state, call_state, state.c_call_completion_queue, on_success, flags,
method, host, deadline, credentials, operationses_and_user_tags, metadata)
method, host, deadline, credentials, operationses_and_user_tags, metadata, context)
return IntegratedCall(state, call_state)
@ -308,7 +313,8 @@ cdef class SegregatedCall:
cdef SegregatedCall _segregated_call(
_ChannelState state, int flags, method, host, object deadline,
object metadata, CallCredentials credentials, operationses_and_user_tags):
object metadata, CallCredentials credentials, operationses_and_user_tags,
object context):
cdef _CallState call_state = _CallState()
cdef SegregatedCall segregated_call
cdef grpc_completion_queue *c_completion_queue
@ -325,7 +331,8 @@ cdef SegregatedCall _segregated_call(
try:
_call(
state, call_state, c_completion_queue, on_success, flags, method, host,
deadline, credentials, operationses_and_user_tags, metadata)
deadline, credentials, operationses_and_user_tags, metadata,
context)
except:
_destroy_c_completion_queue(c_completion_queue)
raise
@ -443,10 +450,11 @@ cdef class Channel:
def integrated_call(
self, int flags, method, host, object deadline, object metadata,
CallCredentials credentials, operationses_and_tags):
CallCredentials credentials, operationses_and_tags,
object context = None):
return _integrated_call(
self._state, flags, method, host, deadline, metadata, credentials,
operationses_and_tags)
operationses_and_tags, context)
def next_call_event(self):
def on_success(tag):
@ -461,10 +469,11 @@ cdef class Channel:
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
CallCredentials credentials, operationses_and_tags):
CallCredentials credentials, operationses_and_tags,
object context = None):
return _segregated_call(
self._state, flags, method, host, deadline, metadata, credentials,
operationses_and_tags)
operationses_and_tags, context)
def check_connectivity_state(self, bint try_to_connect):
with self._state.condition:

@ -0,0 +1,20 @@
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef extern from "grpc/impl/codegen/propagation_bits.h":
cdef int _GRPC_PROPAGATE_DEADLINE "GRPC_PROPAGATE_DEADLINE"
cdef int _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT "GRPC_PROPAGATE_CENSUS_STATS_CONTEXT"
cdef int _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT "GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT"
cdef int _GRPC_PROPAGATE_CANCELLATION "GRPC_PROPAGATE_CANCELLATION"
cdef int _GRPC_PROPAGATE_DEFAULTS "GRPC_PROPAGATE_DEFAULTS"

@ -0,0 +1,20 @@
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class PropagationConstants:
GRPC_PROPAGATE_DEADLINE = _GRPC_PROPAGATE_DEADLINE
GRPC_PROPAGATE_CENSUS_STATS_CONTEXT = _GRPC_PROPAGATE_CENSUS_STATS_CONTEXT
GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT = _GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT
GRPC_PROPAGATE_CANCELLATION = _GRPC_PROPAGATE_CANCELLATION
GRPC_PROPAGATE_DEFAULTS = _GRPC_PROPAGATE_DEFAULTS

@ -29,6 +29,7 @@ include "_cygrpc/server.pxd.pxi"
include "_cygrpc/tag.pxd.pxi"
include "_cygrpc/time.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
include "_cygrpc/propagation_bits.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"

@ -36,6 +36,7 @@ include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/channelz.pyx.pxi"
include "_cygrpc/propagation_bits.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"

@ -480,43 +480,51 @@ def _status(rpc_event, state, serialized_response):
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response, proceed = _call_behavior(rpc_event, state, behavior, argument,
request_deserializer)
if proceed:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_status(rpc_event, state, serialized_response)
cygrpc.install_census_context_from_call(rpc_event.call)
try:
argument = argument_thunk()
if argument is not None:
response, proceed = _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_status(rpc_event, state, serialized_response)
finally:
cygrpc.uninstall_context()
def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response_iterator, proceed = _call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_status(rpc_event, state, None)
break
else:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
proceed = _send_response(rpc_event, state,
serialized_response)
if not proceed:
break
else:
cygrpc.install_census_context_from_call(rpc_event.call)
try:
argument = argument_thunk()
if argument is not None:
response_iterator, proceed = _call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_status(rpc_event, state, None)
break
else:
break
else:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
proceed = _send_response(
rpc_event, state, serialized_response)
if not proceed:
break
else:
break
else:
break
finally:
cygrpc.uninstall_context()
def _handle_unary_unary(rpc_event, state, method_handler, thread_pool):

Loading…
Cancel
Save