diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 10c967d85bc..34945f32820 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -46,8 +47,6 @@ #include #include -struct grpc_call; -struct grpc_completion_queue; struct census_context; namespace grpc { @@ -70,12 +69,68 @@ template class ClientAsyncReaderWriter; template class ClientAsyncResponseReader; +class ServerContext; + +class PropagationOptions { + public: + PropagationOptions() : propagate_(GRPC_PROPAGATE_DEFAULTS) {} + + PropagationOptions& enable_deadline_propagation() { + propagate_ |= GRPC_PROPAGATE_DEADLINE; + return *this; + } + + PropagationOptions& disable_deadline_propagation() { + propagate_ &= ~GRPC_PROPAGATE_DEADLINE; + return *this; + } + + PropagationOptions& enable_census_stats_propagation() { + propagate_ |= GRPC_PROPAGATE_CENSUS_STATS_CONTEXT; + return *this; + } + + PropagationOptions& disable_census_stats_propagation() { + propagate_ &= ~GRPC_PROPAGATE_CENSUS_STATS_CONTEXT; + return *this; + } + + PropagationOptions& enable_census_tracing_propagation() { + propagate_ |= GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT; + return *this; + } + + PropagationOptions& disable_census_tracing_propagation() { + propagate_ &= ~GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT; + return *this; + } + + PropagationOptions& enable_cancellation_propagation() { + propagate_ |= GRPC_PROPAGATE_CANCELLATION; + return *this; + } + + PropagationOptions& disable_cancellation_propagation() { + propagate_ &= ~GRPC_PROPAGATE_CANCELLATION; + return *this; + } + + gpr_uint32 c_bitmask() const { return propagate_; } + + private: + gpr_uint32 propagate_; +}; class ClientContext { public: ClientContext(); ~ClientContext(); + /// Create a new ClientContext that propagates some or all of its attributes + static std::unique_ptr FromServerContext( + const ServerContext& server_context, + PropagationOptions options = PropagationOptions()); + void AddMetadata(const grpc::string& meta_key, const grpc::string& meta_value); @@ -181,6 +236,9 @@ class ClientContext { std::multimap recv_initial_metadata_; std::multimap trailing_metadata_; + grpc_call* propagate_from_call_; + PropagationOptions propagation_options_; + grpc_compression_algorithm compression_algorithm_; }; diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 23273f43e67..4f7fc54ef13 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -50,6 +50,7 @@ struct census_context; namespace grpc { +class ClientContext; template class ServerAsyncReader; template @@ -158,6 +159,7 @@ class ServerContext { friend class ServerStreamingHandler; template friend class BidiStreamingHandler; + friend class ::grpc::ClientContext; // Prevent copying. ServerContext(const ServerContext&); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 1912e5e4c8c..5f54e7fcc17 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -63,10 +63,12 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, const char* host_str = host_.empty() ? NULL : host_.c_str(); auto c_call = method.channel_tag() && context->authority().empty() ? grpc_channel_create_registered_call( - c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(), + c_channel_, context->propagate_from_call_, + context->propagation_options_.c_bitmask(), cq->cq(), method.channel_tag(), context->raw_deadline()) : grpc_channel_create_call( - c_channel_, NULL, GRPC_PROPAGATE_DEFAULTS, cq->cq(), + c_channel_, context->propagate_from_call_, + context->propagation_options_.c_bitmask(), cq->cq(), method.name(), context->authority().empty() ? host_str : context->authority().c_str(), diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index c38d0c1df69..1ed2d389616 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -37,6 +37,7 @@ #include #include #include +#include #include #include "src/core/channel/compress_filter.h" @@ -48,7 +49,8 @@ ClientContext::ClientContext() : initial_metadata_received_(false), call_(nullptr), cq_(nullptr), - deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {} + deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), + propagate_from_call_(nullptr) {} ClientContext::~ClientContext() { if (call_) { @@ -64,6 +66,14 @@ ClientContext::~ClientContext() { } } +std::unique_ptr ClientContext::FromServerContext( + const ServerContext& context, PropagationOptions options) { + std::unique_ptr ctx(new ClientContext); + ctx->propagate_from_call_ = context.call_; + ctx->propagation_options_ = options; + return ctx; +} + void ClientContext::AddMetadata(const grpc::string& meta_key, const grpc::string& meta_value) { send_initial_metadata_.insert(std::make_pair(meta_key, meta_value)); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 3144ca4dc71..24d417d9e65 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -104,6 +104,22 @@ bool CheckIsLocalhost(const grpc::string& addr) { } // namespace +class Proxy : public ::grpc::cpp::test::util::TestService::Service { + public: + Proxy(std::shared_ptr channel) + : stub_(grpc::cpp::test::util::TestService::NewStub(channel)) {} + + Status Echo(ServerContext* server_context, const EchoRequest* request, + EchoResponse* response) GRPC_OVERRIDE { + std::unique_ptr client_context = + ClientContext::FromServerContext(*server_context); + return stub_->Echo(client_context.get(), *request, response); + } + + private: + std::unique_ptr<::grpc::cpp::test::util::TestService::Stub> stub_; +}; + class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { public: TestServiceImpl() : signal_client_(false), host_() {} @@ -241,7 +257,9 @@ class TestServiceImplDupPkg } }; -class End2endTest : public ::testing::Test { +/* Param is whether or not to use a proxy -- some tests use TEST_F as they don't + need this functionality */ +class End2endTest : public ::testing::TestWithParam { protected: End2endTest() : kMaxMessageSize_(8192), special_service_("special"), thread_pool_(2) {} @@ -267,21 +285,41 @@ class End2endTest : public ::testing::Test { server_ = builder.BuildAndStart(); } - void TearDown() GRPC_OVERRIDE { server_->Shutdown(); } + void TearDown() GRPC_OVERRIDE { + server_->Shutdown(); + if (proxy_server_) proxy_server_->Shutdown(); + } - void ResetStub() { + void ResetStub(bool use_proxy) { SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; ChannelArguments args; args.SetSslTargetNameOverride("foo.test.google.fr"); args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); channel_ = CreateChannel(server_address_.str(), SslCredentials(ssl_opts), args); + if (use_proxy) { + proxy_service_.reset(new Proxy(channel_)); + int port = grpc_pick_unused_port_or_die(); + std::ostringstream proxyaddr; + proxyaddr << "localhost:" << port; + ServerBuilder builder; + builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); + builder.RegisterService(proxy_service_.get()); + builder.SetThreadPool(&thread_pool_); + proxy_server_ = builder.BuildAndStart(); + + channel_ = CreateChannel(proxyaddr.str(), InsecureCredentials(), + ChannelArguments()); + } + stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_)); } std::shared_ptr channel_; std::unique_ptr stub_; std::unique_ptr server_; + std::unique_ptr proxy_server_; + std::unique_ptr proxy_service_; std::ostringstream server_address_; const int kMaxMessageSize_; TestServiceImpl service_; @@ -306,7 +344,7 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, } TEST_F(End2endTest, SimpleRpcWithHost) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; @@ -321,13 +359,13 @@ TEST_F(End2endTest, SimpleRpcWithHost) { EXPECT_TRUE(s.ok()); } -TEST_F(End2endTest, SimpleRpc) { - ResetStub(); +TEST_P(End2endTest, SimpleRpc) { + ResetStub(GetParam()); SendRpc(stub_.get(), 1); } -TEST_F(End2endTest, MultipleRpcs) { - ResetStub(); +TEST_P(End2endTest, MultipleRpcs) { + ResetStub(GetParam()); std::vector threads; for (int i = 0; i < 10; ++i) { threads.push_back(new std::thread(SendRpc, stub_.get(), 10)); @@ -339,8 +377,8 @@ TEST_F(End2endTest, MultipleRpcs) { } // Set a 10us deadline and make sure proper error is returned. -TEST_F(End2endTest, RpcDeadlineExpires) { - ResetStub(); +TEST_P(End2endTest, RpcDeadlineExpires) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -354,8 +392,8 @@ TEST_F(End2endTest, RpcDeadlineExpires) { } // Set a long but finite deadline. -TEST_F(End2endTest, RpcLongDeadline) { - ResetStub(); +TEST_P(End2endTest, RpcLongDeadline) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -370,8 +408,8 @@ TEST_F(End2endTest, RpcLongDeadline) { } // Ask server to echo back the deadline it sees. -TEST_F(End2endTest, EchoDeadline) { - ResetStub(); +TEST_P(End2endTest, EchoDeadline) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -392,8 +430,8 @@ TEST_F(End2endTest, EchoDeadline) { } // Ask server to echo back the deadline it sees. The rpc has no deadline. -TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) { - ResetStub(); +TEST_P(End2endTest, EchoDeadlineForNoDeadlineRpc) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -407,8 +445,8 @@ TEST_F(End2endTest, EchoDeadlineForNoDeadlineRpc) { gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec); } -TEST_F(End2endTest, UnimplementedRpc) { - ResetStub(); +TEST_P(End2endTest, UnimplementedRpc) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -422,7 +460,7 @@ TEST_F(End2endTest, UnimplementedRpc) { } TEST_F(End2endTest, RequestStreamOneRequest) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -437,7 +475,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) { } TEST_F(End2endTest, RequestStreamTwoRequests) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -453,7 +491,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) { } TEST_F(End2endTest, ResponseStream) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -473,7 +511,7 @@ TEST_F(End2endTest, ResponseStream) { } TEST_F(End2endTest, BidiStream) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -506,7 +544,7 @@ TEST_F(End2endTest, BidiStream) { // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_F(End2endTest, DiffPackageServices) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -561,8 +599,8 @@ void CancelRpc(ClientContext* context, int delay_us, TestServiceImpl* service) { } // Client cancels rpc after 10ms -TEST_F(End2endTest, ClientCancelsRpc) { - ResetStub(); +TEST_P(End2endTest, ClientCancelsRpc) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -578,8 +616,8 @@ TEST_F(End2endTest, ClientCancelsRpc) { } // Server cancels rpc after 1ms -TEST_F(End2endTest, ServerCancelsRpc) { - ResetStub(); +TEST_P(End2endTest, ServerCancelsRpc) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -593,7 +631,7 @@ TEST_F(End2endTest, ServerCancelsRpc) { // Client cancels request stream after sending two messages TEST_F(End2endTest, ClientCancelsRequestStream) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -613,7 +651,7 @@ TEST_F(End2endTest, ClientCancelsRequestStream) { // Client cancels server stream after sending some messages TEST_F(End2endTest, ClientCancelsResponseStream) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -645,7 +683,7 @@ TEST_F(End2endTest, ClientCancelsResponseStream) { // Client cancels bidi stream after sending some messages TEST_F(End2endTest, ClientCancelsBidi) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -677,7 +715,7 @@ TEST_F(End2endTest, ClientCancelsBidi) { } TEST_F(End2endTest, RpcMaxMessageSize) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; request.set_message(string(kMaxMessageSize_ * 2, 'a')); @@ -702,7 +740,7 @@ bool MetadataContains(const std::multimap& metadata, } TEST_F(End2endTest, SetPerCallCredentials) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -724,7 +762,7 @@ TEST_F(End2endTest, SetPerCallCredentials) { } TEST_F(End2endTest, InsecurePerCallCredentials) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -739,7 +777,7 @@ TEST_F(End2endTest, InsecurePerCallCredentials) { } TEST_F(End2endTest, OverridePerCallCredentials) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -772,7 +810,7 @@ TEST_F(End2endTest, OverridePerCallCredentials) { // Client sends 20 requests and the server returns CANCELLED status after // reading 10 requests. TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; ClientContext context; @@ -791,7 +829,7 @@ TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) { } TEST_F(End2endTest, ClientAuthContext) { - ResetStub(); + ResetStub(false); EchoRequest request; EchoResponse response; request.set_message("Hello"); @@ -816,8 +854,8 @@ TEST_F(End2endTest, ClientAuthContext) { } // Make the response larger than the flow control window. -TEST_F(End2endTest, HugeResponse) { - ResetStub(); +TEST_P(End2endTest, HugeResponse) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("huge response"); @@ -842,7 +880,7 @@ void ReaderThreadFunc(ClientReaderWriter* stream, gpr // Run a Read and a WritesDone simultaneously. TEST_F(End2endTest, SimultaneousReadWritesDone) { - ResetStub(); + ResetStub(false); ClientContext context; gpr_event ev; gpr_event_init(&ev); @@ -855,8 +893,8 @@ TEST_F(End2endTest, SimultaneousReadWritesDone) { reader_thread.join(); } -TEST_F(End2endTest, Peer) { - ResetStub(); +TEST_P(End2endTest, Peer) { + ResetStub(GetParam()); EchoRequest request; EchoResponse response; request.set_message("hello"); @@ -870,6 +908,8 @@ TEST_F(End2endTest, Peer) { EXPECT_TRUE(CheckIsLocalhost(context.peer())); } +INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true)); + } // namespace testing } // namespace grpc