Merge remote-tracking branch 'upstream/master' into xds_no_nack_on_missing_resource

pull/22293/head
Mark D. Roth 5 years ago
commit 3a58bb2e7c
  1. 2
      .github/ISSUE_TEMPLATE/bug_report.md
  2. 2
      .github/ISSUE_TEMPLATE/cleanup_request.md
  3. 2
      .github/ISSUE_TEMPLATE/feature_request.md
  4. 2
      .github/pull_request_template.md
  5. 104
      src/compiler/python_generator.cc
  6. 3
      src/compiler/python_private_generator.h
  7. 2
      src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi
  8. 25
      src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
  9. 10
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  10. 27
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi
  11. 56
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  12. 7
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi
  13. 24
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi
  14. 2
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  15. 5
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  16. 35
      src/python/grpcio/grpc/_simple_stubs.py
  17. 2
      src/python/grpcio/grpc/experimental/aio/_channel.py
  18. 8
      src/python/grpcio/grpc/experimental/aio/_interceptor.py
  19. 3
      src/python/grpcio/grpc/experimental/aio/_server.py
  20. 1
      src/python/grpcio_tests/commands.py
  21. 114
      src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
  22. 1
      src/python/grpcio_tests/tests/tests.json
  23. 7
      src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py
  24. 310
      test/cpp/end2end/xds_end2end_test.cc
  25. 5
      test/cpp/interop/xds_interop_client.cc
  26. 913
      tools/run_tests/run_xds_tests.py

@ -2,7 +2,7 @@
name: Report a bug
about: Create a report to help us improve
labels: kind/bug, priority/P2
assignees: veblush
assignees: karthikravis
---

@ -2,7 +2,7 @@
name: Request a cleanup
about: Suggest a cleanup in our repository
labels: kind/internal cleanup, priority/P2
assignees: veblush
assignees: karthikravis
---

@ -2,7 +2,7 @@
name: Request a feature
about: Suggest an idea for this project
labels: kind/enhancement, priority/P2
assignees: veblush
assignees: karthikravis
---

@ -8,4 +8,4 @@ If you know who should review your pull request, please remove the mentioning be
-->
@veblush
@karthikravis

@ -70,10 +70,16 @@ typedef set<StringPair> StringPairSet;
class IndentScope {
public:
explicit IndentScope(grpc_generator::Printer* printer) : printer_(printer) {
// NOTE(rbellevi): Two-space tabs are hard-coded in the protocol compiler.
// Doubling our indents and outdents guarantees compliance with PEP8.
printer_->Indent();
printer_->Indent();
}
~IndentScope() { printer_->Outdent(); }
~IndentScope() {
printer_->Outdent();
printer_->Outdent();
}
private:
grpc_generator::Printer* printer_;
@ -92,8 +98,9 @@ void PrivateGenerator::PrintAllComments(StringVector comments,
// smarter and more sophisticated, but at the moment, if there is
// no docstring to print, we simply emit "pass" to ensure validity
// of the generated code.
out->Print("# missing associated documentation comment in .proto file\n");
out->Print("pass\n");
out->Print(
"\"\"\"Missing associated documentation comment in .proto "
"file\"\"\"\n");
return;
}
out->Print("\"\"\"");
@ -570,6 +577,93 @@ bool PrivateGenerator::PrintAddServicerToServer(
return true;
}
/* Prints out a service class used as a container for static methods pertaining
* to a class. This class has the exact name of service written in the ".proto"
* file, with no suffixes. Since this class merely acts as a namespace, it
* should never be instantiated.
*/
bool PrivateGenerator::PrintServiceClass(
const grpc::string& package_qualified_service_name,
const grpc_generator::Service* service, grpc_generator::Printer* out) {
StringMap dict;
dict["Service"] = service->name();
out->Print("\n\n");
out->Print(" # This class is part of an EXPERIMENTAL API.\n");
out->Print(dict, "class $Service$(object):\n");
{
IndentScope class_indent(out);
StringVector service_comments = service->GetAllComments();
PrintAllComments(service_comments, out);
for (int i = 0; i < service->method_count(); ++i) {
const auto& method = service->method(i);
grpc::string request_module_and_class;
if (!method->get_module_and_message_path_input(
&request_module_and_class, generator_file_name,
generate_in_pb2_grpc, config.import_prefix,
config.prefixes_to_filter)) {
return false;
}
grpc::string response_module_and_class;
if (!method->get_module_and_message_path_output(
&response_module_and_class, generator_file_name,
generate_in_pb2_grpc, config.import_prefix,
config.prefixes_to_filter)) {
return false;
}
out->Print("\n");
StringMap method_dict;
method_dict["Method"] = method->name();
out->Print("@staticmethod\n");
out->Print(method_dict, "def $Method$(");
grpc::string request_parameter(
method->ClientStreaming() ? "request_iterator" : "request");
StringMap args_dict;
args_dict["RequestParameter"] = request_parameter;
{
IndentScope args_indent(out);
IndentScope args_double_indent(out);
out->Print(args_dict, "$RequestParameter$,\n");
out->Print("target,\n");
out->Print("options=(),\n");
out->Print("channel_credentials=None,\n");
out->Print("call_credentials=None,\n");
out->Print("compression=None,\n");
out->Print("wait_for_ready=None,\n");
out->Print("timeout=None,\n");
out->Print("metadata=None):\n");
}
{
IndentScope method_indent(out);
grpc::string arity_method_name =
grpc::string(method->ClientStreaming() ? "stream" : "unary") + "_" +
grpc::string(method->ServerStreaming() ? "stream" : "unary");
args_dict["ArityMethodName"] = arity_method_name;
args_dict["PackageQualifiedService"] = package_qualified_service_name;
args_dict["Method"] = method->name();
out->Print(args_dict,
"return "
"grpc.experimental.$ArityMethodName$($RequestParameter$, "
"target, '/$PackageQualifiedService$/$Method$',\n");
{
IndentScope continuation_indent(out);
StringMap serializer_dict;
serializer_dict["RequestModuleAndClass"] = request_module_and_class;
serializer_dict["ResponseModuleAndClass"] = response_module_and_class;
out->Print(serializer_dict,
"$RequestModuleAndClass$.SerializeToString,\n");
out->Print(serializer_dict, "$ResponseModuleAndClass$.FromString,\n");
out->Print("options, channel_credentials,\n");
out->Print(
"call_credentials, compression, wait_for_ready, timeout, "
"metadata)\n");
}
}
}
}
// TODO(rbellevi): Add methods pertinent to the server side as well.
return true;
}
bool PrivateGenerator::PrintBetaPreamble(grpc_generator::Printer* out) {
StringMap var;
var["Package"] = config.beta_package_root;
@ -646,7 +740,9 @@ bool PrivateGenerator::PrintGAServices(grpc_generator::Printer* out) {
if (!(PrintStub(package_qualified_service_name, service.get(), out) &&
PrintServicer(service.get(), out) &&
PrintAddServicerToServer(package_qualified_service_name,
service.get(), out))) {
service.get(), out) &&
PrintServiceClass(package_qualified_service_name, service.get(),
out))) {
return false;
}
}

