pull/18095/head
Eric Gribkoff 6 years ago
parent 2ba9a5aaa8
commit bf7107b9dc
  1. 20
      src/python/grpcio/grpc/_server.py
  2. 39
      src/python/grpcio_health_checking/grpc_health/v1/health.py
  3. 7
      src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
  4. 1
      src/python/grpcio_tests/tests/unit/_rpc_test.py

@ -391,11 +391,11 @@ def _call_behavior(rpc_event,
behavior,
argument,
request_deserializer,
on_next_callback=None):
send_response_callback=None):
context = _Context(rpc_event, state, request_deserializer)
try:
if on_next_callback is not None:
return behavior(argument, context, on_next_callback), True
if send_response_callback is not None:
return behavior(argument, context, send_response_callback), True
else:
return behavior(argument, context), True
except Exception as exception: # pylint: disable=broad-except
@ -510,7 +510,7 @@ def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
cygrpc.install_context_from_call(rpc_event.call)
def on_next(response):
def send_response(response):
if response is None:
_status(rpc_event, state, None)
else:
@ -530,13 +530,13 @@ def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
behavior,
argument,
request_deserializer,
on_next_callback=on_next)
send_response_callback=send_response)
else:
response_iterator, proceed = _call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
_stream_response_iterator_adapter(rpc_event, state, on_next,
response_iterator)
_send_message_callback_to_blocking_iterator_adapter(
rpc_event, state, send_response, response_iterator)
finally:
cygrpc.uninstall_context()
@ -545,13 +545,13 @@ def _is_rpc_state_active(state):
return state.client is not _CANCELLED and not state.statused
def _stream_response_iterator_adapter(rpc_event, state, on_next_callback,
response_iterator):
def _send_message_callback_to_blocking_iterator_adapter(
rpc_event, state, send_response_callback, response_iterator):
while True:
response, proceed = _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
on_next_callback(response)
send_response_callback(response)
if not _is_rpc_state_active(state):
break
else:

@ -60,15 +60,15 @@ class _Watcher():
self._condition.notify()
def _watcher_to_on_next_callback_adapter(watcher):
def _watcher_to_send_response_callback_adapter(watcher):
def on_next_callback(response):
def send_response_callback(response):
if response is None:
watcher.close()
else:
watcher.add(response)
return on_next_callback
return send_response_callback
class HealthServicer(_health_pb2_grpc.HealthServicer):
@ -79,16 +79,17 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
experimental_thread_pool=None):
self._lock = threading.RLock()
self._server_status = {}
self._on_next_callbacks = {}
self._send_response_callbacks = {}
self.Watch.__func__.experimental_non_blocking = experimental_non_blocking
self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
def _on_close_callback(self, on_next_callback, service):
def _on_close_callback(self, send_response_callback, service):
def callback():
with self._lock:
self._on_next_callbacks[service].remove(on_next_callback)
on_next_callback(None)
self._send_response_callbacks[service].remove(
send_response_callback)
send_response_callback(None)
return callback
@ -102,26 +103,27 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
return _health_pb2.HealthCheckResponse(status=status)
# pylint: disable=arguments-differ
def Watch(self, request, context, on_next_callback=None):
def Watch(self, request, context, send_response_callback=None):
blocking_watcher = None
if on_next_callback is None:
if send_response_callback is None:
# The server does not support the experimental_non_blocking
# parameter. For backwards compatibility, return a blocking response
# generator.
blocking_watcher = _Watcher()
on_next_callback = _watcher_to_on_next_callback_adapter(
send_response_callback = _watcher_to_send_response_callback_adapter(
blocking_watcher)
service = request.service
with self._lock:
status = self._server_status.get(service)
if status is None:
status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member
on_next_callback(_health_pb2.HealthCheckResponse(status=status))
if service not in self._on_next_callbacks:
self._on_next_callbacks[service] = set()
self._on_next_callbacks[service].add(on_next_callback)
send_response_callback(
_health_pb2.HealthCheckResponse(status=status))
if service not in self._send_response_callbacks:
self._send_response_callbacks[service] = set()
self._send_response_callbacks[service].add(send_response_callback)
context.add_callback(
self._on_close_callback(on_next_callback, service))
self._on_close_callback(send_response_callback, service))
return blocking_watcher
def set(self, service, status):
@ -134,7 +136,8 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
"""
with self._lock:
self._server_status[service] = status
if service in self._on_next_callbacks:
for on_next_callback in self._on_next_callbacks[service]:
on_next_callback(
if service in self._send_response_callbacks:
for send_response_callback in self._send_response_callbacks[
service]:
send_response_callback(
_health_pb2.HealthCheckResponse(status=status))

@ -196,10 +196,11 @@ class BaseWatchTests(object):
# Wait, if necessary, for serving thread to process client cancellation
timeout = time.time() + test_constants.SHORT_TIMEOUT
while time.time(
) < timeout and self._servicer._on_next_callbacks[_WATCH_SERVICE]:
) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]:
time.sleep(1)
self.assertFalse(self._servicer._on_next_callbacks[_WATCH_SERVICE],
'watch set should be empty')
self.assertFalse(
self._servicer._send_response_callbacks[_WATCH_SERVICE],
'watch set should be empty')
self.assertTrue(response_queue.empty())

@ -101,7 +101,6 @@ class _Handler(object):
for _ in range(test_constants.STREAM_LENGTH):
self._control.control()
on_next(request)
# yield request
self._control.control()
if servicer_context is not None:
servicer_context.set_trailing_metadata(((

Loading…
Cancel
Save