Merge branch 'master' into revert-20610-revert-20277-grpc-19871/unary_unary_client_aio-implement-timeout

pull/20617/head
Lidi Zheng 5 years ago committed by GitHub
commit 84fd0a1ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmake/cares.cmake
  2. 13
      src/csharp/Grpc.Core.Api/SerializationContext.cs
  3. 5
      src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs
  4. 2
      src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml
  5. 4
      src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml
  6. 17
      src/proto/grpc/testing/BUILD
  7. 2
      src/python/grpcio/grpc/_cython/BUILD.bazel
  8. 67
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi
  9. 24
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi
  10. 64
      src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi
  11. 44
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi
  12. 275
      src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
  13. 1
      src/python/grpcio/grpc/_cython/cygrpc.pxd
  14. 1
      src/python/grpcio/grpc/_cython/cygrpc.pyx
  15. 1
      src/python/grpcio/grpc/experimental/BUILD.bazel
  16. 1
      src/python/grpcio/grpc/experimental/aio/__init__.py
  17. 179
      src/python/grpcio/grpc/experimental/aio/_server.py
  18. 52
      src/python/grpcio_tests/tests_aio/benchmark/server.py
  19. 43
      src/python/grpcio_tests/tests_aio/unit/BUILD.bazel
  20. 62
      src/python/grpcio_tests/tests_aio/unit/server_test.py

@ -22,7 +22,7 @@ if("${gRPC_CARES_PROVIDER}" STREQUAL "module")
# See https://github.com/grpc/grpc/issues/17255
set(HAVE_LIBNSL OFF CACHE BOOL "avoid cares dependency on libnsl")
endif()
add_subdirectory(third_party/cares/cares)
add_subdirectory("${CARES_ROOT_DIR}" third_party/cares/cares)
if(TARGET c-ares)
set(_gRPC_CARES_LIBRARIES c-ares)

@ -46,13 +46,24 @@ namespace Grpc.Core
throw new NotImplementedException();
}
/// <summary>
/// Sets the payload length when writing serialized data into a buffer writer. If the serializer knows the full payload
/// length in advance, providing that information before obtaining the buffer writer using <c>GetBufferWriter()</c> can improve
/// serialization efficiency by avoiding copies. The provided payload length must be the same as the data written to the writer.
/// Calling this method is optional. If the payload length is not set then the length is calculated using the data written to
/// the buffer writer when <c>Complete()</c> is called.
/// </summary>
/// <param name="payloadLength">The total length of the payload in bytes.</param>
public virtual void SetPayloadLength(int payloadLength)
{
}
/// <summary>
/// Complete the payload written to the buffer writer. <c>Complete()</c> can only be called once.
/// </summary>
public virtual void Complete()
{
throw new NotImplementedException();
}
}
}

@ -56,6 +56,11 @@ namespace Grpc.Core.Internal
return sliceBuffer;
}
public override void SetPayloadLength(int payloadLength)
{
// Length is calculated using the buffer writer
}
/// <summary>
/// Complete the payload written so far.
/// </summary>

@ -22,7 +22,7 @@
<EnumValue Name="None" DisplayName="Do not generate" />
<EnumProperty.DataSource>
<DataSource ItemType="Protobuf" SourceOfDefaultValue="AfterContext"
PersistenceStyle="Attribute" />
PersistenceStyle="Attribute" HasConfigurationCondition="false" />
</EnumProperty.DataSource>
</EnumProperty>

@ -82,7 +82,7 @@
<EnumValue Name="Internal" DisplayName="Internal" />
<EnumProperty.DataSource>
<DataSource ItemType="Protobuf" SourceOfDefaultValue="AfterContext"
PersistenceStyle="Attribute" />
PersistenceStyle="Attribute" HasConfigurationCondition="false" />
</EnumProperty.DataSource>
</EnumProperty>
@ -91,7 +91,7 @@
Description="Specifies if this file is compiled or only imported by other files.">
<BoolProperty.DataSource>
<DataSource ItemType="Protobuf" SourceOfDefaultValue="AfterContext"
PersistenceStyle="Attribute" />
PersistenceStyle="Attribute" HasConfigurationCondition="false" />
</BoolProperty.DataSource>
</BoolProperty>

