Merge branch 'master' into server_channel_affinity

pull/6149/head
Sree Kuchibhotla 9 years ago
commit 96c3236c41
  1. 11
      include/grpc++/impl/codegen/proto_utils.h
  2. 4
      src/core/lib/iomgr/ev_poll_posix.c
  3. 17
      src/core/lib/iomgr/exec_ctx.c
  4. 29
      src/core/lib/iomgr/exec_ctx.h
  5. 2
      src/core/lib/support/string_util_win32.c
  6. 2
      src/python/grpcio/grpc/beta/interfaces.py
  7. 28
      src/python/grpcio/tests/health_check/__init__.py
  8. 75
      src/python/grpcio/tests/health_check/_health_servicer_test.py
  9. 1
      src/python/grpcio/tests/tests.json
  10. 2
      src/python/grpcio/tests/unit/_cython/_channel_test.py
  11. 9
      src/python/grpcio/tests/unit/framework/common/test_constants.py
  12. 10
      src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
  13. 8
      src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
  14. 5
      src/python/grpcio_health_checking/.gitignore
  15. 5
      src/python/grpcio_health_checking/MANIFEST.in
  16. 49
      src/python/grpcio_health_checking/grpc/health/v1/health.proto
  17. 129
      src/python/grpcio_health_checking/grpc/health/v1/health.py
  18. 0
      src/python/grpcio_health_checking/grpc_health/__init__.py
  19. 0
      src/python/grpcio_health_checking/grpc_health/health/__init__.py
  20. 0
      src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py
  21. 66
      src/python/grpcio_health_checking/grpc_health/health/v1/health.py
  22. 30
      src/python/grpcio_health_checking/health_commands.py
  23. 15
      src/python/grpcio_health_checking/setup.py
  24. 5
      test/core/end2end/dualstack_socket_test.c
  25. 10
      test/core/util/port.h
  26. 27
      test/core/util/port_posix.c
  27. 26
      test/core/util/port_windows.c
  28. 6
      test/cpp/end2end/async_end2end_test.cc
  29. 4
      tools/distrib/check_include_guards.py
  30. 4
      tools/run_tests/build_python.sh
  31. 109
      tools/run_tests/performance/scenario_config.py
  32. 18
      tools/run_tests/run_performance_tests.py
  33. 4
      tools/run_tests/run_tests.py

@ -49,7 +49,7 @@ namespace grpc {
extern CoreCodegenInterface* g_core_codegen_interface;
namespace {
namespace internal {
const int kGrpcBufferWriterMaxBufferLength = 8192;
@ -166,7 +166,7 @@ class GrpcBufferReader GRPC_FINAL
grpc_byte_buffer_reader reader_;
gpr_slice slice_;
};
} // namespace
} // namespace internal
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
@ -176,7 +176,7 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true;
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = g_core_codegen_interface->gpr_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GPR_SLICE_END_PTR(slice) ==
@ -185,7 +185,8 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
g_core_codegen_interface->gpr_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
internal::GrpcBufferWriter writer(
bp, internal::kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
@ -200,7 +201,7 @@ class SerializationTraits<T, typename std::enable_if<std::is_base_of<
}
Status result = g_core_codegen_interface->ok();
{
GrpcBufferReader reader(buffer);
internal::GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);

@ -850,6 +850,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
re-evaluate our pollers (this allows poll() based pollers to
ensure they don't miss wakeups) */
keep_polling = 1;
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
while (keep_polling) {
keep_polling = 0;
if (!pollset->kicked_without_pollers) {
@ -858,7 +859,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
added_worker = 1;
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
}
gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
@ -952,7 +952,6 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_free(watchers);
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
} else {
GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
pollset->kicked_without_pollers = 0;
@ -984,6 +983,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
now = gpr_now(now.clock_type);
}
}
gpr_tls_set(&g_current_thread_poller, 0);
if (added_worker) {
remove_worker(pollset, &worker);
gpr_tls_set(&g_current_thread_worker, 0);

@ -39,6 +39,22 @@
#include "src/core/lib/profiling/timers.h"
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
if (!exec_ctx->cached_ready_to_finish) {
exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish(
exec_ctx, exec_ctx->check_ready_to_finish_arg);
}
return exec_ctx->cached_ready_to_finish;
}
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
return false;
}
bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
return true;
}
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
@ -61,6 +77,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
exec_ctx->cached_ready_to_finish = true;
grpc_exec_ctx_flush(exec_ctx);
}