@ -59,6 +59,9 @@ struct PrivateGenerator {
const grpc_generator::Service* service,
grpc_generator::Printer* out);
bool PrintServiceClass(const grpc::string& package_qualified_service_name,
const grpc_generator::Service* service,
grpc_generator::Printer* out);
bool PrintBetaServicer(const grpc_generator::Service* service,
grpc_generator::Printer* out);
bool PrintBetaServerFactory(

@ -72,7 +72,7 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler
cdef class CallbackCompletionQueue:
def __cinit__(self):
self._shutdown_completed = asyncio.get_event_loop().create_future()
self._shutdown_completed = grpc_aio_loop().create_future()
self._wrapper = CallbackWrapper(
self._shutdown_completed,
CQ_SHUTDOWN_FAILURE_HANDLER)

@ -13,16 +13,32 @@
# limitations under the License.
cdef bint _grpc_aio_initialized = 0
cdef bint _grpc_aio_initialized = False
# NOTE(lidiz) Theoretically, applications can run in multiple event loops as
# long as they are in the same thread with same magic. However, I don't think
# we should support this use case. So, the gRPC Python Async Stack should use
# a single event loop picked by "init_grpc_aio".
cdef object _grpc_aio_loop
def init_grpc_aio():
global _grpc_aio_initialized
global _grpc_aio_loop
if _grpc_aio_initialized:
return
else:
_grpc_aio_initialized = True
# Anchors the event loop that the gRPC library going to use.
_grpc_aio_loop = asyncio.get_event_loop()
# Activates asyncio IO manager
install_asyncio_iomgr()
# TODO(https://github.com/grpc/grpc/issues/22244) we need a the
# grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC
# library won't shutdown cleanly.
grpc_init()
# Timers are triggered by the Asyncio loop. We disable
@ -34,4 +50,9 @@ def init_grpc_aio():
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(False)
_grpc_aio_initialized = 1
_grpc_aio_initialized = False
def grpc_aio_loop():
"""Returns the one-and-only gRPC Aio event loop."""
return _grpc_aio_loop

@ -49,7 +49,6 @@ cdef void asyncio_socket_connect(
const grpc_sockaddr* addr,
size_t addr_len,
grpc_custom_connect_callback connect_cb) with gil:
host, port = sockaddr_to_tuple(addr, addr_len)
socket = <_AsyncioSocket>grpc_socket.impl
socket.connect(host, port, connect_cb)
@ -185,14 +184,15 @@ cdef void asyncio_resolve_async(
cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil:
timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0)
Py_INCREF(timer)
grpc_timer.timer = <void*>timer
cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil:
timer = <_AsyncioTimer>grpc_timer.timer
timer.stop()
Py_DECREF(timer)
if grpc_timer.timer == NULL:
return
else:
timer = <_AsyncioTimer>grpc_timer.timer
timer.stop()
cdef void asyncio_init_loop() with gil:

@ -29,34 +29,27 @@ cdef class _AsyncioResolver:
id_ = id(self)
return f"<{class_name} {id_}>"
def _resolve_cb(self, future):
error = False
async def _async_resolve(self, bytes host, bytes port):
self._task_resolve = None
try:
res = future.result()
resolved = await grpc_aio_loop().getaddrinfo(host, port)
except Exception as e:
error = True
error_msg = str(e)
finally:
self._task_resolve = None
if not error:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
tuples_to_resolvaddr(res),
<grpc_error*>0
NULL,
grpc_socket_error("Resolve address [{}:{}] failed: {}: {}".format(
host, port, type(e), str(e)).encode())
)
else:
grpc_custom_resolve_callback(
<grpc_custom_resolver*>self._grpc_resolver,
NULL,
grpc_socket_error("getaddrinfo {}".format(error_msg).encode())
tuples_to_resolvaddr(resolved),
<grpc_error*>0
)
cdef void resolve(self, char* host, char* port):
assert not self._task_resolve
loop = asyncio.get_event_loop()
self._task_resolve = asyncio.ensure_future(
loop.getaddrinfo(host, port)
self._task_resolve = grpc_aio_loop().create_task(
self._async_resolve(host, port)
)
self._task_resolve.add_done_callback(self._resolve_cb)

@ -35,7 +35,6 @@ cdef class _AsyncioSocket:
self._server = None
self._py_socket = None
self._peername = None
self._loop = asyncio.get_event_loop()
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
@ -62,27 +61,37 @@ cdef class _AsyncioSocket:
connected = self.is_connected()
return f"<{class_name} {id_} connected={connected}>"
def _connect_cb(self, future):
async def _async_connect(self, object host, object port,):
self._task_connect = None
try:
self._reader, self._writer = future.result()
self._reader, self._writer = await asyncio.open_connection(host, port)
except Exception as e:
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
grpc_socket_error("Socket connect failed: {}".format(e).encode())
grpc_socket_error("Socket connect failed: {}: {}".format(type(e), str(e)).encode())
)
return
finally:
self._task_connect = None
else:
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error*>0
)
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
<grpc_error*>0
cdef void connect(self,
object host,
object port,
grpc_custom_connect_callback grpc_connect_cb):
assert not self._reader
assert not self._task_connect
self._task_connect = grpc_aio_loop().create_task(
self._async_connect(host, port)
)
self._grpc_connect_cb = grpc_connect_cb
async def _async_read(self, size_t length):
self._task_read = None
@ -106,25 +115,12 @@ cdef class _AsyncioSocket:
<grpc_error*>0
)
cdef void connect(self,
object host,
object port,
grpc_custom_connect_callback grpc_connect_cb):
assert not self._reader
assert not self._task_connect
self._task_connect = asyncio.ensure_future(
asyncio.open_connection(host, port)
)
self._grpc_connect_cb = grpc_connect_cb
self._task_connect.add_done_callback(self._connect_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb):
assert not self._task_read
self._grpc_read_cb = grpc_read_cb
self._read_buffer = buffer_
self._task_read = self._loop.create_task(self._async_read(length))
self._task_read = grpc_aio_loop().create_task(self._async_read(length))
async def _async_write(self, bytearray outbound_buffer):
self._writer.write(outbound_buffer)
@ -157,7 +153,7 @@ cdef class _AsyncioSocket:
outbound_buffer.extend(<bytes>start[:length])
self._grpc_write_cb = grpc_write_cb
self._task_write = self._loop.create_task(self._async_write(outbound_buffer))
self._task_write = grpc_aio_loop().create_task(self._async_write(outbound_buffer))
cdef bint is_connected(self):
return self._reader and not self._reader._transport.is_closing()
@ -201,7 +197,7 @@ cdef class _AsyncioSocket:
sock=self._py_socket,
)
self._loop.create_task(create_asyncio_server())
grpc_aio_loop().create_task(create_asyncio_server())
cdef accept(self,
grpc_custom_socket* grpc_socket_client,

@ -15,11 +15,10 @@
cdef class _AsyncioTimer:
cdef:
grpc_custom_timer * _grpc_timer
object _deadline
object _timer_handler
int _active
object _timer_future
bint _active
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline)
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout)
cdef stop(self)

