Use wait_for_ready in tests

pull/22254/head
Richard Belleville 5 years ago
parent 6b42bcc03d
commit c9d90e5c1b
  1. 14
      src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py

@ -544,7 +544,9 @@ class SimpleStubsPluginTest(unittest.TestCase):
def testUnaryCall(self): def testUnaryCall(self):
request = request_pb2.SimpleRequest(response_size=13) request = request_pb2.SimpleRequest(response_size=13)
response = service_pb2_grpc.TestService.UnaryCall(request, self._target) response = service_pb2_grpc.TestService.UnaryCall(request,
self._target,
wait_for_ready=True)
expected_response = self.servicer_methods.UnaryCall( expected_response = self.servicer_methods.UnaryCall(
request, 'not a real context!') request, 'not a real context!')
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
@ -554,21 +556,23 @@ class SimpleStubsPluginTest(unittest.TestCase):
expected_responses = self.servicer_methods.StreamingOutputCall( expected_responses = self.servicer_methods.StreamingOutputCall(
request, 'not a real RpcContext!') request, 'not a real RpcContext!')
responses = service_pb2_grpc.TestService.StreamingOutputCall( responses = service_pb2_grpc.TestService.StreamingOutputCall(
request, self._target) request, self._target, wait_for_ready=True)
for expected_response, response in moves.zip_longest( for expected_response, response in moves.zip_longest(
expected_responses, responses): expected_responses, responses):
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
def testStreamingInputCall(self): def testStreamingInputCall(self):
response = service_pb2_grpc.TestService.StreamingInputCall( response = service_pb2_grpc.TestService.StreamingInputCall(
_streaming_input_request_iterator(), self._target) _streaming_input_request_iterator(),
self._target,
wait_for_ready=True)
expected_response = self.servicer_methods.StreamingInputCall( expected_response = self.servicer_methods.StreamingInputCall(
_streaming_input_request_iterator(), 'not a real RpcContext!') _streaming_input_request_iterator(), 'not a real RpcContext!')
self.assertEqual(expected_response, response) self.assertEqual(expected_response, response)
def testFullDuplexCall(self): def testFullDuplexCall(self):
responses = service_pb2_grpc.TestService.FullDuplexCall( responses = service_pb2_grpc.TestService.FullDuplexCall(
_full_duplex_request_iterator(), self._target) _full_duplex_request_iterator(), self._target, wait_for_ready=True)
expected_responses = self.servicer_methods.FullDuplexCall( expected_responses = self.servicer_methods.FullDuplexCall(
_full_duplex_request_iterator(), 'not a real RpcContext!') _full_duplex_request_iterator(), 'not a real RpcContext!')
for expected_response, response in moves.zip_longest( for expected_response, response in moves.zip_longest(
@ -587,7 +591,7 @@ class SimpleStubsPluginTest(unittest.TestCase):
yield request yield request
responses = service_pb2_grpc.TestService.HalfDuplexCall( responses = service_pb2_grpc.TestService.HalfDuplexCall(
half_duplex_request_iterator(), self._target) half_duplex_request_iterator(), self._target, wait_for_ready=True)
expected_responses = self.servicer_methods.HalfDuplexCall( expected_responses = self.servicer_methods.HalfDuplexCall(
half_duplex_request_iterator(), 'not a real RpcContext!') half_duplex_request_iterator(), 'not a real RpcContext!')
for expected_response, response in moves.zip_longest( for expected_response, response in moves.zip_longest(

Loading…
Cancel
Save