Support mutating a value used for a response in grpcio_testing

pull/21290/head
Richard Belleville 5 years ago
parent b4d6916552
commit aa156984b1
  1. 3
      src/python/grpcio_testing/grpc_testing/_server/_service.py
  2. 2
      src/python/grpcio_tests/tests/testing/_application_common.py
  3. 12
      src/python/grpcio_tests/tests/testing/_server_application.py
  4. 25
      src/python/grpcio_tests/tests/testing/_server_test.py

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
import grpc import grpc
@ -59,7 +60,7 @@ def _stream_response(argument, implementation, rpc, servicer_context):
else: else:
while True: while True:
try: try:
response = next(response_iterator) response = copy.deepcopy(next(response_iterator))
except StopIteration: except StopIteration:
rpc.stream_response_complete() rpc.stream_response_complete()
break break

@ -37,5 +37,7 @@ ABORT_SUCCESS_QUERY = requests_pb2.Up(first_up_field=43)
ABORT_NO_STATUS_RESPONSE = services_pb2.Down(first_down_field=50) ABORT_NO_STATUS_RESPONSE = services_pb2.Down(first_down_field=50)
ABORT_SUCCESS_RESPONSE = services_pb2.Down(first_down_field=51) ABORT_SUCCESS_RESPONSE = services_pb2.Down(first_down_field=51)
ABORT_FAILURE_RESPONSE = services_pb2.Down(first_down_field=52) ABORT_FAILURE_RESPONSE = services_pb2.Down(first_down_field=52)
STREAM_STREAM_MUTATING_REQUEST = requests_pb2.Top(first_top_field=24601)
STREAM_STREAM_MUTATING_COUNT = 2
INFINITE_REQUEST_STREAM_TIMEOUT = 0.2 INFINITE_REQUEST_STREAM_TIMEOUT = 0.2

@ -75,13 +75,21 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
return _application_common.STREAM_UNARY_RESPONSE return _application_common.STREAM_UNARY_RESPONSE
def StreStre(self, request_iterator, context): def StreStre(self, request_iterator, context):
valid_requests = (_application_common.STREAM_STREAM_REQUEST,
_application_common.STREAM_STREAM_MUTATING_REQUEST)
for request in request_iterator: for request in request_iterator:
if request != _application_common.STREAM_STREAM_REQUEST: if request not in valid_requests:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Something is wrong with your request!') context.set_details('Something is wrong with your request!')
return return
elif not context.is_active(): elif not context.is_active():
return return
else: elif request == _application_common.STREAM_STREAM_REQUEST:
yield _application_common.STREAM_STREAM_RESPONSE yield _application_common.STREAM_STREAM_RESPONSE
yield _application_common.STREAM_STREAM_RESPONSE yield _application_common.STREAM_STREAM_RESPONSE
elif request == _application_common.STREAM_STREAM_MUTATING_REQUEST:
response = services_pb2.Bottom()
for i in range(
_application_common.STREAM_STREAM_MUTATING_COUNT):
response.first_bottom_field = i
yield response

@ -21,6 +21,7 @@ import grpc_testing
from tests.testing import _application_common from tests.testing import _application_common
from tests.testing import _application_testing_common from tests.testing import _application_testing_common
from tests.testing import _server_application from tests.testing import _server_application
from tests.testing.proto import services_pb2
class FirstServiceServicerTest(unittest.TestCase): class FirstServiceServicerTest(unittest.TestCase):
@ -94,6 +95,30 @@ class FirstServiceServicerTest(unittest.TestCase):
response) response)
self.assertIs(code, grpc.StatusCode.OK) self.assertIs(code, grpc.StatusCode.OK)
def test_mutating_stream_stream(self):
rpc = self._real_time_server.invoke_stream_stream(
_application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
initial_metadata = rpc.initial_metadata()
responses = [
rpc.take_response()
for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
]
rpc.send_request(_application_common.STREAM_STREAM_MUTATING_REQUEST)
responses.extend([
rpc.take_response()
for _ in range(_application_common.STREAM_STREAM_MUTATING_COUNT)
])
rpc.requests_closed()
_, _, _ = rpc.termination()
expected_responses = (
services_pb2.Bottom(first_bottom_field=0),
services_pb2.Bottom(first_bottom_field=1),
services_pb2.Bottom(first_bottom_field=0),
services_pb2.Bottom(first_bottom_field=1),
)
self.assertSequenceEqual(expected_responses, responses)
def test_server_rpc_idempotence(self): def test_server_rpc_idempotence(self):
rpc = self._real_time_server.invoke_unary_unary( rpc = self._real_time_server.invoke_unary_unary(
_application_testing_common.FIRST_SERVICE_UNUN, (), _application_testing_common.FIRST_SERVICE_UNUN, (),

Loading…
Cancel
Save