Merge github.com:grpc/grpc into we-dont-need-no-backup

pull/1888/head
Craig Tiller 10 years ago
commit 63f6136431
  1. 45
      Makefile
  2. 16
      build.json
  3. 5
      include/grpc++/async_unary_call.h
  4. 8
      include/grpc++/impl/call.h
  5. 16
      src/compiler/cpp_generator.cc
  6. 4
      src/ruby/bin/math_server.rb
  7. 2
      src/ruby/lib/grpc/core/time_consts.rb
  8. 6
      src/ruby/lib/grpc/generic/rpc_server.rb
  9. 18
      test/cpp/end2end/async_end2end_test.cc
  10. 12
      test/cpp/end2end/end2end_test.cc
  11. 8
      test/cpp/end2end/mock_test.cc
  12. 242
      test/cpp/end2end/thread_stress_test.cc
  13. 23
      test/cpp/qps/client_async.cc
  14. 129
      tools/profile_analyzer/profile_analyzer.py
  15. 9
      tools/run_tests/tests.json

File diff suppressed because one or more lines are too long

@ -2251,6 +2251,22 @@
"gpr_test_util", "gpr_test_util",
"gpr" "gpr"
] ]
},
{
"name": "thread_stress_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/thread_stress_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
} }
] ]
} }

@ -60,9 +60,8 @@ class ClientAsyncResponseReader GRPC_FINAL
public: public:
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context, const RpcMethod& method, ClientContext* context,
const grpc::protobuf::Message& request, void* tag) const grpc::protobuf::Message& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) { : context_(context), call_(channel->CreateCall(method, context, cq)) {
init_buf_.Reset(tag);
init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
init_buf_.AddSendMessage(request); init_buf_.AddSendMessage(request);
init_buf_.AddClientSendClose(); init_buf_.AddClientSendClose();
@ -90,7 +89,7 @@ class ClientAsyncResponseReader GRPC_FINAL
private: private:
ClientContext* context_; ClientContext* context_;
Call call_; Call call_;
CallOpBuffer init_buf_; SneakyCallOpBuffer init_buf_;
CallOpBuffer meta_buf_; CallOpBuffer meta_buf_;
CallOpBuffer finish_buf_; CallOpBuffer finish_buf_;
}; };