@ -125,6 +125,23 @@ grpc_proto_library(
],
)
proto_library(
name = "benchmark_service_descriptor",
srcs = ["benchmark_service.proto"],
deps = [":messages_proto_descriptor"],
)
py_proto_library(
name = "benchmark_service_py_pb2",
deps = [":benchmark_service_descriptor"],
)
py_grpc_library(
name = "benchmark_service_py_pb2_grpc",
srcs = [":benchmark_service_descriptor"],
deps = [":benchmark_service_py_pb2"],
)
grpc_proto_library(
name = "report_qps_scenario_service_proto",
srcs = ["report_qps_scenario_service.proto"],

@ -24,6 +24,8 @@ pyx_library(
"_cygrpc/aio/iomgr/socket.pyx.pxi",
"_cygrpc/aio/iomgr/timer.pxd.pxi",
"_cygrpc/aio/iomgr/timer.pyx.pxi",
"_cygrpc/aio/server.pxd.pxi",
"_cygrpc/aio/server.pyx.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pxd.pxi",

@ -14,9 +14,14 @@
from cpython cimport Py_INCREF, Py_DECREF
from libc cimport string
import socket as native_socket
try:
import ipaddress # CPython 3.3 and above
except ImportError:
pass
cdef grpc_socket_vtable asyncio_socket_vtable
cdef grpc_custom_resolver_vtable asyncio_resolver_vtable
cdef grpc_custom_timer_vtable asyncio_timer_vtable
@ -26,7 +31,7 @@ cdef grpc_custom_poller_vtable asyncio_pollset_vtable
cdef grpc_error* asyncio_socket_init(
grpc_custom_socket* grpc_socket,
int domain) with gil:
socket = _AsyncioSocket.create(grpc_socket)
socket = _AsyncioSocket.create(grpc_socket, None, None)
Py_INCREF(socket)
grpc_socket.impl = <void*>socket
return <grpc_error*>0
@ -81,39 +86,85 @@ cdef grpc_error* asyncio_socket_getpeername(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
peer = (<_AsyncioSocket>grpc_socket.impl).peername()
cdef grpc_resolved_address c_addr
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error* asyncio_socket_getsockname(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
int* length) with gil:
raise NotImplemented()
"""Supplies sock_addr in add_socket_to_server."""
cdef grpc_resolved_address c_addr
socket = (<_AsyncioSocket>grpc_socket.impl)
if socket is None:
peer = ('0.0.0.0', 0)
else:
peer = socket.sockname()
hostname = str_to_bytes(peer[0])
grpc_string_to_sockaddr(&c_addr, hostname, peer[1])
# TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy
string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len)
length[0] = c_addr.len
return grpc_error_none()
cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil:
raise NotImplemented()
(<_AsyncioSocket>grpc_socket.impl).listen()
return grpc_error_none()
def _asyncio_apply_socket_options(object s, so_reuse_port=False):
# TODO(https://github.com/grpc/grpc/issues/20667)
# Connects the so_reuse_port option to channel arguments
s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1)
s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
cdef grpc_error* asyncio_socket_bind(
grpc_custom_socket* grpc_socket,
const grpc_sockaddr* addr,
size_t len, int flags) with gil:
raise NotImplemented()
host, port = sockaddr_to_tuple(addr, len)
try:
ip = ipaddress.ip_address(host)
if isinstance(ip, ipaddress.IPv6Address):
family = native_socket.AF_INET6
else:
family = native_socket.AF_INET
socket = native_socket.socket(family=family)
_asyncio_apply_socket_options(socket)
socket.bind((host, port))
except IOError as io_error:
return socket_error("bind", str(io_error))
else:
aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket)
cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy
grpc_socket.impl = <void*>aio_socket
return grpc_error_none()
cdef void asyncio_socket_accept(
grpc_custom_socket* grpc_socket,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback accept_cb) with gil:
raise NotImplemented()
(<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb)
cdef grpc_error* asyncio_resolve(
char* host,
char* port,
grpc_resolved_addresses** res) with gil:
raise NotImplemented()
result = native_socket.getaddrinfo(host, port)
res[0] = tuples_to_resolvaddr(result)
cdef void asyncio_resolve_async(

@ -15,8 +15,8 @@
cdef class _AsyncioSocket:
cdef:
# Common attributes
grpc_custom_socket * _grpc_socket
grpc_custom_connect_callback _grpc_connect_cb
grpc_custom_read_callback _grpc_read_cb
object _reader
object _writer
@ -24,11 +24,31 @@ cdef class _AsyncioSocket:
object _task_connect
char * _read_buffer
# Client-side attributes
grpc_custom_connect_callback _grpc_connect_cb
# Server-side attributes
grpc_custom_accept_callback _grpc_accept_cb
grpc_custom_socket * _grpc_client_socket
object _server
object _py_socket
object _peername
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket)
cdef _AsyncioSocket create(
grpc_custom_socket * grpc_socket,
object reader,
object writer)
@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket)
cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb)
cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb)
cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb)
cdef bint is_connected(self)
cdef void close(self)
cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb)
cdef listen(self)
cdef tuple peername(self)
cdef tuple sockname(self)

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import socket as native_socket
from libc cimport string
@ -26,11 +26,27 @@ cdef class _AsyncioSocket:
self._task_connect = None
self._task_read = None
self._read_buffer = NULL
self._server = None
self._py_socket = None
self._peername = None
@staticmethod
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket):
cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket,
object reader,
object writer):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._reader = reader
socket._writer = writer
if writer is not None:
socket._peername = writer.get_extra_info('peername')
return socket
@staticmethod
cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket):
socket = _AsyncioSocket()
socket._grpc_socket = grpc_socket
socket._py_socket = py_socket
return socket
def __repr__(self):
@ -52,7 +68,7 @@ cdef class _AsyncioSocket:
# gRPC default posix implementation disables nagle
# algorithm.
sock = self._writer.transport.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True)
self._grpc_connect_cb(
<grpc_custom_socket*>self._grpc_socket,
@ -92,7 +108,11 @@ cdef class _AsyncioSocket:
grpc_socket_error("read {}".format(error_msg).encode())
)
cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb):
cdef void connect(self,
object host,
object port,
grpc_custom_connect_callback grpc_connect_cb):
assert not self._reader
assert not self._task_connect
self._task_connect = asyncio.ensure_future(
@ -132,3 +152,39 @@ cdef class _AsyncioSocket:
cdef void close(self):
if self.is_connected():
self._writer.close()
def _new_connection_callback(self, object reader, object writer):
client_socket = _AsyncioSocket.create(
self._grpc_client_socket,
reader,
writer,
)
self._grpc_client_socket.impl = <void*>client_socket
cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy
# Accept callback expects to be called with:
# grpc_custom_socket: A grpc custom socket for server
# grpc_custom_socket: A grpc custom socket for client (with new Socket instance)
# grpc_error: An error object
self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none())
cdef listen(self):
async def create_asyncio_server():
self._server = await asyncio.start_server(
self._new_connection_callback,
sock=self._py_socket,
)
asyncio.get_event_loop().create_task(create_asyncio_server())
cdef accept(self,
grpc_custom_socket* grpc_socket_client,
grpc_custom_accept_callback grpc_accept_cb):
self._grpc_client_socket = grpc_socket_client
self._grpc_accept_cb = grpc_accept_cb
cdef tuple peername(self):
return self._peername
cdef tuple sockname(self):
return self._py_socket.getsockname()