@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue;
* - track a list of work that needs to be delayed until the top of the
* call stack (this provides a convenient mechanism to run callbacks
* without worrying about locking issues)
* - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides
* signal as to whether a borrowed thread should continue to do work or
* should actively try to finish up and get this thread back to its owner
*
* CONVENTIONS:
* Instance of this must ALWAYS be constructed on the stack, never
@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
int unused;
bool cached_ready_to_finish;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
#define GRPC_EXEC_CTX_INIT \
{ 0 }
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
{ false, finish_check_arg, finish_check }
#endif
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@ -86,6 +97,14 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
bool success,
grpc_workqueue *offload_target_or_null);
/** Returns true if we'd like to leave this execution context as soon as
possible: useful for deciding whether to do something more or not depending
on outside context */
bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
/** A finish check that is never ready to finish */
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** A finish check that is always ready to finish */
bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,

@ -83,7 +83,7 @@ char *gpr_format_message(int messageid) {
DWORD status = FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
NULL, (DWORD)messageid, MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);

@ -235,7 +235,7 @@ class Server(six.with_metaclass(abc.ABCMeta)):
This method may be called at any time and is idempotent. Passing a smaller
grace value than has been passed in a previous call will have the effect of
stopping the Server sooner. Passing a larger grace value than has been
passed in a previous call will not have the effect of stopping the sooner
passed in a previous call will not have the effect of stopping the server
later.
Args:

@ -0,0 +1,28 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,75 @@
# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Tests of grpc_health.health.v1.health."""
import unittest
from grpc_health.health.v1 import health
from grpc_health.health.v1 import health_pb2
class HealthServicerTest(unittest.TestCase):
def setUp(self):
self.servicer = health.HealthServicer()
self.servicer.set('', health_pb2.HealthCheckResponse.SERVING)
self.servicer.set('grpc.test.TestServiceServing',
health_pb2.HealthCheckResponse.SERVING)
self.servicer.set('grpc.test.TestServiceUnknown',
health_pb2.HealthCheckResponse.UNKNOWN)
self.servicer.set('grpc.test.TestServiceNotServing',
health_pb2.HealthCheckResponse.NOT_SERVING)
def test_empty_service(self):
request = health_pb2.HealthCheckRequest()
resp = self.servicer.Check(request, None)
self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
def test_serving_service(self):
request = health_pb2.HealthCheckRequest(
service='grpc.test.TestServiceServing')
resp = self.servicer.Check(request, None)
self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
def test_unknown_serivce(self):
request = health_pb2.HealthCheckRequest(
service='grpc.test.TestServiceUnknown')
resp = self.servicer.Check(request, None)
self.assertEqual(resp.status, health_pb2.HealthCheckResponse.UNKNOWN)
def test_not_serving_service(self):
request = health_pb2.HealthCheckRequest(
service='grpc.test.TestServiceNotServing')
resp = self.servicer.Check(request, None)
self.assertEqual(resp.status, health_pb2.HealthCheckResponse.NOT_SERVING)
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -29,6 +29,7 @@
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_health_servicer_test.HealthServicerTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_intermediary_low_test.CancellationTest",

@ -60,7 +60,7 @@ def _create_loop_destroy():
def _in_parallel(behavior, arguments):
threads = tuple(
threading.Thread(target=behavior, args=arguments)
for _ in range(test_constants.PARALLELISM))
for _ in range(test_constants.THREAD_CONCURRENCY))
for thread in threads:
thread.start()
for thread in threads:

@ -49,8 +49,13 @@ STREAM_LENGTH = 200
# The size of payloads to transmit in tests.
PAYLOAD_SIZE = 256 * 1024 + 17
# The parallelism to use in tests of parallel RPCs.
PARALLELISM = 200
# The concurrency to use in tests of concurrent RPCs that will not create as
# many threads as RPCs.
RPC_CONCURRENCY = 200
# The concurrency to use in tests of concurrent RPCs that will create as many
# threads as RPCs.
THREAD_CONCURRENCY = 25
# The size of thread pools to use in tests.
POOL_SIZE = 10