@ -123,6 +123,14 @@ class CallOpBuffer : public CompletionQueueTag {
bool* recv_closed_; bool* recv_closed_;
}; };
// SneakyCallOpBuffer does not post completions to the completion queue
class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer {
public:
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
return CallOpBuffer::FinalizeResult(tag, status) && false;
}
};
// Channel and Server implement this to allow them to hook performing ops // Channel and Server implement this to allow them to hook performing ops
class CallHook { class CallHook {
public: public:

@ -161,13 +161,13 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReaderInterface< $Response$>> " "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "Async$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
@ -258,7 +258,7 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* " "virtual ::grpc::ClientAsyncResponseReaderInterface< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n"); "::grpc::CompletionQueue* cq) = 0;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -317,13 +317,13 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> " "std::unique_ptr< ::grpc::ClientAsyncResponseReader< $Response$>> "
"Async$Method$(::grpc::ClientContext* context, " "Async$Method$(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Indent(); printer->Indent();
printer->Print( printer->Print(
*vars, *vars,
"return std::unique_ptr< " "return std::unique_ptr< "
"::grpc::ClientAsyncResponseReader< $Response$>>(" "::grpc::ClientAsyncResponseReader< $Response$>>("
"Async$Method$Raw(context, request, cq, tag));\n"); "Async$Method$Raw(context, request, cq));\n");
printer->Outdent(); printer->Outdent();
printer->Print("}\n"); printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
@ -412,7 +412,7 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"Async$Method$Raw(::grpc::ClientContext* context, " "Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print( printer->Print(
*vars, *vars,
@ -746,13 +746,13 @@ void PrintSourceClientMethod(grpc::protobuf::io::Printer *printer,
"::grpc::ClientAsyncResponseReader< $Response$>* " "::grpc::ClientAsyncResponseReader< $Response$>* "
"$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, "
"const $Request$& request, " "const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n"); "::grpc::CompletionQueue* cq) {\n");
printer->Print(*vars, printer->Print(*vars,
" return new " " return new "
"::grpc::ClientAsyncResponseReader< $Response$>(" "::grpc::ClientAsyncResponseReader< $Response$>("
"channel(), cq, " "channel(), cq, "
"rpcmethod_$Method$_, " "rpcmethod_$Method$_, "
"context, request, tag);\n" "context, request);\n"
"}\n\n"); "}\n\n");
} else if (ClientOnlyStreaming(method)) { } else if (ClientOnlyStreaming(method)) {
printer->Print(*vars, printer->Print(*vars,

@ -55,7 +55,7 @@ class Fibber
return enum_for(:generator) unless block_given? return enum_for(:generator) unless block_given?
idx, current, previous = 0, 1, 1 idx, current, previous = 0, 1, 1
until idx == @limit until idx == @limit
if idx == 0 || idx == 1 if idx.zero? || idx == 1
yield Math::Num.new(num: 1) yield Math::Num.new(num: 1)
idx += 1 idx += 1
next next
@ -94,7 +94,7 @@ end
# package. That practice should be avoided by defining real services. # package. That practice should be avoided by defining real services.
class Calculator < Math::Math::Service class Calculator < Math::Math::Service
def div(div_args, _call) def div(div_args, _call)
if div_args.divisor == 0 if div_args.divisor.zero?
# To send non-OK status handlers raise a StatusError with the code and # To send non-OK status handlers raise a StatusError with the code and
# and detail they want sent as a Status. # and detail they want sent as a Status.
fail GRPC::StatusError.new(GRPC::Status::INVALID_ARGUMENT, fail GRPC::StatusError.new(GRPC::Status::INVALID_ARGUMENT,

@ -58,7 +58,7 @@ module GRPC
"Cannot make an absolute deadline from #{timeish.inspect}") "Cannot make an absolute deadline from #{timeish.inspect}")
elsif timeish < 0 elsif timeish < 0
TimeConsts::INFINITE_FUTURE TimeConsts::INFINITE_FUTURE
elsif timeish == 0 elsif timeish.zero?
TimeConsts::ZERO TimeConsts::ZERO
else else
Time.now + timeish Time.now + timeish

@ -146,7 +146,7 @@ module GRPC
def remove_current_thread def remove_current_thread
@stop_mutex.synchronize do @stop_mutex.synchronize do
@workers.delete(Thread.current) @workers.delete(Thread.current)
@stop_cond.signal if @workers.size == 0 @stop_cond.signal if @workers.size.zero?
end end
end end
@ -364,7 +364,7 @@ module GRPC
# - #running? returns true after this is called, until #stop cause the # - #running? returns true after this is called, until #stop cause the
# the server to stop. # the server to stop.
def run def run
if rpc_descs.size == 0 if rpc_descs.size.zero?
logger.warn('did not run as no services were present') logger.warn('did not run as no services were present')
return return
end end
@ -455,7 +455,7 @@ module GRPC
unless cls.include?(GenericService) unless cls.include?(GenericService)
fail "#{cls} must 'include GenericService'" fail "#{cls} must 'include GenericService'"
end end
if cls.rpc_descs.size == 0 if cls.rpc_descs.size.zero?
fail "#{cls} should specify some rpc descriptions" fail "#{cls} should specify some rpc descriptions"
end end
cls.assert_rpc_descs_have_methods cls.assert_rpc_descs_have_methods

@ -141,14 +141,13 @@ class AsyncEnd2endTest : public ::testing::Test {
send_request.set_message("Hello"); send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, service_.RequestEcho(&srv_ctx, &recv_request, &response_writer,
srv_cq_.get(), srv_cq_.get(), tag(2)); srv_cq_.get(), srv_cq_.get(), tag(2));
server_ok(2); server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -196,7 +195,7 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
send_request.set_message("Hello"); send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
std::chrono::system_clock::time_point time_now( std::chrono::system_clock::time_point time_now(
std::chrono::system_clock::now()); std::chrono::system_clock::now());
@ -210,7 +209,6 @@ TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
verify_timed_ok(srv_cq_.get(), 2, true, time_limit); verify_timed_ok(srv_cq_.get(), 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
verify_timed_ok(&cli_cq_, 1, true, time_limit);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -402,7 +400,7 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
cli_ctx.AddMetadata(meta2.first, meta2.second); cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
@ -412,7 +410,6 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size());
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3)); response_writer.Finish(send_response, Status::OK, tag(3));
@ -444,7 +441,7 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
@ -452,7 +449,6 @@ TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second); srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second); srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
client_ok(1);
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
server_ok(3); server_ok(3);
@ -492,7 +488,7 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::pair<grpc::string, grpc::string> meta2("key2", "val2"); std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
@ -500,7 +496,6 @@ TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
EXPECT_EQ(send_request.message(), recv_request.message()); EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3)); response_writer.SendInitialMetadata(tag(3));
server_ok(3); server_ok(3);
client_ok(1);
send_response.set_message(recv_request.message()); send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
@ -553,7 +548,7 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
cli_ctx.AddMetadata(meta2.first, meta2.second); cli_ctx.AddMetadata(meta2.first, meta2.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader( std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1))); stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(), service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
@ -563,7 +558,6 @@ TEST_F(AsyncEnd2endTest, MetadataRpc) {
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size()); EXPECT_EQ(static_cast<size_t>(2), client_initial_metadata.size());
client_ok(1);
srv_ctx.AddInitialMetadata(meta3.first, meta3.second); srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second); srv_ctx.AddInitialMetadata(meta4.first, meta4.second);

