Revert "Revert "server: add method to expose authority seen by server (#29768)" (#29806)" (#29807)

This reverts commit e39e943529.
pull/29845/head
Mark D. Roth 3 years ago committed by GitHub
parent b073407d84
commit 65a7ce3f04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      include/grpcpp/impl/codegen/server_context.h
  2. 12
      src/core/lib/surface/call.cc
  3. 4
      src/core/lib/surface/call.h
  4. 4
      src/core/lib/surface/server.cc
  5. 8
      src/cpp/server/server_context.cc
  6. 1
      src/proto/grpc/testing/echo_messages.proto
  7. 184
      test/cpp/end2end/end2end_test.cc
  8. 5
      test/cpp/end2end/test_service_impl.cc
  9. 5
      test/cpp/end2end/test_service_impl.h

@ -297,6 +297,10 @@ class ServerContextBase {
return call_metric_recorder_;
}
/// EXPERIMENTAL API
/// Returns the call's authority.
grpc::string_ref ExperimentalGetAuthority() const;
protected:
/// Async only. Has to be called before the rpc starts.
/// Returns the tag in completion queue when the rpc finishes.

@ -104,6 +104,7 @@ class Call : public CppImplOf<Call, grpc_call> {
bool is_notify_tag_closure) = 0;
virtual bool failed_before_recv_message() const = 0;
virtual bool is_trailers_only() const = 0;
virtual absl::string_view GetServerAuthority() const = 0;
virtual void ExternalRef() = 0;
virtual void ExternalUnref() = 0;
virtual void InternalRef(const char* reason) = 0;
@ -234,6 +235,13 @@ class FilterStackCall final : public Call {
return call_failed_before_recv_message_;
}
absl::string_view GetServerAuthority() const override {
const Slice* authority_metadata =
recv_initial_metadata_.get_pointer(HttpAuthorityMetadata());
if (authority_metadata == nullptr) return "";
return authority_metadata->as_string_view();
}
grpc_compression_algorithm test_only_compression_algorithm() override {
return incoming_compression_algorithm_;
}
@ -1942,6 +1950,10 @@ int grpc_call_failed_before_recv_message(const grpc_call* c) {
return grpc_core::Call::FromC(c)->failed_before_recv_message();
}
absl::string_view grpc_call_server_authority(const grpc_call* call) {
return grpc_core::Call::FromC(call)->GetServerAuthority();
}
const char* grpc_call_error_to_string(grpc_call_error error) {
switch (error) {
case GRPC_CALL_ERROR:

@ -24,6 +24,7 @@
#include <stddef.h>
#include <stdint.h>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/codegen/compression_types.h>
@ -125,6 +126,9 @@ grpc_compression_algorithm grpc_call_compression_for_level(
Move to surface API if requested by other languages. */
bool grpc_call_is_trailers_only(const grpc_call* call);
// Returns the authority for the call, as seen on the server side.
absl::string_view grpc_call_server_authority(const grpc_call* call);
extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;

@ -1374,7 +1374,9 @@ void Server::CallData::RecvInitialMetadataReady(void* arg,
CallData* calld = static_cast<CallData*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
calld->path_ = calld->recv_initial_metadata_->Take(HttpPathMetadata());
calld->host_ = calld->recv_initial_metadata_->Take(HttpAuthorityMetadata());
auto* host =
calld->recv_initial_metadata_->get_pointer(HttpAuthorityMetadata());
if (host != nullptr) calld->host_.emplace(host->Ref());
} else {
(void)GRPC_ERROR_REF(error);
}

@ -27,6 +27,8 @@
#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
@ -52,6 +54,7 @@
#include <grpcpp/support/interceptor.h>
#include <grpcpp/support/server_callback.h>
#include <grpcpp/support/server_interceptor.h>
#include <grpcpp/support/string_ref.h>
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/sync.h"
@ -407,4 +410,9 @@ void ServerContextBase::CreateCallMetricRecorder() {
call_metric_recorder_ = arena->New<experimental::CallMetricRecorder>(arena);
}
grpc::string_ref ServerContextBase::ExperimentalGetAuthority() const {
absl::string_view authority = grpc_call_server_authority(call_.call);
return grpc::string_ref(authority.data(), authority.size());
}
} // namespace grpc

@ -54,6 +54,7 @@ message RequestParams {
bool echo_metadata_initially = 17;
bool server_notify_client_when_started = 18;
xds.data.orca.v3.OrcaLoadReport backend_metrics = 19;
bool echo_host_from_authority_header = 20;
}
message EchoRequest {

@ -20,6 +20,7 @@
#include <thread>
#include "absl/memory/memory.h"
#include "absl/strings/ascii.h"
#include "absl/strings/match.h"
#include "absl/strings/str_format.h"
@ -275,35 +276,47 @@ class TestServiceImplDupPkg
class TestScenario {
public:
TestScenario(bool interceptors, bool proxy, bool inproc_stub,
const std::string& creds_type, bool use_callback_server)
: use_interceptors(interceptors),
use_proxy(proxy),
inproc(inproc_stub),
credentials_type(creds_type),
callback_server(use_callback_server) {}
void Log() const;
bool use_interceptors;
bool use_proxy;
bool inproc;
const std::string credentials_type;
bool callback_server;
};
TestScenario(bool use_interceptors, bool use_proxy, bool inproc,
const std::string& credentials_type, bool callback_server)
: use_interceptors_(use_interceptors),
use_proxy_(use_proxy),
inproc_(inproc),
credentials_type_(credentials_type),
callback_server_(callback_server) {}
std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
return out << "TestScenario{use_interceptors="
<< (scenario.use_interceptors ? "true" : "false")
<< ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
<< ", inproc=" << (scenario.inproc ? "true" : "false")
<< ", server_type="
<< (scenario.callback_server ? "callback" : "sync")
<< ", credentials='" << scenario.credentials_type << "'}";
}
bool use_interceptors() const { return use_interceptors_; }
bool use_proxy() const { return use_proxy_; }
bool inproc() const { return inproc_; }
const std::string& credentials_type() const { return credentials_type_; }
bool callback_server() const { return callback_server_; }
std::string AsString() const;
void TestScenario::Log() const {
std::ostringstream out;
out << *this;
gpr_log(GPR_DEBUG, "%s", out.str().c_str());
static std::string Name(const ::testing::TestParamInfo<TestScenario>& info) {
return info.param.AsString();
}
private:
bool use_interceptors_;
bool use_proxy_;
bool inproc_;
const std::string credentials_type_;
bool callback_server_;
};
std::string TestScenario::AsString() const {
std::string retval = use_interceptors_ ? "Interceptor" : "";
if (use_proxy_) retval += "Proxy";
if (inproc_) retval += "Inproc";
if (callback_server_) retval += "CallbackServer";
if (credentials_type_ == kInsecureCredentialsType) {
retval += "Insecure";
} else {
std::string creds_type = absl::AsciiStrToLower(credentials_type_);
if (!creds_type.empty()) creds_type[0] = absl::ascii_toupper(creds_type[0]);
retval += creds_type;
}
return retval;
}
class End2endTest : public ::testing::TestWithParam<TestScenario> {
@ -314,9 +327,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
: is_server_started_(false),
kMaxMessageSize_(8192),
special_service_("special"),
first_picked_port_(0) {
GetParam().Log();
}
first_picked_port_(0) {}
void TearDown() override {
if (is_server_started_) {
@ -348,11 +359,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
ServerBuilder builder;
ConfigureServerBuilder(&builder);
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
if (GetParam().credentials_type != kInsecureCredentialsType) {
GetParam().credentials_type());
if (GetParam().credentials_type() != kInsecureCredentialsType) {
server_creds->SetAuthMetadataProcessor(processor);
}
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
creators;
@ -364,7 +375,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
builder.experimental().SetInterceptorCreators(std::move(creators));
}
builder.AddListeningPort(server_address_.str(), server_creds);
if (!GetParam().callback_server) {
if (!GetParam().callback_server()) {
builder.RegisterService(&service_);
} else {
builder.RegisterService(&callback_service_);
@ -395,14 +406,14 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
EXPECT_TRUE(is_server_started_);
ChannelArguments args;
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &args);
GetParam().credentials_type(), &args);
if (!user_agent_prefix_.empty()) {
args.SetUserAgentPrefix(user_agent_prefix_);
}
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
if (!GetParam().inproc) {
if (!GetParam().use_interceptors) {
if (!GetParam().inproc()) {
if (!GetParam().use_interceptors()) {
channel_ = grpc::CreateCustomChannel(server_address_.str(),
channel_creds, args);
} else {
@ -412,7 +423,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
: std::move(interceptor_creators));
}
} else {
if (!GetParam().use_interceptors) {
if (!GetParam().use_interceptors()) {
channel_ = server_->InProcessChannel(args);
} else {
channel_ = server_->experimental().InProcessChannelWithInterceptors(
@ -428,7 +439,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
interceptor_creators = {}) {
ResetChannel(std::move(interceptor_creators));
if (GetParam().use_proxy) {
if (GetParam().use_proxy()) {
proxy_service_ = absl::make_unique<Proxy>(channel_);
int port = grpc_pick_unused_port_or_die();
std::ostringstream proxyaddr;
@ -564,7 +575,7 @@ class End2endServerTryCancelTest : public End2endTest {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -645,7 +656,7 @@ class End2endServerTryCancelTest : public End2endTest {
EXPECT_FALSE(s.ok());
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -733,7 +744,7 @@ class End2endServerTryCancelTest : public End2endTest {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -803,7 +814,7 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
// User-Agent is an HTTP header for HTTP transports only
if (GetParam().inproc) {
if (GetParam().inproc()) {
return;
}
user_agent_prefix_ = "custom_prefix";
@ -872,8 +883,28 @@ TEST_P(End2endTest, EmptyBinaryMetadata) {
EXPECT_TRUE(s.ok());
}
TEST_P(End2endTest, AuthoritySeenOnServerSide) {
ResetStub();
EchoRequest request;
request.mutable_param()->set_echo_host_from_authority_header(true);
EchoResponse response;
request.set_message("Live long and prosper.");
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
if (GetParam().credentials_type() == kTlsCredentialsType) {
// SSL creds overrides the authority.
EXPECT_EQ("foo.test.google.fr", response.param().host());
} else if (GetParam().inproc()) {
EXPECT_EQ("inproc", response.param().host());
} else {
EXPECT_EQ(server_address_.str(), response.param().host());
}
EXPECT_TRUE(s.ok());
}
TEST_P(End2endTest, ReconnectChannel) {
if (GetParam().inproc) {
if (GetParam().inproc()) {
return;
}
int poller_slowdown_factor = 1;
@ -1160,7 +1191,7 @@ TEST_P(End2endTest, CancelRpcBeforeStart) {
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -1178,7 +1209,7 @@ TEST_P(End2endTest, CancelRpcAfterStart) {
s = stub_->Echo(&context, request, &response);
EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
});
if (!GetParam().callback_server) {
if (!GetParam().callback_server()) {
service_.ClientWaitUntilRpcStarted();
} else {
callback_service_.ClientWaitUntilRpcStarted();
@ -1186,7 +1217,7 @@ TEST_P(End2endTest, CancelRpcAfterStart) {
context.TryCancel();
if (!GetParam().callback_server) {
if (!GetParam().callback_server()) {
service_.SignalServerToContinue();
} else {
callback_service_.SignalServerToContinue();
@ -1195,7 +1226,7 @@ TEST_P(End2endTest, CancelRpcAfterStart) {
echo_thread.join();
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -1218,7 +1249,7 @@ TEST_P(End2endTest, ClientCancelsRequestStream) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(response.message(), "");
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -1253,7 +1284,7 @@ TEST_P(End2endTest, ClientCancelsResponseStream) {
// The final status could be either of CANCELLED or OK depending on
// who won the race.
EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -1292,7 +1323,7 @@ TEST_P(End2endTest, ClientCancelsBidi) {
Status s = stream->Finish();
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors) {
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
}
@ -1334,7 +1365,7 @@ TEST_P(End2endTest, SimultaneousReadWritesDone) {
}
TEST_P(End2endTest, ChannelState) {
if (GetParam().inproc) {
if (GetParam().inproc()) {
return;
}
@ -1361,8 +1392,8 @@ TEST_P(End2endTest, ChannelState) {
// Takes 10s.
TEST_P(End2endTest, ChannelStateTimeout) {
if ((GetParam().credentials_type != kInsecureCredentialsType) ||
GetParam().inproc) {
if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
GetParam().inproc()) {
return;
}
int port = grpc_pick_unused_port_or_die();
@ -1383,8 +1414,8 @@ TEST_P(End2endTest, ChannelStateTimeout) {
}
TEST_P(End2endTest, ChannelStateOnLameChannel) {
if ((GetParam().credentials_type != kInsecureCredentialsType) ||
GetParam().inproc) {
if ((GetParam().credentials_type() != kInsecureCredentialsType) ||
GetParam().inproc()) {
return;
}
// Channel using invalid target URI. This creates a lame channel.
@ -1630,7 +1661,7 @@ TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
ClientContext context;
std::thread cancel_thread;
if (!GetParam().callback_server) {
if (!GetParam().callback_server()) {
cancel_thread = std::thread(
[&context, this](int delay) { CancelRpc(&context, delay, &service_); },
kCancelDelayUs);
@ -1687,7 +1718,7 @@ TEST_P(ProxyEnd2endTest, HugeResponse) {
TEST_P(ProxyEnd2endTest, Peer) {
// Peer is not meaningful for inproc
if (GetParam().inproc) {
if (GetParam().inproc()) {
return;
}
ResetStub();
@ -1708,8 +1739,8 @@ TEST_P(ProxyEnd2endTest, Peer) {
class SecureEnd2endTest : public End2endTest {
protected:
SecureEnd2endTest() {
GPR_ASSERT(!GetParam().use_proxy);
GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
GPR_ASSERT(!GetParam().use_proxy());
GPR_ASSERT(GetParam().credentials_type() != kInsecureCredentialsType);
}
};
@ -1757,7 +1788,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
request.mutable_param()->set_expected_client_identity(
TestAuthMetadataProcessor::kGoodGuy);
request.mutable_param()->set_expected_transport_security_type(
GetParam().credentials_type);
GetParam().credentials_type());
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(request.message(), response.message());
@ -1836,7 +1867,7 @@ class CredentialsInterceptorFactory
};
TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
if (!GetParam().use_interceptors) {
if (!GetParam().use_interceptors()) {
return;
}
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
@ -1865,7 +1896,7 @@ TEST_P(SecureEnd2endTest, CallCredentialsInterception) {
}
TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) {
if (!GetParam().use_interceptors) {
if (!GetParam().use_interceptors()) {
return;
}
std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
@ -2063,7 +2094,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
request.mutable_param()->set_expected_client_identity(
TestAuthMetadataProcessor::kGoodGuy);
request.mutable_param()->set_expected_transport_security_type(
GetParam().credentials_type);
GetParam().credentials_type());
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(request.message(), response.message());
@ -2156,10 +2187,10 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
kTlsCredentialsType);
request.mutable_param()->set_check_auth_context(
GetParam().credentials_type() == kTlsCredentialsType);
request.mutable_param()->set_expected_transport_security_type(
GetParam().credentials_type);
GetParam().credentials_type());
ClientContext context;
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(response.message(), request.message());
@ -2169,8 +2200,8 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
std::vector<grpc::string_ref> tst =
auth_ctx->FindPropertyValues("transport_security_type");
ASSERT_EQ(1u, tst.size());
EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
if (GetParam().credentials_type == kTlsCredentialsType) {
EXPECT_EQ(GetParam().credentials_type(), ToString(tst[0]));
if (GetParam().credentials_type() == kTlsCredentialsType) {
EXPECT_EQ("x509_subject_alternative_name",
auth_ctx->GetPeerIdentityPropertyName());
EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
@ -2268,23 +2299,28 @@ std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
INSTANTIATE_TEST_SUITE_P(
End2end, End2endTest,
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
&TestScenario::Name);
INSTANTIATE_TEST_SUITE_P(
End2endServerTryCancel, End2endServerTryCancelTest,
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
&TestScenario::Name);
INSTANTIATE_TEST_SUITE_P(
ProxyEnd2end, ProxyEnd2endTest,
::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)));
::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)),
&TestScenario::Name);
INSTANTIATE_TEST_SUITE_P(
SecureEnd2end, SecureEnd2endTest,
::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)));
::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)),
&TestScenario::Name);
INSTANTIATE_TEST_SUITE_P(
ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)),
&TestScenario::Name);
} // namespace
} // namespace testing

@ -232,6 +232,11 @@ ServerUnaryReactor* CallbackTestServiceImpl::Echo(
internal::MaybeEchoDeadline(ctx_, req_, resp_);
if (service_->host_) {
resp_->mutable_param()->set_host(*service_->host_);
} else if (req_->has_param() &&
req_->param().echo_host_from_authority_header()) {
auto authority = ctx_->ExperimentalGetAuthority();
std::string authority_str(authority.data(), authority.size());
resp_->mutable_param()->set_host(std::move(authority_str));
}
if (req_->has_param() && req_->param().client_cancel_after_us()) {
{

@ -161,6 +161,11 @@ class TestMultipleServiceImpl : public RpcService {
internal::MaybeEchoDeadline(context, request, response);
if (host_) {
response->mutable_param()->set_host(*host_);
} else if (request->has_param() &&
request->param().echo_host_from_authority_header()) {
auto authority = context->ExperimentalGetAuthority();
std::string authority_str(authority.data(), authority.size());
response->mutable_param()->set_host(std::move(authority_str));
}
if (request->has_param() && request->param().client_cancel_after_us()) {
{

Loading…
Cancel
Save