Merge pull request #18842 from lidizheng/respect-interval

Respect interval_us setting for TestServicer
reviewable/pr18937/r1
Lidi Zheng 6 years ago committed by GitHub
commit f1dfe791ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/python/grpcio_tests/tests/fork/_fork_interop_test.py
  2. 87
      src/python/grpcio_tests/tests/interop/BUILD.bazel
  3. 4
      src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
  4. 4
      src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
  5. 68
      src/python/grpcio_tests/tests/interop/methods.py
  6. 4
      src/python/grpcio_tests/tests/interop/server.py
  7. 97
      src/python/grpcio_tests/tests/interop/service.py

@ -56,12 +56,12 @@ class ForkInteropTest(unittest.TestCase):
import grpc
from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods as interop_methods
from tests.interop import service as interop_service
from tests.unit import test_common
server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(
interop_methods.TestService(), server)
interop_service.TestService(), server)
port = server.add_insecure_port('[::]:0')
server.start()
print(port)

@ -5,45 +5,45 @@ package(default_visibility = ["//visibility:public"])
py_library(
name = "_intraop_test_case",
srcs = ["_intraop_test_case.py"],
imports = ["../../"],
deps = [
":methods",
],
imports=["../../",],
)
py_library(
name = "client",
srcs = ["client.py"],
imports = ["../../"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":methods",
":resources",
"//src/proto/grpc/testing:py_test_proto",
requirement('google-auth'),
"//src/python/grpcio/grpc:grpcio",
requirement("google-auth"),
],
imports=["../../",],
)
py_library(
name = "methods",
srcs = ["methods.py"],
imports = ["../../"],
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests:bazel_namespace_package_hack",
"//src/proto/grpc/testing:py_empty_proto",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:py_test_proto",
requirement('google-auth'),
requirement('requests'),
requirement('urllib3'),
requirement('chardet'),
requirement('certifi'),
requirement('idna'),
requirement("google-auth"),
requirement("requests"),
requirement("urllib3"),
requirement("chardet"),
requirement("certifi"),
requirement("idna"),
] + select({
"//conditions:default": [requirement('enum34'),],
"//conditions:default": [requirement("enum34")],
"//:python3": [],
}),
imports=["../../",],
)
py_library(
@ -54,51 +54,62 @@ py_library(
],
)
py_library(
name = "service",
srcs = ["service.py"],
imports = ["../../"],
deps = [
"//src/proto/grpc/testing:py_empty_proto",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:py_test_proto",
"//src/python/grpcio/grpc:grpcio",
],
)
py_library(
name = "server",
srcs = ["server.py"],
imports = ["../../"],
deps = [
"//src/python/grpcio/grpc:grpcio",
":methods",
":resources",
"//src/python/grpcio_tests/tests/unit:test_common",
":service",
"//src/proto/grpc/testing:py_test_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:test_common",
],
imports=["../../",],
)
py_test(
name="_insecure_intraop_test",
size="small",
srcs=["_insecure_intraop_test.py",],
main="_insecure_intraop_test.py",
deps=[
"//src/python/grpcio/grpc:grpcio",
name = "_insecure_intraop_test",
size = "small",
srcs = ["_insecure_intraop_test.py"],
data = [
"//src/python/grpcio_tests/tests/unit/credentials",
],
imports = ["../../"],
main = "_insecure_intraop_test.py",
deps = [
":_intraop_test_case",
":methods",
":server",
"//src/python/grpcio_tests/tests/unit:test_common",
":service",
"//src/proto/grpc/testing:py_test_proto",
],
imports=["../../",],
data=[
"//src/python/grpcio_tests/tests/unit/credentials",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
py_test(
name="_secure_intraop_test",
size="small",
srcs=["_secure_intraop_test.py",],
main="_secure_intraop_test.py",
deps=[
"//src/python/grpcio/grpc:grpcio",
name = "_secure_intraop_test",
size = "small",
srcs = ["_secure_intraop_test.py"],
imports = ["../../"],
main = "_secure_intraop_test.py",
deps = [
":_intraop_test_case",
":methods",
":server",
"//src/python/grpcio_tests/tests/unit:test_common",
":service",
"//src/proto/grpc/testing:py_test_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:test_common",
],
imports=["../../",],
)

@ -19,7 +19,7 @@ import grpc
from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import service
from tests.interop import server
from tests.unit import test_common
@ -29,7 +29,7 @@ class InsecureIntraopTest(_intraop_test_case.IntraopTestCase,
def setUp(self):
self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(),
self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()

@ -19,7 +19,7 @@ import grpc
from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import service
from tests.interop import resources
from tests.unit import test_common
@ -30,7 +30,7 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase):
def setUp(self):
self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(),
self.server)
port = self.server.add_secure_port(
'[::]:0',

@ -25,6 +25,7 @@ import enum
import json
import os
import threading
import time
from google import auth as google_auth
from google.auth import environment_vars as google_auth_environment_vars
@ -34,78 +35,11 @@ import grpc
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
def _maybe_echo_metadata(servicer_context):
"""Copies metadata from request to response if it is present."""
invocation_metadata = dict(servicer_context.invocation_metadata())
if _INITIAL_METADATA_KEY in invocation_metadata:
initial_metadatum = (_INITIAL_METADATA_KEY,
invocation_metadata[_INITIAL_METADATA_KEY])
servicer_context.send_initial_metadata((initial_metadatum,))
if _TRAILING_METADATA_KEY in invocation_metadata:
trailing_metadatum = (_TRAILING_METADATA_KEY,
invocation_metadata[_TRAILING_METADATA_KEY])
servicer_context.set_trailing_metadata((trailing_metadatum,))
def _maybe_echo_status_and_message(request, servicer_context):
"""Sets the response context code and details if the request asks for them"""
if request.HasField('response_status'):
servicer_context.set_code(request.response_status.code)
servicer_context.set_details(request.response_status.message)
class TestService(test_pb2_grpc.TestServiceServicer):
def EmptyCall(self, request, context):
_maybe_echo_metadata(context)
return empty_pb2.Empty()
def UnaryCall(self, request, context):
_maybe_echo_metadata(context)
_maybe_echo_status_and_message(request, context)
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))
def StreamingOutputCall(self, request, context):
_maybe_echo_status_and_message(request, context)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.response_type,
body=b'\x00' * response_parameters.size))
def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0
for request in request_iterator:
if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
def FullDuplexCall(self, request_iterator, context):
_maybe_echo_metadata(context)
for request in request_iterator:
_maybe_echo_status_and_message(request, context)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.payload.type,
body=b'\x00' * response_parameters.size))
# NOTE(nathaniel): Apparently this is the same as the full-duplex call?
# NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
def HalfDuplexCall(self, request_iterator, context):
return self.FullDuplexCall(request_iterator, context)
def _expect_status_code(call, expected_code):
if call.code() != expected_code:
raise ValueError('expected code %s, got %s' % (expected_code,

@ -21,7 +21,7 @@ import time
import grpc
from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods
from tests.interop import service
from tests.interop import resources
from tests.unit import test_common
@ -42,7 +42,7 @@ def serve():
args = parser.parse_args()
server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
test_pb2_grpc.add_TestServiceServicer_to_server(service.TestService(),
server)
if args.use_tls:
private_key = resources.private_key()

@ -0,0 +1,97 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The Python implementation of the TestServicer."""
import time
import grpc
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2_grpc
_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial"
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin"
_US_IN_A_SECOND = 1000 * 1000
def _maybe_echo_metadata(servicer_context):
"""Copies metadata from request to response if it is present."""
invocation_metadata = dict(servicer_context.invocation_metadata())
if _INITIAL_METADATA_KEY in invocation_metadata:
initial_metadatum = (_INITIAL_METADATA_KEY,
invocation_metadata[_INITIAL_METADATA_KEY])
servicer_context.send_initial_metadata((initial_metadatum,))
if _TRAILING_METADATA_KEY in invocation_metadata:
trailing_metadatum = (_TRAILING_METADATA_KEY,
invocation_metadata[_TRAILING_METADATA_KEY])
servicer_context.set_trailing_metadata((trailing_metadatum,))
def _maybe_echo_status_and_message(request, servicer_context):
"""Sets the response context code and details if the request asks for them"""
if request.HasField('response_status'):
servicer_context.set_code(request.response_status.code)
servicer_context.set_details(request.response_status.message)
class TestService(test_pb2_grpc.TestServiceServicer):
def EmptyCall(self, request, context):
_maybe_echo_metadata(context)
return empty_pb2.Empty()
def UnaryCall(self, request, context):
_maybe_echo_metadata(context)
_maybe_echo_status_and_message(request, context)
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
body=b'\x00' * request.response_size))
def StreamingOutputCall(self, request, context):
_maybe_echo_status_and_message(request, context)
for response_parameters in request.response_parameters:
if response_parameters.interval_us != 0:
time.sleep(response_parameters.interval_us / _US_IN_A_SECOND)
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.response_type,
body=b'\x00' * response_parameters.size))
def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0
for request in request_iterator:
if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
def FullDuplexCall(self, request_iterator, context):
_maybe_echo_metadata(context)
for request in request_iterator:
_maybe_echo_status_and_message(request, context)
for response_parameters in request.response_parameters:
if response_parameters.interval_us != 0:
time.sleep(
response_parameters.interval_us / _US_IN_A_SECOND)
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
type=request.payload.type,
body=b'\x00' * response_parameters.size))
# NOTE(nathaniel): Apparently this is the same as the full-duplex call?
# NOTE(atash): It isn't even called in the interop spec (Oct 22 2015)...
def HalfDuplexCall(self, request_iterator, context):
return self.FullDuplexCall(request_iterator, context)
Loading…
Cancel
Save