Merge remote-tracking branch 'upstream/master'

pull/6529/head
thinkerou 9 years ago
commit 161b03f5cf
  1. 1
      src/core/ext/client_config/subchannel_call_holder.c
  2. 51
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 15
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  4. 2
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  5. 4
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  6. 13
      test/cpp/interop/client.cc
  7. 329
      test/cpp/interop/interop_client.cc
  8. 62
      test/cpp/interop/interop_client.h
  9. 59
      test/cpp/interop/stress_interop_client.cc
  10. 28
      test/cpp/interop/stress_interop_client.h
  11. 21
      test/cpp/interop/stress_test.cc
  12. 2
      tools/gce/create_linux_performance_worker.sh
  13. 3
      tools/gce/linux_performance_worker_init.sh
  14. 43
      tools/run_tests/performance/bq_upload_result.py
  15. 45
      tools/run_tests/performance/run_netperf.sh
  16. 47
      tools/run_tests/run_performance_tests.py

@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
fail_locked(exec_ctx, holder);
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */

@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteFailure()
public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusFails()
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
[Test]
public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
requestStream.CompleteAsync();
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
fakeCall.SendCompletionHandler(true);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestFails()
public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterReceivingStatusFails()
public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestFails()
public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);

@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// Indicates that steaming call has finished.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
}
}
protected override void CheckSendingAllowed(bool allowFinished)
{
base.CheckSendingAllowed(true);
// throwing RpcException if we already received status on client
// side makes the most sense.
// Note that this throws even for StatusCode.OK.
if (!allowFinished && finishedStatus.HasValue)
{
throw new RpcException(finishedStatus.Value.Status);
}
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>

@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
protected void CheckSendingAllowed(bool allowFinished)
protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();

@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
{
// Deadline was reached before write has started. Eat the exception and continue.
}
catch (RpcException)
{
// Deadline was reached before write has started. Eat the exception and continue.
}
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.

@ -81,6 +81,14 @@ DEFINE_string(default_service_account, "",
DEFINE_string(service_account_key_file, "",
"Path to service account json key file.");
DEFINE_string(oauth_scope, "", "Scope for OAuth tokens.");
DEFINE_bool(do_not_abort_on_transient_failures, false,
"If set to 'true', abort() is not called in case of transient "
"failures (i.e failures that are temporary and will likely go away "
"on retrying; like a temporary connection failure) and an error "
"message is printed instead. Note that this flag just controls "
"whether abort() is called or not. It does not control whether the "
"test is retried in case of transient failures (and currently the "
"interop tests are not retried even if this flag is set to true)");
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
@ -89,8 +97,9 @@ int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
gpr_log(GPR_INFO, "Testing these cases: %s", FLAGS_test_case.c_str());
int ret = 0;
grpc::testing::InteropClient client(
CreateChannelForTestCase(FLAGS_test_case));
grpc::testing::InteropClient client(CreateChannelForTestCase(FLAGS_test_case),
true,
FLAGS_do_not_abort_on_transient_failures);
if (FLAGS_test_case == "empty_unary") {
client.DoEmpty();
} else if (FLAGS_test_case == "large_unary") {

@ -134,23 +134,43 @@ void InteropClient::Reset(std::shared_ptr<Channel> channel) {
serviceStub_.Reset(channel);
}
InteropClient::InteropClient(std::shared_ptr<Channel> channel)
: serviceStub_(channel, true) {}
InteropClient::InteropClient(std::shared_ptr<Channel> channel,
bool new_stub_every_test_case)
: serviceStub_(channel, new_stub_every_test_case) {}
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures)
: serviceStub_(channel, new_stub_every_test_case),
do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
bool InteropClient::AssertStatusOk(const Status& s) {
if (s.ok()) {
return;
return true;
}
gpr_log(GPR_ERROR, "Error status code: %d, message: %s", s.error_code(),
s.error_message().c_str());
GPR_ASSERT(0);
// Note: At this point, s.error_code is definitely not StatusCode::OK (we
// already checked for s.ok() above). So, the following will call abort()
// (unless s.error_code() corresponds to a transient failure and
// 'do_not_abort_on_transient_failures' is true)
return AssertStatusCode(s, StatusCode::OK);
}
void InteropClient::DoEmpty() {
bool InteropClient::AssertStatusCode(const Status& s,
StatusCode expected_code) {
if (s.error_code() == expected_code) {
return true;
}
gpr_log(GPR_ERROR, "Error status code: %d (expected: %d), message: %s",
s.error_code(), expected_code, s.error_message().c_str());
// In case of transient transient/retryable failures (like a broken
// connection) we may or may not abort (see TransientFailureOrAbort())
if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
return TransientFailureOrAbort();
}
abort();
}
bool InteropClient::DoEmpty() {
gpr_log(GPR_DEBUG, "Sending an empty rpc...");
Empty request = Empty::default_instance();
@ -158,17 +178,21 @@ void InteropClient::DoEmpty() {
ClientContext context;
Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
gpr_log(GPR_DEBUG, "Empty rpc done.");
return true;
}
void InteropClient::PerformLargeUnary(SimpleRequest* request,
bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response) {
PerformLargeUnary(request, response, NoopChecks);
return PerformLargeUnary(request, response, NoopChecks);
}
void InteropClient::PerformLargeUnary(SimpleRequest* request,
bool InteropClient::PerformLargeUnary(SimpleRequest* request,
SimpleResponse* response,
CheckerFn custom_checks_fn) {
ClientContext context;
@ -180,7 +204,9 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
custom_checks_fn(inspector, request, response);
@ -203,9 +229,11 @@ void InteropClient::PerformLargeUnary(SimpleRequest* request,
default:
GPR_ASSERT(false);
}
return true;
}
void InteropClient::DoComputeEngineCreds(
bool InteropClient::DoComputeEngineCreds(
const grpc::string& default_service_account,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
@ -215,7 +243,11 @@ void InteropClient::DoComputeEngineCreds(
request.set_fill_username(true);
request.set_fill_oauth_scope(true);
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
if (!PerformLargeUnary(&request, &response)) {
return false;
}
gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
GPR_ASSERT(!response.username().empty());
@ -224,9 +256,10 @@ void InteropClient::DoComputeEngineCreds(
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
return true;
}
void InteropClient::DoOauth2AuthToken(const grpc::string& username,
bool InteropClient::DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope) {
gpr_log(GPR_DEBUG,
"Sending a unary rpc with raw oauth2 access token credentials ...");
@ -239,16 +272,20 @@ void InteropClient::DoOauth2AuthToken(const grpc::string& username,
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(!response.oauth_scope().empty());
GPR_ASSERT(username == response.username());
const char* oauth_scope_str = response.oauth_scope().c_str();
GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
return true;
}
void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
bool InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
SimpleRequest request;
SimpleResponse response;
@ -263,35 +300,47 @@ void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
return true;
}
void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
gpr_log(GPR_DEBUG,
"Sending a large unary rpc with JWT token credentials ...");
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
if (!PerformLargeUnary(&request, &response)) {
return false;
}
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
return true;
}
void InteropClient::DoLargeUnary() {
bool InteropClient::DoLargeUnary() {
gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
SimpleRequest request;
SimpleResponse response;
request.set_response_type(PayloadType::COMPRESSABLE);
PerformLargeUnary(&request, &response);
if (!PerformLargeUnary(&request, &response)) {
return false;
}
gpr_log(GPR_DEBUG, "Large unary done.");
return true;
}
void InteropClient::DoLargeCompressedUnary() {
bool InteropClient::DoLargeCompressedUnary() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@ -307,14 +356,32 @@ void InteropClient::DoLargeCompressedUnary() {
SimpleResponse response;
request.set_response_type(payload_types[i]);
request.set_response_compression(compression_types[j]);
PerformLargeUnary(&request, &response, CompressionChecks);
if (!PerformLargeUnary(&request, &response, CompressionChecks)) {
gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix);
gpr_free(log_suffix);
return false;
}
gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix);
gpr_free(log_suffix);
}
}
return true;
}
// Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
// false
bool InteropClient::TransientFailureOrAbort() {
if (do_not_abort_on_transient_failures_) {
return false;
}
abort();
}
void InteropClient::DoRequestStreaming() {
bool InteropClient::DoRequestStreaming() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
ClientContext context;
@ -328,18 +395,24 @@ void InteropClient::DoRequestStreaming() {
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
Payload* payload = request.mutable_payload();
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
GPR_ASSERT(stream->Write(request));
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
return TransientFailureOrAbort();
}
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
Status s = stream->Finish();
if (!AssertStatusOk(s)) {
return false;
}
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Request streaming done.");
return true;
}
void InteropClient::DoResponseStreaming() {
bool InteropClient::DoResponseStreaming() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc ...");
ClientContext context;
@ -358,13 +431,27 @@ void InteropClient::DoResponseStreaming() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
if (i < response_stream_sizes.size()) {
// stream->Read() failed before reading all the expected messages. This is
// most likely due to connection failure.
gpr_log(GPR_ERROR,
"DoResponseStreaming(): Read fewer streams (%d) than "
"response_stream_sizes.size() (%d)",
i, response_stream_sizes.size());
return TransientFailureOrAbort();
}
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
gpr_log(GPR_DEBUG, "Response streaming done.");
return true;
}
void InteropClient::DoResponseCompressedStreaming() {
bool InteropClient::DoResponseCompressedStreaming() {
const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
@ -432,17 +519,31 @@ void InteropClient::DoResponseCompressedStreaming() {
++k;
}
GPR_ASSERT(response_stream_sizes.size() == k);
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix);
gpr_free(log_suffix);
if (k < response_stream_sizes.size()) {
// stream->Read() failed before reading all the expected messages. This
// is most likely due to a connection failure.
gpr_log(GPR_ERROR,
"DoResponseCompressedStreaming(): Responses read (k=%d) is "
"less than the expected messages (i.e "
"response_stream_sizes.size() (%d)). (i=%d, j=%d)",
k, response_stream_sizes.size(), i, j);
return TransientFailureOrAbort();
}
Status s = stream->Finish();
if (!AssertStatusOk(s)) {
return false;
}
}
}
return true;
}
void InteropClient::DoResponseStreamingWithSlowConsumer() {
bool InteropClient::DoResponseStreamingWithSlowConsumer() {
gpr_log(GPR_DEBUG, "Receiving response steaming rpc with slow consumer ...");
ClientContext context;
@ -464,14 +565,26 @@ void InteropClient::DoResponseStreamingWithSlowConsumer() {
usleep(kReceiveDelayMilliSeconds * 1000);
++i;
}
GPR_ASSERT(kNumResponseMessages == i);
if (i < kNumResponseMessages) {
gpr_log(GPR_ERROR,
"DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
"less than the expected messages (i.e kNumResponseMessages = %d)",
i, kNumResponseMessages);
return TransientFailureOrAbort();
}
Status s = stream->Finish();
if (!AssertStatusOk(s)) {
return false;
}
AssertOkOrPrintErrorStatus(s);
gpr_log(GPR_DEBUG, "Response streaming done.");
return true;
}
void InteropClient::DoHalfDuplex() {
bool InteropClient::DoHalfDuplex() {
gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
ClientContext context;
@ -483,7 +596,11 @@ void InteropClient::DoHalfDuplex() {
ResponseParameters* response_parameter = request.add_response_parameters();
for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
GPR_ASSERT(stream->Write(request));
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
return TransientFailureOrAbort();
}
}
stream->WritesDone();
@ -494,13 +611,27 @@ void InteropClient::DoHalfDuplex() {
grpc::string(response_stream_sizes[i], '\0'));
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
if (i < response_stream_sizes.size()) {
// stream->Read() failed before reading all the expected messages. This is
// most likely due to a connection failure
gpr_log(GPR_ERROR,
"DoHalfDuplex(): Responses read (i=%d) are less than the expected "
"number of messages response_stream_sizes.size() (%d)",
i, response_stream_sizes.size());
return TransientFailureOrAbort();
}
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
return true;
}
void InteropClient::DoPingPong() {
bool InteropClient::DoPingPong() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@ -513,23 +644,39 @@ void InteropClient::DoPingPong() {
ResponseParameters* response_parameter = request.add_response_parameters();
Payload* payload = request.mutable_payload();
StreamingOutputCallResponse response;
for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
response_parameter->set_size(response_stream_sizes[i]);
payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
GPR_ASSERT(stream->Write(request));
GPR_ASSERT(stream->Read(&response));
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
return TransientFailureOrAbort();
}
if (!stream->Read(&response)) {
gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
return TransientFailureOrAbort();
}
GPR_ASSERT(response.payload().body() ==
grpc::string(response_stream_sizes[i], '\0'));
}
stream->WritesDone();
GPR_ASSERT(!stream->Read(&response));
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
gpr_log(GPR_DEBUG, "Ping pong streaming done.");
return true;
}
void InteropClient::DoCancelAfterBegin() {
bool InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
ClientContext context;
@ -542,11 +689,16 @@ void InteropClient::DoCancelAfterBegin() {
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
if (!AssertStatusCode(s, StatusCode::CANCELLED)) {
return false;
}
gpr_log(GPR_DEBUG, "Canceling streaming done.");
return true;
}
void InteropClient::DoCancelAfterFirstResponse() {
bool InteropClient::DoCancelAfterFirstResponse() {
gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
ClientContext context;
@ -560,17 +712,27 @@ void InteropClient::DoCancelAfterFirstResponse() {
response_parameter->set_size(31415);
request.mutable_payload()->set_body(grpc::string(27182, '\0'));
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Write(request));
GPR_ASSERT(stream->Read(&response));
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
return TransientFailureOrAbort();
}
if (!stream->Read(&response)) {
gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
return TransientFailureOrAbort();
}
GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
gpr_log(GPR_DEBUG, "Trying to cancel...");
context.TryCancel();
Status s = stream->Finish();
gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
return true;
}
void InteropClient::DoTimeoutOnSleepingServer() {
bool InteropClient::DoTimeoutOnSleepingServer() {
gpr_log(GPR_DEBUG,
"Sending Ping Pong streaming rpc with a short deadline...");
@ -587,11 +749,15 @@ void InteropClient::DoTimeoutOnSleepingServer() {
stream->Write(request);
Status s = stream->Finish();
GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED)) {
return false;
}
gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
return true;
}
void InteropClient::DoEmptyStream() {
bool InteropClient::DoEmptyStream() {
gpr_log(GPR_DEBUG, "Starting empty_stream.");
ClientContext context;
@ -601,12 +767,17 @@ void InteropClient::DoEmptyStream() {
stream->WritesDone();
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Read(&response) == false);
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
gpr_log(GPR_DEBUG, "empty_stream done.");
return true;
}
void InteropClient::DoStatusWithMessage() {
bool InteropClient::DoStatusWithMessage() {
gpr_log(GPR_DEBUG,
"Sending RPC with a request for status code 2 and message");
@ -620,12 +791,16 @@ void InteropClient::DoStatusWithMessage() {
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN)) {
return false;
}
GPR_ASSERT(s.error_message() == test_msg);
gpr_log(GPR_DEBUG, "Done testing Status and Message");
return true;
}
void InteropClient::DoCustomMetadata() {
bool InteropClient::DoCustomMetadata() {
const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
const grpc::string kInitialMetadataValue("test_initial_metadata_value");
const grpc::string kEchoTrailingBinMetadataKey(
@ -645,7 +820,10 @@ void InteropClient::DoCustomMetadata() {
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@ -675,14 +853,29 @@ void InteropClient::DoCustomMetadata() {
grpc::string payload(kLargeRequestSize, '\0');
request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
StreamingOutputCallResponse response;
GPR_ASSERT(stream->Write(request));
if (!stream->Write(request)) {
gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
return TransientFailureOrAbort();
}
stream->WritesDone();
GPR_ASSERT(stream->Read(&response));
if (!stream->Read(&response)) {
gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
return TransientFailureOrAbort();
}
GPR_ASSERT(response.payload().body() ==
grpc::string(kLargeResponseSize, '\0'));
GPR_ASSERT(!stream->Read(&response));
Status s = stream->Finish();
AssertOkOrPrintErrorStatus(s);
if (!AssertStatusOk(s)) {
return false;
}
const auto& server_initial_metadata = context.GetServerInitialMetadata();
auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
GPR_ASSERT(iter != server_initial_metadata.end());
@ -695,6 +888,8 @@ void InteropClient::DoCustomMetadata() {
gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
}
return true;
}
} // namespace testing

@ -51,41 +51,42 @@ using CheckerFn =
class InteropClient {
public:
explicit InteropClient(std::shared_ptr<Channel> channel);
explicit InteropClient(
std::shared_ptr<Channel> channel,
bool new_stub_every_test_case); // If new_stub_every_test_case is true,
// a new TestService::Stub object is
// created for every test case below
/// If new_stub_every_test_case is true, a new TestService::Stub object is
/// created for every test case
/// If do_not_abort_on_transient_failures is true, abort() is not called in
/// case of transient failures (like connection failures)
explicit InteropClient(std::shared_ptr<Channel> channel,
bool new_stub_every_test_case,
bool do_not_abort_on_transient_failures);
~InteropClient() {}
void Reset(std::shared_ptr<Channel> channel);
void DoEmpty();
void DoLargeUnary();
void DoLargeCompressedUnary();
void DoPingPong();
void DoHalfDuplex();
void DoRequestStreaming();
void DoResponseStreaming();
void DoResponseCompressedStreaming();
void DoResponseStreamingWithSlowConsumer();
void DoCancelAfterBegin();
void DoCancelAfterFirstResponse();
void DoTimeoutOnSleepingServer();
void DoEmptyStream();
void DoStatusWithMessage();
void DoCustomMetadata();
bool DoEmpty();
bool DoLargeUnary();
bool DoLargeCompressedUnary();
bool DoPingPong();
bool DoHalfDuplex();
bool DoRequestStreaming();
bool DoResponseStreaming();
bool DoResponseCompressedStreaming();
bool DoResponseStreamingWithSlowConsumer();
bool DoCancelAfterBegin();
bool DoCancelAfterFirstResponse();
bool DoTimeoutOnSleepingServer();
bool DoEmptyStream();
bool DoStatusWithMessage();
bool DoCustomMetadata();
// Auth tests.
// username is a string containing the user email
void DoJwtTokenCreds(const grpc::string& username);
void DoComputeEngineCreds(const grpc::string& default_service_account,
bool DoJwtTokenCreds(const grpc::string& username);
bool DoComputeEngineCreds(const grpc::string& default_service_account,
const grpc::string& oauth_scope);
// username the GCE default service account email
void DoOauth2AuthToken(const grpc::string& username,
bool DoOauth2AuthToken(const grpc::string& username,
const grpc::string& oauth_scope);
// username is a string containing the user email
void DoPerRpcCreds(const grpc::string& json_key);
bool DoPerRpcCreds(const grpc::string& json_key);
private:
class ServiceStub {
@ -105,13 +106,18 @@ class InteropClient {
// Get() call
};
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response);
/// Run \a custom_check_fn as an additional check.
void PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
bool PerformLargeUnary(SimpleRequest* request, SimpleResponse* response,
CheckerFn custom_checks_fn);
void AssertOkOrPrintErrorStatus(const Status& s);
bool AssertStatusOk(const Status& s);
bool AssertStatusCode(const Status& s, StatusCode expected_code);
bool TransientFailureOrAbort();
ServiceStub serviceStub_;
/// If true, abort() is not called for transient failures
bool do_not_abort_on_transient_failures_;
};
} // namespace testing

@ -84,11 +84,12 @@ StressTestInteropClient::StressTestInteropClient(
int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector, long test_duration_secs,
long sleep_duration_ms)
long sleep_duration_ms, bool do_not_abort_on_transient_failures)
: test_id_(test_id),
server_address_(server_address),
channel_(channel),
interop_client_(new InteropClient(channel, false)),
interop_client_(new InteropClient(channel, false,
do_not_abort_on_transient_failures)),
test_selector_(test_selector),
test_duration_secs_(test_duration_secs),
sleep_duration_ms_(sleep_duration_ms) {}
@ -126,31 +127,67 @@ void StressTestInteropClient::MainLoop(std::shared_ptr<QpsGauge> qps_gauge) {
}
}
// TODO(sree): Add all interop tests
void StressTestInteropClient::RunTest(TestCaseType test_case) {
bool StressTestInteropClient::RunTest(TestCaseType test_case) {
bool is_success = false;
switch (test_case) {
case EMPTY_UNARY: {
interop_client_->DoEmpty();
is_success = interop_client_->DoEmpty();
break;
}
case LARGE_UNARY: {
interop_client_->DoLargeUnary();
is_success = interop_client_->DoLargeUnary();
break;
}
case LARGE_COMPRESSED_UNARY: {
interop_client_->DoLargeCompressedUnary();
is_success = interop_client_->DoLargeCompressedUnary();
break;
}
case CLIENT_STREAMING: {
interop_client_->DoRequestStreaming();
is_success = interop_client_->DoRequestStreaming();
break;
}
case SERVER_STREAMING: {
interop_client_->DoResponseStreaming();
is_success = interop_client_->DoResponseStreaming();
break;
}
case SERVER_COMPRESSED_STREAMING: {
is_success = interop_client_->DoResponseCompressedStreaming();
break;
}
case SLOW_CONSUMER: {
is_success = interop_client_->DoResponseStreamingWithSlowConsumer();
break;
}
case HALF_DUPLEX: {
is_success = interop_client_->DoHalfDuplex();
break;
}
case PING_PONG: {
is_success = interop_client_->DoPingPong();
break;
}
case CANCEL_AFTER_BEGIN: {
is_success = interop_client_->DoCancelAfterBegin();
break;
}
case CANCEL_AFTER_FIRST_RESPONSE: {
is_success = interop_client_->DoCancelAfterFirstResponse();
break;
}
case TIMEOUT_ON_SLEEPING_SERVER: {
is_success = interop_client_->DoTimeoutOnSleepingServer();
break;
}
case EMPTY_STREAM: {
interop_client_->DoEmptyStream();
is_success = interop_client_->DoEmptyStream();
break;
}
case STATUS_CODE_AND_MESSAGE: {
is_success = interop_client_->DoStatusWithMessage();
break;
}
case CUSTOM_METADATA: {
is_success = interop_client_->DoCustomMetadata();
break;
}
default: {
@ -159,6 +196,8 @@ void StressTestInteropClient::RunTest(TestCaseType test_case) {
break;
}
}
return is_success;
}
} // namespace testing

@ -49,7 +49,6 @@ namespace testing {
using std::pair;
using std::vector;
// TODO(sreek): Add more test cases here in future
enum TestCaseType {
UNKNOWN_TEST = -1,
EMPTY_UNARY = 0,
@ -57,7 +56,16 @@ enum TestCaseType {
LARGE_COMPRESSED_UNARY = 2,
CLIENT_STREAMING = 3,
SERVER_STREAMING = 4,
EMPTY_STREAM = 5
SERVER_COMPRESSED_STREAMING = 5,
SLOW_CONSUMER = 6,
HALF_DUPLEX = 7,
PING_PONG = 8,
CANCEL_AFTER_BEGIN = 9,
CANCEL_AFTER_FIRST_RESPONSE = 10,
TIMEOUT_ON_SLEEPING_SERVER = 11,
EMPTY_STREAM = 12,
STATUS_CODE_AND_MESSAGE = 13,
CUSTOM_METADATA = 14
};
const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
@ -66,7 +74,16 @@ const vector<pair<TestCaseType, grpc::string>> kTestCaseList = {
{LARGE_COMPRESSED_UNARY, "large_compressed_unary"},
{CLIENT_STREAMING, "client_streaming"},
{SERVER_STREAMING, "server_streaming"},
{EMPTY_STREAM, "empty_stream"}};
{SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"},
{SLOW_CONSUMER, "slow_consumer"},
{HALF_DUPLEX, "half_duplex"},
{PING_PONG, "ping_pong"},
{CANCEL_AFTER_BEGIN, "cancel_after_begin"},
{CANCEL_AFTER_FIRST_RESPONSE, "cancel_after_first_response"},
{TIMEOUT_ON_SLEEPING_SERVER, "timeout_on_sleeping_server"},
{EMPTY_STREAM, "empty_stream"},
{STATUS_CODE_AND_MESSAGE, "status_code_and_message"},
{CUSTOM_METADATA, "custom_metadata"}};
class WeightedRandomTestSelector {
public:
@ -87,14 +104,15 @@ class StressTestInteropClient {
StressTestInteropClient(int test_id, const grpc::string& server_address,
std::shared_ptr<Channel> channel,
const WeightedRandomTestSelector& test_selector,
long test_duration_secs, long sleep_duration_ms);
long test_duration_secs, long sleep_duration_ms,
bool do_not_abort_on_transient_failures);
// The main function. Use this as the thread entry point.
// qps_gauge is the QpsGauge to record the requests per second metric
void MainLoop(std::shared_ptr<QpsGauge> qps_gauge);
private:
void RunTest(TestCaseType test_case);
bool RunTest(TestCaseType test_case);
int test_id_;
const grpc::string& server_address_;

@ -89,7 +89,16 @@ DEFINE_string(test_cases, "",
" large_compressed_unary\n"
" client_streaming\n"
" server_streaming\n"
" server_compressed_streaming\n"
" slow_consumer\n"
" half_duplex\n"
" ping_pong\n"
" cancel_after_begin\n"
" cancel_after_first_response\n"
" timeout_on_sleeping_server\n"
" empty_stream\n"
" status_code_and_message\n"
" custom_metadata\n"
" Example: \"empty_unary:20,large_unary:10,empty_stream:70\"\n"
" The above will execute 'empty_unary', 20% of the time,"
" 'large_unary', 10% of the time and 'empty_stream' the remaining"
@ -101,6 +110,10 @@ DEFINE_int32(log_level, GPR_LOG_SEVERITY_INFO,
"The choices are: 0 (GPR_LOG_SEVERITY_DEBUG), 1 "
"(GPR_LOG_SEVERITY_INFO) and 2 (GPR_LOG_SEVERITY_ERROR)");
DEFINE_bool(do_not_abort_on_transient_failures, true,
"If set to 'true', abort() is not called in case of transient "
"failures like temporary connection failures.");
using grpc::testing::kTestCaseList;
using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl;
@ -189,6 +202,12 @@ void LogParameterInfo(const std::vector<grpc::string>& addresses,
gpr_log(GPR_INFO, "test_cases : %s", FLAGS_test_cases.c_str());
gpr_log(GPR_INFO, "sleep_duration_ms: %d", FLAGS_sleep_duration_ms);
gpr_log(GPR_INFO, "test_duration_secs: %d", FLAGS_test_duration_secs);
gpr_log(GPR_INFO, "num_channels_per_server: %d",
FLAGS_num_channels_per_server);
gpr_log(GPR_INFO, "num_stubs_per_channel: %d", FLAGS_num_stubs_per_channel);
gpr_log(GPR_INFO, "log_level: %d", FLAGS_log_level);
gpr_log(GPR_INFO, "do_not_abort_on_transient_failures: %s",
FLAGS_do_not_abort_on_transient_failures ? "true" : "false");
int num = 0;
for (auto it = addresses.begin(); it != addresses.end(); it++) {
@ -272,7 +291,7 @@ int main(int argc, char** argv) {
stub_idx++) {
StressTestInteropClient* client = new StressTestInteropClient(
++thread_idx, *it, channel, test_selector, FLAGS_test_duration_secs,
FLAGS_sleep_duration_ms);
FLAGS_sleep_duration_ms, FLAGS_do_not_abort_on_transient_failures);
bool is_already_created = false;
// QpsGauge name

@ -50,7 +50,7 @@ gcloud compute instances create $INSTANCE_NAME \
--machine-type $MACHINE_TYPE \
--image ubuntu-15-10 \
--boot-disk-size 300 \
--scope https://www.googleapis.com/auth/bigquery
--scopes https://www.googleapis.com/auth/bigquery
echo 'Created GCE instance, waiting 60 seconds for it to come online.'
sleep 60

@ -77,6 +77,9 @@ sudo apt-get install -y \
# perftools
sudo apt-get install -y google-perftools libgoogle-perftools-dev
# netperf
sudo apt-get install -y netperf
# C++ dependencies
sudo apt-get install -y libgflags-dev libgtest-dev libc++-dev clang

@ -48,20 +48,47 @@ import big_query_utils
_PROJECT_ID='grpc-testing'
def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file):
def _upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, result_file):
with open(result_file, 'r') as f:
(col1, col2, col3) = f.read().split(',')
latency50 = float(col1.strip()) * 1000
latency90 = float(col2.strip()) * 1000
latency99 = float(col3.strip()) * 1000
scenario_result = {
'scenario': {
'name': 'netperf_tcp_rr'
},
'summary': {
'latency50': latency50,
'latency90': latency90,
'latency99': latency99
}
}
bq = big_query_utils.create_big_query()
_create_results_table(bq, dataset_id, table_id)
if not _insert_result(bq, dataset_id, table_id, scenario_result, flatten=False):
print 'Error uploading result to bigquery.'
sys.exit(1)
def _upload_scenario_result_to_bigquery(dataset_id, table_id, result_file):
with open(result_file, 'r') as f:
scenario_result = json.loads(f.read())
bq = big_query_utils.create_big_query()
_create_results_table(bq, dataset_id, table_id)
if not _insert_result(bq, dataset_id, table_id, scenario_result):
print 'Error uploading result to bigquery.'
sys.exit(1)
def _insert_result(bq, dataset_id, table_id, scenario_result):
_flatten_result_inplace(scenario_result)
def _insert_result(bq, dataset_id, table_id, scenario_result, flatten=True):
if flatten:
_flatten_result_inplace(scenario_result)
_populate_metadata_inplace(scenario_result)
row = big_query_utils.make_row(str(uuid.uuid4()), scenario_result)
return big_query_utils.insert_rows(bq,
@ -127,9 +154,17 @@ argp.add_argument('--bq_result_table', required=True, default=None, type=str,
help='Bigquery "dataset.table" to upload results to.')
argp.add_argument('--file_to_upload', default='scenario_result.json', type=str,
help='Report file to upload.')
argp.add_argument('--file_format',
choices=['scenario_result','netperf_latency_csv'],
default='scenario_result',
help='Format of the file to upload.')
args = argp.parse_args()
dataset_id, table_id = args.bq_result_table.split('.', 2)
_upload_scenario_result_to_bigquery(dataset_id, table_id, args.file_to_upload)
if args.file_format == 'netperf_latency_csv':
_upload_netperf_latency_csv_to_bigquery(dataset_id, table_id, args.file_to_upload)
else:
_upload_scenario_result_to_bigquery(dataset_id, table_id, args.file_to_upload)
print 'Successfully uploaded %s to BigQuery.\n' % args.file_to_upload

@ -0,0 +1,45 @@
#!/bin/bash
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set -ex
cd $(dirname $0)/../../..
netperf >netperf_latency.txt -P 0 -t TCP_RR -H "$NETPERF_SERVER_HOST" -- -r 1,1 -o P50_LATENCY,P90_LATENCY,P99_LATENCY
cat netperf_latency.txt
if [ "$BQ_RESULT_TABLE" != "" ]
then
tools/run_tests/performance/bq_upload_result.py \
--file_to_upload=netperf_latency.txt \
--file_format=netperf_latency_csv \
--bq_result_table="$BQ_RESULT_TABLE"
fi

@ -131,6 +131,25 @@ def create_quit_jobspec(workers, remote_host=None):
verbose_success=True)
def create_netperf_jobspec(server_host='localhost', client_host=None,
bq_result_table=None):
"""Runs netperf benchmark."""
cmd = 'NETPERF_SERVER_HOST="%s" ' % server_host
if bq_result_table:
cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
cmd += 'tools/run_tests/performance/run_netperf.sh'
if client_host:
user_at_host = '%s@%s' % (_REMOTE_HOST_USERNAME, client_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (user_at_host, pipes.quote(cmd))
return jobset.JobSpec(
cmdline=[cmd],
shortname='netperf',
timeout_seconds=60,
shell=True,
verbose_success=True)
def archive_repo(languages):
"""Archives local version of repo including submodules."""
cmdline=['tar', '-cf', '../grpc.tar', '../grpc/']
@ -244,12 +263,28 @@ def start_qpsworkers(languages, worker_hosts):
def create_scenarios(languages, workers_by_lang, remote_host=None, regex='.*',
category='all', bq_result_table=None):
category='all', bq_result_table=None,
netperf=False, netperf_hosts=[]):
"""Create jobspecs for scenarios to run."""
all_workers = [worker
for workers in workers_by_lang.values()
for worker in workers]
scenarios = []
if netperf:
if not netperf_hosts:
netperf_server='localhost'
netperf_client=None
elif len(netperf_hosts) == 1:
netperf_server=netperf_hosts[0]
netperf_client=netperf_hosts[0]
else:
netperf_server=netperf_hosts[0]
netperf_client=netperf_hosts[1]
scenarios.append(create_netperf_jobspec(server_host=netperf_server,
client_host=netperf_client,
bq_result_table=bq_result_table))
for language in languages:
for scenario_json in language.scenarios():
if re.search(args.regex, scenario_json['name']):
@ -316,6 +351,11 @@ argp.add_argument('--category',
choices=['smoketest','all'],
default='smoketest',
help='Select a category of tests to run. Smoketest runs by default.')
argp.add_argument('--netperf',
default=False,
action='store_const',
const=True,
help='Run netperf benchmark as one of the scenarios.')
args = argp.parse_args()
@ -360,7 +400,10 @@ try:
remote_host=args.remote_driver_host,
regex=args.regex,
category=args.category,
bq_result_table=args.bq_result_table)
bq_result_table=args.bq_result_table,
netperf=args.netperf,
netperf_hosts=args.remote_worker_host)
if not scenarios:
raise Exception('No scenarios to run')

Loading…
Cancel
Save