From d327e782717285d738c018805a0164a1cd560cf5 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 11 Oct 2019 14:24:35 +1300 Subject: [PATCH 01/14] Fix error when changing Protobuf item values in Visual Studio --- src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml | 2 +- src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml b/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml index 66862582dad..5d284e25c8d 100644 --- a/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml +++ b/src/csharp/Grpc.Tools/build/_grpc/Grpc.CSharp.xml @@ -22,7 +22,7 @@ + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> diff --git a/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml b/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml index 66b9f4bd5da..530ec52026f 100644 --- a/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml +++ b/src/csharp/Grpc.Tools/build/_protobuf/Protobuf.CSharp.xml @@ -82,7 +82,7 @@ + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> @@ -91,7 +91,7 @@ Description="Specifies if this file is compiled or only imported by other files."> + PersistenceStyle="Attribute" HasConfigurationCondition="false" /> From c6ae98d49a99d877c8a3bde01b221f70bb5babd9 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 14 Oct 2019 15:30:04 -0700 Subject: [PATCH 02/14] Minimal AsyncIO Server for gRPC * Extends AsyncIO IO manager to support server-side operations; * Adds more logic to AsyncSocket class; * Implements an AsyncIO server that can serve unary-unary handlers; * Adds a server test with grpc.aio.Channel; * Support both Bazel / setup.py build. --- src/proto/grpc/testing/BUILD | 17 ++ src/python/grpcio/grpc/_cython/BUILD.bazel | 2 + .../_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 59 ++++- .../_cython/_cygrpc/aio/iomgr/socket.pxd.pxi | 19 +- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 48 +++- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 35 +++ .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 234 ++++++++++++++++++ src/python/grpcio/grpc/_cython/cygrpc.pxd | 1 + src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + .../grpcio/grpc/experimental/BUILD.bazel | 1 + .../grpcio/grpc/experimental/aio/__init__.py | 1 + .../grpcio/grpc/experimental/aio/_server.py | 174 +++++++++++++ .../tests_aio/benchmark/server.py | 52 ++++ .../grpcio_tests/tests_aio/unit/BUILD.bazel | 42 ++++ .../tests_aio/unit/server_test.py | 61 +++++ 15 files changed, 737 insertions(+), 10 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi create mode 100644 src/python/grpcio/grpc/experimental/aio/_server.py create mode 100644 src/python/grpcio_tests/tests_aio/benchmark/server.py create mode 100644 src/python/grpcio_tests/tests_aio/unit/BUILD.bazel create mode 100644 src/python/grpcio_tests/tests_aio/unit/server_test.py diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index bc481606500..308d4b6ed99 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -125,6 +125,23 @@ grpc_proto_library( ], ) +proto_library( + name = "benchmark_service_descriptor", + srcs = ["benchmark_service.proto"], + deps = [":messages_proto_descriptor"], +) + +py_proto_library( + name = "benchmark_service_py_pb2", + deps = [":benchmark_service_descriptor"], +) + +py_grpc_library( + name = "benchmark_service_py_pb2_grpc", + srcs = [":benchmark_service_descriptor"], + deps = [":benchmark_service_py_pb2"], +) + grpc_proto_library( name = "report_qps_scenario_service_proto", srcs = ["report_qps_scenario_service.proto"], diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel index 916086731e7..3a355527a00 100644 --- a/src/python/grpcio/grpc/_cython/BUILD.bazel +++ b/src/python/grpcio/grpc/_cython/BUILD.bazel @@ -24,6 +24,8 @@ pyx_library( "_cygrpc/aio/iomgr/socket.pyx.pxi", "_cygrpc/aio/iomgr/timer.pxd.pxi", "_cygrpc/aio/iomgr/timer.pyx.pxi", + "_cygrpc/aio/server.pxd.pxi", + "_cygrpc/aio/server.pyx.pxi", "_cygrpc/arguments.pxd.pxi", "_cygrpc/arguments.pyx.pxi", "_cygrpc/call.pxd.pxi", diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 13e95ee1206..f0c33e4c7c0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -14,9 +14,10 @@ from cpython cimport Py_INCREF, Py_DECREF - from libc cimport string +import socket as native_socket + cdef grpc_socket_vtable asyncio_socket_vtable cdef grpc_custom_resolver_vtable asyncio_resolver_vtable cdef grpc_custom_timer_vtable asyncio_timer_vtable @@ -81,39 +82,83 @@ cdef grpc_error* asyncio_socket_getpeername( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, int* length) with gil: - raise NotImplemented() + peer = (<_AsyncioSocket>grpc_socket.impl).peername() + + cdef grpc_resolved_address c_addr + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() cdef grpc_error* asyncio_socket_getsockname( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, int* length) with gil: - raise NotImplemented() + """Supplies sock_addr in add_socket_to_server.""" + cdef grpc_resolved_address c_addr + socket = (<_AsyncioSocket>grpc_socket.impl) + if socket is None: + peer = ('0.0.0.0', 0) + else: + peer = socket.sockname() + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(addr, c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil: - raise NotImplemented() + (<_AsyncioSocket>grpc_socket.impl).listen() + return grpc_error_none() + + +# TODO(lidiz) connects the so_reuse_port option to channel arguments +def _asyncio_apply_socket_options(object s, so_reuse_port=False): + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) + if so_reuse_port: + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1) + s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) cdef grpc_error* asyncio_socket_bind( grpc_custom_socket* grpc_socket, const grpc_sockaddr* addr, size_t len, int flags) with gil: - raise NotImplemented() + host, port = sockaddr_to_tuple(addr, len) + try: + try: + socket = native_socket.socket(family=native_socket.AF_INET6) + _asyncio_apply_socket_options(socket) + socket.bind((host, port)) + except native_socket.gaierror: + socket = native_socket.socket(family=native_socket.AF_INET) + _asyncio_apply_socket_options(socket) + socket.bind((host, port)) + except IOError as io_error: + return socket_error("bind", str(io_error)) + else: + aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket) + cpython.Py_INCREF(aio_socket) + grpc_socket.impl = aio_socket + return grpc_error_none() cdef void asyncio_socket_accept( grpc_custom_socket* grpc_socket, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback accept_cb) with gil: - raise NotImplemented() + (<_AsyncioSocket>grpc_socket.impl).accept(grpc_socket_client, accept_cb) cdef grpc_error* asyncio_resolve( char* host, char* port, grpc_resolved_addresses** res) with gil: - raise NotImplemented() + result = native_socket.getaddrinfo(host, port) + res[0] = tuples_to_resolvaddr(result) cdef void asyncio_resolve_async( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi index aab5db2149a..6cbd8e90155 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -15,8 +15,8 @@ cdef class _AsyncioSocket: cdef: + # Common attributes grpc_custom_socket * _grpc_socket - grpc_custom_connect_callback _grpc_connect_cb grpc_custom_read_callback _grpc_read_cb object _reader object _writer @@ -24,11 +24,28 @@ cdef class _AsyncioSocket: object _task_connect char * _read_buffer + # Client-side attributes + grpc_custom_connect_callback _grpc_connect_cb + + # Server-side attributes + grpc_custom_accept_callback _grpc_accept_cb + grpc_custom_socket * _grpc_client_socket + object _server + object _py_socket + object _peername + @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket) + @staticmethod + cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket) cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb) cdef void write(self, grpc_slice_buffer * g_slice_buffer, grpc_custom_write_callback grpc_write_cb) cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb) cdef bint is_connected(self) cdef void close(self) + + cdef accept(self, grpc_custom_socket* grpc_socket_client, grpc_custom_accept_callback grpc_accept_cb) + cdef listen(self) + cdef tuple peername(self) + cdef tuple sockname(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 690c34c2da9..bb6f74645fc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import socket +import socket as native_socket from libc cimport string @@ -26,6 +26,8 @@ cdef class _AsyncioSocket: self._task_connect = None self._task_read = None self._read_buffer = NULL + self._server = None + self._py_socket = None @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket): @@ -33,6 +35,13 @@ cdef class _AsyncioSocket: socket._grpc_socket = grpc_socket return socket + @staticmethod + cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket): + socket = _AsyncioSocket() + socket._grpc_socket = grpc_socket + socket._py_socket = py_socket + return socket + def __repr__(self): class_name = self.__class__.__name__ id_ = id(self) @@ -52,7 +61,7 @@ cdef class _AsyncioSocket: # gRPC default posix implementation disables nagle # algorithm. sock = self._writer.transport.get_extra_info('socket') - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) + sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) self._grpc_connect_cb( self._grpc_socket, @@ -132,3 +141,38 @@ cdef class _AsyncioSocket: cdef void close(self): if self.is_connected(): self._writer.close() + + def _new_connection_callback(self, object reader, object writer): + client_socket = _AsyncioSocket.create(self._grpc_client_socket) + client_socket._reader = reader + client_socket._writer = writer + client_socket._peername = addr = writer.get_extra_info('peername') + + self._grpc_client_socket.impl = client_socket + cpython.Py_INCREF(client_socket) + # Accept callback expects to be called with: + # * An grpc custom socket for server + # * An grpc custom socket for client (with new Socket instance) + # * An error object + self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none()) + + cdef listen(self): + async def create_asyncio_server(): + self._server = await asyncio.start_server( + self._new_connection_callback, + sock=self._py_socket, + ) + + asyncio.get_event_loop().create_task(create_asyncio_server()) + + cdef accept(self, + grpc_custom_socket* grpc_socket_client, + grpc_custom_accept_callback grpc_accept_cb): + self._grpc_client_socket = grpc_socket_client + self._grpc_accept_cb = grpc_accept_cb + + cdef tuple peername(self): + return self._peername + + cdef tuple sockname(self): + return self._py_socket.getsockname() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi new file mode 100644 index 00000000000..ee5255213c8 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -0,0 +1,35 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _HandlerCallDetails: + cdef readonly str method + cdef readonly tuple invocation_metadata + + +cdef class RPCState: + cdef grpc_call* call, + cdef grpc_call_details details + cdef grpc_metadata_array request_metadata + + cdef bytes method(self) + + +cdef class _AioServerState: + cdef Server server + cdef grpc_completion_queue *cq + cdef list generic_handlers + + +cdef class AioServer: + cdef _AioServerState _state diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi new file mode 100644 index 00000000000..e64f746118a --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -0,0 +1,234 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cdef class _HandlerCallDetails: + def __cinit__(self, str method, tuple invocation_metadata): + self.method = method + self.invocation_metadata = invocation_metadata + + +class _ServicerContextPlaceHolder(object): pass + + +cdef class CallbackWrapper: + cdef CallbackContext context + cdef object future + + def __cinit__(self, object future): + self.context.functor.functor_run = self.functor_run + self.context.waiter = (future) + self.future = future + + @staticmethod + cdef void functor_run( + grpc_experimental_completion_queue_functor* functor, + int succeed): + cdef CallbackContext *context = functor + (context.waiter).set_result(None) + + cdef grpc_experimental_completion_queue_functor *c_functor(self): + return &self.context.functor + + +cdef class RPCState: + + def __cinit__(self): + grpc_metadata_array_init(&self.request_metadata) + grpc_call_details_init(&self.details) + + cdef bytes method(self): + return _slice_bytes(self.details.method) + + def __dealloc__(self): + """Cleans the Core objects.""" + grpc_call_details_destroy(&self.details) + grpc_metadata_array_destroy(&self.request_metadata) + if self.call: + grpc_call_unref(self.call) + + +cdef _find_method_handler(RPCState rpc_state, list generic_handlers): + # TODO(lidiz) connects Metadata to call details + cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails( + rpc_state.method().decode(), + tuple() + ) + + for generic_handler in generic_handlers: + method_handler = generic_handler.service(handler_call_details) + if method_handler is not None: + return method_handler + return None + + +async def callback_start_batch(RPCState rpc_state, tuple operations, object +loop): + """The callback version of start batch operations.""" + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) + batch_operation_tag.prepare() + + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(future) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + cdef grpc_call_error error = grpc_call_start_batch( + rpc_state.call, + batch_operation_tag.c_ops, + batch_operation_tag.c_nops, + wrapper.c_functor(), NULL) + + if error != GRPC_CALL_OK: + raise RuntimeError("Error with callback_start_batch {}".format(error)) + + await future + cpython.Py_DECREF(wrapper) + cdef grpc_event c_event + batch_operation_tag.event(c_event) + + +async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object loop): + # Finds the method handler (application logic) + cdef object method_handler = _find_method_handler( + rpc_state, + server_state.generic_handlers + ) + if method_handler.request_streaming or method_handler.response_streaming: + raise NotImplementedError() + + # Receives request message + cdef tuple receive_ops = ( + ReceiveMessageOperation(_EMPTY_FLAGS), + ) + await callback_start_batch(rpc_state, receive_ops, loop) + + # Parses the request + cdef bytes request_raw = receive_ops[0].message() + cdef object request_message + if method_handler.request_deserializer: + request_message = method_handler.request_deserializer(request_raw) + else: + request_message = request_raw + + # Executes application logic & encodes response message + cdef object response_message = await method_handler.unary_unary(request_message, _ServicerContextPlaceHolder()) + cdef bytes response_raw + if method_handler.response_serializer: + response_raw = method_handler.response_serializer(response_message) + else: + response_raw = response_message + + # Sends response message + cdef tuple send_ops = ( + SendStatusFromServerOperation( + tuple(), StatusCode.ok, b'', _EMPTY_FLAGS), + SendInitialMetadataOperation(tuple(), _EMPTY_FLAGS), + SendMessageOperation(response_raw, _EMPTY_FLAGS), + ) + await callback_start_batch(rpc_state, send_ops, loop) + + +async def _server_call_request_call(_AioServerState server_state, object loop): + cdef grpc_call_error error + cdef RPCState rpc_state = RPCState() + cdef object future = loop.create_future() + cdef CallbackWrapper wrapper = CallbackWrapper(future) + # NOTE(lidiz) Without Py_INCREF, the wrapper object will be destructed + # when calling "await". This is an over-optimization by Cython. + cpython.Py_INCREF(wrapper) + error = grpc_server_request_call( + server_state.server.c_server, &rpc_state.call, &rpc_state.details, + &rpc_state.request_metadata, + server_state.cq, server_state.cq, + wrapper.c_functor() + ) + if error != GRPC_CALL_OK: + raise RuntimeError("Error in _server_call_request_call: %s" % error) + + await future + cpython.Py_DECREF(wrapper) + return rpc_state + + +async def _server_main_loop(_AioServerState server_state): + cdef object loop = asyncio.get_event_loop() + cdef RPCState rpc_state + cdef object waiter + + while True: + rpc_state = await _server_call_request_call( + server_state, + loop) + # await waiter + + loop.create_task(_handle_rpc(server_state, rpc_state, loop)) + await asyncio.sleep(0) + + +async def _server_start(_AioServerState server_state): + server_state.server.start() + await _server_main_loop(server_state) + + +cdef class _AioServerState: + def __cinit__(self): + self.server = None + self.cq = NULL + self.generic_handlers = [] + + +cdef class AioServer: + + def __init__(self, thread_pool, generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression): + self._state = _AioServerState() + self._state.server = Server(options) + self._state.cq = grpc_completion_queue_create_for_callback( + NULL, + NULL + ) + grpc_server_register_completion_queue( + self._state.server.c_server, + self._state.cq, + NULL + ) + self.add_generic_rpc_handlers(generic_handlers) + + if interceptors: + raise NotImplementedError() + if maximum_concurrent_rpcs: + raise NotImplementedError() + if compression: + raise NotImplementedError() + if thread_pool: + raise NotImplementedError() + + def add_generic_rpc_handlers(self, generic_rpc_handlers): + for h in generic_rpc_handlers: + self._state.generic_handlers.append(h) + + def add_insecure_port(self, address): + return self._state.server.add_http2_port(address) + + def add_secure_port(self, address, server_credentials): + return self._state.server.add_http2_port(address, + server_credentials._credentials) + + async def start(self): + loop = asyncio.get_event_loop() + loop.create_task(_server_start(self._state)) + await asyncio.sleep(0) + + def stop(self, unused_grace): + pass diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 4d081fb83e2..b0a2033cc40 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -47,3 +47,4 @@ include "_cygrpc/aio/grpc_aio.pxd.pxi" include "_cygrpc/aio/callbackcontext.pxd.pxi" include "_cygrpc/aio/call.pxd.pxi" include "_cygrpc/aio/channel.pxd.pxi" +include "_cygrpc/aio/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index c4635be72d3..5f980bb46f0 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -64,6 +64,7 @@ include "_cygrpc/aio/grpc_aio.pyx.pxi" include "_cygrpc/aio/call.pyx.pxi" include "_cygrpc/aio/channel.pyx.pxi" include "_cygrpc/aio/rpc_error.pyx.pxi" +include "_cygrpc/aio/server.pyx.pxi" # diff --git a/src/python/grpcio/grpc/experimental/BUILD.bazel b/src/python/grpcio/grpc/experimental/BUILD.bazel index 00815c4e72e..c9f0484c886 100644 --- a/src/python/grpcio/grpc/experimental/BUILD.bazel +++ b/src/python/grpcio/grpc/experimental/BUILD.bazel @@ -5,6 +5,7 @@ py_library( srcs = [ "aio/__init__.py", "aio/_channel.py", + "aio/_server.py", ], deps = [ "//src/python/grpcio/grpc/_cython:cygrpc", diff --git a/src/python/grpcio/grpc/experimental/aio/__init__.py b/src/python/grpcio/grpc/experimental/aio/__init__.py index b94343b8703..6f681bd021b 100644 --- a/src/python/grpcio/grpc/experimental/aio/__init__.py +++ b/src/python/grpcio/grpc/experimental/aio/__init__.py @@ -20,6 +20,7 @@ import six import grpc from grpc._cython import cygrpc from grpc._cython.cygrpc import init_grpc_aio +from ._server import server class Channel(six.with_metaclass(abc.ABCMeta)): diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py new file mode 100644 index 00000000000..ae5bc900423 --- /dev/null +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -0,0 +1,174 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Server-side implementation of gRPC Asyncio Python.""" + +from typing import Text, Optional +import asyncio +import grpc +from grpc._cython import cygrpc + +class Server: + """Serves RPCs.""" + + def __init__(self, thread_pool, generic_handlers, interceptors, options, + maximum_concurrent_rpcs, compression): + self._server = cygrpc.AioServer(thread_pool, generic_handlers, + interceptors, options, + maximum_concurrent_rpcs, compression) + + def add_generic_rpc_handlers( + self, + generic_rpc_handlers, + # generic_rpc_handlers: Iterable[grpc.GenericRpcHandlers] + ) -> None: + """Registers GenericRpcHandlers with this Server. + + This method is only safe to call before the server is started. + + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be + used to service RPCs. + """ + self._server.add_generic_rpc_handlers(generic_rpc_handlers) + + def add_insecure_port(self, address: Text) -> int: + """Opens an insecure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. If the port is 0, + or not specified in the address, then gRPC runtime will choose a port. + + Returns: + An integer port on which server will accept RPC requests. + """ + return self._server.add_insecure_port(address) + + def add_secure_port(self, address: Text, + server_credentials: grpc.ServerCredentials) -> int: + """Opens a secure port for accepting RPCs. + + This method may only be called before starting the server. + + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC + runtime will choose a port. + server_credentials: A ServerCredentials object. + + Returns: + An integer port on which server will accept RPC requests. + """ + return self._server.add_secure_port(address, server_credentials) + + async def start(self) -> None: + """Starts this Server. + + This method may only be called once. (i.e. it is not idempotent). + """ + await self._server.start() + + def stop(self, grace: Optional[float]) -> asyncio.Event: + """Stops this Server. + + This method immediately stop service of new RPCs in all cases. + + If a grace period is specified, this method returns immediately + and all RPCs active at the end of the grace period are aborted. + If a grace period is not specified (by passing None for `grace`), + all existing RPCs are aborted immediately and this method + blocks until the last RPC handler terminates. + + This method is idempotent and may be called at any time. + Passing a smaller grace value in a subsequent call will have + the effect of stopping the Server sooner (passing None will + have the effect of stopping the server immediately). Passing + a larger grace value in a subsequent call *will not* have the + effect of stopping the server later (i.e. the most restrictive + grace value is used). + + Args: + grace: A duration of time in seconds or None. + + Returns: + A threading.Event that will be set when this Server has completely + stopped, i.e. when running RPCs either complete or are aborted and + all handlers have terminated. + """ + raise NotImplementedError() + + async def wait_for_termination(self, + timeout: Optional[float] = None) -> bool: + """Block current thread until the server stops. + + This is an EXPERIMENTAL API. + + The wait will not consume computational resources during blocking, and + it will block until one of the two following conditions are met: + + 1) The server is stopped or terminated; + 2) A timeout occurs if timeout is not `None`. + + The timeout argument works in the same way as `threading.Event.wait()`. + https://docs.python.org/3/library/threading.html#threading.Event.wait + + Args: + timeout: A floating point number specifying a timeout for the + operation in seconds. + + Returns: + A bool indicates if the operation times out. + """ + if timeout: + raise NotImplementedError() + # TODO(lidiz) replace this wait forever logic + future = asyncio.get_event_loop().create_future() + await future + + +def server(thread_pool=None, + handlers=None, + interceptors=None, + options=None, + maximum_concurrent_rpcs=None, + compression=None): + """Creates a Server with which RPCs can be serviced. + + Args: + thread_pool: A futures.ThreadPoolExecutor to be used by the Server + to execute RPC handlers. + handlers: An optional list of GenericRpcHandlers used for executing RPCs. + More handlers may be added by calling add_generic_rpc_handlers any time + before the server is started. + interceptors: An optional list of ServerInterceptor objects that observe + and optionally manipulate the incoming RPCs before handing them over to + handlers. The interceptors are given control in the order they are + specified. This is an EXPERIMENTAL API. + options: An optional list of key-value pairs (channel args in gRPC runtime) + to configure the channel. + maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server + will service before returning RESOURCE_EXHAUSTED status, or None to + indicate no limit. + compression: An element of grpc.compression, e.g. + grpc.compression.Gzip. This compression algorithm will be used for the + lifetime of the server unless overridden. This is an EXPERIMENTAL option. + + Returns: + A Server object. + """ + return Server(thread_pool, () if handlers is None else handlers, () + if interceptors is None else interceptors, () + if options is None else options, maximum_concurrent_rpcs, + compression) diff --git a/src/python/grpcio_tests/tests_aio/benchmark/server.py b/src/python/grpcio_tests/tests_aio/benchmark/server.py new file mode 100644 index 00000000000..ef0a3f7ff2c --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/benchmark/server.py @@ -0,0 +1,52 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import unittest + +from grpc.experimental import aio +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import benchmark_service_pb2_grpc + + +class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): + + async def UnaryCall(self, request, context): + payload = messages_pb2.Payload(body=b'\0' * request.response_size) + return messages_pb2.SimpleResponse(payload=payload) + + +async def _start_async_server(): + server = aio.server() + + port = server.add_insecure_port(('localhost:%s' % 50051).encode('ASCII')) + servicer = BenchmarkServer() + benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( + servicer, server) + + await server.start() + await server.wait_for_termination() + + +def main(): + aio.init_grpc_aio() + loop = asyncio.get_event_loop() + loop.create_task(_start_async_server()) + loop.run_forever() + + +if __name__ == '__main__': + logging.basicConfig() + main() diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel new file mode 100644 index 00000000000..80f578e6c03 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -0,0 +1,42 @@ +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package( + default_testonly = 1, + default_visibility = ["//visibility:public"], +) + +GRPC_ASYNC_TESTS = [ + "server_test.py", +] + +[ + py_test( + name=test_file_name[:-3], + size="small", + srcs=[test_file_name], + main=test_file_name, + deps=[ + "//src/python/grpcio/grpc:grpcio", + "//src/proto/grpc/testing:py_messages_proto", + "//src/proto/grpc/testing:benchmark_service_py_pb2_grpc", + "//src/proto/grpc/testing:benchmark_service_py_pb2", + "//external:six" + ], + imports=["../../",], + data=[ + "//src/python/grpcio_tests/tests/unit/credentials", + ], + ) for test_file_name in GRPC_ASYNC_TESTS +] diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py new file mode 100644 index 00000000000..86beb02461a --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -0,0 +1,61 @@ +# Copyright 2019 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import unittest + +from grpc.experimental import aio +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import benchmark_service_pb2_grpc + + +class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): + + async def UnaryCall(self, request, context): + payload = messages_pb2.Payload(body=b'\0' * request.response_size) + return messages_pb2.SimpleResponse(payload=payload) + + +class TestServer(unittest.TestCase): + + def test_unary_unary(self): + loop = asyncio.get_event_loop() + + async def test_unary_unary_body(): + server = aio.server() + port = server.add_insecure_port(('[::]:0').encode('ASCII')) + benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( + BenchmarkServer(), server) + await server.start() + + async with aio.insecure_channel(f'localhost:{port}') as channel: + unary_call = channel.unary_unary( + '/grpc.testing.BenchmarkService/UnaryCall', + request_serializer=messages_pb2.SimpleRequest. + SerializeToString, + response_deserializer=messages_pb2.SimpleResponse.FromString + ) + response = await unary_call( + messages_pb2.SimpleRequest(response_size=1)) + self.assertIsInstance(response, messages_pb2.SimpleResponse) + self.assertEqual(1, len(response.payload.body)) + + loop.run_until_complete(test_unary_unary_body()) + + +if __name__ == '__main__': + aio.init_grpc_aio() + logging.basicConfig() + unittest.main(verbosity=2) From 175c8e44b92dd01cb79153d60a8bd69857dea0ff Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 15 Oct 2019 17:22:56 -0700 Subject: [PATCH 03/14] Cosmetic changes to increase readability --- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index e64f746118a..1584d34cd8b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -98,22 +98,14 @@ loop): batch_operation_tag.event(c_event) -async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object loop): - # Finds the method handler (application logic) - cdef object method_handler = _find_method_handler( - rpc_state, - server_state.generic_handlers - ) - if method_handler.request_streaming or method_handler.response_streaming: - raise NotImplementedError() - +async def _handle_unary_unary_rpc(object method_handler, RPCState rpc_state, object loop): # Receives request message cdef tuple receive_ops = ( ReceiveMessageOperation(_EMPTY_FLAGS), ) await callback_start_batch(rpc_state, receive_ops, loop) - # Parses the request + # Deserializes the request message cdef bytes request_raw = receive_ops[0].message() cdef object request_message if method_handler.request_deserializer: @@ -121,8 +113,10 @@ async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object l else: request_message = request_raw - # Executes application logic & encodes response message + # Executes application logic cdef object response_message = await method_handler.unary_unary(request_message, _ServicerContextPlaceHolder()) + + # Serializes the response message cdef bytes response_raw if method_handler.response_serializer: response_raw = method_handler.response_serializer(response_message) @@ -139,6 +133,26 @@ async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object l await callback_start_batch(rpc_state, send_ops, loop) +async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object loop): + # Finds the method handler (application logic) + cdef object method_handler = _find_method_handler( + rpc_state, + server_state.generic_handlers + ) + if method_handler is None: + # TODO(lidiz) return unimplemented error to client side + raise NotImplementedError() + # TODO(lidiz) extend to all 4 types of RPC + if method_handler.request_streaming or method_handler.response_streaming: + raise NotImplementedError() + else: + await _handle_unary_unary_rpc( + method_handler, + rpc_state, + loop + ) + + async def _server_call_request_call(_AioServerState server_state, object loop): cdef grpc_call_error error cdef RPCState rpc_state = RPCState() From 995c4f4455c938946c88df307b1c94ce3681cacc Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 15 Oct 2019 17:27:35 -0700 Subject: [PATCH 04/14] Try to pin the python_version to PY3 --- src/python/grpcio_tests/tests_aio/unit/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel index 80f578e6c03..c6d0a9f7728 100644 --- a/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests_aio/unit/BUILD.bazel @@ -27,6 +27,7 @@ GRPC_ASYNC_TESTS = [ size="small", srcs=[test_file_name], main=test_file_name, + python_version="PY3", deps=[ "//src/python/grpcio/grpc:grpcio", "//src/proto/grpc/testing:py_messages_proto", From 3c7cd1543d216747401fc07c5bf4f0524aea015e Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 15 Oct 2019 18:09:31 -0700 Subject: [PATCH 05/14] Make YAPF happy --- src/python/grpcio/grpc/experimental/aio/_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index ae5bc900423..b15f543efb5 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -18,6 +18,7 @@ import asyncio import grpc from grpc._cython import cygrpc + class Server: """Serves RPCs.""" From 5f4b8c350d28b71b851b9c94bdbcc19594fc30d9 Mon Sep 17 00:00:00 2001 From: Kyle Edwards Date: Wed, 16 Oct 2019 16:01:51 -0400 Subject: [PATCH 06/14] Honor CARES_ROOT_DIR Honor CARES_ROOT_DIR in the add_subdirectory() call. Fixes: #17876 --- cmake/cares.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cares.cmake b/cmake/cares.cmake index 4ea0d8725d0..c09f6940c91 100644 --- a/cmake/cares.cmake +++ b/cmake/cares.cmake @@ -22,7 +22,7 @@ if("${gRPC_CARES_PROVIDER}" STREQUAL "module") # See https://github.com/grpc/grpc/issues/17255 set(HAVE_LIBNSL OFF CACHE BOOL "avoid cares dependency on libnsl") endif() - add_subdirectory(third_party/cares/cares) + add_subdirectory("${CARES_ROOT_DIR}" third_party/cares/cares) if(TARGET c-ares) set(_gRPC_CARES_LIBRARIES c-ares) From 49c7f1ddf6572fc74adf062488b4351a8d489b7f Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 17 Oct 2019 15:17:48 -0700 Subject: [PATCH 07/14] Adopt reviewer's advices --- .../_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 11 +++---- .../_cython/_cygrpc/aio/iomgr/socket.pxd.pxi | 5 ++- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 26 +++++++++++---- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 8 +++++ .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 22 ++++++++++--- .../grpcio/grpc/experimental/aio/_server.py | 2 +- .../tests_aio/unit/server_test.py | 33 ++++++++++--------- 7 files changed, 72 insertions(+), 35 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index f0c33e4c7c0..1260b9102aa 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -27,7 +27,7 @@ cdef grpc_custom_poller_vtable asyncio_pollset_vtable cdef grpc_error* asyncio_socket_init( grpc_custom_socket* grpc_socket, int domain) with gil: - socket = _AsyncioSocket.create(grpc_socket) + socket = _AsyncioSocket.create(grpc_socket, None, None) Py_INCREF(socket) grpc_socket.impl = socket return 0 @@ -115,12 +115,11 @@ cdef grpc_error* asyncio_socket_listen(grpc_custom_socket* grpc_socket) with gil return grpc_error_none() -# TODO(lidiz) connects the so_reuse_port option to channel arguments def _asyncio_apply_socket_options(object s, so_reuse_port=False): - s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) - if so_reuse_port: - s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEPORT, 1) - s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) + # TODO(https://github.com/grpc/grpc/issues/20667) + # Connects the so_reuse_port option to channel arguments + s.setsockopt(native_socket.SOL_SOCKET, native_socket.SO_REUSEADDR, 1) + s.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) cdef grpc_error* asyncio_socket_bind( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi index 6cbd8e90155..285fbdcea09 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pxd.pxi @@ -35,7 +35,10 @@ cdef class _AsyncioSocket: object _peername @staticmethod - cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket) + cdef _AsyncioSocket create( + grpc_custom_socket * grpc_socket, + object reader, + object writer) @staticmethod cdef _AsyncioSocket create_with_py_socket(grpc_custom_socket * grpc_socket, object py_socket) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index bb6f74645fc..4e9116bddd6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -28,11 +28,18 @@ cdef class _AsyncioSocket: self._read_buffer = NULL self._server = None self._py_socket = None + self._peername = None @staticmethod - cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket): + cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, + object reader, + object writer): socket = _AsyncioSocket() socket._grpc_socket = grpc_socket + socket._reader = reader + socket._writer = writer + if writer is not None: + socket._peername = writer.get_extra_info('peername') return socket @staticmethod @@ -101,7 +108,13 @@ cdef class _AsyncioSocket: grpc_socket_error("read {}".format(error_msg).encode()) ) - cdef void connect(self, object host, object port, grpc_custom_connect_callback grpc_connect_cb): + cdef void connect(self, + object host, + object port, + grpc_custom_connect_callback grpc_connect_cb): + if self._reader: + return + assert not self._task_connect self._task_connect = asyncio.ensure_future( @@ -143,10 +156,11 @@ cdef class _AsyncioSocket: self._writer.close() def _new_connection_callback(self, object reader, object writer): - client_socket = _AsyncioSocket.create(self._grpc_client_socket) - client_socket._reader = reader - client_socket._writer = writer - client_socket._peername = addr = writer.get_extra_info('peername') + client_socket = _AsyncioSocket.create( + self._grpc_client_socket, + reader, + writer, + ) self._grpc_client_socket.impl = client_socket cpython.Py_INCREF(client_socket) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index ee5255213c8..40513473170 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -25,10 +25,18 @@ cdef class RPCState: cdef bytes method(self) +cdef enum AioServerStatus: + AIO_SERVER_STATUS_UNKNOWN + AIO_SERVER_STATUS_READY + AIO_SERVER_STATUS_RUNNING + AIO_SERVER_STATUS_STOPPED + + cdef class _AioServerState: cdef Server server cdef grpc_completion_queue *cq cdef list generic_handlers + cdef AioServerStatus status cdef class AioServer: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 1584d34cd8b..1bb70bf4c41 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -21,21 +21,26 @@ cdef class _HandlerCallDetails: class _ServicerContextPlaceHolder(object): pass +# TODO(https://github.com/grpc/grpc/issues/20669) +# Apply this to the client-side cdef class CallbackWrapper: cdef CallbackContext context - cdef object future + cdef object _keep_reference def __cinit__(self, object future): self.context.functor.functor_run = self.functor_run self.context.waiter = (future) - self.future = future + self._keep_reference = future @staticmethod cdef void functor_run( grpc_experimental_completion_queue_functor* functor, int succeed): cdef CallbackContext *context = functor - (context.waiter).set_result(None) + if succeed == 0: + (context.waiter).set_exception(RuntimeError()) + else: + (context.waiter).set_result(None) cdef grpc_experimental_completion_queue_functor *c_functor(self): return &self.context.functor @@ -178,13 +183,11 @@ async def _server_call_request_call(_AioServerState server_state, object loop): async def _server_main_loop(_AioServerState server_state): cdef object loop = asyncio.get_event_loop() cdef RPCState rpc_state - cdef object waiter while True: rpc_state = await _server_call_request_call( server_state, loop) - # await waiter loop.create_task(_handle_rpc(server_state, rpc_state, loop)) await asyncio.sleep(0) @@ -212,6 +215,7 @@ cdef class AioServer: NULL, NULL ) + self._state.status = AIO_SERVER_STATUS_READY grpc_server_register_completion_queue( self._state.server.c_server, self._state.cq, @@ -240,9 +244,17 @@ cdef class AioServer: server_credentials._credentials) async def start(self): + if self._state.status == AIO_SERVER_STATUS_RUNNING: + return + elif self._state.status != AIO_SERVER_STATUS_READY: + raise RuntimeError('Server not in ready state') + + self._state.status = AIO_SERVER_STATUS_RUNNING loop = asyncio.get_event_loop() loop.create_task(_server_start(self._state)) await asyncio.sleep(0) + # TODO(https://github.com/grpc/grpc/issues/20668) + # Implement Destruction Methods for AsyncIO Server def stop(self, unused_grace): pass diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index b15f543efb5..5670d7c0282 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -112,7 +112,7 @@ class Server: async def wait_for_termination(self, timeout: Optional[float] = None) -> bool: - """Block current thread until the server stops. + """Block current coroutine until the server stops. This is an EXPERIMENTAL API. diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 86beb02461a..1a744fadc83 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -16,16 +16,25 @@ import asyncio import logging import unittest +import grpc from grpc.experimental import aio from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import benchmark_service_pb2_grpc +_TEST_METHOD_PATH = '' -class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' - async def UnaryCall(self, request, context): - payload = messages_pb2.Payload(body=b'\0' * request.response_size) - return messages_pb2.SimpleResponse(payload=payload) + +async def unary_unary(unused_request, unused_context): + return _RESPONSE + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, unused_handler_details): + return grpc.unary_unary_rpc_method_handler(unary_unary) class TestServer(unittest.TestCase): @@ -36,21 +45,13 @@ class TestServer(unittest.TestCase): async def test_unary_unary_body(): server = aio.server() port = server.add_insecure_port(('[::]:0').encode('ASCII')) - benchmark_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( - BenchmarkServer(), server) + server.add_generic_rpc_handlers((GenericHandler(),)) await server.start() async with aio.insecure_channel(f'localhost:{port}') as channel: - unary_call = channel.unary_unary( - '/grpc.testing.BenchmarkService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest. - SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString - ) - response = await unary_call( - messages_pb2.SimpleRequest(response_size=1)) - self.assertIsInstance(response, messages_pb2.SimpleResponse) - self.assertEqual(1, len(response.payload.body)) + unary_call = channel.unary_unary(_TEST_METHOD_PATH) + response = await unary_call(_REQUEST) + self.assertEqual(response, _RESPONSE) loop.run_until_complete(test_unary_unary_body()) From 2ced359d787c4002949fc4a62da8c51dce624c1d Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Fri, 18 Oct 2019 18:03:36 -0700 Subject: [PATCH 08/14] Adopt reviewer's suggestions --- .../_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 22 +++-- .../_cython/_cygrpc/aio/iomgr/socket.pyx.pxi | 12 +-- .../grpc/_cython/_cygrpc/aio/server.pxd.pxi | 13 +-- .../grpc/_cython/_cygrpc/aio/server.pyx.pxi | 95 +++++++++++-------- .../grpcio/grpc/experimental/aio/_server.py | 26 ++--- .../tests_aio/unit/server_test.py | 2 +- 6 files changed, 97 insertions(+), 73 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 1260b9102aa..1bb1f4a2ae4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -17,6 +17,7 @@ from cpython cimport Py_INCREF, Py_DECREF from libc cimport string import socket as native_socket +import ipaddress # CPython 3.3 and above cdef grpc_socket_vtable asyncio_socket_vtable cdef grpc_custom_resolver_vtable asyncio_resolver_vtable @@ -87,6 +88,7 @@ cdef grpc_error* asyncio_socket_getpeername( cdef grpc_resolved_address c_addr hostname = str_to_bytes(peer[0]) grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy string.memcpy(addr, c_addr.addr, c_addr.len) length[0] = c_addr.len return grpc_error_none() @@ -105,6 +107,7 @@ cdef grpc_error* asyncio_socket_getsockname( peer = socket.sockname() hostname = str_to_bytes(peer[0]) grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + # TODO(https://github.com/grpc/grpc/issues/20684) Remove the memcpy string.memcpy(addr, c_addr.addr, c_addr.len) length[0] = c_addr.len return grpc_error_none() @@ -128,19 +131,20 @@ cdef grpc_error* asyncio_socket_bind( size_t len, int flags) with gil: host, port = sockaddr_to_tuple(addr, len) try: - try: - socket = native_socket.socket(family=native_socket.AF_INET6) - _asyncio_apply_socket_options(socket) - socket.bind((host, port)) - except native_socket.gaierror: - socket = native_socket.socket(family=native_socket.AF_INET) - _asyncio_apply_socket_options(socket) - socket.bind((host, port)) + ip = ipaddress.ip_address(host) + if isinstance(ip, ipaddress.IPv6Address): + family = native_socket.AF_INET6 + else: + family = native_socket.AF_INET + + socket = native_socket.socket(family=family) + _asyncio_apply_socket_options(socket) + socket.bind((host, port)) except IOError as io_error: return socket_error("bind", str(io_error)) else: aio_socket = _AsyncioSocket.create_with_py_socket(grpc_socket, socket) - cpython.Py_INCREF(aio_socket) + cpython.Py_INCREF(aio_socket) # Py_DECREF in asyncio_socket_destroy grpc_socket.impl = aio_socket return grpc_error_none() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 4e9116bddd6..2d56a568348 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -112,9 +112,7 @@ cdef class _AsyncioSocket: object host, object port, grpc_custom_connect_callback grpc_connect_cb): - if self._reader: - return - + assert not self._reader assert not self._task_connect self._task_connect = asyncio.ensure_future( @@ -163,11 +161,11 @@ cdef class _AsyncioSocket: ) self._grpc_client_socket.impl = client_socket - cpython.Py_INCREF(client_socket) + cpython.Py_INCREF(client_socket) # Py_DECREF in asyncio_socket_destroy # Accept callback expects to be called with: - # * An grpc custom socket for server - # * An grpc custom socket for client (with new Socket instance) - # * An error object + # grpc_custom_socket: A grpc custom socket for server + # grpc_custom_socket: A grpc custom socket for client (with new Socket instance) + # grpc_error: An error object self._grpc_accept_cb(self._grpc_socket, self._grpc_client_socket, grpc_error_none()) cdef listen(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi index 40513473170..1906463d088 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pxd.pxi @@ -32,12 +32,13 @@ cdef enum AioServerStatus: AIO_SERVER_STATUS_STOPPED -cdef class _AioServerState: - cdef Server server - cdef grpc_completion_queue *cq - cdef list generic_handlers - cdef AioServerStatus status +cdef class _CallbackCompletionQueue: + cdef grpc_completion_queue *_cq + cdef grpc_completion_queue* c_ptr(self) cdef class AioServer: - cdef _AioServerState _state + cdef Server _server + cdef _CallbackCompletionQueue _cq + cdef list _generic_handlers + cdef AioServerStatus _status diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 1bb70bf4c41..2768517a9e2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -25,12 +25,12 @@ class _ServicerContextPlaceHolder(object): pass # Apply this to the client-side cdef class CallbackWrapper: cdef CallbackContext context - cdef object _keep_reference + cdef object _reference def __cinit__(self, object future): self.context.functor.functor_run = self.functor_run self.context.waiter = (future) - self._keep_reference = future + self._reference = future @staticmethod cdef void functor_run( @@ -63,10 +63,10 @@ cdef class RPCState: grpc_call_unref(self.call) -cdef _find_method_handler(RPCState rpc_state, list generic_handlers): +cdef _find_method_handler(str method, list generic_handlers): # TODO(lidiz) connects Metadata to call details cdef _HandlerCallDetails handler_call_details = _HandlerCallDetails( - rpc_state.method().decode(), + method, tuple() ) @@ -77,8 +77,9 @@ cdef _find_method_handler(RPCState rpc_state, list generic_handlers): return None -async def callback_start_batch(RPCState rpc_state, tuple operations, object -loop): +async def callback_start_batch(RPCState rpc_state, + tuple operations, + object loop): """The callback version of start batch operations.""" cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(None, operations, None) batch_operation_tag.prepare() @@ -100,10 +101,13 @@ loop): await future cpython.Py_DECREF(wrapper) cdef grpc_event c_event + # Tag.event must be called, otherwise messages won't be parsed from C batch_operation_tag.event(c_event) -async def _handle_unary_unary_rpc(object method_handler, RPCState rpc_state, object loop): +async def _handle_unary_unary_rpc(object method_handler, + RPCState rpc_state, + object loop): # Receives request message cdef tuple receive_ops = ( ReceiveMessageOperation(_EMPTY_FLAGS), @@ -138,11 +142,11 @@ async def _handle_unary_unary_rpc(object method_handler, RPCState rpc_state, obj await callback_start_batch(rpc_state, send_ops, loop) -async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object loop): +async def _handle_rpc(list generic_handlers, RPCState rpc_state, object loop): # Finds the method handler (application logic) cdef object method_handler = _find_method_handler( - rpc_state, - server_state.generic_handlers + rpc_state.method().decode(), + generic_handlers ) if method_handler is None: # TODO(lidiz) return unimplemented error to client side @@ -158,7 +162,9 @@ async def _handle_rpc(_AioServerState server_state, RPCState rpc_state, object l ) -async def _server_call_request_call(_AioServerState server_state, object loop): +async def _server_call_request_call(Server server, + _CallbackCompletionQueue cq, + object loop): cdef grpc_call_error error cdef RPCState rpc_state = RPCState() cdef object future = loop.create_future() @@ -167,9 +173,9 @@ async def _server_call_request_call(_AioServerState server_state, object loop): # when calling "await". This is an over-optimization by Cython. cpython.Py_INCREF(wrapper) error = grpc_server_request_call( - server_state.server.c_server, &rpc_state.call, &rpc_state.details, + server.c_server, &rpc_state.call, &rpc_state.details, &rpc_state.request_metadata, - server_state.cq, server_state.cq, + cq.c_ptr(), cq.c_ptr(), wrapper.c_functor() ) if error != GRPC_CALL_OK: @@ -180,45 +186,52 @@ async def _server_call_request_call(_AioServerState server_state, object loop): return rpc_state -async def _server_main_loop(_AioServerState server_state): +async def _server_main_loop(Server server, + _CallbackCompletionQueue cq, + list generic_handlers): cdef object loop = asyncio.get_event_loop() cdef RPCState rpc_state while True: rpc_state = await _server_call_request_call( - server_state, + server, + cq, loop) - loop.create_task(_handle_rpc(server_state, rpc_state, loop)) + loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) await asyncio.sleep(0) -async def _server_start(_AioServerState server_state): - server_state.server.start() - await _server_main_loop(server_state) +async def _server_start(Server server, + _CallbackCompletionQueue cq, + list generic_handlers): + server.start() + await _server_main_loop(server, cq, generic_handlers) + +cdef class _CallbackCompletionQueue: -cdef class _AioServerState: def __cinit__(self): - self.server = None - self.cq = NULL - self.generic_handlers = [] + self._cq = grpc_completion_queue_create_for_callback( + NULL, + NULL + ) + + cdef grpc_completion_queue* c_ptr(self): + return self._cq cdef class AioServer: def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs, compression): - self._state = _AioServerState() - self._state.server = Server(options) - self._state.cq = grpc_completion_queue_create_for_callback( - NULL, - NULL - ) - self._state.status = AIO_SERVER_STATUS_READY + self._server = Server(options) + self._cq = _CallbackCompletionQueue() + self._status = AIO_SERVER_STATUS_READY + self._generic_handlers = [] grpc_server_register_completion_queue( - self._state.server.c_server, - self._state.cq, + self._server.c_server, + self._cq.c_ptr(), NULL ) self.add_generic_rpc_handlers(generic_handlers) @@ -234,24 +247,28 @@ cdef class AioServer: def add_generic_rpc_handlers(self, generic_rpc_handlers): for h in generic_rpc_handlers: - self._state.generic_handlers.append(h) + self._generic_handlers.append(h) def add_insecure_port(self, address): - return self._state.server.add_http2_port(address) + return self._server.add_http2_port(address) def add_secure_port(self, address, server_credentials): - return self._state.server.add_http2_port(address, + return self._server.add_http2_port(address, server_credentials._credentials) async def start(self): - if self._state.status == AIO_SERVER_STATUS_RUNNING: + if self._status == AIO_SERVER_STATUS_RUNNING: return - elif self._state.status != AIO_SERVER_STATUS_READY: + elif self._status != AIO_SERVER_STATUS_READY: raise RuntimeError('Server not in ready state') - self._state.status = AIO_SERVER_STATUS_RUNNING + self._status = AIO_SERVER_STATUS_RUNNING loop = asyncio.get_event_loop() - loop.create_task(_server_start(self._state)) + loop.create_task(_server_start( + self._server, + self._cq, + self._generic_handlers, + )) await asyncio.sleep(0) # TODO(https://github.com/grpc/grpc/issues/20668) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 5670d7c0282..6bc3d210aed 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -16,6 +16,7 @@ from typing import Text, Optional import asyncio import grpc +from grpc import _common from grpc._cython import cygrpc @@ -50,12 +51,12 @@ class Server: Args: address: The address for which to open a port. If the port is 0, - or not specified in the address, then gRPC runtime will choose a port. + or not specified in the address, then the gRPC runtime will choose a port. Returns: - An integer port on which server will accept RPC requests. + An integer port on which the server will accept RPC requests. """ - return self._server.add_insecure_port(address) + return self._server.add_insecure_port(_common.encode(address)) def add_secure_port(self, address: Text, server_credentials: grpc.ServerCredentials) -> int: @@ -65,14 +66,15 @@ class Server: Args: address: The address for which to open a port. - if the port is 0, or not specified in the address, then gRPC + if the port is 0, or not specified in the address, then the gRPC runtime will choose a port. server_credentials: A ServerCredentials object. Returns: - An integer port on which server will accept RPC requests. + An integer port on which the server will accept RPC requests. """ - return self._server.add_secure_port(address, server_credentials) + return self._server.add_secure_port( + _common.encode(address), server_credentials) async def start(self) -> None: """Starts this Server. @@ -84,7 +86,8 @@ class Server: def stop(self, grace: Optional[float]) -> asyncio.Event: """Stops this Server. - This method immediately stop service of new RPCs in all cases. + "This method immediately stops the server from servicing new RPCs in + all cases. If a grace period is specified, this method returns immediately and all RPCs active at the end of the grace period are aborted. @@ -139,7 +142,7 @@ class Server: await future -def server(thread_pool=None, +def server(migration_thread_pool=None, handlers=None, interceptors=None, options=None, @@ -148,8 +151,8 @@ def server(thread_pool=None, """Creates a Server with which RPCs can be serviced. Args: - thread_pool: A futures.ThreadPoolExecutor to be used by the Server - to execute RPC handlers. + migration_thread_pool: A futures.ThreadPoolExecutor to be used by the + Server to execute non-AsyncIO RPC handlers for migration purpose. handlers: An optional list of GenericRpcHandlers used for executing RPCs. More handlers may be added by calling add_generic_rpc_handlers any time before the server is started. @@ -169,7 +172,8 @@ def server(thread_pool=None, Returns: A Server object. """ - return Server(thread_pool, () if handlers is None else handlers, () + return Server(migration_thread_pool, () + if handlers is None else handlers, () if interceptors is None else interceptors, () if options is None else options, maximum_concurrent_rpcs, compression) diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 1a744fadc83..22e04478da3 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -44,7 +44,7 @@ class TestServer(unittest.TestCase): async def test_unary_unary_body(): server = aio.server() - port = server.add_insecure_port(('[::]:0').encode('ASCII')) + port = server.add_insecure_port('[::]:0') server.add_generic_rpc_handlers((GenericHandler(),)) await server.start() From d8d272119ac54c161f5c32fa013aacf4fa002d93 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Mon, 21 Oct 2019 08:53:05 +1300 Subject: [PATCH 09/14] Provide length when getting serialization buffer writer --- src/csharp/Grpc.Core.Api/SerializationContext.cs | 4 ++-- .../Internal/DefaultSerializationContextTest.cs | 10 +++++----- .../Grpc.Core/Internal/DefaultSerializationContext.cs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/csharp/Grpc.Core.Api/SerializationContext.cs b/src/csharp/Grpc.Core.Api/SerializationContext.cs index 59e14c12e3b..1cc4b53619f 100644 --- a/src/csharp/Grpc.Core.Api/SerializationContext.cs +++ b/src/csharp/Grpc.Core.Api/SerializationContext.cs @@ -41,7 +41,8 @@ namespace Grpc.Core /// Gets buffer writer that can be used to write the serialized data. Once serialization is finished, /// Complete() needs to be called. /// - public virtual IBufferWriter GetBufferWriter() + /// The total length of the payload in bytes. + public virtual IBufferWriter GetBufferWriter(int payloadLength) { throw new NotImplementedException(); } @@ -52,7 +53,6 @@ namespace Grpc.Core public virtual void Complete() { throw new NotImplementedException(); - } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs index 061230d8ca4..0245c370e66 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs @@ -84,7 +84,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(); + var bufferWriter = context.GetBufferWriter(payloadSize); origPayload.AsSpan().CopyTo(bufferWriter.GetSpan(payloadSize)); bufferWriter.Advance(payloadSize); context.Complete(); @@ -106,7 +106,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(); + var bufferWriter = context.GetBufferWriter(payloadSize); origPayload.AsSpan().CopyTo(bufferWriter.GetMemory(payloadSize).Span); bufferWriter.Advance(payloadSize); context.Complete(); @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(); + var bufferWriter = context.GetBufferWriter(payloadSize); for (int offset = 0; offset < payloadSize; offset += maxSliceSize) { var sliceSize = Math.Min(maxSliceSize, payloadSize - offset); @@ -165,7 +165,7 @@ namespace Grpc.Core.Internal.Tests var origPayload2 = GetTestBuffer(20); - var bufferWriter = context.GetBufferWriter(); + var bufferWriter = context.GetBufferWriter(20); origPayload2.AsSpan().CopyTo(bufferWriter.GetMemory(origPayload2.Length).Span); bufferWriter.Advance(origPayload2.Length); context.Complete(); @@ -185,7 +185,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; context.Complete(GetTestBuffer(10)); - Assert.Throws(typeof(InvalidOperationException), () => context.GetBufferWriter()); + Assert.Throws(typeof(InvalidOperationException), () => context.GetBufferWriter(10)); } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs index 4d45b5c684f..6d8795df970 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs @@ -50,7 +50,7 @@ namespace Grpc.Core.Internal /// /// Expose serializer as buffer writer /// - public override IBufferWriter GetBufferWriter() + public override IBufferWriter GetBufferWriter(int payloadLength) { GrpcPreconditions.CheckState(!isComplete); return sliceBuffer; From 08bbbcd7514a2f74af32816ecb63e825d98f1152 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 21 Oct 2019 10:24:31 -0700 Subject: [PATCH 10/14] Remove `asyncio.sleep(0)` --- src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi index 2768517a9e2..5bab542f467 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi @@ -199,7 +199,6 @@ async def _server_main_loop(Server server, loop) loop.create_task(_handle_rpc(generic_handlers, rpc_state, loop)) - await asyncio.sleep(0) async def _server_start(Server server, @@ -269,7 +268,6 @@ cdef class AioServer: self._cq, self._generic_handlers, )) - await asyncio.sleep(0) # TODO(https://github.com/grpc/grpc/issues/20668) # Implement Destruction Methods for AsyncIO Server From fbe1bc9e8f8826bfc309185a1522b92f8308f7a3 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 21 Oct 2019 16:00:57 -0700 Subject: [PATCH 11/14] Suppress the ImportError of ipaddress for Python 2 --- .../grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 1bb1f4a2ae4..a5811f8a2fe 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -17,7 +17,10 @@ from cpython cimport Py_INCREF, Py_DECREF from libc cimport string import socket as native_socket -import ipaddress # CPython 3.3 and above +try: + import ipaddress # CPython 3.3 and above +except ImportError: + pass cdef grpc_socket_vtable asyncio_socket_vtable cdef grpc_custom_resolver_vtable asyncio_resolver_vtable From a588b5ffb9e35f4498eea8db69854b9c75de533a Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Mon, 21 Oct 2019 17:07:11 -0700 Subject: [PATCH 12/14] Remove the usage of f-string --- src/python/grpcio_tests/tests_aio/unit/server_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/server_test.py b/src/python/grpcio_tests/tests_aio/unit/server_test.py index 22e04478da3..2d543893176 100644 --- a/src/python/grpcio_tests/tests_aio/unit/server_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/server_test.py @@ -48,7 +48,7 @@ class TestServer(unittest.TestCase): server.add_generic_rpc_handlers((GenericHandler(),)) await server.start() - async with aio.insecure_channel(f'localhost:{port}') as channel: + async with aio.insecure_channel('localhost:%d' % port) as channel: unary_call = channel.unary_unary(_TEST_METHOD_PATH) response = await unary_call(_REQUEST) self.assertEqual(response, _RESPONSE) From f4480fa8b2b9e6572208bcd041eff3026648b23f Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 22 Oct 2019 16:37:11 +1300 Subject: [PATCH 13/14] Change to use SetPayloadLength --- src/csharp/Grpc.Core.Api/SerializationContext.cs | 16 +++++++++++++--- .../Internal/DefaultSerializationContextTest.cs | 10 +++++----- .../Internal/DefaultSerializationContext.cs | 7 ++++++- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/csharp/Grpc.Core.Api/SerializationContext.cs b/src/csharp/Grpc.Core.Api/SerializationContext.cs index 1cc4b53619f..9e7d7d3a156 100644 --- a/src/csharp/Grpc.Core.Api/SerializationContext.cs +++ b/src/csharp/Grpc.Core.Api/SerializationContext.cs @@ -39,12 +39,22 @@ namespace Grpc.Core /// /// Gets buffer writer that can be used to write the serialized data. Once serialization is finished, - /// Complete() needs to be called. + /// Complete() needs to be called. A null value will be returned if serialization + /// with a buffer writer is not supported. + /// + public virtual IBufferWriter GetBufferWriter() + { + return null; + } + + /// + /// Sets the payload length when writing serialized data a buffer writer. This method should be called before GetBufferWriter. + /// Calling this method is optional. If the payload length is not set then the length is calculated using the data written to the + /// buffer writer when Complete() is called. /// /// The total length of the payload in bytes. - public virtual IBufferWriter GetBufferWriter(int payloadLength) + public virtual void SetPayloadLength(int payloadLength) { - throw new NotImplementedException(); } /// diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs index 0245c370e66..061230d8ca4 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultSerializationContextTest.cs @@ -84,7 +84,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(payloadSize); + var bufferWriter = context.GetBufferWriter(); origPayload.AsSpan().CopyTo(bufferWriter.GetSpan(payloadSize)); bufferWriter.Advance(payloadSize); context.Complete(); @@ -106,7 +106,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(payloadSize); + var bufferWriter = context.GetBufferWriter(); origPayload.AsSpan().CopyTo(bufferWriter.GetMemory(payloadSize).Span); bufferWriter.Advance(payloadSize); context.Complete(); @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; var origPayload = GetTestBuffer(payloadSize); - var bufferWriter = context.GetBufferWriter(payloadSize); + var bufferWriter = context.GetBufferWriter(); for (int offset = 0; offset < payloadSize; offset += maxSliceSize) { var sliceSize = Math.Min(maxSliceSize, payloadSize - offset); @@ -165,7 +165,7 @@ namespace Grpc.Core.Internal.Tests var origPayload2 = GetTestBuffer(20); - var bufferWriter = context.GetBufferWriter(20); + var bufferWriter = context.GetBufferWriter(); origPayload2.AsSpan().CopyTo(bufferWriter.GetMemory(origPayload2.Length).Span); bufferWriter.Advance(origPayload2.Length); context.Complete(); @@ -185,7 +185,7 @@ namespace Grpc.Core.Internal.Tests var context = scope.Context; context.Complete(GetTestBuffer(10)); - Assert.Throws(typeof(InvalidOperationException), () => context.GetBufferWriter(10)); + Assert.Throws(typeof(InvalidOperationException), () => context.GetBufferWriter()); } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs index 6d8795df970..981ff69dbdc 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs @@ -50,12 +50,17 @@ namespace Grpc.Core.Internal /// /// Expose serializer as buffer writer /// - public override IBufferWriter GetBufferWriter(int payloadLength) + public override IBufferWriter GetBufferWriter() { GrpcPreconditions.CheckState(!isComplete); return sliceBuffer; } + public override void SetPayloadLength(int payloadLength) + { + // Length is calculated using the buffer writer + } + /// /// Complete the payload written so far. /// From b240210521693c3e83de9893b841583fa6b51154 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 22 Oct 2019 21:51:50 +1300 Subject: [PATCH 14/14] PR feedback --- src/csharp/Grpc.Core.Api/SerializationContext.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/csharp/Grpc.Core.Api/SerializationContext.cs b/src/csharp/Grpc.Core.Api/SerializationContext.cs index 9e7d7d3a156..021ca29c3d5 100644 --- a/src/csharp/Grpc.Core.Api/SerializationContext.cs +++ b/src/csharp/Grpc.Core.Api/SerializationContext.cs @@ -39,18 +39,19 @@ namespace Grpc.Core /// /// Gets buffer writer that can be used to write the serialized data. Once serialization is finished, - /// Complete() needs to be called. A null value will be returned if serialization - /// with a buffer writer is not supported. + /// Complete() needs to be called. /// public virtual IBufferWriter GetBufferWriter() { - return null; + throw new NotImplementedException(); } /// - /// Sets the payload length when writing serialized data a buffer writer. This method should be called before GetBufferWriter. - /// Calling this method is optional. If the payload length is not set then the length is calculated using the data written to the - /// buffer writer when Complete() is called. + /// Sets the payload length when writing serialized data into a buffer writer. If the serializer knows the full payload + /// length in advance, providing that information before obtaining the buffer writer using GetBufferWriter() can improve + /// serialization efficiency by avoiding copies. The provided payload length must be the same as the data written to the writer. + /// Calling this method is optional. If the payload length is not set then the length is calculated using the data written to + /// the buffer writer when Complete() is called. /// /// The total length of the payload in bytes. public virtual void SetPayloadLength(int payloadLength)