@ -577,18 +577,6 @@ TEST_F(End2endTest, ClientCancelsBidi) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.code());
} }
TEST_F(End2endTest, ThreadStress) {
ResetStub();
std::vector<std::thread*> threads;
for (int i = 0; i < 100; ++i) {
threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
}
for (int i = 0; i < 100; ++i) {
threads[i]->join();
delete threads[i];
}
}
TEST_F(End2endTest, RpcMaxMessageSize) { TEST_F(End2endTest, RpcMaxMessageSize) {
ResetStub(); ResetStub();
EchoRequest request; EchoRequest request;

@ -119,8 +119,8 @@ class MockStub : public TestService::StubInterface {
private: private:
ClientAsyncResponseReaderInterface<EchoResponse>* AsyncEchoRaw( ClientAsyncResponseReaderInterface<EchoResponse>* AsyncEchoRaw(
ClientContext* context, const EchoRequest& request, CompletionQueue* cq, ClientContext* context, const EchoRequest& request,
void* tag) GRPC_OVERRIDE { CompletionQueue* cq) GRPC_OVERRIDE {
return nullptr; return nullptr;
} }
ClientWriterInterface<EchoRequest>* RequestStreamRaw( ClientWriterInterface<EchoRequest>* RequestStreamRaw(
@ -151,8 +151,8 @@ class MockStub : public TestService::StubInterface {
return nullptr; return nullptr;
} }
ClientAsyncResponseReaderInterface<EchoResponse>* AsyncUnimplementedRaw( ClientAsyncResponseReaderInterface<EchoResponse>* AsyncUnimplementedRaw(
ClientContext* context, const EchoRequest& request, CompletionQueue* cq, ClientContext* context, const EchoRequest& request,
void* tag) GRPC_OVERRIDE { CompletionQueue* cq) GRPC_OVERRIDE {
return nullptr; return nullptr;
} }
}; };

@ -0,0 +1,242 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <thread>
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/echo_duplicate.grpc.pb.h"
#include "test/cpp/util/echo.grpc.pb.h"
#include "src/cpp/server/thread_pool.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include <grpc++/time.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse;
using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
// When echo_deadline is requested, deadline seen in the ServerContext is set in
// the response in seconds.
void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
if (request->has_param() && request->param().echo_deadline()) {
gpr_timespec deadline = gpr_inf_future;
if (context->deadline() != system_clock::time_point::max()) {
Timepoint2Timespec(context->deadline(), &deadline);
}
response->mutable_param()->set_request_deadline(deadline.tv_sec);
}
}
} // namespace
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
TestServiceImpl() : signal_client_(false) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
response->set_message(request->message());
MaybeEchoDeadline(context, request, response);
if (request->has_param() && request->param().client_cancel_after_us()) {
{
std::unique_lock<std::mutex> lock(mu_);
signal_client_ = true;
}
while (!context->IsCancelled()) {
std::this_thread::sleep_for(std::chrono::microseconds(
request->param().client_cancel_after_us()));
}
return Status::Cancelled;
} else if (request->has_param() &&
request->param().server_cancel_after_us()) {
std::this_thread::sleep_for(
std::chrono::microseconds(request->param().server_cancel_after_us()));
return Status::Cancelled;
} else {
EXPECT_FALSE(context->IsCancelled());
}
return Status::OK;
}
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) GRPC_OVERRIDE {
EchoRequest request;
response->set_message("");
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
return Status::OK;
}
// Return 3 messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
EchoResponse response;
response.set_message(request->message() + "0");
writer->Write(response);
response.set_message(request->message() + "1");
writer->Write(response);
response.set_message(request->message() + "2");
writer->Write(response);
return Status::OK;
}
Status BidiStream(ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream)
GRPC_OVERRIDE {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {
gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
response.set_message(request.message());
stream->Write(response);
}
return Status::OK;
}
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
private:
bool signal_client_;
std::mutex mu_;
};
class TestServiceImplDupPkg
: public ::grpc::cpp::test::util::duplicate::TestService::Service {
public:
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
response->set_message("no package");
return Status::OK;
}
};
class End2endTest : public ::testing::Test {
protected:
End2endTest() : kMaxMessageSize_(8192), thread_pool_(2) {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
builder.SetMaxMessageSize(
kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
builder.SetThreadPool(&thread_pool_);
server_ = builder.BuildAndStart();
}
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() {
std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
const int kMaxMessageSize_;
TestServiceImpl service_;
TestServiceImplDupPkg dup_pkg_service_;
ThreadPool thread_pool_;
};
static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
int num_rpcs) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
}
}
TEST_F(End2endTest, ThreadStress) {
ResetStub();
std::vector<std::thread*> threads;
for (int i = 0; i < 100; ++i) {
threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
}
for (int i = 0; i < 100; ++i) {
threads[i]->join();
delete threads[i];
}
}
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -75,19 +75,20 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
TestService::Stub* stub, const RequestType& req, TestService::Stub* stub, const RequestType& req,
std::function< std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&, TestService::Stub*, grpc::ClientContext*, const RequestType&)>
void*)> start_req, start_req,
std::function<void(grpc::Status, ResponseType*)> on_done) std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(), : context_(),
stub_(stub), stub_(stub),
req_(req), req_(req),
response_(), response_(),
next_state_(&ClientRpcContextUnaryImpl::ReqSent), next_state_(&ClientRpcContextUnaryImpl::RespDone),
callback_(on_done), callback_(on_done),
start_req_(start_req), start_req_(start_req),
start_(Timer::Now()), start_(Timer::Now()),
response_reader_( response_reader_(start_req(stub_, &context_, req_)) {
start_req(stub_, &context_, req_, ClientRpcContext::tag(this))) {} response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
}
~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool ret = (this->*next_state_)(ok); bool ret = (this->*next_state_)(ok);
@ -102,11 +103,6 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
} }
private: private:
bool ReqSent(bool) {
next_state_ = &ClientRpcContextUnaryImpl::RespDone;
response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
return true;
}
bool RespDone(bool) { bool RespDone(bool) {
next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
return false; return false;
@ -122,8 +118,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
bool (ClientRpcContextUnaryImpl::*next_state_)(bool); bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_; std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
TestService::Stub*, grpc::ClientContext*, const RequestType&, void*)> TestService::Stub*, grpc::ClientContext*, const RequestType&)> start_req_;
start_req_;
grpc::Status status_; grpc::Status status_;
double start_; double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@ -198,8 +193,8 @@ private:
const SimpleRequest& req) { const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {}; auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx, auto start_req = [cq](TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, void* tag) { const SimpleRequest& request) {
return stub->AsyncUnaryCall(ctx, request, cq, tag); return stub->AsyncUnaryCall(ctx, request, cq);
}; };
new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
stub, req, start_req, check_done); stub, req, start_req, check_done);

