commit
ceae5835e6
169 changed files with 11006 additions and 15912 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,103 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/logical_thread.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
DebugOnlyTraceFlag grpc_logical_thread_trace(false, "logical_thread"); |
||||
|
||||
struct CallbackWrapper { |
||||
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc) |
||||
: callback(std::move(cb)), location(loc) {} |
||||
|
||||
MultiProducerSingleConsumerQueue::Node mpscq_node; |
||||
const std::function<void()> callback; |
||||
const DebugLocation location; |
||||
}; |
||||
|
||||
void LogicalThread::Run(std::function<void()> callback, |
||||
const grpc_core::DebugLocation& location) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]", |
||||
this, location.file(), location.line()); |
||||
} |
||||
const size_t prev_size = size_.FetchAdd(1); |
||||
if (prev_size == 0) { |
||||
// There is no other closure executing right now on this logical thread.
|
||||
// Execute this closure immediately.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, " Executing immediately"); |
||||
} |
||||
callback(); |
||||
// Loan this thread to the logical thread and drain the queue.
|
||||
DrainQueue(); |
||||
} else { |
||||
CallbackWrapper* cb_wrapper = |
||||
new CallbackWrapper(std::move(callback), location); |
||||
// There already are closures executing on this logical thread. Simply add
|
||||
// this closure to the queue.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper); |
||||
} |
||||
queue_.Push(&cb_wrapper->mpscq_node); |
||||
} |
||||
} |
||||
|
||||
// The thread that calls this loans itself to the logical thread so as to
|
||||
// execute all the scheduled callback. This is called from within
|
||||
// LogicalThread::Run() after executing a callback immediately, and hence size_
|
||||
// is atleast 1.
|
||||
void LogicalThread::DrainQueue() { |
||||
while (true) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this); |
||||
} |
||||
size_t prev_size = size_.FetchSub(1); |
||||
// prev_size should be atleast 1 since
|
||||
GPR_DEBUG_ASSERT(prev_size >= 1); |
||||
if (prev_size == 1) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, " Queue Drained"); |
||||
} |
||||
break; |
||||
} |
||||
// There is atleast one callback on the queue. Pop the callback from the
|
||||
// queue and execute it.
|
||||
CallbackWrapper* cb_wrapper = nullptr; |
||||
bool empty_unused; |
||||
while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>( |
||||
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) { |
||||
// This can happen either due to a race condition within the mpscq
|
||||
// implementation or because of a race with Run()
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, " Queue returned nullptr, trying again"); |
||||
} |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) { |
||||
gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]", |
||||
cb_wrapper, cb_wrapper->location.file(), |
||||
cb_wrapper->location.line()); |
||||
} |
||||
cb_wrapper->callback(); |
||||
delete cb_wrapper; |
||||
} |
||||
} |
||||
} // namespace grpc_core
|
@ -1,52 +0,0 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <functional> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/atomic.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/mpscq.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H |
||||
#define GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H |
||||
|
||||
namespace grpc_core { |
||||
extern DebugOnlyTraceFlag grpc_logical_thread_trace; |
||||
|
||||
// LogicalThread is a mechanism to schedule callbacks in a synchronized manner.
|
||||
// All callbacks scheduled on a LogicalThread instance will be executed serially
|
||||
// in a borrowed thread. The API provides a FIFO guarantee to the execution of
|
||||
// callbacks scheduled on the thread.
|
||||
class LogicalThread : public RefCounted<LogicalThread> { |
||||
public: |
||||
void Run(std::function<void()> callback, |
||||
const grpc_core::DebugLocation& location); |
||||
|
||||
private: |
||||
void DrainQueue(); |
||||
|
||||
Atomic<size_t> size_{0}; |
||||
MultiProducerSingleConsumerQueue queue_; |
||||
}; |
||||
} /* namespace grpc_core */ |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H */ |
@ -0,0 +1,155 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/iomgr/work_serializer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
DebugOnlyTraceFlag grpc_work_serializer_trace(false, "work_serializer"); |
||||
|
||||
struct CallbackWrapper { |
||||
CallbackWrapper(std::function<void()> cb, const grpc_core::DebugLocation& loc) |
||||
: callback(std::move(cb)), location(loc) {} |
||||
|
||||
MultiProducerSingleConsumerQueue::Node mpscq_node; |
||||
const std::function<void()> callback; |
||||
const DebugLocation location; |
||||
}; |
||||
|
||||
class WorkSerializer::WorkSerializerImpl : public Orphanable { |
||||
public: |
||||
void Run(std::function<void()> callback, |
||||
const grpc_core::DebugLocation& location); |
||||
|
||||
void Orphan() override; |
||||
|
||||
private: |
||||
void DrainQueue(); |
||||
|
||||
// An initial size of 1 keeps track of whether the work serializer has been
|
||||
// orphaned.
|
||||
Atomic<size_t> size_{1}; |
||||
MultiProducerSingleConsumerQueue queue_; |
||||
}; |
||||
|
||||
void WorkSerializer::WorkSerializerImpl::Run( |
||||
std::function<void()> callback, const grpc_core::DebugLocation& location) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, "WorkSerializer::Run() %p Scheduling callback [%s:%d]", |
||||
this, location.file(), location.line()); |
||||
} |
||||
const size_t prev_size = size_.FetchAdd(1); |
||||
// The work serializer should not have been orphaned.
|
||||
GPR_DEBUG_ASSERT(prev_size > 0); |
||||
if (prev_size == 1) { |
||||
// There is no other closure executing right now on this work serializer.
|
||||
// Execute this closure immediately.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Executing immediately"); |
||||
} |
||||
callback(); |
||||
// Loan this thread to the work serializer thread and drain the queue.
|
||||
DrainQueue(); |
||||
} else { |
||||
CallbackWrapper* cb_wrapper = |
||||
new CallbackWrapper(std::move(callback), location); |
||||
// There already are closures executing on this work serializer. Simply add
|
||||
// this closure to the queue.
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Scheduling on queue : item %p", cb_wrapper); |
||||
} |
||||
queue_.Push(&cb_wrapper->mpscq_node); |
||||
} |
||||
} |
||||
|
||||
void WorkSerializer::WorkSerializerImpl::Orphan() { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, "WorkSerializer::Orphan() %p", this); |
||||
} |
||||
size_t prev_size = size_.FetchSub(1); |
||||
if (prev_size == 1) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Destroying"); |
||||
} |
||||
delete this; |
||||
} |
||||
} |
||||
|
||||
// The thread that calls this loans itself to the work serializer so as to
|
||||
// execute all the scheduled callback. This is called from within
|
||||
// WorkSerializer::Run() after executing a callback immediately, and hence size_
|
||||
// is at least 1.
|
||||
void WorkSerializer::WorkSerializerImpl::DrainQueue() { |
||||
while (true) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, "WorkSerializer::DrainQueue() %p", this); |
||||
} |
||||
size_t prev_size = size_.FetchSub(1); |
||||
GPR_DEBUG_ASSERT(prev_size >= 1); |
||||
// It is possible that while draining the queue, one of the callbacks ended
|
||||
// up orphaning the work serializer. In that case, delete the object.
|
||||
if (prev_size == 1) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Queue Drained. Destroying"); |
||||
} |
||||
delete this; |
||||
return; |
||||
} |
||||
if (prev_size == 2) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Queue Drained"); |
||||
} |
||||
return; |
||||
} |
||||
// There is at least one callback on the queue. Pop the callback from the
|
||||
// queue and execute it.
|
||||
CallbackWrapper* cb_wrapper = nullptr; |
||||
bool empty_unused; |
||||
while ((cb_wrapper = reinterpret_cast<CallbackWrapper*>( |
||||
queue_.PopAndCheckEnd(&empty_unused))) == nullptr) { |
||||
// This can happen either due to a race condition within the mpscq
|
||||
// implementation or because of a race with Run()
|
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Queue returned nullptr, trying again"); |
||||
} |
||||
} |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_work_serializer_trace)) { |
||||
gpr_log(GPR_INFO, " Running item %p : callback scheduled at [%s:%d]", |
||||
cb_wrapper, cb_wrapper->location.file(), |
||||
cb_wrapper->location.line()); |
||||
} |
||||
cb_wrapper->callback(); |
||||
delete cb_wrapper; |
||||
} |
||||
} |
||||
|
||||
// WorkSerializer
|
||||
|
||||
WorkSerializer::WorkSerializer() |
||||
: impl_(MakeOrphanable<WorkSerializerImpl>()) {} |
||||
|
||||
WorkSerializer::~WorkSerializer() {} |
||||
|
||||
void WorkSerializer::Run(std::function<void()> callback, |
||||
const grpc_core::DebugLocation& location) { |
||||
impl_->Run(std::move(callback), location); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,65 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2019 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <functional> |
||||
|
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gprpp/atomic.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/mpscq.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
|
||||
#ifndef GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H |
||||
#define GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// WorkSerializer is a mechanism to schedule callbacks in a synchronized manner.
|
||||
// All callbacks scheduled on a WorkSerializer instance will be executed
|
||||
// serially in a borrowed thread. The API provides a FIFO guarantee to the
|
||||
// execution of callbacks scheduled on the thread.
|
||||
// When a thread calls Run() with a callback, the thread is considered borrowed.
|
||||
// The callback might run inline, or it might run asynchronously in a different
|
||||
// thread that is already inside of Run(). If the callback runs directly inline,
|
||||
// other callbacks from other threads might also be executed before Run()
|
||||
// returns. Since an arbitrary set of callbacks might be executed when Run() is
|
||||
// called, generally no locks should be held while calling Run().
|
||||
class WorkSerializer { |
||||
public: |
||||
WorkSerializer(); |
||||
|
||||
~WorkSerializer(); |
||||
|
||||
// TODO(yashkt): Replace grpc_core::DebugLocation with absl::SourceLocation
|
||||
// once we can start using it directly.
|
||||
void Run(std::function<void()> callback, |
||||
const grpc_core::DebugLocation& location); |
||||
|
||||
private: |
||||
class WorkSerializerImpl; |
||||
|
||||
OrphanablePtr<WorkSerializerImpl> impl_; |
||||
}; |
||||
|
||||
} /* namespace grpc_core */ |
||||
|
||||
#endif /* GRPC_CORE_LIB_IOMGR_WORK_SERIALIZER_H */ |
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,74 @@ |
||||
# Copyright 2019 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
load("@grpc_python_dependencies//:requirements.bzl", "requirement") |
||||
|
||||
package(default_testonly = 1) |
||||
|
||||
py_library( |
||||
name = "methods", |
||||
srcs = ["methods.py"], |
||||
imports = ["../../"], |
||||
deps = [ |
||||
"//src/proto/grpc/testing:empty_py_pb2", |
||||
"//src/proto/grpc/testing:py_messages_proto", |
||||
"//src/proto/grpc/testing:py_test_proto", |
||||
"//src/proto/grpc/testing:test_py_pb2_grpc", |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
requirement("google-auth"), |
||||
requirement("requests"), |
||||
requirement("urllib3"), |
||||
requirement("chardet"), |
||||
requirement("certifi"), |
||||
requirement("idna"), |
||||
], |
||||
) |
||||
|
||||
py_test( |
||||
name = "local_interop_test", |
||||
size = "small", |
||||
srcs = ["local_interop_test.py"], |
||||
imports = ["../../"], |
||||
python_version = "PY3", |
||||
deps = [ |
||||
":methods", |
||||
"//src/python/grpcio_tests/tests/interop:resources", |
||||
"//src/python/grpcio_tests/tests_aio/unit:_test_base", |
||||
"//src/python/grpcio_tests/tests_aio/unit:_test_server", |
||||
], |
||||
) |
||||
|
||||
py_binary( |
||||
name = "server", |
||||
srcs = ["server.py"], |
||||
imports = ["../../"], |
||||
python_version = "PY3", |
||||
deps = [ |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//src/python/grpcio_tests/tests/interop:server", |
||||
"//src/python/grpcio_tests/tests_aio/unit:_test_server", |
||||
], |
||||
) |
||||
|
||||
py_binary( |
||||
name = "client", |
||||
srcs = ["client.py"], |
||||
imports = ["../../"], |
||||
python_version = "PY3", |
||||
deps = [ |
||||
":methods", |
||||
"//src/python/grpcio/grpc:grpcio", |
||||
"//src/python/grpcio_tests/tests/interop:client", |
||||
], |
||||
) |
@ -0,0 +1,13 @@ |
||||
# Copyright 2019 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
@ -0,0 +1,62 @@ |
||||
# Copyright 2019 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
import argparse |
||||
import asyncio |
||||
import logging |
||||
import os |
||||
|
||||
import grpc |
||||
from grpc.experimental import aio |
||||
|
||||
from tests.interop import client as interop_client_lib |
||||
from tests_aio.interop import methods |
||||
|
||||
_LOGGER = logging.getLogger(__name__) |
||||
_LOGGER.setLevel(logging.DEBUG) |
||||
|
||||
|
||||
def _create_channel(args): |
||||
target = f'{args.server_host}:{args.server_port}' |
||||
|
||||
if args.use_tls: |
||||
channel_credentials, options = interop_client_lib.get_secure_channel_parameters( |
||||
args) |
||||
return aio.secure_channel(target, channel_credentials, options) |
||||
else: |
||||
return aio.insecure_channel(target) |
||||
|
||||
|
||||
def _test_case_from_arg(test_case_arg): |
||||
for test_case in methods.TestCase: |
||||
if test_case_arg == test_case.value: |
||||
return test_case |
||||
else: |
||||
raise ValueError('No test case "%s"!' % test_case_arg) |
||||
|
||||
|
||||
async def test_interoperability(): |
||||
aio.init_grpc_aio() |
||||
|
||||
args = interop_client_lib.parse_interop_client_args() |
||||
channel = _create_channel(args) |
||||
stub = interop_client_lib.create_stub(channel, args) |
||||
test_case = _test_case_from_arg(args.test_case) |
||||
await methods.test_interoperability(test_case, stub, args) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig(level=logging.DEBUG) |
||||
asyncio.get_event_loop().set_debug(True) |
||||
asyncio.get_event_loop().run_until_complete(test_interoperability()) |
@ -0,0 +1,135 @@ |
||||
# Copyright 2020 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Conducts interop tests locally.""" |
||||
|
||||
import logging |
||||
import unittest |
||||
|
||||
import grpc |
||||
from grpc.experimental import aio |
||||
|
||||
from src.proto.grpc.testing import test_pb2_grpc |
||||
from tests.interop import resources |
||||
from tests_aio.interop import methods |
||||
from tests_aio.unit._test_base import AioTestBase |
||||
from tests_aio.unit._test_server import start_test_server |
||||
|
||||
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' |
||||
|
||||
|
||||
class InteropTestCaseMixin: |
||||
"""Unit test methods. |
||||
|
||||
This class must be mixed in with unittest.TestCase and a class that defines |
||||
setUp and tearDown methods that manage a stub attribute. |
||||
""" |
||||
_stub: test_pb2_grpc.TestServiceStub |
||||
|
||||
async def test_empty_unary(self): |
||||
await methods.test_interoperability(methods.TestCase.EMPTY_UNARY, |
||||
self._stub, None) |
||||
|
||||
async def test_large_unary(self): |
||||
await methods.test_interoperability(methods.TestCase.LARGE_UNARY, |
||||
self._stub, None) |
||||
|
||||
async def test_server_streaming(self): |
||||
await methods.test_interoperability(methods.TestCase.SERVER_STREAMING, |
||||
self._stub, None) |
||||
|
||||
async def test_client_streaming(self): |
||||
await methods.test_interoperability(methods.TestCase.CLIENT_STREAMING, |
||||
self._stub, None) |
||||
|
||||
async def test_ping_pong(self): |
||||
await methods.test_interoperability(methods.TestCase.PING_PONG, |
||||
self._stub, None) |
||||
|
||||
async def test_cancel_after_begin(self): |
||||
await methods.test_interoperability(methods.TestCase.CANCEL_AFTER_BEGIN, |
||||
self._stub, None) |
||||
|
||||
async def test_cancel_after_first_response(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE, self._stub, None) |
||||
|
||||
@unittest.skip('TODO(https://github.com/grpc/grpc/issues/21707)') |
||||
async def test_timeout_on_sleeping_server(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER, self._stub, None) |
||||
|
||||
async def test_empty_stream(self): |
||||
await methods.test_interoperability(methods.TestCase.EMPTY_STREAM, |
||||
self._stub, None) |
||||
|
||||
async def test_status_code_and_message(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.STATUS_CODE_AND_MESSAGE, self._stub, None) |
||||
|
||||
async def test_unimplemented_method(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.UNIMPLEMENTED_METHOD, self._stub, None) |
||||
|
||||
async def test_unimplemented_service(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.UNIMPLEMENTED_SERVICE, self._stub, None) |
||||
|
||||
async def test_custom_metadata(self): |
||||
await methods.test_interoperability(methods.TestCase.CUSTOM_METADATA, |
||||
self._stub, None) |
||||
|
||||
async def test_special_status_message(self): |
||||
await methods.test_interoperability( |
||||
methods.TestCase.SPECIAL_STATUS_MESSAGE, self._stub, None) |
||||
|
||||
|
||||
class InsecureLocalInteropTest(InteropTestCaseMixin, AioTestBase): |
||||
|
||||
async def setUp(self): |
||||
address, self._server = await start_test_server() |
||||
self._channel = aio.insecure_channel(address) |
||||
self._stub = test_pb2_grpc.TestServiceStub(self._channel) |
||||
|
||||
async def tearDown(self): |
||||
await self._channel.close() |
||||
await self._server.stop(None) |
||||
|
||||
|
||||
class SecureLocalInteropTest(InteropTestCaseMixin, AioTestBase): |
||||
|
||||
async def setUp(self): |
||||
server_credentials = grpc.ssl_server_credentials([ |
||||
(resources.private_key(), resources.certificate_chain()) |
||||
]) |
||||
channel_credentials = grpc.ssl_channel_credentials( |
||||
resources.test_root_certificates()) |
||||
channel_options = (( |
||||
'grpc.ssl_target_name_override', |
||||
_SERVER_HOST_OVERRIDE, |
||||
),) |
||||
|
||||
address, self._server = await start_test_server( |
||||
secure=True, server_credentials=server_credentials) |
||||
self._channel = aio.secure_channel(address, channel_credentials, |
||||
channel_options) |
||||
self._stub = test_pb2_grpc.TestServiceStub(self._channel) |
||||
|
||||
async def tearDown(self): |
||||
await self._channel.close() |
||||
await self._server.stop(None) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
logging.basicConfig(level=logging.INFO) |
||||
unittest.main(verbosity=2) |
@ -0,0 +1,445 @@ |
||||
# Copyright 2019 The gRPC Authors |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
"""Implementations of interoperability test methods.""" |
||||
|
||||
import argparse |
||||
import asyncio |
||||
import enum |
||||
import collections |
||||
import inspect |
||||
import json |
||||
import os |
||||
import threading |
||||
import time |
||||
from typing import Any, Optional, Union |
||||
|
||||
import grpc |
||||
from google import auth as google_auth |
||||
from google.auth import environment_vars as google_auth_environment_vars |
||||
from google.auth.transport import grpc as google_auth_transport_grpc |
||||
from google.auth.transport import requests as google_auth_transport_requests |
||||
from grpc.experimental import aio |
||||
|
||||
from src.proto.grpc.testing import empty_pb2, messages_pb2, test_pb2_grpc |
||||
|
||||
_INITIAL_METADATA_KEY = "x-grpc-test-echo-initial" |
||||
_TRAILING_METADATA_KEY = "x-grpc-test-echo-trailing-bin" |
||||
|
||||
|
||||
async def _expect_status_code(call: aio.Call, |
||||
expected_code: grpc.StatusCode) -> None: |
||||
code = await call.code() |
||||
if code != expected_code: |
||||
raise ValueError('expected code %s, got %s' % |
||||
(expected_code, await call.code())) |
||||
|
||||
|
||||
async def _expect_status_details(call: aio.Call, expected_details: str) -> None: |
||||
details = await call.details() |
||||
if details != expected_details: |
||||
raise ValueError('expected message %s, got %s' % |
||||
(expected_details, await call.details())) |
||||
|
||||
|
||||
async def _validate_status_code_and_details(call: aio.Call, |
||||
expected_code: grpc.StatusCode, |
||||
expected_details: str) -> None: |
||||
await _expect_status_code(call, expected_code) |
||||
await _expect_status_details(call, expected_details) |
||||
|
||||
|
||||
def _validate_payload_type_and_length( |
||||
response: Union[messages_pb2.SimpleResponse, messages_pb2. |
||||
StreamingOutputCallResponse], expected_type: Any, |
||||
expected_length: int) -> None: |
||||
if response.payload.type is not expected_type: |
||||
raise ValueError('expected payload type %s, got %s' % |
||||
(expected_type, type(response.payload.type))) |
||||
elif len(response.payload.body) != expected_length: |
||||
raise ValueError('expected payload body size %d, got %d' % |
||||
(expected_length, len(response.payload.body))) |
||||
|
||||
|
||||
async def _large_unary_common_behavior( |
||||
stub: test_pb2_grpc.TestServiceStub, fill_username: bool, |
||||
fill_oauth_scope: bool, call_credentials: Optional[grpc.CallCredentials] |
||||
) -> messages_pb2.SimpleResponse: |
||||
size = 314159 |
||||
request = messages_pb2.SimpleRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_size=size, |
||||
payload=messages_pb2.Payload(body=b'\x00' * 271828), |
||||
fill_username=fill_username, |
||||
fill_oauth_scope=fill_oauth_scope) |
||||
response = await stub.UnaryCall(request, credentials=call_credentials) |
||||
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) |
||||
return response |
||||
|
||||
|
||||
async def _empty_unary(stub: test_pb2_grpc.TestServiceStub) -> None: |
||||
response = await stub.EmptyCall(empty_pb2.Empty()) |
||||
if not isinstance(response, empty_pb2.Empty): |
||||
raise TypeError('response is of type "%s", not empty_pb2.Empty!' % |
||||
type(response)) |
||||
|
||||
|
||||
async def _large_unary(stub: test_pb2_grpc.TestServiceStub) -> None: |
||||
await _large_unary_common_behavior(stub, False, False, None) |
||||
|
||||
|
||||
async def _client_streaming(stub: test_pb2_grpc.TestServiceStub) -> None: |
||||
payload_body_sizes = ( |
||||
27182, |
||||
8, |
||||
1828, |
||||
45904, |
||||
) |
||||
|
||||
async def request_gen(): |
||||
for size in payload_body_sizes: |
||||
yield messages_pb2.StreamingInputCallRequest( |
||||
payload=messages_pb2.Payload(body=b'\x00' * size)) |
||||
|
||||
response = await stub.StreamingInputCall(request_gen()) |
||||
if response.aggregated_payload_size != sum(payload_body_sizes): |
||||
raise ValueError('incorrect size %d!' % |
||||
response.aggregated_payload_size) |
||||
|
||||
|
||||
async def _server_streaming(stub: test_pb2_grpc.TestServiceStub) -> None: |
||||
sizes = ( |
||||
31415, |
||||
9, |
||||
2653, |
||||
58979, |
||||
) |
||||
|
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_parameters=( |
||||
messages_pb2.ResponseParameters(size=sizes[0]), |
||||
messages_pb2.ResponseParameters(size=sizes[1]), |
||||
messages_pb2.ResponseParameters(size=sizes[2]), |
||||
messages_pb2.ResponseParameters(size=sizes[3]), |
||||
)) |
||||
call = stub.StreamingOutputCall(request) |
||||
for size in sizes: |
||||
response = await call.read() |
||||
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, |
||||
size) |
||||
|
||||
|
||||
async def _ping_pong(stub: test_pb2_grpc.TestServiceStub) -> None: |
||||
request_response_sizes = ( |
||||
31415, |
||||
9, |
||||
2653, |
||||
58979, |
||||
) |
||||
request_payload_sizes = ( |
||||
27182, |
||||
8, |
||||
1828, |
||||
45904, |
||||
) |
||||
|
||||
call = stub.FullDuplexCall() |
||||
for response_size, payload_size in zip(request_response_sizes, |
||||
request_payload_sizes): |
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_parameters=(messages_pb2.ResponseParameters( |
||||
size=response_size),), |
||||
payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
||||
|
||||
await call.write(request) |
||||
response = await call.read() |
||||
_validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, |
||||
response_size) |
||||
await call.done_writing() |
||||
await _validate_status_code_and_details(call, grpc.StatusCode.OK, '') |
||||
|
||||
|
||||
async def _cancel_after_begin(stub: test_pb2_grpc.TestServiceStub): |
||||
call = stub.StreamingInputCall() |
||||
call.cancel() |
||||
if not call.cancelled(): |
||||
raise ValueError('expected cancelled method to return True') |
||||
code = await call.code() |
||||
if code is not grpc.StatusCode.CANCELLED: |
||||
raise ValueError('expected status code CANCELLED') |
||||
|
||||
|
||||
async def _cancel_after_first_response(stub: test_pb2_grpc.TestServiceStub): |
||||
request_response_sizes = ( |
||||
31415, |
||||
9, |
||||
2653, |
||||
58979, |
||||
) |
||||
request_payload_sizes = ( |
||||
27182, |
||||
8, |
||||
1828, |
||||
45904, |
||||
) |
||||
|
||||
call = stub.FullDuplexCall() |
||||
|
||||
response_size = request_response_sizes[0] |
||||
payload_size = request_payload_sizes[0] |
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_parameters=(messages_pb2.ResponseParameters( |
||||
size=response_size),), |
||||
payload=messages_pb2.Payload(body=b'\x00' * payload_size)) |
||||
|
||||
await call.write(request) |
||||
await call.read() |
||||
|
||||
call.cancel() |
||||
|
||||
try: |
||||
await call.read() |
||||
except asyncio.CancelledError: |
||||
assert await call.code() is grpc.StatusCode.CANCELLED |
||||
else: |
||||
raise ValueError('expected call to be cancelled') |
||||
|
||||
|
||||
async def _timeout_on_sleeping_server(stub: test_pb2_grpc.TestServiceStub): |
||||
request_payload_size = 27182 |
||||
|
||||
call = stub.FullDuplexCall(timeout=0.001) |
||||
|
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) |
||||
await call.write(request) |
||||
await call.done_writing() |
||||
try: |
||||
await call.read() |
||||
except aio.AioRpcError as rpc_error: |
||||
if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED: |
||||
raise |
||||
else: |
||||
raise ValueError('expected call to exceed deadline') |
||||
|
||||
|
||||
async def _empty_stream(stub: test_pb2_grpc.TestServiceStub): |
||||
call = stub.FullDuplexCall() |
||||
await call.done_writing() |
||||
assert await call.read() == aio.EOF |
||||
|
||||
|
||||
async def _status_code_and_message(stub: test_pb2_grpc.TestServiceStub): |
||||
details = 'test status message' |
||||
status = grpc.StatusCode.UNKNOWN # code = 2 |
||||
|
||||
# Test with a UnaryCall |
||||
request = messages_pb2.SimpleRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_size=1, |
||||
payload=messages_pb2.Payload(body=b'\x00'), |
||||
response_status=messages_pb2.EchoStatus(code=status.value[0], |
||||
message=details)) |
||||
call = stub.UnaryCall(request) |
||||
await _validate_status_code_and_details(call, status, details) |
||||
|
||||
# Test with a FullDuplexCall |
||||
call = stub.FullDuplexCall() |
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_parameters=(messages_pb2.ResponseParameters(size=1),), |
||||
payload=messages_pb2.Payload(body=b'\x00'), |
||||
response_status=messages_pb2.EchoStatus(code=status.value[0], |
||||
message=details)) |
||||
await call.write(request) # sends the initial request. |
||||
await call.done_writing() |
||||
await _validate_status_code_and_details(call, status, details) |
||||
|
||||
|
||||
async def _unimplemented_method(stub: test_pb2_grpc.TestServiceStub): |
||||
call = stub.UnimplementedCall(empty_pb2.Empty()) |
||||
await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED) |
||||
|
||||
|
||||
async def _unimplemented_service(stub: test_pb2_grpc.UnimplementedServiceStub): |
||||
call = stub.UnimplementedCall(empty_pb2.Empty()) |
||||
await _expect_status_code(call, grpc.StatusCode.UNIMPLEMENTED) |
||||
|
||||
|
||||
async def _custom_metadata(stub: test_pb2_grpc.TestServiceStub): |
||||
initial_metadata_value = "test_initial_metadata_value" |
||||
trailing_metadata_value = b"\x0a\x0b\x0a\x0b\x0a\x0b" |
||||
metadata = ((_INITIAL_METADATA_KEY, initial_metadata_value), |
||||
(_TRAILING_METADATA_KEY, trailing_metadata_value)) |
||||
|
||||
async def _validate_metadata(call): |
||||
initial_metadata = dict(await call.initial_metadata()) |
||||
if initial_metadata[_INITIAL_METADATA_KEY] != initial_metadata_value: |
||||
raise ValueError('expected initial metadata %s, got %s' % |
||||
(initial_metadata_value, |
||||
initial_metadata[_INITIAL_METADATA_KEY])) |
||||
trailing_metadata = dict(await call.trailing_metadata()) |
||||
if trailing_metadata[_TRAILING_METADATA_KEY] != trailing_metadata_value: |
||||
raise ValueError('expected trailing metadata %s, got %s' % |
||||
(trailing_metadata_value, |
||||
trailing_metadata[_TRAILING_METADATA_KEY])) |
||||
|
||||
# Testing with UnaryCall |
||||
request = messages_pb2.SimpleRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_size=1, |
||||
payload=messages_pb2.Payload(body=b'\x00')) |
||||
call = stub.UnaryCall(request, metadata=metadata) |
||||
await _validate_metadata(call) |
||||
|
||||
# Testing with FullDuplexCall |
||||
call = stub.FullDuplexCall(metadata=metadata) |
||||
request = messages_pb2.StreamingOutputCallRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_parameters=(messages_pb2.ResponseParameters(size=1),)) |
||||
await call.write(request) |
||||
await call.read() |
||||
await call.done_writing() |
||||
await _validate_metadata(call) |
||||
|
||||
|
||||
async def _compute_engine_creds(stub: test_pb2_grpc.TestServiceStub, |
||||
args: argparse.Namespace): |
||||
response = await _large_unary_common_behavior(stub, True, True, None) |
||||
if args.default_service_account != response.username: |
||||
raise ValueError('expected username %s, got %s' % |
||||
(args.default_service_account, response.username)) |
||||
|
||||
|
||||
async def _oauth2_auth_token(stub: test_pb2_grpc.TestServiceStub, |
||||
args: argparse.Namespace): |
||||
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] |
||||
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] |
||||
response = await _large_unary_common_behavior(stub, True, True, None) |
||||
if wanted_email != response.username: |
||||
raise ValueError('expected username %s, got %s' % |
||||
(wanted_email, response.username)) |
||||
if args.oauth_scope.find(response.oauth_scope) == -1: |
||||
raise ValueError( |
||||
'expected to find oauth scope "{}" in received "{}"'.format( |
||||
response.oauth_scope, args.oauth_scope)) |
||||
|
||||
|
||||
async def _jwt_token_creds(stub: test_pb2_grpc.TestServiceStub): |
||||
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] |
||||
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] |
||||
response = await _large_unary_common_behavior(stub, True, False, None) |
||||
if wanted_email != response.username: |
||||
raise ValueError('expected username %s, got %s' % |
||||
(wanted_email, response.username)) |
||||
|
||||
|
||||
async def _per_rpc_creds(stub: test_pb2_grpc.TestServiceStub, |
||||
args: argparse.Namespace): |
||||
json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] |
||||
wanted_email = json.load(open(json_key_filename, 'r'))['client_email'] |
||||
google_credentials, unused_project_id = google_auth.default( |
||||
scopes=[args.oauth_scope]) |
||||
call_credentials = grpc.metadata_call_credentials( |
||||
google_auth_transport_grpc.AuthMetadataPlugin( |
||||
credentials=google_credentials, |
||||
request=google_auth_transport_requests.Request())) |
||||
response = await _large_unary_common_behavior(stub, True, False, |
||||
call_credentials) |
||||
if wanted_email != response.username: |
||||
raise ValueError('expected username %s, got %s' % |
||||
(wanted_email, response.username)) |
||||
|
||||
|
||||
async def _special_status_message(stub: test_pb2_grpc.TestServiceStub): |
||||
details = b'\t\ntest with whitespace\r\nand Unicode BMP \xe2\x98\xba and non-BMP \xf0\x9f\x98\x88\t\n'.decode( |
||||
'utf-8') |
||||
status = grpc.StatusCode.UNKNOWN # code = 2 |
||||
|
||||
# Test with a UnaryCall |
||||
request = messages_pb2.SimpleRequest( |
||||
response_type=messages_pb2.COMPRESSABLE, |
||||
response_size=1, |
||||
payload=messages_pb2.Payload(body=b'\x00'), |
||||
response_status=messages_pb2.EchoStatus(code=status.value[0], |
||||
message=details)) |
||||
call = stub.UnaryCall(request) |
||||
await _validate_status_code_and_details(call, status, details) |
||||
|
||||
|
||||
@enum.unique |
||||
class TestCase(enum.Enum): |
||||
EMPTY_UNARY = 'empty_unary' |
||||
LARGE_UNARY = 'large_unary' |
||||
SERVER_STREAMING = 'server_streaming' |
||||
CLIENT_STREAMING = 'client_streaming' |
||||
PING_PONG = 'ping_pong' |
||||
CANCEL_AFTER_BEGIN = 'cancel_after_begin' |
||||
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' |
||||
TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' |
||||
EMPTY_STREAM = 'empty_stream' |
||||
STATUS_CODE_AND_MESSAGE = 'status_code_and_message' |
||||
UNIMPLEMENTED_METHOD = 'unimplemented_method' |
||||
UNIMPLEMENTED_SERVICE = 'unimplemented_service' |
||||
CUSTOM_METADATA = "custom_metadata" |
||||
COMPUTE_ENGINE_CREDS = 'compute_engine_creds' |
||||
OAUTH2_AUTH_TOKEN = 'oauth2_auth_token' |
||||
JWT_TOKEN_CREDS = 'jwt_token_creds' |
||||
PER_RPC_CREDS = 'per_rpc_creds' |
||||
SPECIAL_STATUS_MESSAGE = 'special_status_message' |
||||
|
||||
|
||||
_TEST_CASE_IMPLEMENTATION_MAPPING = { |
||||
TestCase.EMPTY_UNARY: _empty_unary, |
||||
TestCase.LARGE_UNARY: _large_unary, |
||||
TestCase.SERVER_STREAMING: _server_streaming, |
||||
TestCase.CLIENT_STREAMING: _client_streaming, |
||||
TestCase.PING_PONG: _ping_pong, |
||||
TestCase.CANCEL_AFTER_BEGIN: _cancel_after_begin, |
||||
TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response, |
||||
TestCase.TIMEOUT_ON_SLEEPING_SERVER: _timeout_on_sleeping_server, |
||||
TestCase.EMPTY_STREAM: _empty_stream, |
||||
TestCase.STATUS_CODE_AND_MESSAGE: _status_code_and_message, |
||||
TestCase.UNIMPLEMENTED_METHOD: _unimplemented_method, |
||||
TestCase.UNIMPLEMENTED_SERVICE: _unimplemented_service, |
||||
TestCase.CUSTOM_METADATA: _custom_metadata, |
||||
TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds, |
||||
TestCase.OAUTH2_AUTH_TOKEN: _oauth2_auth_token, |
||||
TestCase.JWT_TOKEN_CREDS: _jwt_token_creds, |
||||
TestCase.PER_RPC_CREDS: _per_rpc_creds, |
||||
TestCase.SPECIAL_STATUS_MESSAGE: _special_status_message, |
||||
} |
||||
|
||||
|
||||
async def test_interoperability(case: TestCase, |
||||
stub: test_pb2_grpc.TestServiceStub, |
||||
args: Optional[argparse.Namespace] = None |
||||
) -> None: |
||||
method = _TEST_CASE_IMPLEMENTATION_MAPPING.get(case) |
||||
if method is None: |
||||
raise NotImplementedError(f'Test case "{case}" not implemented!') |
||||
else: |
||||
num_params = len(inspect.signature(method).parameters) |
||||
if num_params == 1: |
||||
await method(stub) |
||||
elif num_params == 2: |
||||
if args is not None: |
||||
await method(stub, args) |
||||
else: |
||||
raise ValueError(f'Failed to run case [{case}]: args is None') |
||||
else: |
||||
raise ValueError(f'Invalid number of parameters [{num_params}]') |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue