Python style: reformat code using updated yapf 0.30.0

pull/25071/head
Sergii Tkachenko 4 years ago
parent 18af5b31a8
commit 8db79e2e71
  1. 12
      examples/python/async_streaming/server.py
  2. 4
      examples/python/auth/async_customized_auth_server.py
  3. 6
      examples/python/debug/asyncio_debug_server.py
  4. 6
      examples/python/helloworld/async_greeter_server.py
  5. 6
      examples/python/helloworld/async_greeter_server_with_reflection.py
  6. 9
      examples/python/route_guide/asyncio_route_guide_client.py
  7. 11
      examples/python/route_guide/asyncio_route_guide_server.py
  8. 9
      src/proto/gen_build_yaml.py
  9. 12
      src/python/grpcio/grpc/_channel.py
  10. 108
      src/python/grpcio/grpc/_simple_stubs.py
  11. 108
      src/python/grpcio/grpc/aio/_base_channel.py
  12. 9
      src/python/grpcio/grpc/aio/_base_server.py
  13. 5
      src/python/grpcio/grpc/aio/_call.py
  14. 128
      src/python/grpcio/grpc/aio/_channel.py
  15. 87
      src/python/grpcio/grpc/aio/_interceptor.py
  16. 45
      src/python/grpcio_channelz/grpc_channelz/v1/_async.py
  17. 12
      src/python/grpcio_health_checking/grpc_health/v1/_async.py
  18. 4
      src/python/grpcio_reflection/grpc_reflection/v1alpha/_async.py
  19. 20
      src/python/grpcio_tests/tests_aio/benchmark/worker_servicer.py
  20. 20
      src/python/grpcio_tests/tests_aio/interop/methods.py
  21. 46
      src/python/grpcio_tests/tests_aio/unit/server_interceptor_test.py
  22. 15
      src/python/grpcio_tests/tests_py3_only/interop/xds_interop_client.py
  23. 3
      test/cpp/qps/gen_build_yaml.py
  24. 12
      tools/buildgen/build_cleaner.py
  25. 3
      tools/buildgen/bunch.py
  26. 3
      tools/buildgen/plugins/expand_filegroups.py
  27. 6
      tools/codegen/core/gen_header_frame.py
  28. 12
      tools/codegen/core/gen_settings_ids.py
  29. 21
      tools/codegen/core/gen_stats_data.py
  30. 3
      tools/distrib/c-ish/check_documentation.py
  31. 3
      tools/distrib/check_copyright.py
  32. 12
      tools/distrib/check_include_guards.py
  33. 24
      tools/github/pr_latency.py
  34. 9
      tools/line_count/yaml2csv.py
  35. 18
      tools/mkowners/mkowners.py
  36. 3
      tools/profiling/microbenchmarks/bm2bq.py
  37. 12
      tools/profiling/microbenchmarks/bm_diff/bm_diff.py
  38. 24
      tools/profiling/microbenchmarks/bm_diff/bm_speedup.py
  39. 9
      tools/profiling/microbenchmarks/bm_json.py
  40. 5
      tools/run_tests/artifacts/distribtest_targets.py
  41. 3
      tools/run_tests/performance/scenario_config.py
  42. 24
      tools/run_tests/python_utils/jobset.py
  43. 6
      tools/run_tests/python_utils/port_server.py
  44. 3
      tools/run_tests/python_utils/watch_dirs.py
  45. 3
      tools/run_tests/run_microbenchmark.py
  46. 12
      tools/run_tests/run_tests.py
  47. 3
      tools/run_tests/run_tests_matrix.py
  48. 3
      tools/run_tests/sanity/check_port_platform.py
  49. 3
      tools/run_tests/sanity/check_tracer_sanity.py
  50. 9
      tools/run_tests/sanity/core_banned_functions.py
  51. 47
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/gcp/compute.py
  52. 13
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/k8s.py
  53. 42
      tools/run_tests/xds_k8s_test_driver/framework/infrastructure/traffic_director.py
  54. 8
      tools/run_tests/xds_k8s_test_driver/framework/rpc/grpc_testing.py
  55. 34
      tools/run_tests/xds_k8s_test_driver/framework/test_app/client_app.py
  56. 2
      tools/run_tests/xds_k8s_test_driver/framework/xds_k8s_testcase.py

