diff --git a/cmake/cares.cmake b/cmake/cares.cmake index 4ea0d8725d0..c09f6940c91 100644 --- a/cmake/cares.cmake +++ b/cmake/cares.cmake @@ -22,7 +22,7 @@ if("${gRPC_CARES_PROVIDER}" STREQUAL "module") # See https://github.com/grpc/grpc/issues/17255 set(HAVE_LIBNSL OFF CACHE BOOL "avoid cares dependency on libnsl") endif() - add_subdirectory(third_party/cares/cares) + add_subdirectory("${CARES_ROOT_DIR}" third_party/cares/cares) if(TARGET c-ares) set(_gRPC_CARES_LIBRARIES c-ares) diff --git a/src/csharp/Grpc.Core.Api/SerializationContext.cs b/src/csharp/Grpc.Core.Api/SerializationContext.cs index 59e14c12e3b..021ca29c3d5 100644 --- a/src/csharp/Grpc.Core.Api/SerializationContext.cs +++ b/src/csharp/Grpc.Core.Api/SerializationContext.cs @@ -46,13 +46,24 @@ namespace Grpc.Core throw new NotImplementedException(); } + /// + /// Sets the payload length when writing serialized data into a buffer writer. If the serializer knows the full payload + /// length in advance, providing that information before obtaining the buffer writer using GetBufferWriter() can improve + /// serialization efficiency by avoiding copies. The provided payload length must be the same as the data written to the writer. + /// Calling this method is optional. If the payload length is not set then the length is calculated using the data written to + /// the buffer writer when Complete() is called. + /// + /// The total length of the payload in bytes. + public virtual void SetPayloadLength(int payloadLength) + { + } + /// /// Complete the payload written to the buffer writer. Complete() can only be called once. /// public virtual void Complete() { throw new NotImplementedException(); - } } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs index 4d45b5c684f..981ff69dbdc 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs @@ -56,6 +56,11 @@ namespace Grpc.Core.Internal return sliceBuffer; } + public override void SetPayloadLength(int payloadLength) + { + // Length is calculated using the buffer writer + } + /// /// Complete the payload written so far. /// diff --git a/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml b/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml index 66862582dad..5d284e25c8d 100644 --- a/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml +++ b/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml @@ -22,7 +22,7 @@ + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml b/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml index 66b9f4bd5da..530ec52026f 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml +++ b/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml @@ -82,7 +82,7 @@ + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> @@ -91,7 +91,7 @@ Description="Specifies if this file is compiled or only imported by other files."> + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index bc481606500..308d4b6ed99 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -125,6 +125,23 @@ grpc_proto_library( ], ) +proto_library( + name = "benchmark_service_descriptor", + srcs = ["benchmark_service.proto"], + deps = [":messages_proto_descriptor"], +) + +py_proto_library( + name = "benchmark_service_py_pb2", + deps = [":benchmark_service_descriptor"], +) + +py_grpc_library( + name = "benchmark_service_py_pb2_grpc", + srcs = [":benchmark_service_descriptor"], + deps = [":benchmark_service_py_pb2"], +) + grpc_proto_library( name = "report_qps_scenario_service_proto", srcs = ["report_qps_scenario_service.proto"], diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 916086731e7..3a355527a00 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -24,6 +24,8 @@ pyx_library( "_cygrpc/aio/iomgr/socket.pyx.pxi", "_cygrpc/aio/iomgr/timer.pxd.pxi", "_cygrpc/aio/iomgr/timer.pyx.pxi", + "_cygrpc/aio/server.pxd.pxi", + "_cygrpc/aio/server.pyx.pxi", "_cygrpc/arguments.pxd.pxi", "_cygrpc/arguments.pyx.pxi", "_cygrpc/call.pxd.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 13e95ee1206..a5811f8a2fe 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -14,9 +14,14 @@ from cpython cimport Py_INCREF, Py_DECREF - from libc cimport string +import socket as native_socket +try: + import ipaddress # CPython 3.3 and above +except ImportError: + pass + cdef grpc_socket_vtable asyncio_socket_vtable cdef grpc_custom_resolver_vtable asyncio_resolver_vtable cdef grpc_custom_timer_vtable asyncio_timer_vtable @@ -26,7 +31,7 @@ cdef grpc_custom_poller_vtable asyncio_pollset_vtable cdef grpc_error* asyncio_socket_init( grpc_custom_socket* grpc_socket, int domain) with gil: - socket = _AsyncioSocket.create(grpc_socket) + socket = _AsyncioSocket.create(grpc_socket, None, None) Py_INCREF(socket) grpc_socket.impl = socket return 0 @@ -81,39 +86,85 @@ cdef grpc_error* asyncio_socket_getpeername( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, int* length) with gil: - raise NotImplemented() + peer = (<_AsyncioSocket>grpc_socket.impl).peername() + + cdef grpc_resolved_address c_addr + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() cdef grpc_error* asyncio_socket_getsockname( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, int* length) with gil: - raise NotImplemented() + """Supplies sock_addr in add_socket_to_server.""" + cdef grpc_resolved_address c_addr + socket = (<_AsyncioSocket>grpc_socket.impl) + if socket is None: + peer = ('0.0.0.0', 0) + else: + peer = socket.sockname() + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil: - raise NotImplemented() + (<_AsyncioSocket>grpc_socket.impl).listen() + return grpc_error_none() + + +def _asyncio_apply_socket_options(object s, so_reuse_port=False): + # TODO(https://github.com/grpc/grpc/issues/20667) + # Connects the so_reuse_port option to channel arguments + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) + s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) cdef grpc_error* asyncio_socket_bind( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, size_t len, int flags) with gil: - raise NotImplemented() + host, port = sockaddr_to_tuple(addr, len) + try: + ip = ipaddress.ip_address(host) + if isinstance(ip, ipaddress.IPv6Address): + family = native_socket.AF_INET6 + else: + family = native_socket.AF_INET + + socket = native_socket.socket(family=family) + _asyncio_apply_socket_options(socket) + socket.bind((host, port)) + except IOError as io_error: + return socket_error("bind", str(io_error)) + else: + aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket) + cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy + grpc_socket.impl = aio_socket + return grpc_error_none() cdef void asyncio_socket_accept( grpc_custom_socket* grpc_socket, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback accept_cb) with gil: - raise NotImplemented() + (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb) cdef grpc_error* asyncio_resolve( char* host, char* port, grpc_resolved_addresses** res) with gil: - raise NotImplemented() + result = native_socket.getaddrinfo(host, port) + res[0] = tuples_to_resolvaddr(result) cdef void asyncio_resolve_async( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi index aab5db2149a..285fbdcea09 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -15,8 +15,8 @@ cdef class _AsyncioSocket: cdef: + # Common attributes grpc_custom_socket * _grpc_socket - grpc_custom_connect_callback _grpc_connect_cb grpc_custom_read_callback _grpc_read_cb object _reader object _writer @@ -24,11 +24,31 @@ cdef class _AsyncioSocket: object _task_connect char * _read_buffer + # Client-side attributes + grpc_custom_connect_callback _grpc_connect_cb + + # Server-side attributes + grpc_custom_accept_callback _grpc_accept_cb + grpc_custom_socket * _grpc_client_socket + object _server + object _py_socket + object _peername + @staticmethod - cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket) + cdef _AsyncioSocket create( + grpc_custom_socket * grpc_socket, + object reader, + object writer) + @staticmethod + cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket) cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb) cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb) cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb) cdef bint is_connected(self) cdef void close(self) + + cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb) + cdef listen(self) + cdef tuple peername(self) + cdef tuple sockname(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 690c34c2da9..2d56a568348 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import socket +import socket as native_socket from libc cimport string @@ -26,11 +26,27 @@ cdef class _AsyncioSocket: self._task_connect = None self._task_read = None self._read_buffer = NULL + self._server = None + self._py_socket = None + self._peername = None @staticmethod - cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket): + cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, + object reader, + object writer): socket = _AsyncioSocket() socket._grpc_socket = grpc_socket + socket._reader = reader + socket._writer = writer + if writer is not None: + socket._peername = writer.get_extra_info('peername') + return socket + + @staticmethod + cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket): + socket = _AsyncioSocket() + socket._grpc_socket = grpc_socket + socket._py_socket = py_socket return socket def __repr__(self): @@ -52,7 +68,7 @@ cdef class _AsyncioSocket: # gRPC default posix implementation disables nagle # algorithm. sock = self._writer.transport.get_extra_info('socket') - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) + sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) self._grpc_connect_cb( self._grpc_socket, @@ -92,7 +108,11 @@ cdef class _AsyncioSocket: grpc_socket_error("read {}".format(error_msg).encode()) ) - cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb): + cdef void connect(self, + object host, + object port, + grpc_custom_connect_callback grpc_connect_cb): + assert not self._reader assert not self._task_connect self._task_connect = asyncio.ensure_future( @@ -132,3 +152,39 @@ cdef class _AsyncioSocket: cdef void close(self): if self.is_connected(): self._writer.close() + + def _new_connection_callback(self, object reader, object writer): + client_socket = _AsyncioSocket.create( + self._grpc_client_socket, + reader, + writer, + ) + + self._grpc_client_socket.impl = client_socket + cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy + # Accept callback expects to be called with: + # grpc_custom_socket: A grpc custom socket for server + # grpc_custom_socket: A grpc custom socket for client (with new Socket instance) + # grpc_error: An error object + self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none()) + + cdef listen(self): + async def create_asyncio_server(): + self._server = await asyncio.start_server( + self._new_connection_callback, + sock=self._py_socket, + ) + + asyncio.get_event_loop().create_task(create_asyncio_server()) + + cdef accept(self, + grpc_custom_socket* grpc_socket_client, + grpc_custom_accept_callback grpc_accept_cb): + self._grpc_client_socket = grpc_socket_client + self._grpc_accept_cb = grpc_accept_cb + + cdef tuple peername(self): + return self._peername + + cdef tuple sockname(self): + return self._py_socket.getsockname() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi new file mode 100644 index 00000000000..1906463d088 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -0,0 +1,44 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _HandlerCallDetails: + cdef readonly str method + cdef readonly tuple invocation_metadata + + +cdef class RPCState: + cdef grpc_call* call, + cdef grpc_call_details details + cdef grpc_metadata_array request_metadata + + cdef bytes method(self) + + +cdef enum AioServerStatus: + AIO_SERVER_STATUS_UNKNOWN + AIO_SERVER_STATUS_READY + AIO_SERVER_STATUS_RUNNING + AIO_SERVER_STATUS_STOPPED + + +cdef class _CallbackCompletionQueue: + cdef grpc_completion_queue *_cq + cdef grpc_completion_queue* c_ptr(self) + + +cdef class AioServer: + cdef Server _server + cdef _CallbackCompletionQueue _cq + cdef list _generic_handlers + cdef AioServerStatus _status diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi new file mode 100644 index 00000000000..5bab542f467 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -0,0 +1,275 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _HandlerCallDetails: + def __cinit__(self, str method, tuple invocation_metadata): + self.method = method + self.invocation_metadata = invocation_metadata + + +class _ServicerContextPlaceHolder(object): pass + + +# TODO(https://github.com/grpc/grpc/issues/20669) +# Apply this to the client-side +cdef class CallbackWrapper: + cdef CallbackContext context + cdef object _reference + + def __cinit__(self, object future): + self.context.functor.functor_run = self.functor_run + self.context.waiter = (future) + self._reference = future + + @staticmethod + cdef void functor_run( + grpc_experimental_completion_queue_functor* functor, + int succeed): + cdef CallbackContext *context = functor + if succeed == 0: + (context.waiter).set_exception(RuntimeError()) + else: + (context.waiter).set_result(None) + + cdef grpc_experimental_completion_queue_functor *c_functor(self): + return &self.context.functor + + +cdef class RPCState: + + def __cinit__(self): + grpc_metadata_array_init(&self.request_metadata) + grpc_call_details_init(&self.details) + + cdef bytes method(self): + return _slice_bytes(self.details.method) + + def __dealloc__(self): + """Cleans the Core objects.""" + grpc_call_details_destroy(&self.details) + grpc_metadata_array_destroy(&self.request_metadata) + if self.call: + grpc_call_unref(self.call) + + +cdef _find_method_handler(str method, list generic_handlers): + # TODO(lidiz) connects Metadata to call details + cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails( + method, + tuple() + ) + + for generic_handler in generic_handlers: + method_handler = generic_handler.service(handler_call_details) + if method_handler is not None: + return method_handler + return None + + +async def callback_start_batch(RPCState rpc_state, + tuple operations, + object loop): + """The callback version of start batch operations.""" + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) + batch_operation_tag.prepare() + + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(future) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + cdef grpc_call_error error = grpc_call_start_batch( + rpc_state.call, + batch_operation_tag.c_ops, + batch_operation_tag.c_nops, + wrapper.c_functor(), NULL) + + if error != GRPC_CALL_OK: + raise RuntimeError("Error with callback_start_batch {}".format(error)) + + await future + cpython.Py_DECREF(wrapper) + cdef grpc_event c_event + # Tag.event must be called, otherwise messages won't be parsed from C + batch_operation_tag.event(c_event) + + +async def _handle_unary_unary_rpc(object method_handler, + RPCState rpc_state, + object loop): + # Receives request message + cdef tuple receive_ops = ( + ReceiveMessageOperation(_EMPTY_FLAGS), + ) + await callback_start_batch(rpc_state, receive_ops, loop) + + # Deserializes the request message + cdef bytes request_raw = receive_ops[0].message() + cdef object request_message + if method_handler.request_deserializer: + request_message = method_handler.request_deserializer(request_raw) + else: + request_message = request_raw + + # Executes application logic + cdef object response_message = await method_handler.unary_unary(request_message, _ServicerContextPlaceHolder()) + + # Serializes the response message + cdef bytes response_raw + if method_handler.response_serializer: + response_raw = method_handler.response_serializer(response_message) + else: + response_raw = response_message + + # Sends response message + cdef tuple send_ops = ( + SendStatusFromServerOperation( + tuple(), StatusCode.ok, b'', _EMPTY_FLAGS), + SendInitialMetadataOperation(tuple(), _EMPTY_FLAGS), + SendMessageOperation(response_raw, _EMPTY_FLAGS), + ) + await callback_start_batch(rpc_state, send_ops, loop) + + +async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): + # Finds the method handler (application logic) + cdef object method_handler = _find_method_handler( + rpc_state.method().decode(), + generic_handlers + ) + if method_handler is None: + # TODO(lidiz) return unimplemented error to client side + raise NotImplementedError() + # TODO(lidiz) extend to all 4 types of RPC + if method_handler.request_streaming or method_handler.response_streaming: + raise NotImplementedError() + else: + await _handle_unary_unary_rpc( + method_handler, + rpc_state, + loop + ) + + +async def _server_call_request_call(Server server, + _CallbackCompletionQueue cq, + object loop): + cdef grpc_call_error error + cdef RPCState rpc_state = RPCState() + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(future) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + error = grpc_server_request_call( + server.c_server, &rpc_state.call, &rpc_state.details, + &rpc_state.request_metadata, + cq.c_ptr(), cq.c_ptr(), + wrapper.c_functor() + ) + if error != GRPC_CALL_OK: + raise RuntimeError("Error in _server_call_request_call: %s" % error) + + await future + cpython.Py_DECREF(wrapper) + return rpc_state + + +async def _server_main_loop(Server server, + _CallbackCompletionQueue cq, + list generic_handlers): + cdef object loop = asyncio.get_event_loop() + cdef RPCState rpc_state + + while True: + rpc_state = await _server_call_request_call( + server, + cq, + loop) + + loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) + + +async def _server_start(Server server, + _CallbackCompletionQueue cq, + list generic_handlers): + server.start() + await _server_main_loop(server, cq, generic_handlers) + + +cdef class _CallbackCompletionQueue: + + def __cinit__(self): + self._cq = grpc_completion_queue_create_for_callback( + NULL, + NULL + ) + + cdef grpc_completion_queue* c_ptr(self): + return self._cq + + +cdef class AioServer: + + def __init__(self, thread_pool, generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression): + self._server = Server(options) + self._cq = _CallbackCompletionQueue() + self._status = AIO_SERVER_STATUS_READY + self._generic_handlers = [] + grpc_server_register_completion_queue( + self._server.c_server, + self._cq.c_ptr(), + NULL + ) + self.add_generic_rpc_handlers(generic_handlers) + + if interceptors: + raise NotImplementedError() + if maximum_concurrent_rpcs: + raise NotImplementedError() + if compression: + raise NotImplementedError() + if thread_pool: + raise NotImplementedError() + + def add_generic_rpc_handlers(self, generic_rpc_handlers): + for h in generic_rpc_handlers: + self._generic_handlers.append(h) + + def add_insecure_port(self, address): + return self._server.add_http2_port(address) + + def add_secure_port(self, address, server_credentials): + return self._server.add_http2_port(address, + server_credentials._credentials) + + async def start(self): + if self._status == AIO_SERVER_STATUS_RUNNING: + return + elif self._status != AIO_SERVER_STATUS_READY: + raise RuntimeError('Server not in ready state') + + self._status = AIO_SERVER_STATUS_RUNNING + loop = asyncio.get_event_loop() + loop.create_task(_server_start( + self._server, + self._cq, + self._generic_handlers, + )) + + # TODO(https://github.com/grpc/grpc/issues/20668) + # Implement Destruction Methods for AsyncIO Server + def stop(self, unused_grace): + pass diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 4d081fb83e2..b0a2033cc40 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -47,3 +47,4 @@ include "_cygrpc/aio/grpc_aio.pxd.pxi" include "_cygrpc/aio/callbackcontext.pxd.pxi" include "_cygrpc/aio/call.pxd.pxi" include "_cygrpc/aio/channel.pxd.pxi" +include "_cygrpc/aio/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index c4635be72d3..5f980bb46f0 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -64,6 +64,7 @@ include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" include "_cygrpc/aio/rpc_error.pyx.pxi" +include "_cygrpc/aio/server.pyx.pxi" # diff --git a/src/python/grpcio/grpc/experimental/BUILD.bazel b/src/python/grpcio/grpc/experimental/BUILD.bazel index 00815c4e72e..c9f0484c886 100644 --- a/src/python/grpcio/grpc/experimental/BUILD.bazel +++ b/src/python/grpcio/grpc/experimental/BUILD.bazel @@ -5,6 +5,7 @@ py_library( srcs = [ "aio/__init__.py", "aio/_channel.py", + "aio/_server.py", ], deps = [ "//src/python/grpcio/grpc/_cython:cygrpc", diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index 997d7d3cad9..5e919f500bd 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -20,6 +20,7 @@ import six import grpc from grpc._cython import cygrpc from grpc._cython.cygrpc import init_grpc_aio +from ._server import server from ._channel import Channel from ._channel import UnaryUnaryMultiCallable diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py new file mode 100644 index 00000000000..6bc3d210aed --- /dev/null +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -0,0 +1,179 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Server-side implementation of gRPC Asyncio Python.""" + +from typing import Text, Optional +import asyncio +import grpc +from grpc import _common +from grpc._cython import cygrpc + + +class Server: + """Serves RPCs.""" + + def __init__(self, thread_pool, generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression): + self._server = cygrpc.AioServer(thread_pool, generic_handlers, + interceptors, options, + maximum_concurrent_rpcs, compression) + + def add_generic_rpc_handlers( + self, + generic_rpc_handlers, + # generic_rpc_handlers: Iterable[grpc.GenericRpcHandlers] + ) -> None: + """Registers GenericRpcHandlers with this Server. + + This method is only safe to call before the server is started. + + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be + used to service RPCs. + """ + self._server.add_generic_rpc_handlers(generic_rpc_handlers) + + def add_insecure_port(self, address: Text) -> int: + """Opens an insecure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. If the port is 0, + or not specified in the address, then the gRPC runtime will choose a port. + + Returns: + An integer port on which the server will accept RPC requests. + """ + return self._server.add_insecure_port(_common.encode(address)) + + def add_secure_port(self, address: Text, + server_credentials: grpc.ServerCredentials) -> int: + """Opens a secure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then the gRPC + runtime will choose a port. + server_credentials: A ServerCredentials object. + + Returns: + An integer port on which the server will accept RPC requests. + """ + return self._server.add_secure_port( + _common.encode(address), server_credentials) + + async def start(self) -> None: + """Starts this Server. + + This method may only be called once. (i.e. it is not idempotent). + """ + await self._server.start() + + def stop(self, grace: Optional[float]) -> asyncio.Event: + """Stops this Server. + + "This method immediately stops the server from servicing new RPCs in + all cases. + + If a grace period is specified, this method returns immediately + and all RPCs active at the end of the grace period are aborted. + If a grace period is not specified (by passing None for `grace`), + all existing RPCs are aborted immediately and this method + blocks until the last RPC handler terminates. + + This method is idempotent and may be called at any time. + Passing a smaller grace value in a subsequent call will have + the effect of stopping the Server sooner (passing None will + have the effect of stopping the server immediately). Passing + a larger grace value in a subsequent call *will not* have the + effect of stopping the server later (i.e. the most restrictive + grace value is used). + + Args: + grace: A duration of time in seconds or None. + + Returns: + A threading.Event that will be set when this Server has completely + stopped, i.e. when running RPCs either complete or are aborted and + all handlers have terminated. + """ + raise NotImplementedError() + + async def wait_for_termination(self, + timeout: Optional[float] = None) -> bool: + """Block current coroutine until the server stops. + + This is an EXPERIMENTAL API. + + The wait will not consume computational resources during blocking, and + it will block until one of the two following conditions are met: + + 1) The server is stopped or terminated; + 2) A timeout occurs if timeout is not `None`. + + The timeout argument works in the same way as `threading.Event.wait()`. + https://docs.python.org/3/library/threading.html#threading.Event.wait + + Args: + timeout: A floating point number specifying a timeout for the + operation in seconds. + + Returns: + A bool indicates if the operation times out. + """ + if timeout: + raise NotImplementedError() + # TODO(lidiz) replace this wait forever logic + future = asyncio.get_event_loop().create_future() + await future + + +def server(migration_thread_pool=None, + handlers=None, + interceptors=None, + options=None, + maximum_concurrent_rpcs=None, + compression=None): + """Creates a Server with which RPCs can be serviced. + + Args: + migration_thread_pool: A futures.ThreadPoolExecutor to be used by the + Server to execute non-AsyncIO RPC handlers for migration purpose. + handlers: An optional list of GenericRpcHandlers used for executing RPCs. + More handlers may be added by calling add_generic_rpc_handlers any time + before the server is started. + interceptors: An optional list of ServerInterceptor objects that observe + and optionally manipulate the incoming RPCs before handing them over to + handlers. The interceptors are given control in the order they are + specified. This is an EXPERIMENTAL API. + options: An optional list of key-value pairs (channel args in gRPC runtime) + to configure the channel. + maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server + will service before returning RESOURCE_EXHAUSTED status, or None to + indicate no limit. + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This compression algorithm will be used for the + lifetime of the server unless overridden. This is an EXPERIMENTAL option. + + Returns: + A Server object. + """ + return Server(migration_thread_pool, () + if handlers is None else handlers, () + if interceptors is None else interceptors, () + if options is None else options, maximum_concurrent_rpcs, + compression) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/server.py b/src/python/grpcio_tests/tests_aio/benchmark/server.py new file mode 100644 index 00000000000..ef0a3f7ff2c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/server.py @@ -0,0 +1,52 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import unittest + +from grpc.experimental import aio +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import benchmark_service_pb2_grpc + + +class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): + + async def UnaryCall(self, request, context): + payload = messages_pb2.Payload(body=b'\0' * request.response_size) + return messages_pb2.SimpleResponse(payload=payload) + + +async def _start_async_server(): + server = aio.server() + + port = server.add_insecure_port(('localhost:%s' % 50051).encode('ASCII')) + servicer = BenchmarkServer() + benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( + servicer, server) + + await server.start() + await server.wait_for_termination() + + +def main(): + aio.init_grpc_aio() + loop = asyncio.get_event_loop() + loop.create_task(_start_async_server()) + loop.run_forever() + + +if __name__ == '__main__': + logging.basicConfig() + main() diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel new file mode 100644 index 00000000000..c6d0a9f7728 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -0,0 +1,43 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package( + default_testonly = 1, + default_visibility = ["//visibility:public"], +) + +GRPC_ASYNC_TESTS = [ + "server_test.py", +] + +[ + py_test( + name=test_file_name[:-3], + size="small", + srcs=[test_file_name], + main=test_file_name, + python_version="PY3", + deps=[ + "//src/python/grpcio/grpc:grpcio", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", + "//src/proto/grpc/testing:benchmark_service_py_pb2", + "//external:six" + ], + imports=["../../",], + data=[ + "//src/python/grpcio_tests/tests/unit/credentials", + ], + ) for test_file_name in GRPC_ASYNC_TESTS +] diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py new file mode 100644 index 00000000000..2d543893176 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -0,0 +1,62 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import unittest + +import grpc +from grpc.experimental import aio +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import benchmark_service_pb2_grpc + +_TEST_METHOD_PATH = '' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' + + +async def unary_unary(unused_request, unused_context): + return _RESPONSE + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, unused_handler_details): + return grpc.unary_unary_rpc_method_handler(unary_unary) + + +class TestServer(unittest.TestCase): + + def test_unary_unary(self): + loop = asyncio.get_event_loop() + + async def test_unary_unary_body(): + server = aio.server() + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((GenericHandler(),)) + await server.start() + + async with aio.insecure_channel('localhost:%d' % port) as channel: + unary_call = channel.unary_unary(_TEST_METHOD_PATH) + response = await unary_call(_REQUEST) + self.assertEqual(response, _RESPONSE) + + loop.run_until_complete(test_unary_unary_body()) + + +if __name__ == '__main__': + aio.init_grpc_aio() + logging.basicConfig() + unittest.main(verbosity=2)