|
|
|
@ -164,6 +164,19 @@ class FirstServiceServicerTest(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) |
|
|
|
|
|
|
|
|
|
def test_servicer_context_abort(self): |
|
|
|
|
rpc = self._real_time_server.invoke_unary_unary( |
|
|
|
|
_application_testing_common.FIRST_SERVICE_UNUN, (), |
|
|
|
|
_application_common.ABORT_REQUEST, None) |
|
|
|
|
response, trailing_metadata, code, details = rpc.termination() |
|
|
|
|
self.assertIs(code, grpc.StatusCode.PERMISSION_DENIED) |
|
|
|
|
rpc = self._real_time_server.invoke_unary_unary( |
|
|
|
|
_application_testing_common.FIRST_SERVICE_UNUN, (), |
|
|
|
|
_application_common.ABORT_SUCCESS_QUERY, None) |
|
|
|
|
response, trailing_metadata, code, details = rpc.termination() |
|
|
|
|
self.assertEqual(_application_common.ABORT_SUCCESS_RESPONSE, response) |
|
|
|
|
self.assertIs(code, grpc.StatusCode.OK) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
unittest.main(verbosity=2) |
|
|
|
|