@ -25,8 +25,8 @@ import phone_pb2
import phone_pb2_grpc
def create_state_response(call_state: phone_pb2.CallState.State
) -> phone_pb2.StreamCallResponse:
def create_state_response(
call_state: phone_pb2.CallState.State) -> phone_pb2.StreamCallResponse:
response = phone_pb2.StreamCallResponse()
response.call_state.state = call_state
return response
@ -50,10 +50,10 @@ class Phone(phone_pb2_grpc.PhoneServicer):
def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None:
logging.info("Call session cleaned [%s]", MessageToJson(call_info))
def StreamCall(self,
request_iterator: Iterable[phone_pb2.StreamCallRequest],
context: grpc.ServicerContext
) -> Iterable[phone_pb2.StreamCallResponse]:
def StreamCall(
self, request_iterator: Iterable[phone_pb2.StreamCallRequest],
context: grpc.ServicerContext
) -> Iterable[phone_pb2.StreamCallResponse]:
try:
request = next(request_iterator)
logging.info("Received a phone call request for number [%s]",

@ -42,8 +42,8 @@ class SignatureValidationInterceptor(grpc.aio.ServerInterceptor):
self._abort_handler = grpc.unary_unary_rpc_method_handler(abort)
async def intercept_service(
self, continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
self, continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
# Example HandlerCallDetails object:

@ -37,9 +37,9 @@ class FaultInjectGreeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self, failure_rate):
self._failure_rate = failure_rate
async def SayHello(self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext
) -> helloworld_pb2.HelloReply:
async def SayHello(
self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply:
if random.random() < self._failure_rate:
context.abort(grpc.StatusCode.UNAVAILABLE,
'Randomly injected failure.')

@ -23,9 +23,9 @@ import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext
) -> helloworld_pb2.HelloReply:
async def SayHello(
self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply:
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

@ -25,9 +25,9 @@ import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext
) -> helloworld_pb2.HelloReply:
async def SayHello(
self, request: helloworld_pb2.HelloRequest,
context: grpc.aio.ServicerContext) -> helloworld_pb2.HelloReply:
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

@ -54,8 +54,8 @@ async def guide_get_feature(stub: route_guide_pb2_grpc.RouteGuideStub) -> None:
# Performs a server-streaming call
async def guide_list_features(stub: route_guide_pb2_grpc.RouteGuideStub
) -> None:
async def guide_list_features(
stub: route_guide_pb2_grpc.RouteGuideStub) -> None:
rectangle = route_guide_pb2.Rectangle(
lo=route_guide_pb2.Point(latitude=400000000, longitude=-750000000),
hi=route_guide_pb2.Point(latitude=420000000, longitude=-730000000))
@ -67,8 +67,9 @@ async def guide_list_features(stub: route_guide_pb2_grpc.RouteGuideStub
print(f"Feature called {feature.name} at {feature.location}")
def generate_route(feature_list: List[route_guide_pb2.Feature]
) -> Iterable[route_guide_pb2.Point]:
def generate_route(
feature_list: List[route_guide_pb2.Feature]
) -> Iterable[route_guide_pb2.Point]:
for _ in range(0, 10):
random_feature = random.choice(feature_list)
print(f"Visiting point {random_feature.location}")

@ -72,9 +72,9 @@ class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
else:
return feature
async def ListFeatures(self, request: route_guide_pb2.Rectangle,
unused_context
) -> AsyncIterable[route_guide_pb2.Feature]:
async def ListFeatures(
self, request: route_guide_pb2.Rectangle,
unused_context) -> AsyncIterable[route_guide_pb2.Feature]:
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
@ -86,9 +86,8 @@ class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
feature.location.latitude <= top):
yield feature
async def RecordRoute(
self, request_iterator: AsyncIterable[route_guide_pb2.Point],
unused_context) -> route_guide_pb2.RouteSummary:
async def RecordRoute(self, request_iterator: AsyncIterable[
route_guide_pb2.Point], unused_context) -> route_guide_pb2.RouteSummary:
point_count = 0
feature_count = 0
distance = 0.0

@ -27,7 +27,8 @@ def update_deps(key, proto_filename, deps, deps_external, is_trans, visited):
with open(proto_filename) as inp:
for line in inp:
imp = re.search(r'import "([^"]*)"', line)
if not imp: continue
if not imp:
continue
imp_proto = imp.group(1)
# This indicates an external dependency, which we should handle
# differently and not traverse recursively
@ -40,7 +41,8 @@ def update_deps(key, proto_filename, deps, deps_external, is_trans, visited):
# revert the change to avoid file error.
if imp_proto.startswith('third_party/grpc'):
imp_proto = imp_proto[17:]
if key not in deps: deps[key] = []
if key not in deps:
deps[key] = []
deps[key].append(imp_proto[:-6])
if is_trans:
update_deps(key, imp_proto, deps, deps_external, is_trans,
@ -57,7 +59,8 @@ def main():
deps_external_trans = {}
for root, dirs, files in os.walk('src/proto'):
for f in files:
if f[-6:] != '.proto': continue
if f[-6:] != '.proto':
continue
look_at = os.path.join(root, f)
deps_for = look_at[:-6]
# First level deps

@ -230,8 +230,8 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer,
def _done():
return (state.code is not None or
cygrpc.OperationType.send_message not in
state.due)
cygrpc.OperationType.send_message
not in state.due)
_common.wait(state.condition.wait,
_done,
@ -786,10 +786,10 @@ class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint:
raise self
def _response_ready():
return (
self._state.response is not None or
(cygrpc.OperationType.receive_message not in self._state.due
and self._state.code is not None))
return (self._state.response is not None or
(cygrpc.OperationType.receive_message
not in self._state.due and
self._state.code is not None))
_common.wait(self._state.condition.wait, _response_ready)
if self._state.response is not None:

@ -28,8 +28,8 @@ RequestType = TypeVar('RequestType')
ResponseType = TypeVar('ResponseType')
OptionsType = Sequence[Tuple[str, str]]
CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials], Optional[
grpc.Compression]]
CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials],
Optional[grpc.Compression]]
_LOGGER = logging.getLogger(__name__)
@ -174,19 +174,19 @@ class ChannelCache:
@experimental_api
def unary_unary(
request: RequestType,
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
request: RequestType,
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> ResponseType:
"""Invokes a unary-unary RPC without an explicitly specified channel.
@ -255,19 +255,19 @@ def unary_unary(
@experimental_api
def unary_stream(
request: RequestType,
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
request: RequestType,
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> Iterator[ResponseType]:
"""Invokes a unary-stream RPC without an explicitly specified channel.
@ -335,19 +335,19 @@ def unary_stream(
@experimental_api
def stream_unary(
request_iterator: Iterator[RequestType],
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
request_iterator: Iterator[RequestType],
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> ResponseType:
"""Invokes a stream-unary RPC without an explicitly specified channel.
@ -415,19 +415,19 @@ def stream_unary(
@experimental_api
def stream_stream(
request_iterator: Iterator[RequestType],
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
request_iterator: Iterator[RequestType],
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
insecure: bool = False,
call_credentials: Optional[grpc.CallCredentials] = None,
compression: Optional[grpc.Compression] = None,
wait_for_ready: Optional[bool] = None,
timeout: Optional[float] = _DEFAULT_TIMEOUT,
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
) -> Iterator[ResponseType]:
"""Invokes a stream-stream RPC without an explicitly specified channel.

@ -28,15 +28,16 @@ class UnaryUnaryMultiCallable(abc.ABC):
"""Enables asynchronous invocation of a unary-call RPC."""
@abc.abstractmethod
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryUnaryCall:
def __call__(
self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryUnaryCall:
"""Asynchronously invokes the underlying RPC.
Args:
@ -66,15 +67,16 @@ class UnaryStreamMultiCallable(abc.ABC):
"""Enables asynchronous invocation of a server-streaming RPC."""
@abc.abstractmethod
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryStreamCall:
def __call__(
self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryStreamCall:
"""Asynchronously invokes the underlying RPC.
Args:
@ -104,14 +106,15 @@ class StreamUnaryMultiCallable(abc.ABC):
"""Enables asynchronous invocation of a client-streaming RPC."""
@abc.abstractmethod
def __call__(self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamUnaryCall:
def __call__(
self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamUnaryCall:
"""Asynchronously invokes the underlying RPC.
Args:
@ -142,14 +145,15 @@ class StreamStreamMultiCallable(abc.ABC):
"""Enables asynchronous invocation of a bidirectional-streaming RPC."""
@abc.abstractmethod
def __call__(self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamStreamCall:
def __call__(
self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamStreamCall:
"""Asynchronously invokes the underlying RPC.
Args:
@ -234,8 +238,8 @@ class Channel(abc.ABC):
@abc.abstractmethod
async def wait_for_state_change(
self,
last_observed_state: grpc.ChannelConnectivity,
self,
last_observed_state: grpc.ChannelConnectivity,
) -> None:
"""Waits for a change in connectivity state.
@ -264,10 +268,10 @@ class Channel(abc.ABC):
@abc.abstractmethod
def unary_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryUnaryMultiCallable:
"""Creates a UnaryUnaryMultiCallable for a unary-unary method.
@ -285,10 +289,10 @@ class Channel(abc.ABC):
@abc.abstractmethod
def unary_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryStreamMultiCallable:
"""Creates a UnaryStreamMultiCallable for a unary-stream method.
@ -306,10 +310,10 @@ class Channel(abc.ABC):
@abc.abstractmethod
def stream_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> StreamUnaryMultiCallable:
"""Creates a StreamUnaryMultiCallable for a stream-unary method.
@ -327,10 +331,10 @@ class Channel(abc.ABC):
@abc.abstractmethod
def stream_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> StreamStreamMultiCallable:
"""Creates a StreamStreamMultiCallable for a stream-stream method.

@ -169,10 +169,11 @@ class ServicerContext(Generic[RequestType, ResponseType], abc.ABC):
"""
@abc.abstractmethod
async def abort(self,
code: grpc.StatusCode,
details: str = '',
trailing_metadata: Metadata = tuple()) -> None:
async def abort(
self,
code: grpc.StatusCode,
details: str = '',
trailing_metadata: Metadata = tuple()) -> None:
"""Raises an exception to terminate the RPC with a non-OK status.
The code and details passed as arguments will supercede any existing

@ -393,9 +393,8 @@ class _StreamRequestMixin(Call):
def _metadata_sent_observer(self):
self._metadata_sent.set()
async def _consume_request_iterator(self,
request_iterator: RequestIterableType
) -> None:
async def _consume_request_iterator(
self, request_iterator: RequestIterableType) -> None:
try:
if inspect.isasyncgen(request_iterator) or hasattr(
request_iterator, '__aiter__'):

@ -73,13 +73,13 @@ class _BaseMultiCallable:
# pylint: disable=too-many-arguments
def __init__(
self,
channel: cygrpc.AioChannel,
method: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
interceptors: Optional[Sequence[ClientInterceptor]],
loop: asyncio.AbstractEventLoop,
self,
channel: cygrpc.AioChannel,
method: bytes,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction,
interceptors: Optional[Sequence[ClientInterceptor]],
loop: asyncio.AbstractEventLoop,
) -> None:
self._loop = loop
self._channel = channel
@ -89,9 +89,9 @@ class _BaseMultiCallable:
self._interceptors = interceptors
@staticmethod
def _init_metadata(metadata: Optional[Metadata] = None,
compression: Optional[grpc.Compression] = None
) -> Metadata:
def _init_metadata(
metadata: Optional[Metadata] = None,
compression: Optional[grpc.Compression] = None) -> Metadata:
"""Based on the provided values for <metadata> or <compression> initialise the final
metadata, as it should be used for the current call.
"""
@ -105,15 +105,16 @@ class _BaseMultiCallable:
class UnaryUnaryMultiCallable(_BaseMultiCallable,
_base_channel.UnaryUnaryMultiCallable):
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryUnaryCall:
def __call__(
self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryUnaryCall:
metadata = self._init_metadata(metadata, compression)
if not self._interceptors:
@ -135,15 +136,16 @@ class UnaryUnaryMultiCallable(_BaseMultiCallable,
class UnaryStreamMultiCallable(_BaseMultiCallable,
_base_channel.UnaryStreamMultiCallable):
def __call__(self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryStreamCall:
def __call__(
self,
request: Any,
*,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.UnaryStreamCall:
metadata = self._init_metadata(metadata, compression)
deadline = _timeout_to_deadline(timeout)
@ -166,14 +168,15 @@ class UnaryStreamMultiCallable(_BaseMultiCallable,
class StreamUnaryMultiCallable(_BaseMultiCallable,
_base_channel.StreamUnaryMultiCallable):
def __call__(self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamUnaryCall:
def __call__(
self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamUnaryCall:
metadata = self._init_metadata(metadata, compression)
deadline = _timeout_to_deadline(timeout)
@ -196,14 +199,15 @@ class StreamUnaryMultiCallable(_BaseMultiCallable,
class StreamStreamMultiCallable(_BaseMultiCallable,
_base_channel.StreamStreamMultiCallable):
def __call__(self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamStreamCall:
def __call__(
self,
request_iterator: Optional[RequestIterableType] = None,
timeout: Optional[float] = None,
metadata: Optional[Metadata] = None,
credentials: Optional[grpc.CallCredentials] = None,
wait_for_ready: Optional[bool] = None,
compression: Optional[grpc.Compression] = None
) -> _base_call.StreamStreamCall:
metadata = self._init_metadata(metadata, compression)
deadline = _timeout_to_deadline(timeout)
@ -361,8 +365,8 @@ class Channel(_base_channel.Channel):
return _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[result]
async def wait_for_state_change(
self,
last_observed_state: grpc.ChannelConnectivity,
self,
last_observed_state: grpc.ChannelConnectivity,
) -> None:
assert await self._channel.watch_connectivity_state(
last_observed_state.value[0], None)
@ -374,10 +378,10 @@ class Channel(_base_channel.Channel):
state = self.get_state(try_to_connect=True)
def unary_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryUnaryMultiCallable:
return UnaryUnaryMultiCallable(self._channel, _common.encode(method),
request_serializer,
@ -386,10 +390,10 @@ class Channel(_base_channel.Channel):
self._loop)
def unary_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> UnaryStreamMultiCallable:
return UnaryStreamMultiCallable(self._channel, _common.encode(method),
request_serializer,
@ -398,10 +402,10 @@ class Channel(_base_channel.Channel):
self._loop)
def stream_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> StreamUnaryMultiCallable:
return StreamUnaryMultiCallable(self._channel, _common.encode(method),
request_serializer,
@ -410,10 +414,10 @@ class Channel(_base_channel.Channel):
self._loop)
def stream_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None
) -> StreamStreamMultiCallable:
return StreamStreamMultiCallable(self._channel, _common.encode(method),
request_serializer,

@ -42,8 +42,8 @@ class ServerInterceptor(metaclass=ABCMeta):
@abstractmethod
async def intercept_service(
self, continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
self, continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
"""Intercepts incoming RPCs before handing them over to a handler.
@ -167,11 +167,11 @@ class StreamUnaryClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
@abstractmethod
async def intercept_stream_unary(
self,
continuation: Callable[[ClientCallDetails, RequestType],
UnaryStreamCall],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType,
self,
continuation: Callable[[ClientCallDetails, RequestType],
UnaryStreamCall],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType,
) -> StreamUnaryCall:
"""Intercepts a stream-unary invocation asynchronously.
@ -208,11 +208,11 @@ class StreamStreamClientInterceptor(ClientInterceptor, metaclass=ABCMeta):
@abstractmethod
async def intercept_stream_stream(
self,
continuation: Callable[[ClientCallDetails, RequestType],
UnaryStreamCall],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType,
self,
continuation: Callable[[ClientCallDetails, RequestType],
UnaryStreamCall],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType,
) -> Union[ResponseIterableType, StreamStreamCall]:
"""Intercepts a stream-stream invocation asynchronously.
@ -280,9 +280,8 @@ class InterceptedCall:
def __del__(self):
self.cancel()
def _fire_or_add_pending_done_callbacks(self,
interceptors_task: asyncio.Task
) -> None:
def _fire_or_add_pending_done_callbacks(
self, interceptors_task: asyncio.Task) -> None:
if not self._pending_add_done_callbacks:
return
@ -442,8 +441,8 @@ class _InterceptedStreamResponseMixin:
# consumed a logging warning is emmited by Asyncio.
self._response_aiter = None
async def _wait_for_interceptor_task_response_iterator(self
) -> ResponseType:
async def _wait_for_interceptor_task_response_iterator(
self) -> ResponseType:
call = await self._interceptors_task
async for response in call:
yield response
@ -572,14 +571,14 @@ class InterceptedUnaryUnaryCall(_InterceptedUnaryResponseMixin, InterceptedCall,
super().__init__(interceptors_task)
# pylint: disable=too-many-arguments
async def _invoke(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
method: bytes, timeout: Optional[float],
metadata: Optional[Metadata],
credentials: Optional[grpc.CallCredentials],
wait_for_ready: Optional[bool], request: RequestType,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction
) -> UnaryUnaryCall:
async def _invoke(
self, interceptors: Sequence[UnaryUnaryClientInterceptor],
method: bytes, timeout: Optional[float],
metadata: Optional[Metadata],
credentials: Optional[grpc.CallCredentials],
wait_for_ready: Optional[bool], request: RequestType,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction) -> UnaryUnaryCall:
"""Run the RPC call wrapped in interceptors"""
async def _run_interceptor(
@ -646,20 +645,20 @@ class InterceptedUnaryStreamCall(_InterceptedStreamResponseMixin,
super().__init__(interceptors_task)
# pylint: disable=too-many-arguments
async def _invoke(self, interceptors: Sequence[UnaryUnaryClientInterceptor],
method: bytes, timeout: Optional[float],
metadata: Optional[Metadata],
credentials: Optional[grpc.CallCredentials],
wait_for_ready: Optional[bool], request: RequestType,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction
) -> UnaryStreamCall:
async def _invoke(
self, interceptors: Sequence[UnaryUnaryClientInterceptor],
method: bytes, timeout: Optional[float],
metadata: Optional[Metadata],
credentials: Optional[grpc.CallCredentials],
wait_for_ready: Optional[bool], request: RequestType,
request_serializer: SerializingFunction,
response_deserializer: DeserializingFunction) -> UnaryStreamCall:
"""Run the RPC call wrapped in interceptors"""
async def _run_interceptor(
interceptors: Iterator[UnaryStreamClientInterceptor],
client_call_details: ClientCallDetails,
request: RequestType,
interceptors: Iterator[UnaryStreamClientInterceptor],
client_call_details: ClientCallDetails,
request: RequestType,
) -> _base_call.UnaryUnaryCall:
interceptor = next(interceptors, None)
@ -741,9 +740,9 @@ class InterceptedStreamUnaryCall(_InterceptedUnaryResponseMixin,
"""Run the RPC call wrapped in interceptors"""
async def _run_interceptor(
interceptors: Iterator[UnaryUnaryClientInterceptor],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType
interceptors: Iterator[UnaryUnaryClientInterceptor],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType
) -> _base_call.StreamUnaryCall:
interceptor = next(interceptors, None)
@ -814,9 +813,9 @@ class InterceptedStreamStreamCall(_InterceptedStreamResponseMixin,
"""Run the RPC call wrapped in interceptors"""
async def _run_interceptor(
interceptors: Iterator[StreamStreamClientInterceptor],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType
interceptors: Iterator[StreamStreamClientInterceptor],
client_call_details: ClientCallDetails,
request_iterator: RequestIterableType
) -> _base_call.StreamStreamCall:
interceptor = next(interceptors, None)
@ -908,8 +907,8 @@ class _StreamCallResponseIterator:
_call: Union[_base_call.UnaryStreamCall, _base_call.StreamStreamCall]
_response_iterator: AsyncIterable[ResponseType]
def __init__(self, call: Union[_base_call.UnaryStreamCall, _base_call.
StreamStreamCall],
def __init__(self, call: Union[_base_call.UnaryStreamCall,
_base_call.StreamStreamCall],
response_iterator: AsyncIterable[ResponseType]) -> None:
self._response_iterator = response_iterator
self._call = call

@ -24,43 +24,46 @@ class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer):
"""AsyncIO servicer for handling RPCs for service statuses."""
@staticmethod
async def GetTopChannels(request: _channelz_pb2.GetTopChannelsRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetTopChannelsResponse:
async def GetTopChannels(
request: _channelz_pb2.GetTopChannelsRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetTopChannelsResponse:
return _SyncChannelzServicer.GetTopChannels(request, context)
@staticmethod
async def GetServers(request: _channelz_pb2.GetServersRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetServersResponse:
async def GetServers(
request: _channelz_pb2.GetServersRequest,
context: aio.ServicerContext) -> _channelz_pb2.GetServersResponse:
return _SyncChannelzServicer.GetServers(request, context)
@staticmethod
async def GetServer(request: _channelz_pb2.GetServerRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetServerResponse:
async def GetServer(
request: _channelz_pb2.GetServerRequest,
context: aio.ServicerContext) -> _channelz_pb2.GetServerResponse:
return _SyncChannelzServicer.GetServer(request, context)
@staticmethod
async def GetServerSockets(request: _channelz_pb2.GetServerSocketsRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetServerSocketsResponse:
async def GetServerSockets(
request: _channelz_pb2.GetServerSocketsRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetServerSocketsResponse:
return _SyncChannelzServicer.GetServerSockets(request, context)
@staticmethod
async def GetChannel(request: _channelz_pb2.GetChannelRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetChannelResponse:
async def GetChannel(
request: _channelz_pb2.GetChannelRequest,
context: aio.ServicerContext) -> _channelz_pb2.GetChannelResponse:
return _SyncChannelzServicer.GetChannel(request, context)
@staticmethod
async def GetSubchannel(request: _channelz_pb2.GetSubchannelRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetSubchannelResponse:
async def GetSubchannel(
request: _channelz_pb2.GetSubchannelRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetSubchannelResponse:
return _SyncChannelzServicer.GetSubchannel(request, context)
@staticmethod
async def GetSocket(request: _channelz_pb2.GetSocketRequest,
context: aio.ServicerContext
) -> _channelz_pb2.GetSocketResponse:
async def GetSocket(
request: _channelz_pb2.GetSocketRequest,
context: aio.ServicerContext) -> _channelz_pb2.GetSocketResponse:
return _SyncChannelzServicer.GetSocket(request, context)

@ -71,9 +71,9 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
if request.service in self._server_watchers:
del self._server_watchers[request.service]
async def _set(self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus
) -> None:
async def _set(
self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus) -> None:
if service in self._server_watchers:
condition = self._server_watchers.get(service)
async with condition:
@ -82,9 +82,9 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
else:
self._server_status[service] = status
async def set(self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus
) -> None:
async def set(
self, service: str,
status: _health_pb2.HealthCheckResponse.ServingStatus) -> None:
"""Sets the status of a service.
Args:

@ -25,8 +25,8 @@ class ReflectionServicer(BaseReflectionServicer):
"""Servicer handling RPCs for service statuses."""
async def ServerReflectionInfo(
self, request_iterator: AsyncIterable[
_reflection_pb2.ServerReflectionRequest], unused_context
self, request_iterator: AsyncIterable[
_reflection_pb2.ServerReflectionRequest], unused_context
) -> AsyncIterable[_reflection_pb2.ServerReflectionResponse]:
async for request in request_iterator:
if request.HasField('file_by_filename'):

@ -105,9 +105,9 @@ def _create_server(config: control_pb2.ServerConfig) -> Tuple[aio.Server, int]:
return server, port
def _get_client_status(start_time: float, end_time: float,
qps_data: histogram.Histogram
) -> control_pb2.ClientStatus:
def _get_client_status(
start_time: float, end_time: float,
qps_data: histogram.Histogram) -> control_pb2.ClientStatus:
"""Creates ClientStatus proto message."""
latencies = qps_data.get_data()
end_time = time.monotonic()
@ -120,9 +120,9 @@ def _get_client_status(start_time: float, end_time: float,
return control_pb2.ClientStatus(stats=stats)
def _create_client(server: str, config: control_pb2.ClientConfig,
qps_data: histogram.Histogram
) -> benchmark_client.BenchmarkClient:
def _create_client(
server: str, config: control_pb2.ClientConfig,
qps_data: histogram.Histogram) -> benchmark_client.BenchmarkClient:
"""Creates a client object according to the ClientConfig."""
if config.load_params.WhichOneof('load') != 'closed_loop':
raise NotImplementedError(
@ -215,8 +215,8 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
await self._run_single_server(config, request_iterator, context)
else:
# If server_processes > 1, offload to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker() for _ in range(config.server_processes)))
sub_workers = await asyncio.gather(
*(_create_sub_worker() for _ in range(config.server_processes)))
calls = [worker.stub.RunServer() for worker in sub_workers]
@ -308,8 +308,8 @@ class WorkerServicer(worker_service_pb2_grpc.WorkerServiceServicer):
await self._run_single_client(config, request_iterator, context)
else:
# If client_processes > 1, offload the work to other processes.
sub_workers = await asyncio.gather(*(
_create_sub_worker() for _ in range(config.client_processes)))
sub_workers = await asyncio.gather(
*(_create_sub_worker() for _ in range(config.client_processes)))
calls = [worker.stub.RunClient() for worker in sub_workers]

@ -60,10 +60,10 @@ async def _validate_status_code_and_details(call: aio.Call,
await _expect_status_details(call, expected_details)
def _validate_payload_type_and_length(
response: Union[messages_pb2.SimpleResponse, messages_pb2.
StreamingOutputCallResponse], expected_type: Any,
expected_length: int) -> None:
def _validate_payload_type_and_length(response: Union[
messages_pb2.SimpleResponse, messages_pb2.StreamingOutputCallResponse],
expected_type: Any,
expected_length: int) -> None:
if response.payload.type is not expected_type:
raise ValueError('expected payload type %s, got %s' %
(expected_type, type(response.payload.type)))
@ -73,8 +73,8 @@ def _validate_payload_type_and_length(
async def _large_unary_common_behavior(
stub: test_pb2_grpc.TestServiceStub, fill_username: bool,
fill_oauth_scope: bool, call_credentials: Optional[grpc.CallCredentials]
stub: test_pb2_grpc.TestServiceStub, fill_username: bool,
fill_oauth_scope: bool, call_credentials: Optional[grpc.CallCredentials]
) -> messages_pb2.SimpleResponse:
size = 314159
request = messages_pb2.SimpleRequest(
@ -436,10 +436,10 @@ _TEST_CASE_IMPLEMENTATION_MAPPING = {
}
async def test_interoperability(case: TestCase,
stub: test_pb2_grpc.TestServiceStub,
args: Optional[argparse.Namespace] = None
) -> None:
async def test_interoperability(
case: TestCase,
stub: test_pb2_grpc.TestServiceStub,
args: Optional[argparse.Namespace] = None) -> None:
method = _TEST_CASE_IMPLEMENTATION_MAPPING.get(case)
if method is None:
raise NotImplementedError(f'Test case "{case}" not implemented!')

@ -38,8 +38,8 @@ class _LoggingInterceptor(aio.ServerInterceptor):
self.record = record
async def intercept_service(
self, continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
self, continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
self.record.append(self.tag + ':intercept_service')
@ -48,28 +48,29 @@ class _LoggingInterceptor(aio.ServerInterceptor):
class _GenericInterceptor(aio.ServerInterceptor):
def __init__(self, fn: Callable[[
Callable[[grpc.HandlerCallDetails], Awaitable[grpc.
RpcMethodHandler]],
grpc.HandlerCallDetails
], Any]) -> None:
def __init__(
self, fn: Callable[[
Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]], grpc.HandlerCallDetails
], Any]
) -> None:
self._fn = fn
async def intercept_service(
self, continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
self, continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
return await self._fn(continuation, handler_call_details)
def _filter_server_interceptor(condition: Callable,
interceptor: aio.ServerInterceptor
) -> aio.ServerInterceptor:
def _filter_server_interceptor(
condition: Callable,
interceptor: aio.ServerInterceptor) -> aio.ServerInterceptor:
async def intercept_service(
continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
if condition(handler_call_details):
@ -87,8 +88,8 @@ class _CacheInterceptor(aio.ServerInterceptor):
self.cache_store = cache_store or {}
async def intercept_service(
self, continuation: Callable[[grpc.HandlerCallDetails], Awaitable[
grpc.RpcMethodHandler]],
self, continuation: Callable[[grpc.HandlerCallDetails],
Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails
) -> grpc.RpcMethodHandler:
# Get the actual handler
@ -100,13 +101,14 @@ class _CacheInterceptor(aio.ServerInterceptor):
return handler
def wrapper(behavior: Callable[
[messages_pb2.SimpleRequest, aio.
ServicerContext], messages_pb2.SimpleResponse]):
[messages_pb2.SimpleRequest, aio.ServicerContext],
messages_pb2.SimpleResponse]):
@functools.wraps(behavior)
async def wrapper(request: messages_pb2.SimpleRequest,
context: aio.ServicerContext
) -> messages_pb2.SimpleResponse:
async def wrapper(
request: messages_pb2.SimpleRequest,
context: aio.ServicerContext
) -> messages_pb2.SimpleResponse:
if request.response_size not in self.cache_store:
self.cache_store[request.response_size] = await behavior(
request, context)
@ -118,7 +120,7 @@ class _CacheInterceptor(aio.ServerInterceptor):
async def _create_server_stub_pair(
*interceptors: aio.ServerInterceptor
*interceptors: aio.ServerInterceptor
) -> Tuple[aio.Server, test_pb2_grpc.TestServiceStub]:
"""Creates a server-stub pair with given interceptors.

@ -77,8 +77,8 @@ class _StatsWatcher:
self._rpcs_needed -= 1
self._condition.notify()
def await_rpc_stats_response(self, timeout_sec: int
) -> messages_pb2.LoadBalancerStatsResponse:
def await_rpc_stats_response(
self, timeout_sec: int) -> messages_pb2.LoadBalancerStatsResponse:
"""Blocks until a full response has been collected."""
with self._condition:
self._condition.wait_for(lambda: not self._rpcs_needed,
@ -111,9 +111,10 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
def __init__(self):
super(_LoadBalancerStatsServicer).__init__()
def GetClientStats(self, request: messages_pb2.LoadBalancerStatsRequest,
context: grpc.ServicerContext
) -> messages_pb2.LoadBalancerStatsResponse:
def GetClientStats(
self, request: messages_pb2.LoadBalancerStatsRequest,
context: grpc.ServicerContext
) -> messages_pb2.LoadBalancerStatsResponse:
logger.info("Received stats request.")
start = None
end = None
@ -132,8 +133,8 @@ class _LoadBalancerStatsServicer(test_pb2_grpc.LoadBalancerStatsServiceServicer
def _start_rpc(method: str, metadata: Sequence[Tuple[str, str]],
request_id: int, stub: test_pb2_grpc.TestServiceStub,
timeout: float,
futures: Mapping[int, Tuple[grpc.Future, str]]) -> None:
timeout: float, futures: Mapping[int, Tuple[grpc.Future,
str]]) -> None:
logger.info(f"Sending {method} request to backend: {request_id}")
if method == "UnaryCall":
future = stub.UnaryCall.future(messages_pb2.SimpleRequest(),

@ -77,7 +77,8 @@ def guess_cpu(scenario_json, is_tsan):
server = threads_required(scenario_json, 'server', is_tsan)
# make an arbitrary guess if set to auto-detect
# about the size of the jenkins instances we have for unit tests
if client == 0 or server == 0: return 'capacity'
if client == 0 or server == 0:
return 'capacity'
return (scenario_json['num_clients'] * client +
scenario_json['num_servers'] * server)

@ -48,15 +48,18 @@ def _rebuild_as_ordered_dict(indict, special_keys):
if key in indict:
outdict[key] = indict[key]
for key in sorted(indict.keys()):
if key in special_keys: continue
if '#' in key: continue
if key in special_keys:
continue
if '#' in key:
continue
outdict[key] = indict[key]
return outdict
def _clean_elem(indict):
for name in ['public_headers', 'headers', 'src']:
if name not in indict: continue
if name not in indict:
continue
inlist = indict[name]
protos = list(x for x in inlist if os.path.splitext(x)[1] == '.proto')
others = set(x for x in inlist if x not in protos)
@ -68,7 +71,8 @@ def cleaned_build_yaml_dict_as_string(indict):
"""Takes dictionary which represents yaml file and returns the cleaned-up yaml string"""
js = _rebuild_as_ordered_dict(indict, _TOP_LEVEL_KEYS)
for grp in ['filegroups', 'libs', 'targets']:
if grp not in js: continue
if grp not in js:
continue
js[grp] = sorted([_clean_elem(x) for x in js[grp]],
key=lambda x: (x.get('language', '_'), x['name']))
output = yaml.dump(js, indent=2, width=80, default_flow_style=False)

@ -41,7 +41,8 @@ def merge_json(dst, add):
if isinstance(dst, dict) and isinstance(add, dict):
for k, v in add.items():
if k in dst:
if k == '#': continue
if k == '#':
continue
merge_json(dst[k], v)
else:
dst[k] = v

@ -119,7 +119,8 @@ def mako_plugin(dictionary):
while todo:
cur = todo[0]
todo = todo[1:]
if cur in done: continue
if cur in done:
continue
things[cur]['used_by'].append(thing['name'])
todo.extend(thing_deps(things[cur]))
done.add(cur)

@ -73,8 +73,10 @@ args = argp.parse_args()
vals = []
for line in sys.stdin:
line = line.strip()
if line == '': continue
if line[0] == '#': continue
if line == '':
continue
if line[0] == '#':
continue
key_tail, value = line[1:].split(':')
key = (line[0] + key_tail).strip()
value = value.strip()

@ -67,7 +67,8 @@ def put_banner(files, banner):
with open(sys.argv[0]) as my_source:
copyright = []
for line in my_source:
if line[0] != '#': break
if line[0] != '#':
break
for line in my_source:
if line[0] == '#':
copyright.append(line)
@ -141,9 +142,12 @@ bool grpc_wire_id_to_setting_id(uint32_t wire_id, grpc_chttp2_setting_id *out) {
switch (y) {
""" % cgargs
for i, r in enumerate(p.r):
if not r: continue
if r < 0: print >> C, 'case %d: h -= %d; break;' % (i, -r)
else: print >> C, 'case %d: h += %d; break;' % (i, r)
if not r:
continue
if r < 0:
print >> C, 'case %d: h -= %d; break;' % (i, -r)
else:
print >> C, 'case %d: h += %d; break;' % (i, r)
print >> C, """
}
*out = (grpc_chttp2_setting_id)h;

@ -82,10 +82,13 @@ def find_ideal_shift(mapped_bounds, max_size):
best = None
for shift_bits in reversed(range(0, 64)):
n = shift_works_until(mapped_bounds, shift_bits)
if n == 0: continue
if n == 0:
continue
table_size = mapped_bounds[n - 1] >> shift_bits
if table_size > max_size: continue
if table_size > 65535: continue
if table_size > max_size:
continue
if table_size > 65535:
continue
if best is None:
best = (shift_bits, n, table_size)
elif best[1] < n:
@ -114,7 +117,8 @@ def decl_static_table(values, type):
global static_tables
v = (type, values)
for i, vp in enumerate(static_tables):
if v == vp: return i
if v == vp:
return i
print "ADD TABLE: %s %r" % (type, values)
r = len(static_tables)
static_tables.append(v)
@ -212,7 +216,8 @@ with open('src/core/lib/debug/stats_data.h', 'w') as H:
with open(sys.argv[0]) as my_source:
copyright = []
for line in my_source:
if line[0] != '#': break
if line[0] != '#':
break
for line in my_source:
if line[0] == '#':
copyright.append(line)
@ -304,7 +309,8 @@ with open('src/core/lib/debug/stats_data.cc', 'w') as C:
with open(sys.argv[0]) as my_source:
copyright = []
for line in my_source:
if line[0] != '#': break
if line[0] != '#':
break
for line in my_source:
if line[0] == '#':
copyright.append(line)
@ -427,7 +433,8 @@ with open('tools/run_tests/performance/scenario_result_schema.json', 'w') as f:
with open('tools/run_tests/performance/massage_qps_stats.py', 'w') as P:
with open(sys.argv[0]) as my_source:
for line in my_source:
if line[0] != '#': break
if line[0] != '#':
break
for line in my_source:
if line[0] == '#':
print >> P, line.rstrip()

@ -46,7 +46,8 @@ for target_dir in _TARGET_DIRS:
printed_banner = True
print root
errors += 1
if printed_banner: print
if printed_banner:
print
printed_banner = False
for target_dir in _TARGET_DIRS:
for root, dirs, filenames in os.walk(target_dir):

@ -130,7 +130,8 @@ assert (re.search(RE_LICENSE['Makefile'], load('Makefile')))
def log(cond, why, filename):
if not cond: return
if not cond:
return
if args.output == 'details':
print('%s: %s' % (why, filename))
else:

@ -101,13 +101,15 @@ class GuardValidator(object):
if not running_guard.endswith('_H'):
fcontents = self.fail(fpath, match.re, match.string, match.group(1),
valid_guard, fix)
if fix: save(fpath, fcontents)
if fix:
save(fpath, fcontents)
# Is it the expected one based on the file path?
if running_guard != valid_guard:
fcontents = self.fail(fpath, match.re, match.string, match.group(1),
valid_guard, fix)
if fix: save(fpath, fcontents)
if fix:
save(fpath, fcontents)
# Is there a #define? Is it the same as the #ifndef one?
match = self.define_re.search(fcontents)
@ -120,7 +122,8 @@ class GuardValidator(object):
if match.group(1) != running_guard:
fcontents = self.fail(fpath, match.re, match.string, match.group(1),
valid_guard, fix)
if fix: save(fpath, fcontents)
if fix:
save(fpath, fcontents)
# Is there a properly commented #endif?
flines = fcontents.rstrip().splitlines()
@ -148,7 +151,8 @@ class GuardValidator(object):
# Is the #endif guard the same as the #ifndef and #define guards?
fcontents = self.fail(fpath, endif_re, fcontents, match.group(1),
valid_guard, fix)
if fix: save(fpath, fcontents)
if fix:
save(fpath, fcontents)
return not self.failed # Did the check succeed? (ie, not failed)

@ -118,17 +118,24 @@ def get_status_data(statuses_url, system):
failures = 0
errors = 0
latest_datetime = None
if not statuses: return None
if system == 'kokoro': string_in_target_url = 'kokoro'
elif system == 'jenkins': string_in_target_url = 'grpc-testing'
if not statuses:
return None
if system == 'kokoro':
string_in_target_url = 'kokoro'
elif system == 'jenkins':
string_in_target_url = 'grpc-testing'
for status in statuses['statuses']:
if not status['target_url'] or string_in_target_url not in status[
'target_url']:
continue # Ignore jenkins
if status['state'] == 'pending': return None
elif status['state'] == 'success': successes += 1
elif status['state'] == 'failure': failures += 1
elif status['state'] == 'error': errors += 1
if status['state'] == 'pending':
return None
elif status['state'] == 'success':
successes += 1
elif status['state'] == 'failure':
failures += 1
elif status['state'] == 'error':
errors += 1
if not latest_datetime:
latest_datetime = parse_timestamp(status['updated_at'])
else:
@ -174,7 +181,8 @@ def main():
args_parser = build_args_parser()
args = args_parser.parse_args()
TOKEN = args.token
if args.format == 'csv': print_csv_header()
if args.format == 'csv':
print_csv_header()
for pr_data in get_pr_data():
commit_data = get_commits_data(pr_data['number'])
# PR with a single commit -> use the PRs creation time.

@ -32,9 +32,12 @@ with open(args.output, 'w') as outf:
writer = csv.DictWriter(
outf, ['date', 'name', 'language', 'code', 'comment', 'blank'])
for key, value in data.iteritems():
if key == 'header': continue
if key == 'SUM': continue
if key.startswith('third_party/'): continue
if key == 'header':
continue
if key == 'SUM':
continue
if key.startswith('third_party/'):
continue
row = {'name': key, 'date': args.date}
row.update(value)
writer.writerow(row)

@ -67,8 +67,10 @@ def parse_owners(filename):
for line in src:
line = line.strip()
# line := directive | comment
if not line: continue
if line[0] == '#': continue
if not line:
continue
if line[0] == '#':
continue
# it's a directive
directive = None
if line == 'set noparent':
@ -102,11 +104,13 @@ for owners in owners_data:
best_parent = None
best_parent_score = None
for possible_parent in owners_data:
if possible_parent is owners: continue
if possible_parent is owners:
continue
rel = os.path.relpath(owners.dir, possible_parent.dir)
# '..' ==> we had to walk up from possible_parent to get to owners
# ==> not a parent
if '..' in rel: continue
if '..' in rel:
continue
depth = len(rel.split(os.sep))
if not best_parent or depth < best_parent_score:
best_parent = possible_parent
@ -134,7 +138,8 @@ gg_cache = {}
def git_glob(glob):
global gg_cache
if glob in gg_cache: return gg_cache[glob]
if glob in gg_cache:
return gg_cache[glob]
r = set(
subprocess.check_output([
'git', 'ls-files', os.path.join(git_root, glob)
@ -176,7 +181,8 @@ def expand_directives(root, directives):
def add_parent_to_globs(parent, globs, globs_dir):
if not parent: return
if not parent:
return
for owners in owners_data:
if owners.dir == parent:
owners_globs = expand_directives(owners.dir, owners.directives)

@ -60,6 +60,7 @@ for row in bm_json.expand_json(js, js2):
sane_row = {}
for name, sql_type in columns:
if name in row:
if row[name] == '': continue
if row[name] == '':
continue
sane_row[name] = SANITIZE[sql_type](row[name])
writer.writerow(sane_row)

@ -82,14 +82,16 @@ def _args():
help='Print details of before/after')
args = argp.parse_args()
global verbose
if args.verbose: verbose = True
if args.verbose:
verbose = True
assert args.new
assert args.old
return args
def _maybe_print(str):
if verbose: print str
if verbose:
print str
class Benchmark:
@ -110,7 +112,8 @@ class Benchmark:
for f in sorted(track):
new = self.samples[True][f]
old = self.samples[False][f]
if not new or not old: continue
if not new or not old:
continue
mdn_diff = abs(_median(new) - _median(old))
_maybe_print('%s: %s=%r %s=%r mdn_diff=%r' %
(f, new_name, new, old_name, old, mdn_diff))
@ -204,7 +207,8 @@ def diff(bms, loops, regex, track, old, new, counters):
headers = ['Benchmark'] + fields
rows = []
for name in sorted(benchmarks.keys()):
if benchmarks[name].skip(): continue
if benchmarks[name].skip():
continue
rows.append([name] + benchmarks[name].row(fields))
note = None
if len(badjson_files):

@ -29,25 +29,33 @@ def cmp(a, b):
def speedup(new, old, threshold=_DEFAULT_THRESHOLD):
if (len(set(new))) == 1 and new == old: return 0
if (len(set(new))) == 1 and new == old:
return 0
s0, p0 = cmp(new, old)
if math.isnan(p0): return 0
if s0 == 0: return 0
if p0 > threshold: return 0
if math.isnan(p0):
return 0
if s0 == 0:
return 0
if p0 > threshold:
return 0
if s0 < 0:
pct = 1
while pct < 100:
sp, pp = cmp(new, scale(old, 1 - pct / 100.0))
if sp > 0: break
if pp > threshold: break
if sp > 0:
break
if pp > threshold:
break
pct += 1
return -(pct - 1)
else:
pct = 1
while pct < 10000:
sp, pp = cmp(new, scale(old, 1 + pct / 100.0))
if sp < 0: break
if pp > threshold: break
if sp < 0:
break
if pp > threshold:
break
pct += 1
return pct - 1

@ -120,7 +120,8 @@ _BM_SPECS = {
def numericalize(s):
"""Convert abbreviations like '100M' or '10k' to a number."""
if not s: return ''
if not s:
return ''
if s[-1] == 'k':
return float(s[:-1]) * 1024
if s[-1] == 'M':
@ -177,8 +178,10 @@ def parse_name(name):
def expand_json(js, js2=None):
if not js and not js2: raise StopIteration()
if not js: js = js2
if not js and not js2:
raise StopIteration()
if not js:
js = js2
for bm in js['benchmarks']:
if bm['name'].endswith('_stddev') or bm['name'].endswith('_mean'):
continue

@ -81,7 +81,10 @@ def create_jobspec(name,
class CSharpDistribTest(object):
"""Tests C# NuGet package"""
def __init__(self, platform, arch, docker_suffix=None,
def __init__(self,
platform,
arch,
docker_suffix=None,
use_dotnet_cli=False):
self.name = 'csharp_%s_%s' % (platform, arch)
self.platform = platform

@ -596,7 +596,8 @@ class CXXLanguage:
1, 200000, math.sqrt(10)):
if synchronicity == 'sync' and outstanding > 1200:
continue
if outstanding < channels: continue
if outstanding < channels:
continue
yield _ping_pong_scenario(
'cpp_protobuf_%s_%s_qps_unconstrained_%s_%d_channels_%d_outstanding'
% (synchronicity, rpc_type, secstr, channels,

@ -456,14 +456,17 @@ class Jobset(object):
message('SKIPPED', spec.shortname, do_newline=True)
self.resultset[spec.shortname] = [skipped_job_result]
return True
if self.cancelled(): return False
if self.cancelled():
return False
current_cpu_cost = self.cpu_cost()
if current_cpu_cost == 0: break
if current_cpu_cost == 0:
break
if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
if len(self._running) < self._maxjobs_cpu_agnostic:
break
self.reap(spec.shortname, spec.cpu_cost)
if self.cancelled(): return False
if self.cancelled():
return False
job = Job(spec, self._newline_on_success, self._travis, self._add_env,
self._quiet_success)
self._running.add(job)
@ -477,7 +480,8 @@ class Jobset(object):
dead = set()
for job in self._running:
st = eintr_be_gone(lambda: job.state())
if st == _RUNNING: continue
if st == _RUNNING:
continue
if st == _FAILURE or st == _KILLED:
self._failures += 1
if self._stop_on_failure:
@ -491,7 +495,8 @@ class Jobset(object):
if not self._quiet_success or job.result.state != 'PASSED':
self.resultset[job.GetSpec().shortname].append(job.result)
self._running.remove(job)
if dead: return
if dead:
return
if not self._travis and platform_string() != 'windows':
rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
if self._remaining is not None and self._completed > 0:
@ -518,8 +523,10 @@ class Jobset(object):
def cancelled(self):
"""Poll for cancellation."""
if self._cancelled: return True
if not self._check_cancelled(): return False
if self._cancelled:
return True
if not self._check_cancelled():
return False
for job in self._running:
job.kill()
self._cancelled = True
@ -527,7 +534,8 @@ class Jobset(object):
def finish(self):
while self._running:
if self.cancelled(): pass # poll cancellation
if self.cancelled():
pass # poll cancellation
self.reap()
if platform_string() != 'windows':
signal.alarm(0)

@ -71,7 +71,8 @@ cronet_restricted_ports = [
def can_connect(port):
# this test is only really useful on unices where SO_REUSE_PORT is available
# so on Windows, where this test is expensive, skip it
if platform.system() == 'Windows': return False
if platform.system() == 'Windows':
return False
s = socket.socket()
try:
s.connect(('localhost', port))
@ -102,7 +103,8 @@ def refill_pool(max_timeout, req):
]
random.shuffle(chk)
for i in chk:
if len(pool) > 100: break
if len(pool) > 100:
break
if i in in_use:
age = time.time() - in_use[i]
if age < max_timeout:

@ -39,7 +39,8 @@ class DirWatcher(object):
continue
for root, _, files in os.walk(path):
for f in files:
if f and f[0] == '.': continue
if f and f[0] == '.':
continue
try:
st = os.stat(os.path.join(root, f))
except OSError as e:

@ -41,7 +41,8 @@ def fnize(s):
out = ''
for c in s:
if c in '<>, /':
if len(out) and out[-1] == '_': continue
if len(out) and out[-1] == '_':
continue
out += '_'
else:
out += c

@ -357,7 +357,8 @@ class CLanguage(object):
stderr=fnull)
for line in tests.split('\n'):
test = line.strip()
if not test: continue
if not test:
continue
cmdline = [binary,
'--benchmark_filter=%s$' % test
] + target['args']
@ -382,8 +383,10 @@ class CLanguage(object):
base = None
for line in tests.split('\n'):
i = line.find('#')
if i >= 0: line = line[:i]
if not line: continue
if i >= 0:
line = line[:i]
if not line:
continue
if line[0] != ' ':
base = line.strip()
else:
@ -1324,7 +1327,8 @@ def runs_per_test_type(arg_str):
return 0
try:
n = int(arg_str)
if n <= 0: raise ValueError
if n <= 0:
raise ValueError
return n
except:
msg = '\'{}\' is not a positive integer or \'inf\''.format(arg_str)

@ -369,7 +369,8 @@ def _runs_per_test_type(arg_str):
"""Auxiliary function to parse the "runs_per_test" flag."""
try:
n = int(arg_str)
if n <= 0: raise ValueError
if n <= 0:
raise ValueError
return n
except:
msg = '\'{}\' is not a positive integer'.format(arg_str)

@ -25,7 +25,8 @@ def check_port_platform_inclusion(directory_root):
for root, dirs, files in os.walk(directory_root):
for filename in files:
path = os.path.join(root, filename)
if os.path.splitext(path)[1] not in ['.c', '.cc', '.h']: continue
if os.path.splitext(path)[1] not in ['.c', '.cc', '.h']:
continue
if path in [
os.path.join('include', 'grpc', 'support',
'port_platform.h'),

@ -26,7 +26,8 @@ pattern = re.compile("GRPC_TRACER_INITIALIZER\((true|false), \"(.*)\"\)")
for root, dirs, files in os.walk('src/core'):
for filename in files:
path = os.path.join(root, filename)
if os.path.splitext(path)[1] != '.c': continue
if os.path.splitext(path)[1] != '.c':
continue
with open(path) as f:
text = f.read()
for o in pattern.findall(text):

@ -52,15 +52,18 @@ BANNED_EXCEPT = {
errors = 0
num_files = 0
for root, dirs, files in os.walk('src/core'):
if root.startswith('src/core/tsi'): continue
if root.startswith('src/core/tsi'):
continue
for filename in files:
num_files += 1
path = os.path.join(root, filename)
if os.path.splitext(path)[1] != '.cc': continue
if os.path.splitext(path)[1] != '.cc':
continue
with open(path) as f:
text = f.read()
for banned, exceptions in BANNED_EXCEPT.items():
if path in exceptions: continue
if path in exceptions:
continue
if banned in text:
print('Illegal use of "%s" in %s' % (banned, path))
errors += 1

@ -50,7 +50,8 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
HTTP2 = enum.auto()
GRPC = enum.auto()
def create_health_check_tcp(self, name,
def create_health_check_tcp(self,
name,
use_serving_port=False) -> GcpResource:
health_check_settings = {}
if use_serving_port:
@ -113,12 +114,12 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
name)
def create_url_map(
self,
name: str,
matcher_name: str,
src_hosts,
dst_default_backend_service: GcpResource,
dst_host_rule_match_backend_service: Optional[GcpResource] = None,
self,
name: str,
matcher_name: str,
src_hosts,
dst_default_backend_service: GcpResource,
dst_host_rule_match_backend_service: Optional[GcpResource] = None,
) -> GcpResource:
if dst_host_rule_match_backend_service is None:
dst_host_rule_match_backend_service = dst_default_backend_service
@ -142,9 +143,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
self._delete_resource(self.api.urlMaps(), 'urlMap', name)
def create_target_grpc_proxy(
self,
name: str,
url_map: GcpResource,
self,
name: str,
url_map: GcpResource,
) -> GcpResource:
return self._insert_resource(self.api.targetGrpcProxies(), {
'name': name,
@ -157,9 +158,9 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
name)
def create_target_http_proxy(
self,
name: str,
url_map: GcpResource,
self,
name: str,
url_map: GcpResource,
) -> GcpResource:
return self._insert_resource(self.api.targetHttpProxies(), {
'name': name,
@ -171,11 +172,11 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
name)
def create_forwarding_rule(
self,
name: str,
src_port: int,
target_proxy: GcpResource,
network_url: str,
self,
name: str,
src_port: int,
target_proxy: GcpResource,
network_url: str,
) -> GcpResource:
return self._insert_resource(
self.api.globalForwardingRules(),
@ -229,11 +230,11 @@ class ComputeV1(gcp.api.GcpProjectApiResource):
return neg
def wait_for_backends_healthy_status(
self,
backend_service,
backends,
timeout_sec=_WAIT_FOR_BACKEND_SEC,
wait_sec=4,
self,
backend_service,
backends,
timeout_sec=_WAIT_FOR_BACKEND_SEC,
wait_sec=4,
):
pending = set(backends)

@ -100,7 +100,8 @@ class KubernetesNamespace:
def get_service_account(self, name) -> V1Service:
return self.api.core.read_namespaced_service_account(name, self.name)
def delete_service(self, name,
def delete_service(self,
name,
grace_period_seconds=DELETE_GRACE_PERIOD_SEC):
self.api.core.delete_namespaced_service(
name=name,
@ -278,11 +279,11 @@ class KubernetesNamespace:
_wait_for_pod_started()
def port_forward_pod(
self,
pod: V1Pod,
remote_port: int,
local_port: Optional[int] = None,
local_address: Optional[str] = None,
self,
pod: V1Pod,
remote_port: int,
local_port: Optional[int] = None,
local_address: Optional[str] = None,
) -> subprocess.Popen:
"""Experimental"""
local_address = local_address or self.PORT_FORWARD_LOCAL_ADDRESS

@ -47,12 +47,12 @@ class TrafficDirectorManager:
FORWARDING_RULE_NAME = "forwarding-rule"
def __init__(
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
network: str = 'default',
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
network: str = 'default',
):
# API
self.compute = _ComputeV1(gcp_api_manager, project)
@ -87,8 +87,10 @@ class TrafficDirectorManager:
self.setup_backend_for_grpc(protocol=backend_protocol)
self.setup_routing_rule_map_for_grpc(service_host, service_port)
def setup_backend_for_grpc(
self, *, protocol: Optional[BackendServiceProtocol] = _BackendGRPC):
def setup_backend_for_grpc(self,
*,
protocol: Optional[
BackendServiceProtocol] = _BackendGRPC):
self.create_health_check()
self.create_backend_service(protocol)
@ -191,9 +193,9 @@ class TrafficDirectorManager:
self.backends)
def create_url_map(
self,
src_host: str,
src_port: int,
self,
src_host: str,
src_port: int,
) -> GcpResource:
src_address = f'{src_host}:{src_port}'
name = self._ns_name(self.URL_MAP_NAME)
@ -290,12 +292,12 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
CERTIFICATE_PROVIDER_INSTANCE = "google_cloud_private_spiffe"
def __init__(
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
network: str = 'default',
self,
gcp_api_manager: gcp.api.GcpApiManager,
project: str,
*,
resource_prefix: str,
network: str = 'default',
):
super().__init__(gcp_api_manager,
project,
@ -448,9 +450,9 @@ class TrafficDirectorSecureManager(TrafficDirectorManager):
self.client_tls_policy = None
def backend_service_apply_client_mtls_policy(
self,
server_namespace,
server_name,
self,
server_namespace,
server_name,
):
if not self.client_tls_policy:
logger.warning(

@ -37,10 +37,10 @@ class LoadBalancerStatsServiceClient(framework.rpc.grpc.GrpcClientHelper):
super().__init__(channel, test_pb2_grpc.LoadBalancerStatsServiceStub)
def get_client_stats(
self,
*,
num_rpcs: int,
timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
self,
*,
num_rpcs: int,
timeout_sec: Optional[int] = STATS_PARTIAL_RESULTS_TIMEOUT_SEC,
) -> LoadBalancerStatsResponse:
if timeout_sec is None:
timeout_sec = self.STATS_PARTIAL_RESULTS_TIMEOUT_SEC

@ -72,10 +72,10 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
def get_load_balancer_stats(
self,
*,
num_rpcs: int,
timeout_sec: Optional[int] = None,
self,
*,
num_rpcs: int,
timeout_sec: Optional[int] = None,
) -> grpc_testing.LoadBalancerStatsResponse:
"""
Shortcut to LoadBalancerStatsServiceClient.get_client_stats()
@ -121,11 +121,11 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
logger.debug('Found client -> server socket: %s', socket.ref.name)
return socket
def wait_for_server_channel_state(self,
state: _ChannelzChannelState,
*,
timeout: Optional[_timedelta] = None
) -> _ChannelzChannel:
def wait_for_server_channel_state(
self,
state: _ChannelzChannelState,
*,
timeout: Optional[_timedelta] = None) -> _ChannelzChannel:
# Fine-tuned to wait for the channel to the server.
retryer = retryers.exponential_retryer_with_timeout(
wait_min=_timedelta(seconds=10),
@ -141,11 +141,11 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
_ChannelzChannelState.Name(state), channel)
return channel
def find_server_channel_with_state(self,
state: _ChannelzChannelState,
*,
check_subchannel=True
) -> _ChannelzChannel:
def find_server_channel_with_state(
self,
state: _ChannelzChannelState,
*,
check_subchannel=True) -> _ChannelzChannel:
for channel in self.get_server_channels():
channel_state: _ChannelzChannelState = channel.data.state.state
logger.info('Server channel: %s, state: %s', channel.ref.name,
@ -169,9 +169,9 @@ class XdsTestClient(framework.rpc.grpc.GrpcApp):
f'Client has no {_ChannelzChannelState.Name(state)} channel with '
'the server')
def find_subchannel_with_state(self, channel: _ChannelzChannel,
state: _ChannelzChannelState
) -> _ChannelzSubchannel:
def find_subchannel_with_state(
self, channel: _ChannelzChannel,
state: _ChannelzChannelState) -> _ChannelzSubchannel:
for subchannel in self.channelz.list_channel_subchannels(channel):
if subchannel.data.state.state is state:
return subchannel

@ -444,7 +444,7 @@ class SecurityXdsKubernetesTestCase(XdsKubernetesTestCase):
@staticmethod
def getConnectedSockets(
test_client: XdsTestClient, test_server: XdsTestServer
test_client: XdsTestClient, test_server: XdsTestServer
) -> Tuple[grpc_channelz.Socket, grpc_channelz.Socket]:
client_sock = test_client.get_active_server_channel_socket()
server_sock = test_server.get_server_socket_matching_client(client_sock)

Loading…
Cancel
Save