@ -39,8 +39,8 @@ Usage:
import collections import collections
import itertools import itertools
import math
import re import re
import sys
# Create a regex to parse output of the C core basic profiler, # Create a regex to parse output of the C core basic profiler,
# as defined in src/core/profiling/basic_timers.c. # as defined in src/core/profiling/basic_timers.c.
@ -64,19 +64,68 @@ class ImportantMark(object):
def entry(self): def entry(self):
return self._entry return self._entry
def append_post_entry(self, entry): @property
if self._n > 0: def max_depth(self):
self._post_stack.append(entry) return self._n
def append_post_entry(self, post_entry):
if self._n > 0 and post_entry.thread == self._entry.thread:
self._post_stack.append(post_entry)
self._n -= 1 self._n -= 1
def get_deltas(self): def get_deltas(self):
pre_and_post_stacks = itertools.chain(self._pre_stack, self._post_stack) pre_and_post_stacks = itertools.chain(self._pre_stack, self._post_stack)
return collections.OrderedDict((stack_entry, return collections.OrderedDict((stack_entry,
(self._entry.time - stack_entry.time)) abs(self._entry.time - stack_entry.time))
for stack_entry in pre_and_post_stacks) for stack_entry in pre_and_post_stacks)
def entries():
for line in sys.stdin: def print_block_statistics(block_times):
print '{:<12s} {:>12s} {:>12s} {:>12s} {:>12s}'.format(
'Block tag', '50th p.', '90th p.', '95th p.', '99th p.')
for tag, tag_times in sorted(block_times.iteritems()):
times = sorted(tag_times)
print '{:<12d}: {:>12.3f} {:>12.3f} {:>12.3f} {:>12.3f}'.format(
tag, percentile(times, 50), percentile(times, 90),
percentile(times, 95), percentile(times, 99))
print
def print_grouped_imark_statistics(group_key, imarks_group):
values = collections.OrderedDict()
for imark in imarks_group:
deltas = imark.get_deltas()
for relative_entry, time_delta_us in deltas.iteritems():
key = '{tag} {type} ({file}:{line})'.format(**relative_entry._asdict())
l = values.setdefault(key, list())
l.append(time_delta_us)
print group_key
print '{:<50s} {:>12s} {:>12s} {:>12s} {:>12s}'.format(
'Relative mark', '50th p.', '90th p.', '95th p.', '99th p.')
for key, time_values in values.iteritems():
time_values = sorted(time_values)
print '{:<50s}: {:>12.3f} {:>12.3f} {:>12.3f} {:>12.3f}'.format(
key, percentile(time_values, 50), percentile(time_values, 90),
percentile(time_values, 95), percentile(time_values, 99))
print
def percentile(vals, percent):
""" Calculates the interpolated percentile given a sorted sequence and a
percent (in the usual 0-100 range)."""
assert vals, "Empty input sequence."
percent /= 100.0
k = (len(vals)-1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return vals[int(k)]
# else, interpolate
d0 = vals[int(f)] * (c-k)
d1 = vals[int(c)] * (k-f)
return d0 + d1
def entries(f):
for line in f:
m = _RE_LINE.match(line) m = _RE_LINE.match(line)
if not m: continue if not m: continue
yield Entry(time=float(m.group(1)), yield Entry(time=float(m.group(1)),
@ -87,53 +136,53 @@ def entries():
file=m.group(6), file=m.group(6),
line=m.group(7)) line=m.group(7))
def main(f):
percentiles = (50, 90, 95, 99)
threads = collections.defaultdict(lambda: collections.defaultdict(list)) threads = collections.defaultdict(lambda: collections.defaultdict(list))
times = collections.defaultdict(list) times = collections.defaultdict(list)
# Indexed by the mark's tag. Items in the value list correspond to the mark in
# different stack situations.
important_marks = collections.defaultdict(list) important_marks = collections.defaultdict(list)
stack_depth = collections.defaultdict(int)
for entry in entries(): for entry in entries(f):
thread = threads[entry.thread] thread = threads[entry.thread]
if entry.type == '{': if entry.type == '{':
thread[entry.tag].append(entry) thread[entry.tag].append(entry)
stack_depth[entry.thread] += 1
if entry.type == '!': if entry.type == '!':
# Save a snapshot of the current stack inside a new ImportantMark instance. # Save a snapshot of the current stack inside a new ImportantMark instance.
# Get all entries with type '{' from "thread". # Get all entries _for any tag in the thread_.
stack = [e for entries_for_tag in thread.values() stack = [e for entries_for_tag in thread.itervalues()
for e in entries_for_tag if e.type == '{'] for e in entries_for_tag]
important_marks[entry.tag].append(ImportantMark(entry, stack)) imark_group_key = '{tag}/{thread}@{file}:{line}'.format(**entry._asdict())
important_marks[imark_group_key].append(ImportantMark(entry, stack))
elif entry.type == '}': elif entry.type == '}':
last = thread[entry.tag].pop() last = thread[entry.tag].pop()
times[entry.tag].append(entry.time - last.time) times[entry.tag].append(entry.time - last.time)
# Update accounting for important marks. # only access the last "depth" imarks for the tag.
for imarks_for_tag in important_marks.itervalues(): depth = stack_depth[entry.thread]
for imark in imarks_for_tag: for imarks_group in important_marks.itervalues():
for imark in imarks_group[-depth:]:
# if at a '}' deeper than where the current "imark" was found, ignore.
if depth > imark.max_depth: continue
imark.append_post_entry(entry) imark.append_post_entry(entry)
stack_depth[entry.thread] -= 1
def percentile(vals, pct): print
return sorted(vals)[int(len(vals) * pct / 100.0)] print 'Block marks:'
print '============'
print 'tag 50%/90%/95%/99% us' print_block_statistics(times)
for tag in sorted(times.keys()):
vals = times[tag]
print '%d %.2f/%.2f/%.2f/%.2f' % (tag,
percentile(vals, 50),
percentile(vals, 90),
percentile(vals, 95),
percentile(vals, 99))
print print
print 'Important marks:' print 'Important marks:'
print '================' print '================'
for tag, imark_for_tag in important_marks.iteritems(): for group_key, imarks_group in important_marks.iteritems():
for imark in imarks_for_tag: print_grouped_imark_statistics(group_key, imarks_group)
deltas = imark.get_deltas()
print '{tag} @ {file}:{line}'.format(**imark.entry._asdict())
for entry, time_delta_us in deltas.iteritems(): if __name__ == '__main__':
format_dict = entry._asdict() # If invoked without arguments, read off sys.stdin. If one argument is given,
format_dict['time_delta_us'] = time_delta_us # take it as a file name and open it for reading.
print '{tag} {type} ({file}:{line}): {time_delta_us:12.3f} us'.format( import sys
**format_dict) f = sys.stdin
print if len(sys.argv) == 2:
f = open(sys.argv[1], 'r')
main(f)

@ -686,6 +686,15 @@
"posix" "posix"
] ]
}, },
{
"flaky": false,
"language": "c++",
"name": "thread_stress_test",
"platforms": [
"windows",
"posix"
]
},
{ {
"flaky": false, "flaky": false,
"language": "c", "language": "c",

Loading…
Cancel
Save