mirror of https://github.com/grpc/grpc.git
Merge pull request #21954 from gnossen/simple_stubs
Experimental: Implement Top-Level Invocation Functions Not Requiring an Explicit Channelpull/22066/head
commit
3ed237d4bb
12 changed files with 930 additions and 32 deletions
@ -0,0 +1,450 @@ |
||||
# Copyright 2020 The gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Functions that obviate explicit stubs and explicit channels.""" |
||||
|
||||
import collections |
||||
import datetime |
||||
import os |
||||
import logging |
||||
import threading |
||||
from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence, |
||||
Tuple, TypeVar, Union) |
||||
|
||||
import grpc |
||||
from grpc.experimental import experimental_api |
||||
|
||||
RequestType = TypeVar('RequestType') |
||||
ResponseType = TypeVar('ResponseType') |
||||
|
||||
OptionsType = Sequence[Tuple[str, str]] |
||||
CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials], Optional[ |
||||
grpc.Compression]] |
||||
|
||||
_LOGGER = logging.getLogger(__name__) |
||||
|
||||
_EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" |
||||
if _EVICTION_PERIOD_KEY in os.environ: |
||||
_EVICTION_PERIOD = datetime.timedelta( |
||||
seconds=float(os.environ[_EVICTION_PERIOD_KEY])) |
||||
_LOGGER.debug("Setting managed channel eviction period to %s", |
||||
_EVICTION_PERIOD) |
||||
else: |
||||
_EVICTION_PERIOD = datetime.timedelta(minutes=10) |
||||
|
||||
_MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" |
||||
if _MAXIMUM_CHANNELS_KEY in os.environ: |
||||
_MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY]) |
||||
_LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS) |
||||
else: |
||||
_MAXIMUM_CHANNELS = 2**8 |
||||
|
||||
|
||||
def _create_channel(target: str, options: Sequence[Tuple[str, str]], |
||||
channel_credentials: Optional[grpc.ChannelCredentials], |
||||
compression: Optional[grpc.Compression]) -> grpc.Channel: |
||||
channel_credentials = channel_credentials or grpc.local_channel_credentials( |
||||
) |
||||
if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials: |
||||
_LOGGER.debug(f"Creating insecure channel with options '{options}' " + |
||||
f"and compression '{compression}'") |
||||
return grpc.insecure_channel(target, |
||||
options=options, |
||||
compression=compression) |
||||
else: |
||||
_LOGGER.debug( |
||||
f"Creating secure channel with credentials '{channel_credentials}', " |
||||
+ f"options '{options}' and compression '{compression}'") |
||||
return grpc.secure_channel(target, |
||||
credentials=channel_credentials, |
||||
options=options, |
||||
compression=compression) |
||||
|
||||
|
||||
class ChannelCache: |
||||
# NOTE(rbellevi): Untyped due to reference cycle. |
||||
_singleton = None |
||||
_lock: threading.RLock = threading.RLock() |
||||
_condition: threading.Condition = threading.Condition(lock=_lock) |
||||
_eviction_ready: threading.Event = threading.Event() |
||||
|
||||
_mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]] |
||||
_eviction_thread: threading.Thread |
||||
|
||||
def __init__(self): |
||||
self._mapping = collections.OrderedDict() |
||||
self._eviction_thread = threading.Thread( |
||||
target=ChannelCache._perform_evictions, daemon=True) |
||||
self._eviction_thread.start() |
||||
|
||||
@staticmethod |
||||
def get(): |
||||
with ChannelCache._lock: |
||||
if ChannelCache._singleton is None: |
||||
ChannelCache._singleton = ChannelCache() |
||||
ChannelCache._eviction_ready.wait() |
||||
return ChannelCache._singleton |
||||
|
||||
def _evict_locked(self, key: CacheKey): |
||||
channel, _ = self._mapping.pop(key) |
||||
_LOGGER.debug("Evicting channel %s with configuration %s.", channel, |
||||
key) |
||||
channel.close() |
||||
del channel |
||||
|
||||
@staticmethod |
||||
def _perform_evictions(): |
||||
while True: |
||||
with ChannelCache._lock: |
||||
ChannelCache._eviction_ready.set() |
||||
if not ChannelCache._singleton._mapping: |
||||
ChannelCache._condition.wait() |
||||
elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS: |
||||
key = next(iter(ChannelCache._singleton._mapping.keys())) |
||||
ChannelCache._singleton._evict_locked(key) |
||||
# And immediately reevaluate. |
||||
else: |
||||
key, (_, eviction_time) = next( |
||||
iter(ChannelCache._singleton._mapping.items())) |
||||
now = datetime.datetime.now() |
||||
if eviction_time <= now: |
||||
ChannelCache._singleton._evict_locked(key) |
||||
continue |
||||
else: |
||||
time_to_eviction = (eviction_time - now).total_seconds() |
||||
# NOTE: We aim to *eventually* coalesce to a state in |
||||
# which no overdue channels are in the cache and the |
||||
# length of the cache is longer than _MAXIMUM_CHANNELS. |
||||
# We tolerate momentary states in which these two |
||||
# criteria are not met. |
||||
ChannelCache._condition.wait(timeout=time_to_eviction) |
||||
|
||||
def get_channel(self, target: str, options: Sequence[Tuple[str, str]], |
||||
channel_credentials: Optional[grpc.ChannelCredentials], |
||||
compression: Optional[grpc.Compression]) -> grpc.Channel: |
||||
key = (target, options, channel_credentials, compression) |
||||
with self._lock: |
||||
channel_data = self._mapping.get(key, None) |
||||
if channel_data is not None: |
||||
channel = channel_data[0] |
||||
self._mapping.pop(key) |
||||
self._mapping[key] = (channel, datetime.datetime.now() + |
||||
_EVICTION_PERIOD) |
||||
return channel |
||||
else: |
||||
channel = _create_channel(target, options, channel_credentials, |
||||
compression) |
||||
self._mapping[key] = (channel, datetime.datetime.now() + |
||||
_EVICTION_PERIOD) |
||||
if len(self._mapping) == 1 or len( |
||||
self._mapping) >= _MAXIMUM_CHANNELS: |
||||
self._condition.notify() |
||||
return channel |
||||
|
||||
def _test_only_channel_count(self) -> int: |
||||
with self._lock: |
||||
return len(self._mapping) |
||||
|
||||
|
||||
# TODO(rbellevi): Consider a credential type that has the |
||||
# following functionality matrix: |
||||
# |
||||
# +----------+-------+--------+ |
||||
# | | local | remote | |
||||
# |----------+-------+--------+ |
||||
# | secure | o | o | |
||||
# | insecure | o | x | |
||||
# +----------+-------+--------+ |
||||
# |
||||
# Make this the default option. |
||||
|
||||
|
||||
@experimental_api |
||||
def unary_unary( |
||||
request: RequestType, |
||||
target: str, |
||||
method: str, |
||||
request_serializer: Optional[Callable[[Any], bytes]] = None, |
||||
request_deserializer: Optional[Callable[[bytes], Any]] = None, |
||||
options: Sequence[Tuple[AnyStr, AnyStr]] = (), |
||||
channel_credentials: Optional[grpc.ChannelCredentials] = None, |
||||
call_credentials: Optional[grpc.CallCredentials] = None, |
||||
compression: Optional[grpc.Compression] = None, |
||||
wait_for_ready: Optional[bool] = None, |
||||
timeout: Optional[float] = None, |
||||
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None |
||||
) -> ResponseType: |
||||
"""Invokes a unary-unary RPC without an explicitly specified channel. |
||||
|
||||
THIS IS AN EXPERIMENTAL API. |
||||
|
||||
This is backed by a per-process cache of channels. Channels are evicted |
||||
from the cache after a fixed period by a background. Channels will also be |
||||
evicted if more than a configured maximum accumulate. |
||||
|
||||
The default eviction period is 10 minutes. One may set the environment |
||||
variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. |
||||
|
||||
The default maximum number of channels is 256. One may set the |
||||
environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure |
||||
this. |
||||
|
||||
Args: |
||||
request: An iterator that yields request values for the RPC. |
||||
target: The server address. |
||||
method: The name of the RPC method. |
||||
request_serializer: Optional behaviour for serializing the request |
||||
message. Request goes unserialized in case None is passed. |
||||
response_deserializer: Optional behaviour for deserializing the response |
||||
message. Response goes undeserialized in case None is passed. |
||||
options: An optional list of key-value pairs (channel args in gRPC Core |
||||
runtime) to configure the channel. |
||||
channel_credentials: A credential applied to the whole channel, e.g. the |
||||
return value of grpc.ssl_channel_credentials() or |
||||
grpc.insecure_channel_credentials(). |
||||
call_credentials: A call credential applied to each call individually, |
||||
e.g. the output of grpc.metadata_call_credentials() or |
||||
grpc.access_token_call_credentials(). |
||||
compression: An optional value indicating the compression method to be |
||||
used over the lifetime of the channel, e.g. grpc.Compression.Gzip. |
||||
wait_for_ready: An optional flag indicating whether the RPC should fail |
||||
immediately if the connection is not ready at the time the RPC is |
||||
invoked, or if it should wait until the connection to the server |
||||
becomes ready. When using this option, the user will likely also want |
||||
to set a timeout. Defaults to False. |
||||
timeout: An optional duration of time in seconds to allow for the RPC, |
||||
after which an exception will be raised. |
||||
metadata: Optional metadata to send to the server. |
||||
|
||||
Returns: |
||||
The response to the RPC. |
||||
""" |
||||
channel = ChannelCache.get().get_channel(target, options, |
||||
channel_credentials, compression) |
||||
multicallable = channel.unary_unary(method, request_serializer, |
||||
request_deserializer) |
||||
return multicallable(request, |
||||
metadata=metadata, |
||||
wait_for_ready=wait_for_ready, |
||||
credentials=call_credentials, |
||||
timeout=timeout) |
||||
|
||||
|
||||
@experimental_api |
||||
def unary_stream( |
||||
request: RequestType, |
||||
target: str, |
||||
method: str, |
||||
request_serializer: Optional[Callable[[Any], bytes]] = None, |
||||
request_deserializer: Optional[Callable[[bytes], Any]] = None, |
||||
options: Sequence[Tuple[AnyStr, AnyStr]] = (), |
||||
channel_credentials: Optional[grpc.ChannelCredentials] = None, |
||||
call_credentials: Optional[grpc.CallCredentials] = None, |
||||
compression: Optional[grpc.Compression] = None, |
||||
wait_for_ready: Optional[bool] = None, |
||||
timeout: Optional[float] = None, |
||||
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None |
||||
) -> Iterator[ResponseType]: |
||||
"""Invokes a unary-stream RPC without an explicitly specified channel. |
||||
|
||||
THIS IS AN EXPERIMENTAL API. |
||||
|
||||
This is backed by a per-process cache of channels. Channels are evicted |
||||
from the cache after a fixed period by a background. Channels will also be |
||||
evicted if more than a configured maximum accumulate. |
||||
|
||||
The default eviction period is 10 minutes. One may set the environment |
||||
variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. |
||||
|
||||
The default maximum number of channels is 256. One may set the |
||||
environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure |
||||
this. |
||||
|
||||
Args: |
||||
request: An iterator that yields request values for the RPC. |
||||
target: The server address. |
||||
method: The name of the RPC method. |
||||
request_serializer: Optional behaviour for serializing the request |
||||
message. Request goes unserialized in case None is passed. |
||||
response_deserializer: Optional behaviour for deserializing the response |
||||
message. Response goes undeserialized in case None is passed. |
||||
options: An optional list of key-value pairs (channel args in gRPC Core |
||||
runtime) to configure the channel. |
||||
channel_credentials: A credential applied to the whole channel, e.g. the |
||||
return value of grpc.ssl_channel_credentials(). |
||||
call_credentials: A call credential applied to each call individually, |
||||
e.g. the output of grpc.metadata_call_credentials() or |
||||
grpc.access_token_call_credentials(). |
||||
compression: An optional value indicating the compression method to be |
||||
used over the lifetime of the channel, e.g. grpc.Compression.Gzip. |
||||
wait_for_ready: An optional flag indicating whether the RPC should fail |
||||
immediately if the connection is not ready at the time the RPC is |
||||
invoked, or if it should wait until the connection to the server |
||||
becomes ready. When using this option, the user will likely also want |
||||
to set a timeout. Defaults to False. |
||||
timeout: An optional duration of time in seconds to allow for the RPC, |
||||
after which an exception will be raised. |
||||
metadata: Optional metadata to send to the server. |
||||
|
||||
Returns: |
||||
An iterator of responses. |
||||
""" |
||||
channel = ChannelCache.get().get_channel(target, options, |
||||
channel_credentials, compression) |
||||
multicallable = channel.unary_stream(method, request_serializer, |
||||
request_deserializer) |
||||
return multicallable(request, |
||||
metadata=metadata, |
||||
wait_for_ready=wait_for_ready, |
||||
credentials=call_credentials, |
||||
timeout=timeout) |
||||
|
||||
|
||||
@experimental_api |
||||
def stream_unary( |
||||
request_iterator: Iterator[RequestType], |
||||
target: str, |
||||
method: str, |
||||
request_serializer: Optional[Callable[[Any], bytes]] = None, |
||||
request_deserializer: Optional[Callable[[bytes], Any]] = None, |
||||
options: Sequence[Tuple[AnyStr, AnyStr]] = (), |
||||
channel_credentials: Optional[grpc.ChannelCredentials] = None, |
||||
call_credentials: Optional[grpc.CallCredentials] = None, |
||||
compression: Optional[grpc.Compression] = None, |
||||
wait_for_ready: Optional[bool] = None, |
||||
timeout: Optional[float] = None, |
||||
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None |
||||
) -> ResponseType: |
||||
"""Invokes a stream-unary RPC without an explicitly specified channel. |
||||
|
||||
THIS IS AN EXPERIMENTAL API. |
||||
|
||||
This is backed by a per-process cache of channels. Channels are evicted |
||||
from the cache after a fixed period by a background. Channels will also be |
||||
evicted if more than a configured maximum accumulate. |
||||
|
||||
The default eviction period is 10 minutes. One may set the environment |
||||
variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. |
||||
|
||||
The default maximum number of channels is 256. One may set the |
||||
environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure |
||||
this. |
||||
|
||||
Args: |
||||
request_iterator: An iterator that yields request values for the RPC. |
||||
target: The server address. |
||||
method: The name of the RPC method. |
||||
request_serializer: Optional behaviour for serializing the request |
||||
message. Request goes unserialized in case None is passed. |
||||
response_deserializer: Optional behaviour for deserializing the response |
||||
message. Response goes undeserialized in case None is passed. |
||||
options: An optional list of key-value pairs (channel args in gRPC Core |
||||
runtime) to configure the channel. |
||||
channel_credentials: A credential applied to the whole channel, e.g. the |
||||
return value of grpc.ssl_channel_credentials(). |
||||
call_credentials: A call credential applied to each call individually, |
||||
e.g. the output of grpc.metadata_call_credentials() or |
||||
grpc.access_token_call_credentials(). |
||||
compression: An optional value indicating the compression method to be |
||||
used over the lifetime of the channel, e.g. grpc.Compression.Gzip. |
||||
wait_for_ready: An optional flag indicating whether the RPC should fail |
||||
immediately if the connection is not ready at the time the RPC is |
||||
invoked, or if it should wait until the connection to the server |
||||
becomes ready. When using this option, the user will likely also want |
||||
to set a timeout. Defaults to False. |
||||
timeout: An optional duration of time in seconds to allow for the RPC, |
||||
after which an exception will be raised. |
||||
metadata: Optional metadata to send to the server. |
||||
|
||||
Returns: |
||||
The response to the RPC. |
||||
""" |
||||
channel = ChannelCache.get().get_channel(target, options, |
||||
channel_credentials, compression) |
||||
multicallable = channel.stream_unary(method, request_serializer, |
||||
request_deserializer) |
||||
return multicallable(request_iterator, |
||||
metadata=metadata, |
||||
wait_for_ready=wait_for_ready, |
||||
credentials=call_credentials, |
||||
timeout=timeout) |
||||
|
||||
|
||||
@experimental_api |
||||
def stream_stream( |
||||
request_iterator: Iterator[RequestType], |
||||
target: str, |
||||
method: str, |
||||
request_serializer: Optional[Callable[[Any], bytes]] = None, |
||||
request_deserializer: Optional[Callable[[bytes], Any]] = None, |
||||
options: Sequence[Tuple[AnyStr, AnyStr]] = (), |
||||
channel_credentials: Optional[grpc.ChannelCredentials] = None, |
||||
call_credentials: Optional[grpc.CallCredentials] = None, |
||||
compression: Optional[grpc.Compression] = None, |
||||
wait_for_ready: Optional[bool] = None, |
||||
timeout: Optional[float] = None, |
||||
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None |
||||
) -> Iterator[ResponseType]: |
||||
"""Invokes a stream-stream RPC without an explicitly specified channel. |
||||
|
||||
THIS IS AN EXPERIMENTAL API. |
||||
|
||||
This is backed by a per-process cache of channels. Channels are evicted |
||||
from the cache after a fixed period by a background. Channels will also be |
||||
evicted if more than a configured maximum accumulate. |
||||
|
||||
The default eviction period is 10 minutes. One may set the environment |
||||
variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this. |
||||
|
||||
The default maximum number of channels is 256. One may set the |
||||
environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure |
||||
this. |
||||
|
||||
Args: |
||||
request_iterator: An iterator that yields request values for the RPC. |
||||
target: The server address. |
||||
method: The name of the RPC method. |
||||
request_serializer: Optional behaviour for serializing the request |
||||
message. Request goes unserialized in case None is passed. |
||||
response_deserializer: Optional behaviour for deserializing the response |
||||
message. Response goes undeserialized in case None is passed. |
||||
options: An optional list of key-value pairs (channel args in gRPC Core |
||||
runtime) to configure the channel. |
||||
channel_credentials: A credential applied to the whole channel, e.g. the |
||||
return value of grpc.ssl_channel_credentials(). |
||||
call_credentials: A call credential applied to each call individually, |
||||
e.g. the output of grpc.metadata_call_credentials() or |
||||
grpc.access_token_call_credentials(). |
||||
compression: An optional value indicating the compression method to be |
||||
used over the lifetime of the channel, e.g. grpc.Compression.Gzip. |
||||
wait_for_ready: An optional flag indicating whether the RPC should fail |
||||
immediately if the connection is not ready at the time the RPC is |
||||
invoked, or if it should wait until the connection to the server |
||||
becomes ready. When using this option, the user will likely also want |
||||
to set a timeout. Defaults to False. |
||||
timeout: An optional duration of time in seconds to allow for the RPC, |
||||
after which an exception will be raised. |
||||
metadata: Optional metadata to send to the server. |
||||
|
||||
Returns: |
||||
An iterator of responses. |
||||
""" |
||||
channel = ChannelCache.get().get_channel(target, options, |
||||
channel_credentials, compression) |
||||
multicallable = channel.stream_stream(method, request_serializer, |
||||
request_deserializer) |
||||
return multicallable(request_iterator, |
||||
metadata=metadata, |
||||
wait_for_ready=wait_for_ready, |
||||
credentials=call_credentials, |
||||
timeout=timeout) |
@ -0,0 +1,21 @@ |
||||
# Copyright 2020 The gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from __future__ import absolute_import |
||||
|
||||
from tests import _loader |
||||
from tests import _runner |
||||
|
||||
Loader = _loader.Loader |
||||
Runner = _runner.Runner |
@ -0,0 +1,41 @@ |
||||
# Copyright 2020 The gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
package( |
||||
default_testonly = True, |
||||
) |
||||
|
||||
GRPCIO_PY3_ONLY_TESTS_UNIT = glob([ |
||||
"*_test.py", |
||||
]) |
||||
|
||||
[ |
||||
py_test( |
||||
name = test_file_name[:-len(".py")], |
||||
size = "small", |
||||
srcs = [test_file_name], |
||||
main = test_file_name, |
||||
python_version = "PY3", |
||||
srcs_version = "PY3", |
||||
deps = [ |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//src/python/grpcio_tests/tests/testing", |
||||
"//src/python/grpcio_tests/tests/unit:resources", |
||||
"//src/python/grpcio_tests/tests/unit:test_common", |
||||
"//src/python/grpcio_tests/tests/unit/framework/common", |
||||
"@six", |
||||
], |
||||
) |
||||
for test_file_name in GRPCIO_PY3_ONLY_TESTS_UNIT |
||||
] |
@ -0,0 +1,13 @@ |
||||
# Copyright 2019 The gRPC Authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
@ -0,0 +1,276 @@ |
||||
# Copyright 2020 The gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Tests for Simple Stubs.""" |
||||
|
||||
# TODO(https://github.com/grpc/grpc/issues/21965): Run under setuptools. |
||||
|
||||
import os |
||||
|
||||
_MAXIMUM_CHANNELS = 10 |
||||
|
||||
os.environ["GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"] = "1" |
||||
os.environ["GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"] = str(_MAXIMUM_CHANNELS) |
||||
|
||||
import contextlib |
||||
import datetime |
||||
import inspect |
||||
import logging |
||||
import unittest |
||||
import sys |
||||
import time |
||||
from typing import Callable, Optional |
||||
|
||||
from tests.unit import test_common |
||||
import grpc |
||||
import grpc.experimental |
||||
|
||||
_REQUEST = b"0000" |
||||
|
||||
_CACHE_EPOCHS = 8 |
||||
_CACHE_TRIALS = 6 |
||||
|
||||
_SERVER_RESPONSE_COUNT = 10 |
||||
_CLIENT_REQUEST_COUNT = _SERVER_RESPONSE_COUNT |
||||
|
||||
_STRESS_EPOCHS = _MAXIMUM_CHANNELS * 10 |
||||
|
||||
_UNARY_UNARY = "/test/UnaryUnary" |
||||
_UNARY_STREAM = "/test/UnaryStream" |
||||
_STREAM_UNARY = "/test/StreamUnary" |
||||
_STREAM_STREAM = "/test/StreamStream" |
||||
|
||||
|
||||
def _unary_unary_handler(request, context): |
||||
return request |
||||
|
||||
|
||||
def _unary_stream_handler(request, context): |
||||
for _ in range(_SERVER_RESPONSE_COUNT): |
||||
yield request |
||||
|
||||
|
||||
def _stream_unary_handler(request_iterator, context): |
||||
request = None |
||||
for single_request in request_iterator: |
||||
request = single_request |
||||
return request |
||||
|
||||
|
||||
def _stream_stream_handler(request_iterator, context): |
||||
for request in request_iterator: |
||||
yield request |
||||
|
||||
|
||||
class _GenericHandler(grpc.GenericRpcHandler): |
||||
|
||||
def service(self, handler_call_details): |
||||
if handler_call_details.method == _UNARY_UNARY: |
||||
return grpc.unary_unary_rpc_method_handler(_unary_unary_handler) |
||||
elif handler_call_details.method == _UNARY_STREAM: |
||||
return grpc.unary_stream_rpc_method_handler(_unary_stream_handler) |
||||
elif handler_call_details.method == _STREAM_UNARY: |
||||
return grpc.stream_unary_rpc_method_handler(_stream_unary_handler) |
||||
elif handler_call_details.method == _STREAM_STREAM: |
||||
return grpc.stream_stream_rpc_method_handler(_stream_stream_handler) |
||||
else: |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
def _time_invocation(to_time: Callable[[], None]) -> datetime.timedelta: |
||||
start = datetime.datetime.now() |
||||
to_time() |
||||
return datetime.datetime.now() - start |
||||
|
||||
|
||||
@contextlib.contextmanager |
||||
def _server(credentials: Optional[grpc.ServerCredentials]): |
||||
try: |
||||
server = test_common.test_server() |
||||
target = '[::]:0' |
||||
if credentials is None: |
||||
port = server.add_insecure_port(target) |
||||
else: |
||||
port = server.add_secure_port(target, credentials) |
||||
server.add_generic_rpc_handlers((_GenericHandler(),)) |
||||
server.start() |
||||
yield port |
||||
finally: |
||||
server.stop(None) |
||||
|
||||
|
||||
class SimpleStubsTest(unittest.TestCase): |
||||
|
||||
def assert_cached(self, to_check: Callable[[str], None]) -> None: |
||||
"""Asserts that a function caches intermediate data/state. |
||||
|
||||
To be specific, given a function whose caching behavior is |
||||
deterministic in the value of a supplied string, this function asserts |
||||
that, on average, subsequent invocations of the function for a specific |
||||
string are faster than first invocations with that same string. |
||||
|
||||
Args: |
||||
to_check: A function returning nothing, that caches values based on |
||||
an arbitrary supplied string. |
||||
""" |
||||
initial_runs = [] |
||||
cached_runs = [] |
||||
for epoch in range(_CACHE_EPOCHS): |
||||
runs = [] |
||||
text = str(epoch) |
||||
for trial in range(_CACHE_TRIALS): |
||||
runs.append(_time_invocation(lambda: to_check(text))) |
||||
initial_runs.append(runs[0]) |
||||
cached_runs.extend(runs[1:]) |
||||
average_cold = sum((run for run in initial_runs), |
||||
datetime.timedelta()) / len(initial_runs) |
||||
average_warm = sum((run for run in cached_runs), |
||||
datetime.timedelta()) / len(cached_runs) |
||||
self.assertLess(average_warm, average_cold) |
||||
|
||||
def assert_eventually(self, |
||||
predicate: Callable[[], bool], |
||||
*, |
||||
timeout: Optional[datetime.timedelta] = None, |
||||
message: Optional[Callable[[], str]] = None) -> None: |
||||
message = message or (lambda: "Proposition did not evaluate to true") |
||||
timeout = timeout or datetime.timedelta(seconds=10) |
||||
end = datetime.datetime.now() + timeout |
||||
while datetime.datetime.now() < end: |
||||
if predicate(): |
||||
break |
||||
time.sleep(0.5) |
||||
else: |
||||
self.fail(message() + " after " + str(timeout)) |
||||
|
||||
def test_unary_unary_insecure(self): |
||||
with _server(None) as port: |
||||
target = f'localhost:{port}' |
||||
response = grpc.experimental.unary_unary( |
||||
_REQUEST, |
||||
target, |
||||
_UNARY_UNARY, |
||||
channel_credentials=grpc.experimental. |
||||
insecure_channel_credentials()) |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
def test_unary_unary_secure(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
response = grpc.experimental.unary_unary( |
||||
_REQUEST, |
||||
target, |
||||
_UNARY_UNARY, |
||||
channel_credentials=grpc.local_channel_credentials()) |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
def test_channel_credentials_default(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
response = grpc.experimental.unary_unary(_REQUEST, target, |
||||
_UNARY_UNARY) |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
def test_channels_cached(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
test_name = inspect.stack()[0][3] |
||||
args = (_REQUEST, target, _UNARY_UNARY) |
||||
kwargs = {"channel_credentials": grpc.local_channel_credentials()} |
||||
|
||||
def _invoke(seed: str): |
||||
run_kwargs = dict(kwargs) |
||||
run_kwargs["options"] = ((test_name + seed, ""),) |
||||
grpc.experimental.unary_unary(*args, **run_kwargs) |
||||
|
||||
self.assert_cached(_invoke) |
||||
|
||||
def test_channels_evicted(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
response = grpc.experimental.unary_unary( |
||||
_REQUEST, |
||||
target, |
||||
_UNARY_UNARY, |
||||
channel_credentials=grpc.local_channel_credentials()) |
||||
self.assert_eventually( |
||||
lambda: grpc._simple_stubs.ChannelCache.get( |
||||
)._test_only_channel_count() == 0, |
||||
message=lambda: |
||||
f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} remain" |
||||
) |
||||
|
||||
def test_total_channels_enforced(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
for i in range(_STRESS_EPOCHS): |
||||
# Ensure we get a new channel each time. |
||||
options = (("foo", str(i)),) |
||||
# Send messages at full blast. |
||||
grpc.experimental.unary_unary( |
||||
_REQUEST, |
||||
target, |
||||
_UNARY_UNARY, |
||||
options=options, |
||||
channel_credentials=grpc.local_channel_credentials()) |
||||
self.assert_eventually( |
||||
lambda: grpc._simple_stubs.ChannelCache.get( |
||||
)._test_only_channel_count() <= _MAXIMUM_CHANNELS + 1, |
||||
message=lambda: |
||||
f"{grpc._simple_stubs.ChannelCache.get()._test_only_channel_count()} channels remain" |
||||
) |
||||
|
||||
def test_unary_stream(self): |
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
for response in grpc.experimental.unary_stream( |
||||
_REQUEST, |
||||
target, |
||||
_UNARY_STREAM, |
||||
channel_credentials=grpc.local_channel_credentials()): |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
def test_stream_unary(self): |
||||
|
||||
def request_iter(): |
||||
for _ in range(_CLIENT_REQUEST_COUNT): |
||||
yield _REQUEST |
||||
|
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
response = grpc.experimental.stream_unary( |
||||
request_iter(), |
||||
target, |
||||
_STREAM_UNARY, |
||||
channel_credentials=grpc.local_channel_credentials()) |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
def test_stream_stream(self): |
||||
|
||||
def request_iter(): |
||||
for _ in range(_CLIENT_REQUEST_COUNT): |
||||
yield _REQUEST |
||||
|
||||
with _server(grpc.local_server_credentials()) as port: |
||||
target = f'localhost:{port}' |
||||
for response in grpc.experimental.stream_stream( |
||||
request_iter(), |
||||
target, |
||||
_STREAM_STREAM, |
||||
channel_credentials=grpc.local_channel_credentials()): |
||||
self.assertEqual(_REQUEST, response) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
logging.basicConfig(level=logging.INFO) |
||||
unittest.main(verbosity=2) |
Loading…
Reference in new issue