Merge pull request #2813 from ctiller/every-day-i-propagate

C++ propagation API
pull/2754/head^2
Yang Gao 9 years ago
commit aa7a75a4d9
  1. 62
      include/grpc++/client_context.h
  2. 2
      include/grpc++/server_context.h
  3. 6
      src/cpp/client/channel.cc
  4. 12
      src/cpp/client/client_context.cc
  5. 122
      test/cpp/end2end/end2end_test.cc

@ -39,6 +39,7 @@
#include <string>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc++/auth_context.h>
@ -46,8 +47,6 @@
#include <grpc++/status.h>
#include <grpc++/time.h>
struct grpc_call;
struct grpc_completion_queue;
struct census_context;
namespace grpc {
@ -70,12 +69,68 @@ template <class R, class W>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
class ServerContext;
class PropagationOptions {
public:
PropagationOptions() : propagate_(GRPC_PROPAGATE_DEFAULTS) {}
PropagationOptions& enable_deadline_propagation() {
propagate_ |= GRPC_PROPAGATE_DEADLINE;
return *this;
}
PropagationOptions& disable_deadline_propagation() {
propagate_ &= ~GRPC_PROPAGATE_DEADLINE;
return *this;
}
PropagationOptions& enable_census_stats_propagation() {
propagate_ |= GRPC_PROPAGATE_CENSUS_STATS_CONTEXT;
return *this;
}
PropagationOptions& disable_census_stats_propagation() {
propagate_ &= ~GRPC_PROPAGATE_CENSUS_STATS_CONTEXT;
return *this;
}
PropagationOptions& enable_census_tracing_propagation() {
propagate_ |= GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT;
return *this;
}
PropagationOptions& disable_census_tracing_propagation() {
propagate_ &= ~GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT;
return *this;
}
PropagationOptions& enable_cancellation_propagation() {
propagate_ |= GRPC_PROPAGATE_CANCELLATION;
return *this;
}
PropagationOptions& disable_cancellation_propagation() {
propagate_ &= ~GRPC_PROPAGATE_CANCELLATION;
return *this;
}
gpr_uint32 c_bitmask() const { return propagate_; }
private:
gpr_uint32 propagate_;
};
class ClientContext {
public:
ClientContext();
~ClientContext();
/// Create a new ClientContext that propagates some or all of its attributes
static std::unique_ptr<ClientContext> FromServerContext(
const ServerContext& server_context,
PropagationOptions options = PropagationOptions());
void AddMetadata(const grpc::string& meta_key,
const grpc::string& meta_value);
@ -181,6 +236,9 @@ class ClientContext {
std::multimap<grpc::string, grpc::string> recv_initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
grpc_call* propagate_from_call_;
PropagationOptions propagation_options_;
grpc_compression_algorithm compression_algorithm_;
};

@ -50,6 +50,7 @@ struct census_context;
namespace grpc {
class ClientContext;
template <class W, class R>
class ServerAsyncReader;
template <class W>
@ -158,6 +159,7 @@ class ServerContext {
friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler;
friend class ::grpc::ClientContext;
// Prevent copying.
ServerContext(const ServerContext&);

@ -63,10 +63,12 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
const char* host_str = host_.empty() ? NULL : host_.c_str();
auto c_call = method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(
c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(), cq->cq(),
method.channel_tag(), context->raw_deadline())
: grpc_channel_create_call(
c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(),
c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(), cq->cq(),
method.name(), context->authority().empty()
? host_str
: context->authority().c_str(),

@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc++/credentials.h>
#include <grpc++/server_context.h>
#include <grpc++/time.h>
#include "src/core/channel/compress_filter.h"
@ -48,7 +49,8 @@ ClientContext::ClientContext()
: initial_metadata_received_(false),
call_(nullptr),
cq_(nullptr),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
propagate_from_call_(nullptr) {}
ClientContext::~ClientContext() {
if (call_) {
@ -64,6 +66,14 @@ ClientContext::~ClientContext() {
}
}
std::unique_ptr<ClientContext> ClientContext::FromServerContext(
const ServerContext& context, PropagationOptions options) {
std::unique_ptr<ClientContext> ctx(new ClientContext);
ctx->propagate_from_call_ = context.call_;
ctx->propagation_options_ = options;
return ctx;
}
void ClientContext::AddMetadata(const grpc::string& meta_key,
const grpc::string& meta_value) {
send_initial_metadata_.insert(std::make_pair(meta_key, meta_value));

@ -104,6 +104,22 @@ bool CheckIsLocalhost(const grpc::string& addr) {
} // namespace
class Proxy : public ::grpc::cpp::test::util::TestService::Service {
public:
Proxy(std::shared_ptr<ChannelInterface> channel)
: stub_(grpc::cpp::test::util::TestService::NewStub(channel)) {}
Status Echo(ServerContext* server_context, const EchoRequest* request,
EchoResponse* response) GRPC_OVERRIDE {
std::unique_ptr<ClientContext> client_context =
ClientContext::FromServerContext(*server_context);
return stub_->Echo(client_context.get(), *request, response);
}
private:
std::unique_ptr<::grpc::cpp::test::util::TestService::Stub> stub_;
};
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
public:
TestServiceImpl() : signal_client_(false), host_() {}
@ -241,7 +257,9 @@ class TestServiceImplDupPkg
}
};
class End2endTest : public ::testing::Test {
/* Param is whether or not to use a proxy -- some tests use TEST_F as they don't
need this functionality */
class End2endTest : public ::testing::TestWithParam<bool> {
protected:
End2endTest()
: kMaxMessageSize_(8192), special_service_("special"), thread_pool_(2) {}
@ -267,21 +285,41 @@ class End2endTest : public ::testing::Test {
server_ = builder.BuildAndStart();
}
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void TearDown() GRPC_OVERRIDE {
server_->Shutdown();
if (proxy_server_) proxy_server_->Shutdown();
}
void ResetStub() {
void ResetStub(bool use_proxy) {
SslCredentialsOptions ssl_opts = {test_root_cert, "", ""};
ChannelArguments args;
args.SetSslTargetNameOverride("foo.test.google.fr");
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
channel_ = CreateChannel(server_address_.str(), SslCredentials(ssl_opts),
args);
if (use_proxy) {
proxy_service_.reset(new Proxy(channel_));
int port = grpc_pick_unused_port_or_die();
std::ostringstream proxyaddr;
proxyaddr << "localhost:" << port;
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
builder.SetThreadPool(&thread_pool_);
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureCredentials(),
ChannelArguments());
}
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::unique_ptr<Server> proxy_server_;
std::unique_ptr<Proxy> proxy_service_;
std::ostringstream server_address_;
const int kMaxMessageSize_;
TestServiceImpl service_;
@ -306,7 +344,7 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
}
TEST_F(End2endTest, SimpleRpcWithHost) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
@ -321,13 +359,13 @@ TEST_F(End2endTest, SimpleRpcWithHost) {
EXPECT_TRUE(s.ok());
}
TEST_F(End2endTest, SimpleRpc) {
ResetStub();
TEST_P(End2endTest, SimpleRpc) {
ResetStub(GetParam());
SendRpc(stub_.get(), 1);
}
TEST_F(End2endTest, MultipleRpcs) {
ResetStub();
TEST_P(End2endTest, MultipleRpcs) {
ResetStub(GetParam());
std::vector<std::thread*> threads;
for (int i = 0; i < 10; ++i) {
threads.push_back(new std::thread(SendRpc, stub_.get(), 10));
@ -339,8 +377,8 @@ TEST_F(End2endTest, MultipleRpcs) {
}
// Set a 10us deadline and make sure proper error is returned.
TEST_F(End2endTest, RpcDeadlineExpires) {
ResetStub();
TEST_P(End2endTest, RpcDeadlineExpires) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -354,8 +392,8 @@ TEST_F(End2endTest, RpcDeadlineExpires) {
}
// Set a long but finite deadline.
TEST_F(End2endTest, RpcLongDeadline) {
ResetStub();
TEST_P(End2endTest, RpcLongDeadline) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -370,8 +408,8 @@ TEST_F(End2endTest, RpcLongDeadline) {
}
// Ask server to echo back the deadline it sees.
TEST_F(End2endTest, EchoDeadline) {
ResetStub();
TEST_P(End2endTest, EchoDeadline) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -392,8 +430,8 @@ TEST_F(End2endTest, EchoDeadline) {
}
// Ask server to echo back the deadline it sees. The rpc has no deadline.
TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) {
ResetStub();
TEST_P(End2endTest, EchoDeadlineForNoDeadlineRpc) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -407,8 +445,8 @@ TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) {
gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
}
TEST_F(End2endTest, UnimplementedRpc) {
ResetStub();
TEST_P(End2endTest, UnimplementedRpc) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -422,7 +460,7 @@ TEST_F(End2endTest, UnimplementedRpc) {
}
TEST_F(End2endTest, RequestStreamOneRequest) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -437,7 +475,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) {
}
TEST_F(End2endTest, RequestStreamTwoRequests) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -453,7 +491,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
}
TEST_F(End2endTest, ResponseStream) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -473,7 +511,7 @@ TEST_F(End2endTest, ResponseStream) {
}
TEST_F(End2endTest, BidiStream) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -506,7 +544,7 @@ TEST_F(End2endTest, BidiStream) {
// Talk to the two services with the same name but different package names.
// The two stubs are created on the same channel.
TEST_F(End2endTest, DiffPackageServices) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -561,8 +599,8 @@ void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) {
}
// Client cancels rpc after 10ms
TEST_F(End2endTest, ClientCancelsRpc) {
ResetStub();
TEST_P(End2endTest, ClientCancelsRpc) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -578,8 +616,8 @@ TEST_F(End2endTest, ClientCancelsRpc) {
}
// Server cancels rpc after 1ms
TEST_F(End2endTest, ServerCancelsRpc) {
ResetStub();
TEST_P(End2endTest, ServerCancelsRpc) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -593,7 +631,7 @@ TEST_F(End2endTest, ServerCancelsRpc) {
// Client cancels request stream after sending two messages
TEST_F(End2endTest, ClientCancelsRequestStream) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -613,7 +651,7 @@ TEST_F(End2endTest, ClientCancelsRequestStream) {
// Client cancels server stream after sending some messages
TEST_F(End2endTest, ClientCancelsResponseStream) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -645,7 +683,7 @@ TEST_F(End2endTest, ClientCancelsResponseStream) {
// Client cancels bidi stream after sending some messages
TEST_F(End2endTest, ClientCancelsBidi) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -677,7 +715,7 @@ TEST_F(End2endTest, ClientCancelsBidi) {
}
TEST_F(End2endTest, RpcMaxMessageSize) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
request.set_message(string(kMaxMessageSize_ * 2, 'a'));
@ -702,7 +740,7 @@ bool MetadataContains(const std::multimap<grpc::string, grpc::string>& metadata,
}
TEST_F(End2endTest, SetPerCallCredentials) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -724,7 +762,7 @@ TEST_F(End2endTest, SetPerCallCredentials) {
}
TEST_F(End2endTest, InsecurePerCallCredentials) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -739,7 +777,7 @@ TEST_F(End2endTest, InsecurePerCallCredentials) {
}
TEST_F(End2endTest, OverridePerCallCredentials) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -772,7 +810,7 @@ TEST_F(End2endTest, OverridePerCallCredentials) {
// Client sends 20 requests and the server returns CANCELLED status after
// reading 10 requests.
TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
ClientContext context;
@ -791,7 +829,7 @@ TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) {
}
TEST_F(End2endTest, ClientAuthContext) {
ResetStub();
ResetStub(false);
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@ -816,8 +854,8 @@ TEST_F(End2endTest, ClientAuthContext) {
}
// Make the response larger than the flow control window.
TEST_F(End2endTest, HugeResponse) {
ResetStub();
TEST_P(End2endTest, HugeResponse) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("huge response");
@ -842,7 +880,7 @@ void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, gpr
// Run a Read and a WritesDone simultaneously.
TEST_F(End2endTest, SimultaneousReadWritesDone) {
ResetStub();
ResetStub(false);
ClientContext context;
gpr_event ev;
gpr_event_init(&ev);
@ -855,8 +893,8 @@ TEST_F(End2endTest, SimultaneousReadWritesDone) {
reader_thread.join();
}
TEST_F(End2endTest, Peer) {
ResetStub();
TEST_P(End2endTest, Peer) {
ResetStub(GetParam());
EchoRequest request;
EchoResponse response;
request.set_message("hello");
@ -870,6 +908,8 @@ TEST_F(End2endTest, Peer) {
EXPECT_TRUE(CheckIsLocalhost(context.peer()));
}
INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true));
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save