@ -146,13 +146,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.PARALLELISM):
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@ -168,13 +168,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
pool.shutdown(wait=True)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.PARALLELISM):
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@ -184,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM // 2)
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)

@ -249,7 +249,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
for test_messages in test_messages_sequence:
requests = []
response_futures = []
for _ in range(test_constants.PARALLELISM):
for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@ -263,13 +263,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(request, response, self)
def testWaitingForSomeButNotAllParallelInvocations(self):
pool = logging_pool.pool(test_constants.PARALLELISM)
pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
for index in range(test_constants.PARALLELISM):
for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
inner_response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@ -279,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
test_constants.PARALLELISM // 2)
test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)

@ -0,0 +1,5 @@
*.proto
*_pb2.py
build/
grpcio_health_checking.egg-info/
dist/

@ -1,2 +1,3 @@
graft grpc
include commands.py
include health_commands.py
graft grpc_health
global-exclude *.pyc

@ -1,49 +0,0 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.health.v1;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

@ -1,129 +0,0 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Reference implementation for health checking in gRPC Python."""
import abc
import enum
import threading
from grpc.health.v1 import health_pb2
@enum.unique
class HealthStatus(enum.Enum):
"""Statuses for a service mirroring the reference health.proto's values."""
UNKNOWN = health_pb2.HealthCheckResponse.UNKNOWN
SERVING = health_pb2.HealthCheckResponse.SERVING
NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING
class _HealthServicer(health_pb2.EarlyAdopterHealthServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self):
self._server_status_lock = threading.Lock()
self._server_status = {}
def Check(self, request, context):
with self._server_status_lock:
if request.service not in self._server_status:
# TODO(atash): once the Python API has a way of setting the server
# status, bring us into conformance with the health check spec by
# returning the NOT_FOUND status here.
raise NotImplementedError()
else:
return health_pb2.HealthCheckResponse(
status=self._server_status[request.service].value)
def set(service, status):
if not isinstance(status, HealthStatus):
raise TypeError('expected grpc.health.v1.health.HealthStatus '
'for argument `status` but got {}'.format(status))
with self._server_status_lock:
self._server_status[service] = status
class HealthServer(health_pb2.EarlyAdopterHealthServer):
"""Interface for the reference gRPC Python health server."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
raise NotImplementedError()
@abc.abstractmethod
def set(self, service, status):
"""Set the status of the given service.
Args:
service (str): service name of the service to set the reported status of
status (HealthStatus): status to set for the specified service
"""
raise NotImplementedError()
class _HealthServerImplementation(HealthServer):
"""Implementation for the reference gRPC Python health server."""
def __init__(self, server, servicer):
self._server = server
self._servicer = servicer
def start(self):
self._server.start()
def stop(self):
self._server.stop()
def set(self, service, status):
self._servicer.set(service, status)
def create_Health_server(port, private_key=None, certificate_chain=None):
"""Get a HealthServer instance.
Args:
port (int): port number passed through to health_pb2 server creation
routine.
private_key (str): to-be-created server's desired private key
certificate_chain (str): to-be-created server's desired certificate chain
Returns:
An instance of HealthServer (conforming thus to
EarlyAdopterHealthServer and providing a method to set server status)."""
servicer = _HealthServicer()
server = health_pb2.early_adopter_create_Health_server(
servicer, port=port, private_key=private_key,
certificate_chain=certificate_chain)
return _HealthServerImplementation(server, servicer)

@ -0,0 +1,66 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Reference implementation for health checking in gRPC Python."""
import threading
from grpc_health.health.v1 import health_pb2
class HealthServicer(health_pb2.BetaHealthServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self):
self._server_status_lock = threading.Lock()
self._server_status = {}
def Check(self, request, context):
with self._server_status_lock:
if request.service not in self._server_status:
# TODO(atash): once the Python API has a way of setting the server
# status, bring us into conformance with the health check spec by
# returning the NOT_FOUND status here.
raise NotImplementedError()
else:
return health_pb2.HealthCheckResponse(
status=self._server_status[request.service])
def set(self, service, status):
"""Sets the status of a service.
Args:
service: string, the name of the service.
NOTE, '' must be set.
status: HealthCheckResponse.status enum value indicating
the status of the service
"""
with self._server_status_lock:
self._server_status[service] = status