@ -0,0 +1,44 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _HandlerCallDetails:
cdef readonly str method
cdef readonly tuple invocation_metadata
cdef class RPCState:
cdef grpc_call* call,
cdef grpc_call_details details
cdef grpc_metadata_array request_metadata
cdef bytes method(self)
cdef enum AioServerStatus:
AIO_SERVER_STATUS_UNKNOWN
AIO_SERVER_STATUS_READY
AIO_SERVER_STATUS_RUNNING
AIO_SERVER_STATUS_STOPPED
cdef class _CallbackCompletionQueue:
cdef grpc_completion_queue *_cq
cdef grpc_completion_queue* c_ptr(self)
cdef class AioServer:
cdef Server _server
cdef _CallbackCompletionQueue _cq
cdef list _generic_handlers
cdef AioServerStatus _status

@ -0,0 +1,275 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cdef class _HandlerCallDetails:
def __cinit__(self, str method, tuple invocation_metadata):
self.method = method
self.invocation_metadata = invocation_metadata
class _ServicerContextPlaceHolder(object): pass
# TODO(https://github.com/grpc/grpc/issues/20669)
# Apply this to the client-side
cdef class CallbackWrapper:
cdef CallbackContext context
cdef object _reference
def __cinit__(self, object future):
self.context.functor.functor_run = self.functor_run
self.context.waiter = <cpython.PyObject*>(future)
self._reference = future
@staticmethod
cdef void functor_run(
grpc_experimental_completion_queue_functor* functor,
int succeed):
cdef CallbackContext *context = <CallbackContext *>functor
if succeed == 0:
(<object>context.waiter).set_exception(RuntimeError())
else:
(<object>context.waiter).set_result(None)
cdef grpc_experimental_completion_queue_functor *c_functor(self):
return &self.context.functor
cdef class RPCState:
def __cinit__(self):
grpc_metadata_array_init(&self.request_metadata)
grpc_call_details_init(&self.details)
cdef bytes method(self):
return _slice_bytes(self.details.method)
def __dealloc__(self):
"""Cleans the Core objects."""
grpc_call_details_destroy(&self.details)
grpc_metadata_array_destroy(&self.request_metadata)
if self.call:
grpc_call_unref(self.call)
cdef _find_method_handler(str method, list generic_handlers):
# TODO(lidiz) connects Metadata to call details
cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails(
method,
tuple()
)
for generic_handler in generic_handlers:
method_handler = generic_handler.service(handler_call_details)
if method_handler is not None:
return method_handler
return None
async def callback_start_batch(RPCState rpc_state,
tuple operations,
object loop):
"""The callback version of start batch operations."""
cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None)
batch_operation_tag.prepare()
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(future)
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
cdef grpc_call_error error = grpc_call_start_batch(
rpc_state.call,
batch_operation_tag.c_ops,
batch_operation_tag.c_nops,
wrapper.c_functor(), NULL)
if error != GRPC_CALL_OK:
raise RuntimeError("Error with callback_start_batch {}".format(error))
await future
cpython.Py_DECREF(wrapper)
cdef grpc_event c_event
# Tag.event must be called, otherwise messages won't be parsed from C
batch_operation_tag.event(c_event)
async def _handle_unary_unary_rpc(object method_handler,
RPCState rpc_state,
object loop):
# Receives request message
cdef tuple receive_ops = (
ReceiveMessageOperation(_EMPTY_FLAGS),
)
await callback_start_batch(rpc_state, receive_ops, loop)
# Deserializes the request message
cdef bytes request_raw = receive_ops[0].message()
cdef object request_message
if method_handler.request_deserializer:
request_message = method_handler.request_deserializer(request_raw)
else:
request_message = request_raw
# Executes application logic
cdef object response_message = await method_handler.unary_unary(request_message, _ServicerContextPlaceHolder())
# Serializes the response message
cdef bytes response_raw
if method_handler.response_serializer:
response_raw = method_handler.response_serializer(response_message)
else:
response_raw = response_message
# Sends response message
cdef tuple send_ops = (
SendStatusFromServerOperation(
tuple(), StatusCode.ok, b'', _EMPTY_FLAGS),
SendInitialMetadataOperation(tuple(), _EMPTY_FLAGS),
SendMessageOperation(response_raw, _EMPTY_FLAGS),
)
await callback_start_batch(rpc_state, send_ops, loop)
async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop):
# Finds the method handler (application logic)
cdef object method_handler = _find_method_handler(
rpc_state.method().decode(),
generic_handlers
)
if method_handler is None:
# TODO(lidiz) return unimplemented error to client side
raise NotImplementedError()
# TODO(lidiz) extend to all 4 types of RPC
if method_handler.request_streaming or method_handler.response_streaming:
raise NotImplementedError()
else:
await _handle_unary_unary_rpc(
method_handler,
rpc_state,
loop
)
async def _server_call_request_call(Server server,
_CallbackCompletionQueue cq,
object loop):
cdef grpc_call_error error
cdef RPCState rpc_state = RPCState()
cdef object future = loop.create_future()
cdef CallbackWrapper wrapper = CallbackWrapper(future)
# NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed
# when calling "await". This is an over-optimization by Cython.
cpython.Py_INCREF(wrapper)
error = grpc_server_request_call(
server.c_server, &rpc_state.call, &rpc_state.details,
&rpc_state.request_metadata,
cq.c_ptr(), cq.c_ptr(),
wrapper.c_functor()
)
if error != GRPC_CALL_OK:
raise RuntimeError("Error in _server_call_request_call: %s" % error)
await future
cpython.Py_DECREF(wrapper)
return rpc_state
async def _server_main_loop(Server server,
_CallbackCompletionQueue cq,
list generic_handlers):
cdef object loop = asyncio.get_event_loop()
cdef RPCState rpc_state
while True:
rpc_state = await _server_call_request_call(
server,
cq,
loop)
loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop))
async def _server_start(Server server,
_CallbackCompletionQueue cq,
list generic_handlers):
server.start()
await _server_main_loop(server, cq, generic_handlers)
cdef class _CallbackCompletionQueue:
def __cinit__(self):
self._cq = grpc_completion_queue_create_for_callback(
NULL,
NULL
)
cdef grpc_completion_queue* c_ptr(self):
return self._cq
cdef class AioServer:
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
self._server = Server(options)
self._cq = _CallbackCompletionQueue()
self._status = AIO_SERVER_STATUS_READY
self._generic_handlers = []
grpc_server_register_completion_queue(
self._server.c_server,
self._cq.c_ptr(),
NULL
)
self.add_generic_rpc_handlers(generic_handlers)
if interceptors:
raise NotImplementedError()
if maximum_concurrent_rpcs:
raise NotImplementedError()
if compression:
raise NotImplementedError()
if thread_pool:
raise NotImplementedError()
def add_generic_rpc_handlers(self, generic_rpc_handlers):
for h in generic_rpc_handlers:
self._generic_handlers.append(h)
def add_insecure_port(self, address):
return self._server.add_http2_port(address)
def add_secure_port(self, address, server_credentials):
return self._server.add_http2_port(address,
server_credentials._credentials)
async def start(self):
if self._status == AIO_SERVER_STATUS_RUNNING:
return
elif self._status != AIO_SERVER_STATUS_READY:
raise RuntimeError('Server not in ready state')
self._status = AIO_SERVER_STATUS_RUNNING
loop = asyncio.get_event_loop()
loop.create_task(_server_start(
self._server,
self._cq,
self._generic_handlers,
))
# TODO(https://github.com/grpc/grpc/issues/20668)
# Implement Destruction Methods for AsyncIO Server
def stop(self, unused_grace):
pass

@ -47,3 +47,4 @@ include "_cygrpc/aio/grpc_aio.pxd.pxi"
include "_cygrpc/aio/callbackcontext.pxd.pxi"
include "_cygrpc/aio/call.pxd.pxi"
include "_cygrpc/aio/channel.pxd.pxi"
include "_cygrpc/aio/server.pxd.pxi"

@ -64,6 +64,7 @@ include "_cygrpc/aio/grpc_aio.pyx.pxi"
include "_cygrpc/aio/call.pyx.pxi"
include "_cygrpc/aio/channel.pyx.pxi"
include "_cygrpc/aio/rpc_error.pyx.pxi"
include "_cygrpc/aio/server.pyx.pxi"
#

@ -5,6 +5,7 @@ py_library(
srcs = [
"aio/__init__.py",
"aio/_channel.py",
"aio/_server.py",
],
deps = [
"//src/python/grpcio/grpc/_cython:cygrpc",

@ -20,6 +20,7 @@ import six
import grpc
from grpc._cython import cygrpc
from grpc._cython.cygrpc import init_grpc_aio
from ._server import server
from ._channel import Channel
from ._channel import UnaryUnaryMultiCallable

@ -0,0 +1,179 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Server-side implementation of gRPC Asyncio Python."""
from typing import Text, Optional
import asyncio
import grpc
from grpc import _common
from grpc._cython import cygrpc
class Server:
"""Serves RPCs."""
def __init__(self, thread_pool, generic_handlers, interceptors, options,
maximum_concurrent_rpcs, compression):
self._server = cygrpc.AioServer(thread_pool, generic_handlers,
interceptors, options,
maximum_concurrent_rpcs, compression)
def add_generic_rpc_handlers(
self,
generic_rpc_handlers,
# generic_rpc_handlers: Iterable[grpc.GenericRpcHandlers]
) -> None:
"""Registers GenericRpcHandlers with this Server.
This method is only safe to call before the server is started.
Args:
generic_rpc_handlers: An iterable of GenericRpcHandlers that will be
used to service RPCs.
"""
self._server.add_generic_rpc_handlers(generic_rpc_handlers)
def add_insecure_port(self, address: Text) -> int:
"""Opens an insecure port for accepting RPCs.
This method may only be called before starting the server.
Args:
address: The address for which to open a port. If the port is 0,
or not specified in the address, then the gRPC runtime will choose a port.
Returns:
An integer port on which the server will accept RPC requests.
"""
return self._server.add_insecure_port(_common.encode(address))
def add_secure_port(self, address: Text,
server_credentials: grpc.ServerCredentials) -> int:
"""Opens a secure port for accepting RPCs.
This method may only be called before starting the server.
Args:
address: The address for which to open a port.
if the port is 0, or not specified in the address, then the gRPC
runtime will choose a port.
server_credentials: A ServerCredentials object.
Returns:
An integer port on which the server will accept RPC requests.
"""
return self._server.add_secure_port(
_common.encode(address), server_credentials)
async def start(self) -> None:
"""Starts this Server.
This method may only be called once. (i.e. it is not idempotent).
"""
await self._server.start()
def stop(self, grace: Optional[float]) -> asyncio.Event:
"""Stops this Server.
"This method immediately stops the server from servicing new RPCs in
all cases.
If a grace period is specified, this method returns immediately
and all RPCs active at the end of the grace period are aborted.
If a grace period is not specified (by passing None for `grace`),
all existing RPCs are aborted immediately and this method
blocks until the last RPC handler terminates.
This method is idempotent and may be called at any time.
Passing a smaller grace value in a subsequent call will have
the effect of stopping the Server sooner (passing None will
have the effect of stopping the server immediately). Passing
a larger grace value in a subsequent call *will not* have the
effect of stopping the server later (i.e. the most restrictive
grace value is used).
Args:
grace: A duration of time in seconds or None.
Returns:
A threading.Event that will be set when this Server has completely
stopped, i.e. when running RPCs either complete or are aborted and
all handlers have terminated.
"""
raise NotImplementedError()
async def wait_for_termination(self,
timeout: Optional[float] = None) -> bool:
"""Block current coroutine until the server stops.
This is an EXPERIMENTAL API.
The wait will not consume computational resources during blocking, and
it will block until one of the two following conditions are met:
1) The server is stopped or terminated;
2) A timeout occurs if timeout is not `None`.
The timeout argument works in the same way as `threading.Event.wait()`.
https://docs.python.org/3/library/threading.html#threading.Event.wait
Args:
timeout: A floating point number specifying a timeout for the
operation in seconds.
Returns:
A bool indicates if the operation times out.
"""
if timeout:
raise NotImplementedError()
# TODO(lidiz) replace this wait forever logic
future = asyncio.get_event_loop().create_future()
await future
def server(migration_thread_pool=None,
handlers=None,
interceptors=None,
options=None,
maximum_concurrent_rpcs=None,
compression=None):
"""Creates a Server with which RPCs can be serviced.
Args:
migration_thread_pool: A futures.ThreadPoolExecutor to be used by the
Server to execute non-AsyncIO RPC handlers for migration purpose.
handlers: An optional list of GenericRpcHandlers used for executing RPCs.
More handlers may be added by calling add_generic_rpc_handlers any time
before the server is started.
interceptors: An optional list of ServerInterceptor objects that observe
and optionally manipulate the incoming RPCs before handing them over to
handlers. The interceptors are given control in the order they are
specified. This is an EXPERIMENTAL API.
options: An optional list of key-value pairs (channel args in gRPC runtime)
to configure the channel.
maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
will service before returning RESOURCE_EXHAUSTED status, or None to
indicate no limit.
compression: An element of grpc.compression, e.g.
grpc.compression.Gzip. This compression algorithm will be used for the
lifetime of the server unless overridden. This is an EXPERIMENTAL option.
Returns:
A Server object.
"""
return Server(migration_thread_pool, ()
if handlers is None else handlers, ()
if interceptors is None else interceptors, ()
if options is None else options, maximum_concurrent_rpcs,
compression)

@ -0,0 +1,52 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import unittest
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import benchmark_service_pb2_grpc
class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer):
async def UnaryCall(self, request, context):
payload = messages_pb2.Payload(body=b'\0' * request.response_size)
return messages_pb2.SimpleResponse(payload=payload)
async def _start_async_server():
server = aio.server()
port = server.add_insecure_port(('localhost:%s' % 50051).encode('ASCII'))
servicer = BenchmarkServer()
benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server(
servicer, server)
await server.start()
await server.wait_for_termination()
def main():
aio.init_grpc_aio()
loop = asyncio.get_event_loop()
loop.create_task(_start_async_server())
loop.run_forever()
if __name__ == '__main__':
logging.basicConfig()
main()

@ -0,0 +1,43 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
package(
default_testonly = 1,
default_visibility = ["//visibility:public"],
)
GRPC_ASYNC_TESTS = [
"server_test.py",
]
[
py_test(
name=test_file_name[:-3],
size="small",
srcs=[test_file_name],
main=test_file_name,
python_version="PY3",
deps=[
"//src/python/grpcio/grpc:grpcio",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:benchmark_service_py_pb2_grpc",
"//src/proto/grpc/testing:benchmark_service_py_pb2",
"//external:six"
],
imports=["../../",],
data=[
"//src/python/grpcio_tests/tests/unit/credentials",
],
) for test_file_name in GRPC_ASYNC_TESTS
]

@ -0,0 +1,62 @@
# Copyright 2019 The gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import unittest
import grpc
from grpc.experimental import aio
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import benchmark_service_pb2_grpc
_TEST_METHOD_PATH = ''
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x01\x01\x01'
async def unary_unary(unused_request, unused_context):
return _RESPONSE
class GenericHandler(grpc.GenericRpcHandler):
def service(self, unused_handler_details):
return grpc.unary_unary_rpc_method_handler(unary_unary)
class TestServer(unittest.TestCase):
def test_unary_unary(self):
loop = asyncio.get_event_loop()
async def test_unary_unary_body():
server = aio.server()
port = server.add_insecure_port('[::]:0')
server.add_generic_rpc_handlers((GenericHandler(),))
await server.start()
async with aio.insecure_channel('localhost:%d' % port) as channel:
unary_call = channel.unary_unary(_TEST_METHOD_PATH)
response = await unary_call(_REQUEST)
self.assertEqual(response, _RESPONSE)
loop.run_until_complete(test_unary_unary_body())
if __name__ == '__main__':
aio.init_grpc_aio()
logging.basicConfig()
unittest.main(verbosity=2)
Loading…
Cancel
Save