@ -16,21 +16,22 @@
cdef class _AsyncioTimer:
def __cinit__(self):
self._grpc_timer = NULL
self._timer_handler = None
self._active = 0
self._timer_future = None
self._active = False
cpython.Py_INCREF(self)
@staticmethod
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline):
cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout):
timer = _AsyncioTimer()
timer._grpc_timer = grpc_timer
timer._deadline = deadline
timer._timer_handler = asyncio.get_event_loop().call_later(deadline, timer._on_deadline)
timer._active = 1
timer._timer_future = grpc_aio_loop().call_later(timeout, timer.on_time_up)
timer._active = True
return timer
def _on_deadline(self):
self._active = 0
def on_time_up(self):
self._active = False
grpc_custom_timer_callback(self._grpc_timer, <grpc_error*>0)
cpython.Py_DECREF(self)
def __repr__(self):
class_name = self.__class__.__name__
@ -38,8 +39,9 @@ cdef class _AsyncioTimer:
return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>"
cdef stop(self):
if self._active == 0:
if not self._active:
return
self._timer_handler.cancel()
self._active = 0
self._timer_future.cancel()
self._active = False
cpython.Py_DECREF(self)

@ -256,6 +256,8 @@ cdef void _call(
on_success(started_tags)
else:
raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
cdef void _process_integrated_call_tag(
_ChannelState state, _BatchOperationTag tag) except *:
cdef _CallState call_state = state.integrated_call_states.pop(tag)

@ -148,8 +148,9 @@ cdef class Server:
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
grpc_server_destroy(self.c_server)
self.c_server = NULL
with nogil:
grpc_server_destroy(self.c_server)
self.c_server = NULL
def __dealloc__(self):
if self.c_server == NULL:

@ -53,8 +53,10 @@ else:
def _create_channel(target: str, options: Sequence[Tuple[str, str]],
channel_credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression]) -> grpc.Channel:
channel_credentials = channel_credentials or grpc.local_channel_credentials(
)
# TODO(rbellevi): Revisit the default value for this.
if channel_credentials is None:
raise NotImplementedError(
"channel_credentials must be supplied explicitly.")
if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials:
_LOGGER.debug(f"Creating insecure channel with options '{options}' " +
f"and compression '{compression}'")
@ -156,26 +158,13 @@ class ChannelCache:
return len(self._mapping)
# TODO(rbellevi): Consider a credential type that has the
# following functionality matrix:
#
# +----------+-------+--------+
# | | local | remote |
# |----------+-------+--------+
# | secure | o | o |
# | insecure | o | x |
# +----------+-------+--------+
#
# Make this the default option.
@experimental_api
def unary_unary(
request: RequestType,
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
request_deserializer: Optional[Callable[[bytes], Any]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
call_credentials: Optional[grpc.CallCredentials] = None,
@ -232,7 +221,7 @@ def unary_unary(
channel = ChannelCache.get().get_channel(target, options,
channel_credentials, compression)
multicallable = channel.unary_unary(method, request_serializer,
request_deserializer)
response_deserializer)
return multicallable(request,
metadata=metadata,
wait_for_ready=wait_for_ready,
@ -246,7 +235,7 @@ def unary_stream(
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
request_deserializer: Optional[Callable[[bytes], Any]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
call_credentials: Optional[grpc.CallCredentials] = None,
@ -302,7 +291,7 @@ def unary_stream(
channel = ChannelCache.get().get_channel(target, options,
channel_credentials, compression)
multicallable = channel.unary_stream(method, request_serializer,
request_deserializer)
response_deserializer)
return multicallable(request,
metadata=metadata,
wait_for_ready=wait_for_ready,
@ -316,7 +305,7 @@ def stream_unary(
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
request_deserializer: Optional[Callable[[bytes], Any]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
call_credentials: Optional[grpc.CallCredentials] = None,
@ -372,7 +361,7 @@ def stream_unary(
channel = ChannelCache.get().get_channel(target, options,
channel_credentials, compression)
multicallable = channel.stream_unary(method, request_serializer,
request_deserializer)
response_deserializer)
return multicallable(request_iterator,
metadata=metadata,
wait_for_ready=wait_for_ready,
@ -386,7 +375,7 @@ def stream_stream(
target: str,
method: str,
request_serializer: Optional[Callable[[Any], bytes]] = None,
request_deserializer: Optional[Callable[[bytes], Any]] = None,
response_deserializer: Optional[Callable[[bytes], Any]] = None,
options: Sequence[Tuple[AnyStr, AnyStr]] = (),
channel_credentials: Optional[grpc.ChannelCredentials] = None,
call_credentials: Optional[grpc.CallCredentials] = None,
@ -442,7 +431,7 @@ def stream_stream(
channel = ChannelCache.get().get_channel(target, options,
channel_credentials, compression)
multicallable = channel.stream_stream(method, request_serializer,
request_deserializer)
response_deserializer)
return multicallable(request_iterator,
metadata=metadata,
wait_for_ready=wait_for_ready,

@ -228,7 +228,7 @@ class Channel(_base_channel.Channel):
"UnaryUnaryClientInterceptors, the following are invalid: {}"\
.format(invalid_interceptors))
self._loop = asyncio.get_event_loop()
self._loop = cygrpc.grpc_aio_loop()
self._channel = cygrpc.AioChannel(
_common.encode(target),
_augment_channel_arguments(options, compression), credentials,

@ -160,10 +160,10 @@ class InterceptedUnaryUnaryCall(_base_call.UnaryUnaryCall):
loop: asyncio.AbstractEventLoop) -> None:
self._channel = channel
self._loop = loop
self._interceptors_task = asyncio.ensure_future(self._invoke(
interceptors, method, timeout, metadata, credentials,
wait_for_ready, request, request_serializer, response_deserializer),
loop=loop)
self._interceptors_task = loop.create_task(
self._invoke(interceptors, method, timeout, metadata, credentials,
wait_for_ready, request, request_serializer,
response_deserializer))
self._pending_add_done_callbacks = []
self._interceptors_task.add_done_callback(
self._fire_pending_add_done_callbacks)

@ -13,7 +13,6 @@
# limitations under the License.
"""Server-side implementation of gRPC Asyncio Python."""
import asyncio
from concurrent.futures import Executor
from typing import Any, Optional, Sequence
@ -41,7 +40,7 @@ class Server(_base_server.Server):
options: ChannelArgumentType,
maximum_concurrent_rpcs: Optional[int],
compression: Optional[grpc.Compression]):
self._loop = asyncio.get_event_loop()
self._loop = cygrpc.grpc_aio_loop()
if interceptors:
invalid_interceptors = [
interceptor for interceptor in interceptors

@ -193,6 +193,7 @@ class TestGevent(setuptools.Command):
'unit._server_ssl_cert_config_test',
# TODO(https://github.com/grpc/grpc/issues/14901) enable this test
'protoc_plugin._python_plugin_test.PythonPluginTest',
'protoc_plugin._python_plugin_test.SimpleStubsPluginTest',
# Beta API is unsupported for gevent
'protoc_plugin.beta_python_plugin_test',
'unit.beta._beta_features_test',

@ -27,6 +27,7 @@ import unittest
from six import moves
import grpc
import grpc.experimental
from tests.unit import test_common
from tests.unit.framework.common import test_constants
@ -503,5 +504,118 @@ class PythonPluginTest(unittest.TestCase):
service.server.stop(None)
@unittest.skipIf(sys.version_info[0] < 3, "Unsupported on Python 2.")
class SimpleStubsPluginTest(unittest.TestCase):
servicer_methods = _ServicerMethods()
class Servicer(service_pb2_grpc.TestServiceServicer):
def UnaryCall(self, request, context):
return SimpleStubsPluginTest.servicer_methods.UnaryCall(
request, context)
def StreamingOutputCall(self, request, context):
return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall(
request, context)
def StreamingInputCall(self, request_iterator, context):
return SimpleStubsPluginTest.servicer_methods.StreamingInputCall(
request_iterator, context)
def FullDuplexCall(self, request_iterator, context):
return SimpleStubsPluginTest.servicer_methods.FullDuplexCall(
request_iterator, context)
def HalfDuplexCall(self, request_iterator, context):
return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall(
request_iterator, context)
def setUp(self):
super(SimpleStubsPluginTest, self).setUp()
self._server = test_common.test_server()
service_pb2_grpc.add_TestServiceServicer_to_server(
self.Servicer(), self._server)
self._port = self._server.add_insecure_port('[::]:0')
self._server.start()
self._target = 'localhost:{}'.format(self._port)
def tearDown(self):
self._server.stop(None)
super(SimpleStubsPluginTest, self).tearDown()
def testUnaryCall(self):
request = request_pb2.SimpleRequest(response_size=13)
response = service_pb2_grpc.TestService.UnaryCall(
request,
self._target,
channel_credentials=grpc.experimental.insecure_channel_credentials(
),
wait_for_ready=True)
expected_response = self.servicer_methods.UnaryCall(
request, 'not a real context!')
self.assertEqual(expected_response, response)
def testStreamingOutputCall(self):
request = _streaming_output_request()
expected_responses = self.servicer_methods.StreamingOutputCall(
request, 'not a real RpcContext!')
responses = service_pb2_grpc.TestService.StreamingOutputCall(
request,
self._target,
channel_credentials=grpc.experimental.insecure_channel_credentials(
),
wait_for_ready=True)
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testStreamingInputCall(self):
response = service_pb2_grpc.TestService.StreamingInputCall(
_streaming_input_request_iterator(),
self._target,
channel_credentials=grpc.experimental.insecure_channel_credentials(
),
wait_for_ready=True)
expected_response = self.servicer_methods.StreamingInputCall(
_streaming_input_request_iterator(), 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testFullDuplexCall(self):
responses = service_pb2_grpc.TestService.FullDuplexCall(
_full_duplex_request_iterator(),
self._target,
channel_credentials=grpc.experimental.insecure_channel_credentials(
),
wait_for_ready=True)
expected_responses = self.servicer_methods.FullDuplexCall(
_full_duplex_request_iterator(), 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testHalfDuplexCall(self):
def half_duplex_request_iterator():
request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
responses = service_pb2_grpc.TestService.HalfDuplexCall(
half_duplex_request_iterator(),
self._target,
channel_credentials=grpc.experimental.insecure_channel_credentials(
),
wait_for_ready=True)
expected_responses = self.servicer_methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -7,6 +7,7 @@
"interop._insecure_intraop_test.InsecureIntraopTest",
"interop._secure_intraop_test.SecureIntraopTest",
"protoc_plugin._python_plugin_test.PythonPluginTest",
"protoc_plugin._python_plugin_test.SimpleStubsPluginTest",
"protoc_plugin._split_definitions_test.SameProtoGrpcBeforeProtoProtocStyleTest",
"protoc_plugin._split_definitions_test.SameProtoMid2016ProtocStyleTest",
"protoc_plugin._split_definitions_test.SameProtoProtoBeforeGrpcProtocStyleTest",

@ -174,13 +174,6 @@ class SimpleStubsTest(unittest.TestCase):
channel_credentials=grpc.local_channel_credentials())
self.assertEqual(_REQUEST, response)
def test_channel_credentials_default(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'
response = grpc.experimental.unary_unary(_REQUEST, target,
_UNARY_UNARY)
self.assertEqual(_REQUEST, response)
def test_channels_cached(self):
with _server(grpc.local_server_credentials()) as port:
target = f'localhost:{port}'

@ -400,7 +400,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
std::pair<std::string /* type url */, std::string /* resource name */>>;
// A struct representing a client's subscription to a particular resource.
struct SubscriberState {
struct SubscriptionState {
// Version that the client currently knows about.
int current_version = 0;
// The queue upon which to place updates when the resource is updated.
@ -408,23 +408,25 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
};
// A struct representing the a client's subscription to all the resources.
using SubscriptionNameMap =
std::map<std::string /* resource_name */, SubscriptionState>;
using SubscriptionMap =
std::map<std::string /* type_url */,
std::map<std::string /* resource_name */, SubscriberState>>;
std::map<std::string /* type_url */, SubscriptionNameMap>;
// A struct representing the current state for a resource:
// - the version of the resource that is set by the SetResource() methods.
// - a list of subscribers interested in this resource.
// - a list of subscriptions interested in this resource.
struct ResourceState {
int version = 0;
absl::optional<google::protobuf::Any> resource;
std::set<SubscriberState*> subscribers;
std::set<SubscriptionState*> subscriptions;
};
// A struct representing the current state for all resources:
// LDS, CDS, EDS, and RDS for the class as a whole.
using ResourcesMap =
std::map<std::string, std::map<std::string, ResourceState>>;
using ResourceNameMap =
std::map<std::string /* resource_name */, ResourceState>;
using ResourceMap = std::map<std::string /* type_url */, ResourceNameMap>;
AdsServiceImpl(bool enable_load_reporting) {
// Construct RDS response data.
@ -475,101 +477,61 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
}
// Checks whether the client needs to receive a newer version of
// the resource.
bool ClientNeedsResourceUpdate(const string& resource_type,
const string& name,
SubscriptionMap* subscription_map) {
auto subscriber_it = (*subscription_map)[resource_type].find(name);
if (subscriber_it == (*subscription_map)[resource_type].end()) {
gpr_log(GPR_INFO,
"ADS[%p]: Skipping an unsubscribed update for resource %s and "
"name %s",
this, resource_type.c_str(), name.c_str());
return false;
}
const auto& resource_state = resources_map_[resource_type][name];
if (subscriber_it->second.current_version < resource_state.version) {
subscriber_it->second.current_version = resource_state.version;
gpr_log(GPR_INFO,
"ADS[%p]: Need to process new %s update %s, bring current to %d",
this, resource_type.c_str(), name.c_str(),
subscriber_it->second.current_version);
// the resource. If so, updates subscription_state->current_version and
// returns true.
bool ClientNeedsResourceUpdate(const ResourceState& resource_state,
SubscriptionState* subscription_state) {
if (subscription_state->current_version < resource_state.version) {
subscription_state->current_version = resource_state.version;
return true;
} else {
gpr_log(GPR_INFO,
"ADS[%p]: Skipping an old %s update %s, current is at %d", this,
resource_type.c_str(), name.c_str(),
subscriber_it->second.current_version);
return false;
}
return false;
}
// Resource subscription:
// 1. inserting an entry into the subscription map indexed by resource
// type/name pair.
// 2. inserting or updating an entry into the resources map indexed
// by resource type/name pair about this subscription.
void ResourceSubscribe(const std::string& resource_type,
const std::string& name, UpdateQueue* update_queue,
SubscriptionMap* subscription_map) {
SubscriberState& subscriber_state =
(*subscription_map)[resource_type][name];
subscriber_state.update_queue = update_queue;
ResourceState& resource_state = resources_map_[resource_type][name];
resource_state.subscribers.emplace(&subscriber_state);
gpr_log(
GPR_INFO,
"ADS[%p]: subscribe to resource type %s name %s version %d state %p",
this, resource_type.c_str(), name.c_str(), resource_state.version,
&subscriber_state);
}
// Resource unsubscription:
// 1. update the entry in the resources map indexed
// by resource type/name pair to remove this subscription
// 2. remove this entry from the subscription map.
// 3. remove this resource type from the subscription map if there are no more
// resources subscribed for the resource type.
void ResourceUnsubscribe(const std::string& resource_type,
const std::string& name,
SubscriptionMap* subscription_map) {
auto subscription_by_type_it = subscription_map->find(resource_type);
if (subscription_by_type_it == subscription_map->end()) {
gpr_log(GPR_INFO, "ADS[%p]: resource type %s not subscribed", this,
resource_type.c_str());
return;
}
auto& subscription_by_type_map = subscription_by_type_it->second;
auto subscription_it = subscription_by_type_map.find(name);
if (subscription_it == subscription_by_type_map.end()) {
gpr_log(GPR_INFO, "ADS[%p]: resource name %s of type %s not subscribed",
this, name.c_str(), resource_type.c_str());
return;
}
gpr_log(GPR_INFO,
"ADS[%p]: Unsubscribe to resource type %s name %s state %p", this,
resource_type.c_str(), name.c_str(), &subscription_it->second);
auto resource_by_type_it = resources_map_.find(resource_type);
GPR_ASSERT(resource_by_type_it != resources_map_.end());
auto& resource_by_type_map = resource_by_type_it->second;
auto resource_it = resource_by_type_map.find(name);
GPR_ASSERT(resource_it != resource_by_type_map.end());
resource_it->second.subscribers.erase(&subscription_it->second);
if (resource_it->second.subscribers.empty() &&
!resource_it->second.resource.has_value()) {
gpr_log(GPR_INFO,
"ADS[%p]: Erasing resource type %s name %s from resource map "
"since there are no more subscribers for this unset resource",
this, resource_type.c_str(), name.c_str());
resource_by_type_map.erase(resource_it);
}
subscription_by_type_map.erase(subscription_it);
if (subscription_by_type_map.empty()) {
gpr_log(GPR_INFO,
"ADS[%p]: Erasing resource type %s from subscription_map", this,
resource_type.c_str());
subscription_map->erase(subscription_by_type_it);
// Subscribes to a resource if not already subscribed:
// 1. Sets the update_queue field in subscription_state.
// 2. Adds subscription_state to resource_state->subscriptions.
void MaybeSubscribe(const std::string& resource_type,
const std::string& resource_name,
SubscriptionState* subscription_state,
ResourceState* resource_state,
UpdateQueue* update_queue) {
if (subscription_state->update_queue != nullptr) return;
subscription_state->update_queue = update_queue;
resource_state->subscriptions.emplace(subscription_state);
gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
}
// Removes subscriptions for resources no longer present in the
// current request.
void ProcessUnsubscriptions(
const std::string& resource_type,
const std::set<std::string>& resources_in_current_request,
SubscriptionNameMap* subscription_name_map,
ResourceNameMap* resource_name_map) {
for (auto it = subscription_name_map->begin();
it != subscription_name_map->end();) {
const std::string& resource_name = it->first;
SubscriptionState& subscription_state = it->second;
if (resources_in_current_request.find(resource_name) !=
resources_in_current_request.end()) {
++it;
continue;
}
gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p",
this, resource_type.c_str(), resource_name.c_str(),
&subscription_state);
auto resource_it = resource_name_map->find(resource_name);
GPR_ASSERT(resource_it != resource_name_map->end());
auto& resource_state = resource_it->second;
resource_state.subscriptions.erase(&subscription_state);
if (resource_state.subscriptions.empty() &&
!resource_state.resource.has_value()) {
resource_name_map->erase(resource_it);
}
it = subscription_name_map->erase(it);
}
}
@ -577,7 +539,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// for all resources and by adding all subscribed resources for LDS and CDS.
void CompleteBuildingDiscoveryResponse(
const std::string& resource_type, const int version,
const SubscriptionMap& subscription_map,
const SubscriptionNameMap& subscription_name_map,
const std::set<std::string>& resources_added_to_response,
DiscoveryResponse* response) {
resource_type_response_state_[resource_type] = SENT;
@ -587,18 +549,15 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
// For LDS and CDS we must send back all subscribed resources
// (even the unchanged ones)
auto subscription_map_by_type_it = subscription_map.find(resource_type);
GPR_ASSERT(subscription_map_by_type_it != subscription_map.end());
for (const auto& subscription : subscription_map_by_type_it->second) {
if (resources_added_to_response.find(subscription.first) ==
for (const auto& p : subscription_name_map) {
const std::string& resource_name = p.first;
if (resources_added_to_response.find(resource_name) ==
resources_added_to_response.end()) {
absl::optional<google::protobuf::Any>& resource =
resources_map_[resource_type][subscription.first].resource;
if (resource.has_value()) {
response->add_resources()->CopyFrom(resource.value());
} else {
gpr_log(GPR_INFO, "ADS[%p]: Unknown resource type %s and name %s",
this, resource_type.c_str(), subscription.first.c_str());
const ResourceState& resource_state =
resource_map_[resource_type][resource_name];
if (resource_state.resource.has_value()) {
response->add_resources()->CopyFrom(
resource_state.resource.value());
}
}
}
@ -622,7 +581,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Resources that the client will be subscribed to keyed by resource type
// url.
SubscriptionMap subscription_map;
std::map<std::string, SubscriberState> subscriber_map;
// Current Version map keyed by resource type url.
std::map<std::string, int> resource_type_version;
// Creating blocking thread to read from stream.
@ -647,7 +605,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
DiscoveryRequest request = std::move(requests.front());
requests.pop_front();
did_work = true;
gpr_log(GPR_INFO, "ADS[%p]: Handling request %s with content %s",
gpr_log(GPR_INFO,
"ADS[%p]: Received request for type %s with content %s",
this, request.type_url().c_str(),
request.DebugString().c_str());
// Identify ACK and NACK by looking for version information and
@ -667,58 +626,51 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// 3. unsubscribe if necessary
if (resource_types_to_ignore_.find(request.type_url()) ==
resource_types_to_ignore_.end()) {
auto& subscription_name_map =
subscription_map[request.type_url()];
auto& resource_name_map = resource_map_[request.type_url()];
std::set<std::string> resources_in_current_request;
std::set<std::string> resources_added_to_response;
for (const std::string& resource_name :
request.resource_names()) {
resources_in_current_request.emplace(resource_name);
auto subscriber_it =
subscription_map[request.type_url()].find(resource_name);
if (subscriber_it ==
subscription_map[request.type_url()].end()) {
ResourceSubscribe(request.type_url(), resource_name,
&update_queue, &subscription_map);
}
if (ClientNeedsResourceUpdate(request.type_url(), resource_name,
&subscription_map)) {
auto& subscription_state = subscription_name_map[resource_name];
auto& resource_state = resource_name_map[resource_name];
MaybeSubscribe(request.type_url(), resource_name,
&subscription_state, &resource_state,
&update_queue);
if (ClientNeedsResourceUpdate(resource_state,
&subscription_state)) {
gpr_log(
GPR_INFO,
"ADS[%p]: Sending update for type=%s name=%s version=%d",
this, request.type_url().c_str(), resource_name.c_str(),
resource_state.version);
resources_added_to_response.emplace(resource_name);
gpr_log(GPR_INFO,
"ADS[%p]: Handling resource type %s and name %s",
this, request.type_url().c_str(),
resource_name.c_str());
auto resource =
resources_map_[request.type_url()][resource_name];
GPR_ASSERT(resource.resource.has_value());
response.add_resources()->CopyFrom(resource.resource.value());
}
}
// Remove subscriptions no longer requested: build a list of
// unsubscriber names first while iterating the subscription_map
// and then erase from the subscription_map in
// ResourceUnsubscribe.
std::set<std::string> unsubscriber_list;
for (const auto& subscription :
subscription_map[request.type_url()]) {
if (resources_in_current_request.find(subscription.first) ==
resources_in_current_request.end()) {
unsubscriber_list.emplace(subscription.first);
if (resource_state.resource.has_value()) {
response.add_resources()->CopyFrom(
resource_state.resource.value());
}
}
}
for (const auto& name : unsubscriber_list) {
ResourceUnsubscribe(request.type_url(), name,
&subscription_map);
}
if (!response.resources().empty()) {
// Process unsubscriptions for any resource no longer
// present in the request's resource list.
ProcessUnsubscriptions(
request.type_url(), resources_in_current_request,
&subscription_name_map, &resource_name_map);
// Send response if needed.
if (!resources_added_to_response.empty()) {
CompleteBuildingDiscoveryResponse(
request.type_url(),
++resource_type_version[request.type_url()],
subscription_map, resources_added_to_response, &response);
subscription_name_map, resources_added_to_response,
&response);
}
}
}
}
if (!response.resources().empty()) {
gpr_log(GPR_INFO, "ADS[%p]: sending request response '%s'", this,
gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
response.DebugString().c_str());
stream->Write(response);
}
@ -727,32 +679,40 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{
grpc_core::MutexLock lock(&ads_mu_);
if (!update_queue.empty()) {
std::pair<std::string, std::string> update =
std::move(update_queue.front());
const std::string resource_type =
std::move(update_queue.front().first);
const std::string resource_name =
std::move(update_queue.front().second);
update_queue.pop_front();
did_work = true;
gpr_log(GPR_INFO, "ADS[%p]: Handling update type %s name %s", this,
update.first.c_str(), update.second.c_str());
auto subscriber_it =
subscription_map[update.first].find(update.second);
if (subscriber_it != subscription_map[update.first].end()) {
if (ClientNeedsResourceUpdate(update.first, update.second,
&subscription_map)) {
gpr_log(GPR_INFO,
"ADS[%p]: Updating resource type %s and name %s", this,
update.first.c_str(), update.second.c_str());
auto resource = resources_map_[update.first][update.second];
GPR_ASSERT(resource.resource.has_value());
response.add_resources()->CopyFrom(resource.resource.value());
CompleteBuildingDiscoveryResponse(
update.first, ++resource_type_version[update.first],
subscription_map, {update.second}, &response);
gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s",
this, resource_type.c_str(), resource_name.c_str());
auto& subscription_name_map = subscription_map[resource_type];
auto& resource_name_map = resource_map_[resource_type];
auto it = subscription_name_map.find(resource_name);
if (it != subscription_name_map.end()) {
SubscriptionState& subscription_state = it->second;
ResourceState& resource_state = resource_name_map[resource_name];
if (ClientNeedsResourceUpdate(resource_state,
&subscription_state)) {
gpr_log(
GPR_INFO,
"ADS[%p]: Sending update for type=%s name=%s version=%d",
this, resource_type.c_str(), resource_name.c_str(),
resource_state.version);
if (resource_state.resource.has_value()) {
response.add_resources()->CopyFrom(
resource_state.resource.value());
CompleteBuildingDiscoveryResponse(
resource_type, ++resource_type_version[resource_type],
subscription_name_map, {resource_name}, &response);
}
}
}
}
}
if (!response.resources().empty()) {
gpr_log(GPR_INFO, "ADS[%p]: sending update response '%s'", this,
gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
response.DebugString().c_str());
stream->Write(response);
}
@ -808,13 +768,13 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
void SetResource(google::protobuf::Any resource, const std::string& type_url,
const std::string& name) {
grpc_core::MutexLock lock(&ads_mu_);
ResourceState& state = resources_map_[type_url][name];
ResourceState& state = resource_map_[type_url][name];
++state.version;
state.resource = std::move(resource);
gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this,
type_url.c_str(), name.c_str(), state.version);
for (SubscriberState* subscriber : state.subscribers) {
subscriber->update_queue->emplace_back(type_url, name);
for (SubscriptionState* subscription : state.subscriptions) {
subscription->update_queue->emplace_back(type_url, name);
}
}
@ -873,15 +833,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
{
grpc_core::MutexLock lock(&ads_mu_);
NotifyDoneWithAdsCallLocked();
resources_map_.clear();
resource_map_.clear();
resource_type_response_state_.clear();
}
gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
}
static ClusterLoadAssignment BuildEdsResource(const EdsResourceArgs& args) {
static ClusterLoadAssignment BuildEdsResource(
const EdsResourceArgs& args,
const char* cluster_name = kDefaultResourceName) {
ClusterLoadAssignment assignment;
assignment.set_cluster_name(kDefaultResourceName);
assignment.set_cluster_name(cluster_name);
for (const auto& locality : args.locality_list) {
auto* endpoints = assignment.add_endpoints();
endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight);
@ -946,8 +908,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service,
// Note that an entry will exist whenever either of the following is true:
// - The resource exists (i.e., has been created by SetResource() and has not
// yet been destroyed by UnsetResource()).
// - There is at least one subscriber for the resource.
ResourcesMap resources_map_;
// - There is at least one subscription for the resource.
ResourceMap resource_map_;
};
class LrsServiceImpl : public LrsService,

@ -124,8 +124,6 @@ class TestClient {
void AsyncUnaryCall() {
SimpleResponse response;
ClientContext context;
int saved_request_id;
{
std::lock_guard<std::mutex> lk(mu);
@ -134,9 +132,8 @@ class TestClient {
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(FLAGS_rpc_timeout_sec);
context.set_deadline(deadline);
AsyncClientCall* call = new AsyncClientCall;
call->context.set_deadline(deadline);
call->saved_request_id = saved_request_id;
call->response_reader = stub_->PrepareAsyncUnaryCall(
&call->context, SimpleRequest::default_instance(), &cq_);

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save