@ -13,18 +13,31 @@
# limitations under the License.
""" Service-side implementation of gRPC Python. """
from __future__ import annotations
import collections
from concurrent import futures
import enum
import logging
import threading
import time
from typing import ( Any , Callable , Iterable , Iterator , List , Mapping , Optional ,
Sequence , Set , Tuple , Union )
import grpc
from grpc import _common
from grpc import _compression
from grpc import _interceptor
import grpc # pytype: disable=pyi-error
from grpc import _common # pytype: disable=pyi-error
from grpc import _compression # pytype: disable=pyi-error
from grpc import _interceptor # pytype: disable=pyi-error
from grpc . _cython import cygrpc
from grpc . _typing import ArityAgnosticMethodHandler
from grpc . _typing import ChannelArgumentType
from grpc . _typing import DeserializingFunction
from grpc . _typing import MetadataType
from grpc . _typing import NullaryCallbackType
from grpc . _typing import ResponseType
from grpc . _typing import SerializingFunction
from grpc . _typing import ServerCallbackTag
from grpc . _typing import ServerTagCallbackType
_LOGGER = logging . getLogger ( __name__ )
@ -51,30 +64,31 @@ _DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
_INF_TIMEOUT = 1e9
def _serialized_request ( request_event ) :
def _serialized_request ( request_event : cygrpc . BaseEvent ) - > bytes :
return request_event . batch_operations [ 0 ] . message ( )
def _application_code ( code ) :
def _application_code ( code : grpc . StatusCode ) - > cygrpc . StatusCode :
cygrpc_code = _common . STATUS_CODE_TO_CYGRPC_STATUS_CODE . get ( code )
return cygrpc . StatusCode . unknown if cygrpc_code is None else cygrpc_code
def _completion_code ( state ) :
def _completion_code ( state : _RPCState ) - > cygrpc . StatusCode :
if state . code is None :
return cygrpc . StatusCode . ok
else :
return _application_code ( state . code )
def _abortion_code ( state , code ) :
def _abortion_code ( state : _RPCState ,
code : cygrpc . StatusCode ) - > cygrpc . StatusCode :
if state . code is None :
return code
else :
return _application_code ( state . code )
def _details ( state ) :
def _details ( state : _RPCState ) - > bytes :
return b ' ' if state . details is None else state . details
@ -87,6 +101,20 @@ class _HandlerCallDetails(
class _RPCState ( object ) :
condition : threading . Condition
due = Set [ str ]
request : Any
client : str
initial_metadata_allowed : bool
compression_algorithm : Optional [ grpc . Compression ]
disable_next_compression : bool
trailing_metadata : Optional [ MetadataType ]
code : Optional [ grpc . StatusCode ]
details : Optional [ bytes ]
statused : bool
rpc_errors : List [ Exception ]
callbacks : Optional [ List [ NullaryCallbackType ] ]
aborted : bool
def __init__ ( self ) :
self . condition = threading . Condition ( )
@ -105,13 +133,14 @@ class _RPCState(object):
self . aborted = False
def _raise_rpc_error ( state ) :
def _raise_rpc_error ( state : _RPCState ) - > None :
rpc_error = grpc . RpcError ( )
state . rpc_errors . append ( rpc_error )
raise rpc_error
def _possibly_finish_call ( state , token ) :
def _possibly_finish_call ( state : _RPCState ,
token : str ) - > ServerTagCallbackType :
state . due . remove ( token )
if not _is_rpc_state_active ( state ) and not state . due :
callbacks = state . callbacks
@ -121,7 +150,7 @@ def _possibly_finish_call(state, token):
return None , ( )
def _send_status_from_server ( state , token ) :
def _send_status_from_server ( state : _RPCState , token : str ) - > ServerCallbackTag :
def send_status_from_server ( unused_send_status_from_server_event ) :
with state . condition :
@ -130,7 +159,9 @@ def _send_status_from_server(state, token):
return send_status_from_server
def _get_initial_metadata ( state , metadata ) :
def _get_initial_metadata (
state : _RPCState ,
metadata : Optional [ MetadataType ] ) - > Optional [ MetadataType ] :
with state . condition :
if state . compression_algorithm :
compression_metadata = (
@ -144,13 +175,15 @@ def _get_initial_metadata(state, metadata):
return metadata
def _get_initial_metadata_operation ( state , metadata ) :
def _get_initial_metadata_operation (
state : _RPCState , metadata : Optional [ MetadataType ] ) - > cygrpc . Operation :
operation = cygrpc . SendInitialMetadataOperation (
_get_initial_metadata ( state , metadata ) , _EMPTY_FLAGS )
return operation
def _abort ( state , call , code , details ) :
def _abort ( state : _RPCState , call : cygrpc . Call , code : cygrpc . StatusCode ,
details : bytes ) - > None :
if state . client is not _CANCELLED :
effective_code = _abortion_code ( state , code )
effective_details = details if state . details is None else state . details
@ -174,7 +207,7 @@ def _abort(state, call, code, details):
state . due . add ( token )
def _receive_close_on_server ( state ) :
def _receive_close_on_server ( state : _RPCState ) - > ServerCallbackTag :
def receive_close_on_server ( receive_close_on_server_event ) :
with state . condition :
@ -188,7 +221,10 @@ def _receive_close_on_server(state):
return receive_close_on_server
def _receive_message ( state , call , request_deserializer ) :
def _receive_message (
state : _RPCState , call : cygrpc . Call ,
request_deserializer : Optional [ DeserializingFunction ]
) - > ServerCallbackTag :
def receive_message ( receive_message_event ) :
serialized_request = _serialized_request ( receive_message_event )
@ -213,7 +249,7 @@ def _receive_message(state, call, request_deserializer):
return receive_message
def _send_initial_metadata ( state ) :
def _send_initial_metadata ( state : _RPCState ) - > ServerCallbackTag :
def send_initial_metadata ( unused_send_initial_metadata_event ) :
with state . condition :
@ -222,7 +258,7 @@ def _send_initial_metadata(state):
return send_initial_metadata
def _send_message ( state , token ) :
def _send_message ( state : _RPCState , token : str ) - > ServerCallbackTag :
def send_message ( unused_send_message_event ) :
with state . condition :
@ -233,23 +269,27 @@ def _send_message(state, token):
class _Context ( grpc . ServicerContext ) :
_rpc_event : cygrpc . BaseEvent
_state : _RPCState
request_deserializer : Optional [ DeserializingFunction ]
def __init__ ( self , rpc_event , state , request_deserializer ) :
def __init__ ( self , rpc_event : cygrpc . BaseEvent , state : _RPCState ,
request_deserializer : Optional [ DeserializingFunction ] ) :
self . _rpc_event = rpc_event
self . _state = state
self . _request_deserializer = request_deserializer
def is_active ( self ) :
def is_active ( self ) - > bool :
with self . _state . condition :
return _is_rpc_state_active ( self . _state )
def time_remaining ( self ) :
def time_remaining ( self ) - > float :
return max ( self . _rpc_event . call_details . deadline - time . time ( ) , 0 )
def cancel ( self ) :
def cancel ( self ) - > None :
self . _rpc_event . call . cancel ( )
def add_callback ( self , callback ) :
def add_callback ( self , callback : NullaryCallbackType ) - > bool :
with self . _state . condition :
if self . _state . callbacks is None :
return False
@ -257,24 +297,24 @@ class _Context(grpc.ServicerContext):
self . _state . callbacks . append ( callback )
return True
def disable_next_message_compression ( self ) :
def disable_next_message_compression ( self ) - > None :
with self . _state . condition :
self . _state . disable_next_compression = True
def invocation_metadata ( self ) :
def invocation_metadata ( self ) - > Optional [ MetadataType ] :
return self . _rpc_event . invocation_metadata
def peer ( self ) :
def peer ( self ) - > str :
return _common . decode ( self . _rpc_event . call . peer ( ) )
def peer_identities ( self ) :
def peer_identities ( self ) - > Optional [ Sequence [ bytes ] ] :
return cygrpc . peer_identities ( self . _rpc_event . call )
def peer_identity_key ( self ) :
def peer_identity_key ( self ) - > Optional [ str ] :
id_key = cygrpc . peer_identity_key ( self . _rpc_event . call )
return id_key if id_key is None else _common . decode ( id_key )
def auth_context ( self ) :
def auth_context ( self ) - > Mapping [ str , Sequence [ bytes ] ] :
auth_context = cygrpc . auth_context ( self . _rpc_event . call )
auth_context_dict = { } if auth_context is None else auth_context
return {
@ -282,11 +322,11 @@ class _Context(grpc.ServicerContext):
for key , value in auth_context_dict . items ( )
}
def set_compression ( self , compression ) :
def set_compression ( self , compression : grpc . Compression ) - > None :
with self . _state . condition :
self . _state . compression_algorithm = compression
def send_initial_metadata ( self , initial_metadata ) :
def send_initial_metadata ( self , initial_metadata : MetadataType ) - > None :
with self . _state . condition :
if self . _state . client is _CANCELLED :
_raise_rpc_error ( self . _state )
@ -301,14 +341,14 @@ class _Context(grpc.ServicerContext):
else :
raise ValueError ( ' Initial metadata no longer allowed! ' )
def set_trailing_metadata ( self , trailing_metadata ) :
def set_trailing_metadata ( self , trailing_metadata : MetadataType ) - > None :
with self . _state . condition :
self . _state . trailing_metadata = trailing_metadata
def trailing_metadata ( self ) :
def trailing_metadata ( self ) - > Optional [ MetadataType ] :
return self . _state . trailing_metadata
def abort ( self , code , details ) :
def abort ( self , code : grpc . StatusCode , details : str ) - > None :
# treat OK like other invalid arguments: fail the RPC
if code == grpc . StatusCode . OK :
_LOGGER . error (
@ -321,36 +361,40 @@ class _Context(grpc.ServicerContext):
self . _state . aborted = True
raise Exception ( )
def abort_with_status ( self , status ) :
def abort_with_status ( self , status : grpc . Status ) - > None :
self . _state . trailing_metadata = status . trailing_metadata
self . abort ( status . code , status . details )
def set_code ( self , code ) :
def set_code ( self , code : grpc . StatusCode ) - > None :
with self . _state . condition :
self . _state . code = code
def code ( self ) :
def code ( self ) - > grpc . StatusCode :
return self . _state . code
def set_details ( self , details ) :
def set_details ( self , details : str ) - > None :
with self . _state . condition :
self . _state . details = _common . encode ( details )
def details ( self ) :
def details ( self ) - > bytes :
return self . _state . details
def _finalize_state ( self ) :
def _finalize_state ( self ) - > None :
pass
class _RequestIterator ( object ) :
_state : _RPCState
_call : cygrpc . Call
_request_deserializer : Optional [ DeserializingFunction ]
def __init__ ( self , state , call , request_deserializer ) :
def __init__ ( self , state : _RPCState , call : cygrpc . Call ,
request_deserializer : Optional [ DeserializingFunction ] ) :
self . _state = state
self . _call = call
self . _request_deserializer = request_deserializer
def _raise_or_start_receive_message ( self ) :
def _raise_or_start_receive_message ( self ) - > None :
if self . _state . client is _CANCELLED :
_raise_rpc_error ( self . _state )
elif not _is_rpc_state_active ( self . _state ) :
@ -362,7 +406,7 @@ class _RequestIterator(object):
self . _request_deserializer ) )
self . _state . due . add ( _RECEIVE_MESSAGE_TOKEN )
def _look_for_request ( self ) :
def _look_for_request ( self ) - > Any :
if self . _state . client is _CANCELLED :
_raise_rpc_error ( self . _state )
elif ( self . _state . request is None and
@ -375,7 +419,7 @@ class _RequestIterator(object):
raise AssertionError ( ) # should never run
def _next ( self ) :
def _next ( self ) - > Any :
with self . _state . condition :
self . _raise_or_start_receive_message ( )
while True :
@ -384,17 +428,20 @@ class _RequestIterator(object):
if request is not None :
return request
def __iter__ ( self ) :
def __iter__ ( self ) - > _RequestIterator :
return self
def __next__ ( self ) :
def __next__ ( self ) - > Any :
return self . _next ( )
def next ( self ) :
def next ( self ) - > Any :
return self . _next ( )
def _unary_request ( rpc_event , state , request_deserializer ) :
def _unary_request (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
request_deserializer : Optional [ DeserializingFunction ]
) - > Callable [ [ ] , Any ] :
def unary_request ( ) :
with state . condition :
@ -426,13 +473,15 @@ def _unary_request(rpc_event, state, request_deserializer):
return unary_request
def _call_behavior ( rpc_event ,
state ,
behavior ,
argument ,
request_deserializer ,
send_response_callback = None ) :
from grpc import _create_servicer_context
def _call_behavior (
rpc_event : cygrpc . BaseEvent ,
state : _RPCState ,
behavior : ArityAgnosticMethodHandler ,
argument : Any ,
request_deserializer : Optional [ DeserializingFunction ] ,
send_response_callback : Optional [ Callable [ [ ResponseType ] , None ] ] = None
) - > Tuple [ Union [ ResponseType , Iterator [ ResponseType ] ] , bool ] :
from grpc import _create_servicer_context # pytype: disable=pyi-error
with _create_servicer_context ( rpc_event , state ,
request_deserializer ) as context :
try :
@ -457,7 +506,9 @@ def _call_behavior(rpc_event,
return None , False
def _take_response_from_response_iterator ( rpc_event , state , response_iterator ) :
def _take_response_from_response_iterator (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
response_iterator : Iterator [ ResponseType ] ) - > Tuple [ ResponseType , bool ] :
try :
return next ( response_iterator ) , True
except StopIteration :
@ -475,7 +526,9 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
return None , False
def _serialize_response ( rpc_event , state , response , response_serializer ) :
def _serialize_response (
rpc_event : cygrpc . BaseEvent , state : _RPCState , response : Any ,
response_serializer : Optional [ SerializingFunction ] ) - > Optional [ bytes ] :
serialized_response = _common . serialize ( response , response_serializer )
if serialized_response is None :
with state . condition :
@ -486,19 +539,21 @@ def _serialize_response(rpc_event, state, response, response_serializer):
return serialized_response
def _get_send_message_op_flags_from_state ( state ) :
def _get_send_message_op_flags_from_state (
state : _RPCState ) - > Union [ int , cygrpc . WriteFlag ] :
if state . disable_next_compression :
return cygrpc . WriteFlag . no_compress
else :
return _EMPTY_FLAGS
def _reset_per_message_state ( state ) :
def _reset_per_message_state ( state : _RPCState ) - > None :
with state . condition :
state . disable_next_compression = False
def _send_response ( rpc_event , state , serialized_response ) :
def _send_response ( rpc_event : cygrpc . BaseEvent , state : _RPCState ,
serialized_response : bytes ) - > bool :
with state . condition :
if not _is_rpc_state_active ( state ) :
return False
@ -527,7 +582,8 @@ def _send_response(rpc_event, state, serialized_response):
return _is_rpc_state_active ( state )
def _status ( rpc_event , state , serialized_response ) :
def _status ( rpc_event : cygrpc . BaseEvent , state : _RPCState ,
serialized_response : Optional [ bytes ] ) - > None :
with state . condition :
if state . client is not _CANCELLED :
code = _completion_code ( state )
@ -552,8 +608,11 @@ def _status(rpc_event, state, serialized_response):
state . due . add ( _SEND_STATUS_FROM_SERVER_TOKEN )
def _unary_response_in_pool ( rpc_event , state , behavior , argument_thunk ,
request_deserializer , response_serializer ) :
def _unary_response_in_pool (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
behavior : ArityAgnosticMethodHandler , argument_thunk : Callable [ [ ] , Any ] ,
request_deserializer : Optional [ SerializingFunction ] ,
response_serializer : Optional [ SerializingFunction ] ) - > None :
cygrpc . install_context_from_request_call_event ( rpc_event )
try :
argument = argument_thunk ( )
@ -569,11 +628,14 @@ def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
cygrpc . uninstall_context ( )
def _stream_response_in_pool ( rpc_event , state , behavior , argument_thunk ,
request_deserializer , response_serializer ) :
def _stream_response_in_pool (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
behavior : ArityAgnosticMethodHandler , argument_thunk : Callable [ [ ] , Any ] ,
request_deserializer : Optional [ DeserializingFunction ] ,
response_serializer : Optional [ SerializingFunction ] ) - > None :
cygrpc . install_context_from_request_call_event ( rpc_event )
def send_response ( response ) :
def send_response ( response : Any ) - > None :
if response is None :
_status ( rpc_event , state , None )
else :
@ -604,13 +666,14 @@ def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
cygrpc . uninstall_context ( )
def _is_rpc_state_active ( state ) :
def _is_rpc_state_active ( state : _RPCState ) - > bool :
return state . client is not _CANCELLED and not state . statused
def _send_message_callback_to_blocking_iterator_adapter ( rpc_event , state ,
send_response_callback ,
response_iterator ) :
def _send_message_callback_to_blocking_iterator_adapter (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
send_response_callback : Callable [ [ ResponseType ] , None ] ,
response_iterator : Iterator [ ResponseType ] ) - > None :
while True :
response , proceed = _take_response_from_response_iterator (
rpc_event , state , response_iterator )
@ -622,7 +685,10 @@ def _send_message_callback_to_blocking_iterator_adapter(rpc_event, state,
break
def _select_thread_pool_for_behavior ( behavior , default_thread_pool ) :
def _select_thread_pool_for_behavior (
behavior : ArityAgnosticMethodHandler ,
default_thread_pool : futures . ThreadPoolExecutor
) - > futures . ThreadPoolExecutor :
if hasattr ( behavior , ' experimental_thread_pool ' ) and isinstance (
behavior . experimental_thread_pool , futures . ThreadPoolExecutor ) :
return behavior . experimental_thread_pool
@ -630,7 +696,10 @@ def _select_thread_pool_for_behavior(behavior, default_thread_pool):
return default_thread_pool
def _handle_unary_unary ( rpc_event , state , method_handler , default_thread_pool ) :
def _handle_unary_unary (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
method_handler : grpc . RpcMethodHandler ,
default_thread_pool : futures . ThreadPoolExecutor ) - > futures . Future :
unary_request = _unary_request ( rpc_event , state ,
method_handler . request_deserializer )
thread_pool = _select_thread_pool_for_behavior ( method_handler . unary_unary ,
@ -641,7 +710,10 @@ def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):
method_handler . response_serializer )
def _handle_unary_stream ( rpc_event , state , method_handler , default_thread_pool ) :
def _handle_unary_stream (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
method_handler : grpc . RpcMethodHandler ,
default_thread_pool : futures . ThreadPoolExecutor ) - > futures . Future :
unary_request = _unary_request ( rpc_event , state ,
method_handler . request_deserializer )
thread_pool = _select_thread_pool_for_behavior ( method_handler . unary_stream ,
@ -652,7 +724,10 @@ def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool):
method_handler . response_serializer )
def _handle_stream_unary ( rpc_event , state , method_handler , default_thread_pool ) :
def _handle_stream_unary (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
method_handler : grpc . RpcMethodHandler ,
default_thread_pool : futures . ThreadPoolExecutor ) - > futures . Future :
request_iterator = _RequestIterator ( state , rpc_event . call ,
method_handler . request_deserializer )
thread_pool = _select_thread_pool_for_behavior ( method_handler . stream_unary ,
@ -664,8 +739,10 @@ def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool):
method_handler . response_serializer )
def _handle_stream_stream ( rpc_event , state , method_handler ,
default_thread_pool ) :
def _handle_stream_stream (
rpc_event : cygrpc . BaseEvent , state : _RPCState ,
method_handler : grpc . RpcMethodHandler ,
default_thread_pool : futures . ThreadPoolExecutor ) - > futures . Future :
request_iterator = _RequestIterator ( state , rpc_event . call ,
method_handler . request_deserializer )
thread_pool = _select_thread_pool_for_behavior ( method_handler . stream_stream ,
@ -677,9 +754,14 @@ def _handle_stream_stream(rpc_event, state, method_handler,
method_handler . response_serializer )
def _find_method_handler ( rpc_event , generic_handlers , interceptor_pipeline ) :
def _find_method_handler (
rpc_event : cygrpc . BaseEvent , generic_handlers : List [ grpc . GenericRpcHandler ] ,
interceptor_pipeline : Optional [ _interceptor . _ServicePipeline ]
) - > Optional [ grpc . RpcMethodHandler ] :
def query_handlers ( handler_call_details ) :
def query_handlers (
handler_call_details : _HandlerCallDetails
) - > Optional [ grpc . RpcMethodHandler ] :
for generic_handler in generic_handlers :
method_handler = generic_handler . service ( handler_call_details )
if method_handler is not None :
@ -697,7 +779,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
return query_handlers ( handler_call_details )
def _reject_rpc ( rpc_event , status , details ) :
def _reject_rpc ( rpc_event : cygrpc . BaseEvent , status : cygrpc . StatusCode ,
details : bytes ) - > _RPCState :
rpc_state = _RPCState ( )
operations = (
_get_initial_metadata_operation ( rpc_state , None ) ,
@ -712,7 +795,10 @@ def _reject_rpc(rpc_event, status, details):
return rpc_state
def _handle_with_method_handler ( rpc_event , method_handler , thread_pool ) :
def _handle_with_method_handler (
rpc_event : cygrpc . BaseEvent , method_handler : grpc . RpcMethodHandler ,
thread_pool : futures . ThreadPoolExecutor
) - > Tuple [ _RPCState , futures . Future ] :
state = _RPCState ( )
with state . condition :
rpc_event . call . start_server_batch (
@ -735,8 +821,11 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
method_handler , thread_pool )
def _handle_call ( rpc_event , generic_handlers , interceptor_pipeline , thread_pool ,
concurrency_exceeded ) :
def _handle_call (
rpc_event : cygrpc . BaseEvent , generic_handlers : List [ grpc . GenericRpcHandler ] ,
interceptor_pipeline : Optional [ _interceptor . _ServicePipeline ] ,
thread_pool : futures . ThreadPoolExecutor , concurrency_exceeded : bool
) - > Tuple [ Optional [ _RPCState ] , Optional [ futures . Future ] ] :
if not rpc_event . success :
return None , None
if rpc_event . call_details . method is not None :
@ -769,10 +858,28 @@ class _ServerStage(enum.Enum):
class _ServerState ( object ) :
lock : threading . RLock
completion_queue : cygrpc . CompletionQueue
server : cygrpc . Server
generic_handlers : List [ grpc . GenericRpcHandler ]
interceptor_pipeline : Optional [ _interceptor . _ServicePipeline ]
thread_pool : futures . ThreadPoolExecutor
stage : _ServerStage
termination_event : threading . Event
shutdown_events : List [ threading . Event ]
maximum_concurrent_rpcs : Optional [ int ]
active_rpc_count : int
rpc_states : Set [ _RPCState ]
due : Set [ str ]
server_deallocated : bool
# pylint: disable=too-many-arguments
def __init__ ( self , completion_queue , server , generic_handlers ,
interceptor_pipeline , thread_pool , maximum_concurrent_rpcs ) :
def __init__ ( self , completion_queue : cygrpc . CompletionQueue ,
server : cygrpc . Server ,
generic_handlers : Sequence [ grpc . GenericRpcHandler ] ,
interceptor_pipeline : Optional [ _interceptor . _ServicePipeline ] ,
thread_pool : futures . ThreadPoolExecutor ,
maximum_concurrent_rpcs : Optional [ int ] ) :
self . lock = threading . RLock ( )
self . completion_queue = completion_queue
self . server = server
@ -793,30 +900,33 @@ class _ServerState(object):
self . server_deallocated = False
def _add_generic_handlers ( state , generic_handlers ) :
def _add_generic_handlers (
state : _ServerState ,
generic_handlers : Iterable [ grpc . GenericRpcHandler ] ) - > None :
with state . lock :
state . generic_handlers . extend ( generic_handlers )
def _add_insecure_port ( state , address ) :
def _add_insecure_port ( state : _ServerState , address : bytes ) - > int :
with state . lock :
return state . server . add_http2_port ( address )
def _add_secure_port ( state , address , server_credentials ) :
def _add_secure_port ( state : _ServerState , address : bytes ,
server_credentials : grpc . ServerCredentials ) - > int :
with state . lock :
return state . server . add_http2_port ( address ,
server_credentials . _credentials )
def _request_call ( state ) :
def _request_call ( state : _ServerState ) - > None :
state . server . request_call ( state . completion_queue , state . completion_queue ,
_REQUEST_CALL_TAG )
state . due . add ( _REQUEST_CALL_TAG )
# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
def _stop_serving ( state ) :
def _stop_serving ( state : _ServerState ) - > bool :
if not state . rpc_states and not state . due :
state . server . destroy ( )
for shutdown_event in state . shutdown_events :
@ -827,12 +937,13 @@ def _stop_serving(state):
return False
def _on_call_completed ( state ) :
def _on_call_completed ( state : _ServerState ) - > None :
with state . lock :
state . active_rpc_count - = 1
def _process_event_and_continue ( state , event ) :
def _process_event_and_continue ( state : _ServerState ,
event : cygrpc . BaseEvent ) - > bool :
should_continue = True
if event . tag is _SHUTDOWN_TAG :
with state . lock :
@ -874,7 +985,7 @@ def _process_event_and_continue(state, event):
return should_continue
def _serve ( state ) :
def _serve ( state : _ServerState ) - > None :
while True :
timeout = time . time ( ) + _DEALLOCATED_SERVER_CHECK_PERIOD_S
event = state . completion_queue . poll ( timeout )
@ -889,7 +1000,7 @@ def _serve(state):
event = None
def _begin_shutdown_once ( state ) :
def _begin_shutdown_once ( state : _ServerState ) - > None :
with state . lock :
if state . stage is _ServerStage . STARTED :
state . server . shutdown ( state . completion_queue , _SHUTDOWN_TAG )
@ -897,7 +1008,7 @@ def _begin_shutdown_once(state):
state . due . add ( _SHUTDOWN_TAG )
def _stop ( state , grace ) :
def _stop ( state : _ServerState , grace : Optional [ float ] ) - > threading . Event :
with state . lock :
if state . stage is _ServerStage . STOPPED :
shutdown_event = threading . Event ( )
@ -923,7 +1034,7 @@ def _stop(state, grace):
return shutdown_event
def _start ( state ) :
def _start ( state : _ServerState ) - > None :
with state . lock :
if state . stage is not _ServerStage . STOPPED :
raise ValueError ( ' Cannot start already-started server! ' )
@ -936,7 +1047,8 @@ def _start(state):
thread . start ( )
def _validate_generic_rpc_handlers ( generic_rpc_handlers ) :
def _validate_generic_rpc_handlers (
generic_rpc_handlers : Iterable [ grpc . GenericRpcHandler ] ) - > None :
for generic_rpc_handler in generic_rpc_handlers :
service_attribute = getattr ( generic_rpc_handler , ' service ' , None )
if service_attribute is None :
@ -945,16 +1057,24 @@ def _validate_generic_rpc_handlers(generic_rpc_handlers):
' not have " service " method! ' . format ( generic_rpc_handler ) )
def _augment_options ( base_options , compression ) :
def _augment_options (
base_options : Sequence [ ChannelArgumentType ] ,
compression : Optional [ grpc . Compression ]
) - > Sequence [ ChannelArgumentType ] :
compression_option = _compression . create_channel_option ( compression )
return tuple ( base_options ) + compression_option
class _Server ( grpc . Server ) :
_state : _ServerState
# pylint: disable=too-many-arguments
def __init__ ( self , thread_pool , generic_handlers , interceptors , options ,
maximum_concurrent_rpcs , compression , xds ) :
def __init__ ( self , thread_pool : futures . ThreadPoolExecutor ,
generic_handlers : Sequence [ grpc . GenericRpcHandler ] ,
interceptors : Sequence [ grpc . ServerInterceptor ] ,
options : Sequence [ ChannelArgumentType ] ,
maximum_concurrent_rpcs : Optional [ int ] ,
compression : Optional [ grpc . Compression ] , xds : bool ) :
completion_queue = cygrpc . CompletionQueue ( )
server = cygrpc . Server ( _augment_options ( options , compression ) , xds )
server . register_completion_queue ( completion_queue )
@ -962,24 +1082,27 @@ class _Server(grpc.Server):
_interceptor . service_pipeline ( interceptors ) ,
thread_pool , maximum_concurrent_rpcs )
def add_generic_rpc_handlers ( self , generic_rpc_handlers ) :
def add_generic_rpc_handlers (
self ,
generic_rpc_handlers : Iterable [ grpc . GenericRpcHandler ] ) - > None :
_validate_generic_rpc_handlers ( generic_rpc_handlers )
_add_generic_handlers ( self . _state , generic_rpc_handlers )
def add_insecure_port ( self , address ) :
def add_insecure_port ( self , address : str ) - > int :
return _common . validate_port_binding_result (
address , _add_insecure_port ( self . _state , _common . encode ( address ) ) )
def add_secure_port ( self , address , server_credentials ) :
def add_secure_port ( self , address : str ,
server_credentials : grpc . ServerCredentials ) - > int :
return _common . validate_port_binding_result (
address ,
_add_secure_port ( self . _state , _common . encode ( address ) ,
server_credentials ) )
def start ( self ) :
def start ( self ) - > None :
_start ( self . _state )
def wait_for_termination ( self , timeout = None ) :
def wait_for_termination ( self , timeout : Optional [ float ] = None ) - > bool :
# NOTE(https://bugs.python.org/issue35935)
# Remove this workaround once threading.Event.wait() is working with
# CTRL+C across platforms.
@ -987,7 +1110,7 @@ class _Server(grpc.Server):
self . _state . termination_event . is_set ,
timeout = timeout )
def stop ( self , grace ) :
def stop ( self , grace : Optional [ float ] ) - > threading . Event :
return _stop ( self . _state , grace )
def __del__ ( self ) :
@ -997,8 +1120,13 @@ class _Server(grpc.Server):
self . _state . server_deallocated = True
def create_server ( thread_pool , generic_rpc_handlers , interceptors , options ,
maximum_concurrent_rpcs , compression , xds ) :
def create_server ( thread_pool : futures . ThreadPoolExecutor ,
generic_rpc_handlers : Sequence [ grpc . GenericRpcHandler ] ,
interceptors : Sequence [ grpc . ServerInterceptor ] ,
options : Sequence [ ChannelArgumentType ] ,
maximum_concurrent_rpcs : Optional [ int ] ,
compression : Optional [ grpc . Compression ] ,
xds : bool ) - > _Server :
_validate_generic_rpc_handlers ( generic_rpc_handlers )
return _Server ( thread_pool , generic_rpc_handlers , interceptors , options ,
maximum_concurrent_rpcs , compression , xds )