@ -33,11 +33,16 @@ import distutils
import glob
import os
import os.path
import shutil
import subprocess
import sys
import setuptools
from setuptools.command import build_py
from setuptools.command import sdist
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
HEALTH_PROTO = os.path.join(ROOT_DIR, '../../proto/grpc/health/v1/health.proto')
class BuildProtoModules(setuptools.Command):
@ -76,9 +81,34 @@ class BuildProtoModules(setuptools.Command):
raise Exception('{}\nOutput:\n{}'.format(e.message, e.output))
class CopyProtoModules(setuptools.Command):
"""Command to copy proto modules from grpc/src/proto."""
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
if os.path.isfile(HEALTH_PROTO):
shutil.copyfile(
HEALTH_PROTO,
os.path.join(ROOT_DIR, 'grpc_health/health/v1/health.proto'))
class BuildPy(build_py.build_py):
"""Custom project build command."""
def run(self):
self.run_command('copy_proto_modules')
self.run_command('build_proto_modules')
build_py.build_py.run(self)
class SDist(sdist.sdist):
"""Custom project build command."""
def run(self):
self.run_command('copy_proto_modules')
sdist.sdist.run(self)

@ -40,7 +40,7 @@ import setuptools
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Break import-style to ensure we can actually find our commands module.
import commands
import health_commands
_PACKAGES = (
setuptools.find_packages('.')
@ -51,22 +51,21 @@ _PACKAGE_DIRECTORIES = {
}
_INSTALL_REQUIRES = (
'grpcio>=0.11.0b0',
'grpcio>=0.13.1',
)
_SETUP_REQUIRES = _INSTALL_REQUIRES
_COMMAND_CLASS = {
'build_proto_modules': commands.BuildProtoModules,
'build_py': commands.BuildPy,
'copy_proto_modules': health_commands.CopyProtoModules,
'build_proto_modules': health_commands.BuildProtoModules,
'build_py': health_commands.BuildPy,
'sdist': health_commands.SDist,
}
setuptools.setup(
name='grpcio_health_checking',
version='0.11.0b0',
version='0.14.0b0',
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=_INSTALL_REQUIRES,
setup_requires=_SETUP_REQUIRES,
cmdclass=_COMMAND_CLASS
)

@ -88,9 +88,11 @@ void test_connect(const char *server_host, const char *client_host, int port,
int was_cancelled = 2;
grpc_call_details call_details;
char *peer;
int picked_port = 0;
if (port == 0) {
port = grpc_pick_unused_port_or_die();
picked_port = 1;
}
gpr_join_host_port(&server_hostport, server_host, port);
@ -263,6 +265,9 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_call_details_destroy(&call_details);
gpr_free(details);
if (picked_port) {
grpc_recycle_unused_port(port);
}
}
int external_dns_works(const char *host) {

@ -40,10 +40,16 @@ extern "C" {
/* pick a port number that is currently unused by either tcp or udp. return
0 on failure. */
int grpc_pick_unused_port();
int grpc_pick_unused_port(void);
/* pick a port number that is currently unused by either tcp or udp. abort
on failure. */
int grpc_pick_unused_port_or_die();
int grpc_pick_unused_port_or_die(void);
/* Return a port which was previously returned by grpc_pick_unused_port().
* Implementations of grpc_pick_unused_port() backed by a portserver may limit
* the total number of ports available; this lets a binary return its allocated
* ports back to the server if it is going to allocate a large number. */
void grpc_recycle_unused_port(int port);
#ifdef __cplusplus
}

@ -68,6 +68,31 @@ static int has_port_been_chosen(int port) {
return 0;
}
static int free_chosen_port(int port) {
size_t i;
int found = 0;
size_t found_at = 0;
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
/* Find the port and erase it from the list, then tell the server it can be
freed. */
for (i = 0; i < num_chosen_ports; i++) {
if (chosen_ports[i] == port) {
GPR_ASSERT(found == 0);
found = 1;
found_at = i;
}
}
if (found) {
chosen_ports[found_at] = chosen_ports[num_chosen_ports - 1];
num_chosen_ports--;
if (env) {
grpc_free_port_using_server(env, port);
}
}
gpr_free(env);
return found;
}
static void free_chosen_ports(void) {
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
if (env != NULL) {
@ -210,4 +235,6 @@ int grpc_pick_unused_port_or_die(void) {
return port;
}
void grpc_recycle_unused_port(int port) { GPR_ASSERT(free_chosen_port(port)); }
#endif /* GPR_POSIX_SOCKET && GRPC_TEST_PICK_PORT */

@ -71,6 +71,30 @@ static int has_port_been_chosen(int port) {
return 0;
}
static int free_chosen_port(int port) {
size_t i;
int found = 0;
size_t found_at = 0;
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
if (env != NULL) {
/* Find the port and erase it from the list, then tell the server it can be
freed. */
for (i = 0; i < num_chosen_ports; i++) {
if (chosen_ports[i] == port) {
GPR_ASSERT(found == 0);
found = 1;
found_at = i;
}
}
if (found) {
chosen_ports[found_at] = chosen_ports[num_chosen_ports - 1];
grpc_free_port_using_server(env, port);
num_chosen_ports--;
}
}
return found;
}
static void free_chosen_ports(void) {
char *env = gpr_getenv("GRPC_TEST_PORT_SERVER");
if (env != NULL) {
@ -216,4 +240,6 @@ int grpc_pick_unused_port_or_die(void) {
return port;
}
void grpc_recycle_unused_port(int port) { GPR_ASSERT(free_chosen_port(port)); }
#endif /* GPR_WINSOCK_SOCKET && GRPC_TEST_PICK_PORT */

@ -245,8 +245,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
void SetUp() GRPC_OVERRIDE {
poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
port_ = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port_;
// Setup server
ServerBuilder builder;
@ -274,6 +274,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
;
poll_overrider_.reset();
gpr_tls_set(&g_is_async_end2end_test, 0);
grpc_recycle_unused_port(port_);
}
void ResetStub() {
@ -325,6 +326,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
std::ostringstream server_address_;
int port_;
std::unique_ptr<PollingOverrider> poll_overrider_;
};

@ -56,7 +56,7 @@ class GuardValidator(object):
def __init__(self):
self.ifndef_re = re.compile(r'#ifndef ([A-Z][A-Z_1-9]*)')
self.define_re = re.compile(r'#define ([A-Z][A-Z_1-9]*)')
self.endif_c_re = re.compile(r'#endif /\* ([A-Z][A-Z_1-9]*) \*/')
self.endif_c_re = re.compile(r'#endif /\* ([A-Z][A-Z_1-9]*) (?:\\ *\n *)?\*/')
self.endif_cpp_re = re.compile(r'#endif // ([A-Z][A-Z_1-9]*)')
self.failed = False
@ -132,7 +132,7 @@ class GuardValidator(object):
# Is there a properly commented #endif?
endif_re = self.endif_cpp_re if cpp_header else self.endif_c_re
flines = fcontents.rstrip().splitlines()
match = endif_re.search(flines[-1])
match = endif_re.search('\n'.join(flines[-2:]))
if not match:
# No endif. Check if we have the last line as just '#endif' and if so
# replace it with a properly commented one.

@ -55,3 +55,7 @@ $ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build_py
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py build_ext --inplace
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/setup.py gather --test
# Build the health checker
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/src/python/grpcio_health_checking/setup.py build
$ROOT/.tox/${TOX_PYTHON_ENV}/bin/python $ROOT/src/python/grpcio_health_checking/setup.py build_py

@ -69,6 +69,10 @@ DEEP=100
# wide is the number of client channels in multi-channel tests (1 otherwise)
WIDE=64
# For most synchronous clients, DEEP*WIDE threads will be created.
SYNC_DEEP=10
SYNC_WIDE=8
def _get_secargs(is_secure):
if is_secure:
@ -81,6 +85,7 @@ def remove_nonproto_fields(scenario):
"""Remove special-purpose that contains some extra info about the scenario
but don't belong to the ScenarioConfig protobuf message"""
scenario.pop('CATEGORIES', None)
scenario.pop('CLIENT_LANGUAGE', None)
scenario.pop('SERVER_LANGUAGE', None)
return scenario
@ -89,7 +94,8 @@ def _ping_pong_scenario(name, rpc_type,
client_type, server_type,
secure=True,
use_generic_payload=False,
use_unconstrained_client=False,
unconstrained_client=None,
client_language=None,
server_language=None,
server_core_limit=0,
async_server_threads=0,
@ -130,18 +136,28 @@ def _ping_pong_scenario(name, rpc_type,
# For proto payload, only the client should get the config.
scenario['client_config']['payload_config'] = EMPTY_PROTO_PAYLOAD
if use_unconstrained_client:
if unconstrained_client:
if unconstrained_client == 'async':
deep = DEEP
wide = WIDE
elif unconstrained_client == 'sync':
deep = SYNC_DEEP
wide = SYNC_WIDE
else:
raise Exception('Illegal value of unconstrained_client option.')
scenario['num_clients'] = 0 # use as many client as available.
# TODO(jtattermusch): for SYNC_CLIENT, this will create 100*64 threads
# and that's probably too much (at least for wrapped languages).
scenario['client_config']['outstanding_rpcs_per_channel'] = DEEP
scenario['client_config']['client_channels'] = WIDE
scenario['client_config']['outstanding_rpcs_per_channel'] = deep
scenario['client_config']['client_channels'] = wide
scenario['client_config']['async_client_threads'] = 0
else:
scenario['client_config']['outstanding_rpcs_per_channel'] = 1
scenario['client_config']['client_channels'] = 1
scenario['client_config']['async_client_threads'] = 1
if client_language:
# the CLIENT_LANGUAGE field is recognized by run_performance_tests.py
scenario['CLIENT_LANGUAGE'] = client_language
if server_language:
# the SERVER_LANGUAGE field is recognized by run_performance_tests.py
scenario['SERVER_LANGUAGE'] = server_language
@ -196,27 +212,27 @@ class CXXLanguage:
yield _ping_pong_scenario(
'cpp_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure,
categories=smoketest_categories)
yield _ping_pong_scenario(
'cpp_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure)
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_unconstrained_client=True, use_generic_payload=True,
unconstrained_client='async', use_generic_payload=True,
secure=secure,
categories=smoketest_categories)
yield _ping_pong_scenario(
'cpp_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_unconstrained_client=True, use_generic_payload=True,
unconstrained_client='async', use_generic_payload=True,
server_core_limit=1, async_server_threads=1,
secure=secure)
@ -258,13 +274,13 @@ class CSharpLanguage:
yield _ping_pong_scenario(
'csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
categories=[SMOKETEST])
yield _ping_pong_scenario(
'csharp_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True)
unconstrained_client='async')
yield _ping_pong_scenario(
'csharp_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
@ -277,6 +293,22 @@ class CSharpLanguage:
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
server_language='c++', server_core_limit=1, async_server_threads=1)
yield _ping_pong_scenario(
'csharp_to_cpp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='async', server_language='c++')
yield _ping_pong_scenario(
'csharp_to_cpp_protobuf_sync_to_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='sync', server_language='c++')
yield _ping_pong_scenario(
'cpp_to_csharp_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
unconstrained_client='async', client_language='c++')
def __str__(self):
return 'csharp'
@ -313,14 +345,14 @@ class NodeLanguage:
yield _ping_pong_scenario(
'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
categories=[SMOKETEST])
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
# 'node_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
# client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
# use_unconstrained_client=True)
# unconstrained_client='async')
# TODO(jtattermusch): make this scenario work
#yield _ping_pong_scenario(
@ -369,19 +401,15 @@ class PythonLanguage:
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
categories=[SMOKETEST])
# TODO(jtattermusch):
# The qps_worker server gets thread starved with ~6400 threads, the GIL
# enforces that a single thread runs at a time, with no way to set thread
# priority. Re-evaluate after changing DEEP and WIDE.
#yield _ping_pong_scenario(
# 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# use_unconstrained_client=True)
yield _ping_pong_scenario(
'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='SYNC_SERVER',
use_unconstrained_client=True)
unconstrained_client='async')
yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
@ -420,17 +448,15 @@ class RubyLanguage:
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
categories=[SMOKETEST])
# TODO: scenario reports QPS of 0.0
#yield _ping_pong_scenario(
# 'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# use_unconstrained_client=True)
yield _ping_pong_scenario(
'ruby_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
unconstrained_client='sync')
# TODO: scenario reports QPS of 0.0
#yield _ping_pong_scenario(
# 'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# use_unconstrained_client=True)
yield _ping_pong_scenario(
'ruby_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
unconstrained_client='sync')
yield _ping_pong_scenario(
'ruby_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
@ -492,26 +518,26 @@ class JavaLanguage:
yield _ping_pong_scenario(
'java_protobuf_async_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS,
categories=smoketest_categories)
yield _ping_pong_scenario(
'java_protobuf_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
'java_generic_async_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_unconstrained_client=True, use_generic_payload=True,
unconstrained_client='async', use_generic_payload=True,
secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
yield _ping_pong_scenario(
'java_generic_async_streaming_qps_one_server_core_%s' % secstr, rpc_type='STREAMING',
client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_unconstrained_client=True, use_generic_payload=True,
unconstrained_client='async', use_generic_payload=True,
async_server_threads=1,
secure=secure, warmup_seconds=JAVA_WARMUP_SECONDS)
@ -560,25 +586,28 @@ class GoLanguage:
secure=secure,
categories=smoketest_categories)
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
'go_protobuf_sync_unary_qps_unconstrained_%s' % secstr, rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure,
categories=smoketest_categories)
# unconstrained_client='async' is intended (client uses goroutines)
yield _ping_pong_scenario(
'go_protobuf_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
use_unconstrained_client=True,
unconstrained_client='async',
secure=secure)
# unconstrained_client='async' is intended (client uses goroutines)
# ASYNC_GENERIC_SERVER for Go actually uses a sync streaming server,
# but that's mostly because of lack of better name of the enum value.
yield _ping_pong_scenario(
'go_generic_sync_streaming_qps_unconstrained_%s' % secstr, rpc_type='STREAMING',
client_type='SYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
use_unconstrained_client=True, use_generic_payload=True,
unconstrained_client='async', use_generic_payload=True,
secure=secure)
# TODO(jtattermusch): add scenarios go vs C++

@ -304,7 +304,11 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
# 'SERVER_LANGUAGE' is an indicator for this script to pick
# a server in different language.
custom_server_lang = scenario_json.get('SERVER_LANGUAGE', None)
custom_client_lang = scenario_json.get('CLIENT_LANGUAGE', None)
scenario_json = scenario_config.remove_nonproto_fields(scenario_json)
if custom_server_lang and custom_client_lang:
raise Exception('Cannot set both custom CLIENT_LANGUAGE and SERVER_LANGUAGE'
'in the same scenario')
if custom_server_lang:
if not workers_by_lang.get(custom_server_lang, []):
print 'Warning: Skipping scenario %s as' % scenario_json['name']
@ -314,6 +318,16 @@ def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
for idx in range(0, scenario_json['num_servers']):
# replace first X workers by workers of a different language
workers[idx] = workers_by_lang[custom_server_lang][idx]
if custom_client_lang:
if not workers_by_lang.get(custom_client_lang, []):
print 'Warning: Skipping scenario %s as' % scenario_json['name']
print('CLIENT_LANGUAGE is set to %s yet the language has '
'not been selected with -l' % custom_client_lang)
continue
for idx in range(scenario_json['num_servers'], len(workers)):
# replace all client workers by workers of a different language,
# leave num_server workers as they are server workers.
workers[idx] = workers_by_lang[custom_client_lang][idx]
scenario = create_scenario_jobspec(scenario_json,
workers,
remote_host=remote_host,
@ -360,8 +374,8 @@ argp.add_argument('--bq_result_table', default=None, type=str,
help='Bigquery "dataset.table" to upload results to.')
argp.add_argument('--category',
choices=['smoketest','all'],
default='smoketest',
help='Select a category of tests to run. Smoketest runs by default.')
default='all',
help='Select a category of tests to run.')
argp.add_argument('--netperf',
default=False,
action='store_const',

@ -383,7 +383,9 @@ class PythonLanguage(object):
with open('src/python/grpcio/tests/tests.json') as tests_json_file:
tests_json = json.load(tests_json_file)
environment = dict(_FORCE_ENVIRON_FOR_WRAPPERS)
environment['PYTHONPATH'] = os.path.abspath('src/python/gens')
environment['PYTHONPATH'] = '{}:{}'.format(
os.path.abspath('src/python/gens'),
os.path.abspath('src/python/grpcio_health_checking'))
if self.config.build_config != 'gcov':
return [self.config.job_spec(
['tools/run_tests/run_python.sh', self._tox_env],

Loading…
Cancel
Save