[call creds] refactor token-fetching call creds to allow cleaner reuse (#37510)

Previously, `grpc_oauth2_token_fetcher_credentials` provided functionality for on-demand token-fetching, but it was integrated into the oauth2 code, so it was not possible to use that same code for on-demand fetching of (e.g.) JWT tokens.  This PR splits that class into two parts:

1. A base `TokenFetcherCredentials` class that provides a framework for on-demand fetching of any arbitrary type of auth token.
2. An `Oauth2TokenFetcherCredentials` subclass that derives from `TokenFetcherCredentials` and provides handling for oauth2 tokens.

The `grpc_compute_engine_token_fetcher_credentials`, `StsTokenFetcherCredentials`, and `grpc_google_refresh_token_credentials` classes that previously derived from `grpc_oauth2_token_fetcher_credentials` now derive from `Oauth2TokenFetcherCredentials` instead, so there's not much change to those classes (other than a cleaner interface with the base class functionality).

The `ExternalAccountCredentials` class and its subclasses got more extensive changes here.  Previously, this class inheritted from `grpc_oauth2_token_fetcher_credentials` and fooled the base class into thinking that it directly fetched the oauth2 token, when in fact it actually performed a number of steps to gather data and then constructed a synthetic HTTP response to pass back to the base class.  I have changed this to instead derive directly from `TokenFetcherCredentials` to provide a much cleaner interface with the parent class.

In addition, I have changed `grpc_call_credentials` from `RefCounted<>` to `DualRefCounted<>` to provide a clean way to shut down any in-flight token fetch when the credentials are unreffed.

This PR paves the way for subsequent work that will allow implementing an on-demand JWT token fetcher call credential, as part of gRFC A83 (https://github.com/grpc/proposal/pull/438).

Closes #37510

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37510 from markdroth:token_fetcher_call_creds_refactor 3bd398a762
PiperOrigin-RevId: 666547985
pull/37531/head^2
Mark D. Roth 7 months ago committed by Copybara-Service
parent c49fc0f78e
commit 71c8629e75
  1. 8
      CMakeLists.txt
  2. 1
      Makefile
  3. 2
      Package.swift
  4. 8
      build_autogenerated.yaml
  5. 2
      config.m4
  6. 2
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      package.xml
  11. 37
      src/core/BUILD
  12. 2
      src/core/lib/security/credentials/composite/composite_credentials.h
  13. 2
      src/core/lib/security/credentials/credentials.h
  14. 645
      src/core/lib/security/credentials/external/aws_external_account_credentials.cc
  15. 95
      src/core/lib/security/credentials/external/aws_external_account_credentials.h
  16. 748
      src/core/lib/security/credentials/external/external_account_credentials.cc
  17. 177
      src/core/lib/security/credentials/external/external_account_credentials.h
  18. 128
      src/core/lib/security/credentials/external/file_external_account_credentials.cc
  19. 34
      src/core/lib/security/credentials/external/file_external_account_credentials.h
  20. 206
      src/core/lib/security/credentials/external/url_external_account_credentials.cc
  21. 31
      src/core/lib/security/credentials/external/url_external_account_credentials.h
  22. 2
      src/core/lib/security/credentials/fake/fake_credentials.h
  23. 68
      src/core/lib/security/credentials/google_default/google_default_credentials.cc
  24. 2
      src/core/lib/security/credentials/iam/iam_credentials.h
  25. 2
      src/core/lib/security/credentials/jwt/jwt_credentials.h
  26. 413
      src/core/lib/security/credentials/oauth2/oauth2_credentials.cc
  27. 90
      src/core/lib/security/credentials/oauth2/oauth2_credentials.h
  28. 2
      src/core/lib/security/credentials/plugin/plugin_credentials.h
  29. 130
      src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc
  30. 115
      src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h
  31. 1
      src/python/grpcio/grpc_core_dependencies.py
  32. 2
      test/core/filters/client_auth_filter_test.cc
  33. 2
      test/core/security/BUILD
  34. 845
      test/core/security/credentials_test.cc
  35. 2
      tools/doxygen/Doxyfile.c++.internal
  36. 2
      tools/doxygen/Doxyfile.core.internal

8
CMakeLists.txt generated

@ -2458,6 +2458,7 @@ add_library(grpc
src/core/lib/security/credentials/tls/grpc_tls_crl_provider.cc
src/core/lib/security/credentials/tls/tls_credentials.cc
src/core/lib/security/credentials/tls/tls_utils.cc
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc
src/core/lib/security/credentials/xds/xds_credentials.cc
src/core/lib/security/security_connector/alts/alts_security_connector.cc
src/core/lib/security/security_connector/fake/fake_security_connector.cc
@ -30600,6 +30601,12 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(test_core_security_credentials_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
test/core/event_engine/event_engine_test_utils.cc
test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
test/core/security/credentials_test.cc
test/core/test_util/cmdline.cc
test/core/test_util/fuzzer_util.cc
@ -30643,6 +30650,7 @@ target_include_directories(test_core_security_credentials_test
target_link_libraries(test_core_security_credentials_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
${_gRPC_PROTOBUF_LIBRARIES}
grpc_test_util
)

1
Makefile generated

@ -1295,6 +1295,7 @@ LIBGRPC_SRC = \
src/core/lib/security/credentials/tls/grpc_tls_crl_provider.cc \
src/core/lib/security/credentials/tls/tls_credentials.cc \
src/core/lib/security/credentials/tls/tls_utils.cc \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc \
src/core/lib/security/credentials/xds/xds_credentials.cc \
src/core/lib/security/security_connector/alts/alts_security_connector.cc \
src/core/lib/security/security_connector/fake/fake_security_connector.cc \

2
Package.swift generated

@ -1601,6 +1601,8 @@ let package = Package(
"src/core/lib/security/credentials/tls/tls_credentials.h",
"src/core/lib/security/credentials/tls/tls_utils.cc",
"src/core/lib/security/credentials/tls/tls_utils.h",
"src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc",
"src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h",
"src/core/lib/security/credentials/xds/xds_credentials.cc",
"src/core/lib/security/credentials/xds/xds_credentials.h",
"src/core/lib/security/security_connector/alts/alts_security_connector.cc",

@ -1055,6 +1055,7 @@ libs:
- src/core/lib/security/credentials/tls/grpc_tls_crl_provider.h
- src/core/lib/security/credentials/tls/tls_credentials.h
- src/core/lib/security/credentials/tls/tls_utils.h
- src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h
- src/core/lib/security/credentials/xds/xds_credentials.h
- src/core/lib/security/security_connector/alts/alts_security_connector.h
- src/core/lib/security/security_connector/fake/fake_security_connector.h
@ -1870,6 +1871,7 @@ libs:
- src/core/lib/security/credentials/tls/grpc_tls_crl_provider.cc
- src/core/lib/security/credentials/tls/tls_credentials.cc
- src/core/lib/security/credentials/tls/tls_utils.cc
- src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc
- src/core/lib/security/credentials/xds/xds_credentials.cc
- src/core/lib/security/security_connector/alts/alts_security_connector.cc
- src/core/lib/security/security_connector/fake/fake_security_connector.cc
@ -19649,6 +19651,8 @@ targets:
build: test
language: c++
headers:
- test/core/event_engine/event_engine_test_utils.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- test/core/test_util/cmdline.h
- test/core/test_util/evaluate_args_test_util.h
- test/core/test_util/fuzzer_util.h
@ -19660,6 +19664,9 @@ targets:
- test/core/test_util/slice_splitter.h
- test/core/test_util/tracer_util.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- test/core/event_engine/event_engine_test_utils.cc
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
- test/core/security/credentials_test.cc
- test/core/test_util/cmdline.cc
- test/core/test_util/fuzzer_util.cc
@ -19672,6 +19679,7 @@ targets:
- test/core/test_util/tracer_util.cc
deps:
- gtest
- protobuf
- grpc_test_util
- name: test_core_security_ssl_credentials_test
gtest: true

2
config.m4 generated

@ -670,6 +670,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/security/credentials/tls/grpc_tls_crl_provider.cc \
src/core/lib/security/credentials/tls/tls_credentials.cc \
src/core/lib/security/credentials/tls/tls_utils.cc \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc \
src/core/lib/security/credentials/xds/xds_credentials.cc \
src/core/lib/security/security_connector/alts/alts_security_connector.cc \
src/core/lib/security/security_connector/fake/fake_security_connector.cc \
@ -1557,6 +1558,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/credentials/plugin)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/credentials/ssl)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/credentials/tls)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/credentials/token_fetcher)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/credentials/xds)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/security_connector)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/lib/security/security_connector/alts)

2
config.w32 generated

@ -635,6 +635,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\security\\credentials\\tls\\grpc_tls_crl_provider.cc " +
"src\\core\\lib\\security\\credentials\\tls\\tls_credentials.cc " +
"src\\core\\lib\\security\\credentials\\tls\\tls_utils.cc " +
"src\\core\\lib\\security\\credentials\\token_fetcher\\token_fetcher_credentials.cc " +
"src\\core\\lib\\security\\credentials\\xds\\xds_credentials.cc " +
"src\\core\\lib\\security\\security_connector\\alts\\alts_security_connector.cc " +
"src\\core\\lib\\security\\security_connector\\fake\\fake_security_connector.cc " +
@ -1695,6 +1696,7 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\credentials\\plugin");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\credentials\\ssl");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\credentials\\tls");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\credentials\\token_fetcher");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\credentials\\xds");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\security_connector");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\lib\\security\\security_connector\\alts");

2
gRPC-C++.podspec generated

@ -1159,6 +1159,7 @@ Pod::Spec.new do |s|
'src/core/lib/security/credentials/tls/grpc_tls_crl_provider.h',
'src/core/lib/security/credentials/tls/tls_credentials.h',
'src/core/lib/security/credentials/tls/tls_utils.h',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h',
'src/core/lib/security/credentials/xds/xds_credentials.h',
'src/core/lib/security/security_connector/alts/alts_security_connector.h',
'src/core/lib/security/security_connector/fake/fake_security_connector.h',
@ -2444,6 +2445,7 @@ Pod::Spec.new do |s|
'src/core/lib/security/credentials/tls/grpc_tls_crl_provider.h',
'src/core/lib/security/credentials/tls/tls_credentials.h',
'src/core/lib/security/credentials/tls/tls_utils.h',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h',
'src/core/lib/security/credentials/xds/xds_credentials.h',
'src/core/lib/security/security_connector/alts/alts_security_connector.h',
'src/core/lib/security/security_connector/fake/fake_security_connector.h',

3
gRPC-Core.podspec generated

@ -1717,6 +1717,8 @@ Pod::Spec.new do |s|
'src/core/lib/security/credentials/tls/tls_credentials.h',
'src/core/lib/security/credentials/tls/tls_utils.cc',
'src/core/lib/security/credentials/tls/tls_utils.h',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h',
'src/core/lib/security/credentials/xds/xds_credentials.cc',
'src/core/lib/security/credentials/xds/xds_credentials.h',
'src/core/lib/security/security_connector/alts/alts_security_connector.cc',
@ -3222,6 +3224,7 @@ Pod::Spec.new do |s|
'src/core/lib/security/credentials/tls/grpc_tls_crl_provider.h',
'src/core/lib/security/credentials/tls/tls_credentials.h',
'src/core/lib/security/credentials/tls/tls_utils.h',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h',
'src/core/lib/security/credentials/xds/xds_credentials.h',
'src/core/lib/security/security_connector/alts/alts_security_connector.h',
'src/core/lib/security/security_connector/fake/fake_security_connector.h',

2
grpc.gemspec generated

@ -1603,6 +1603,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/security/credentials/tls/tls_credentials.h )
s.files += %w( src/core/lib/security/credentials/tls/tls_utils.cc )
s.files += %w( src/core/lib/security/credentials/tls/tls_utils.h )
s.files += %w( src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc )
s.files += %w( src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h )
s.files += %w( src/core/lib/security/credentials/xds/xds_credentials.cc )
s.files += %w( src/core/lib/security/credentials/xds/xds_credentials.h )
s.files += %w( src/core/lib/security/security_connector/alts/alts_security_connector.cc )

2
package.xml generated

@ -1585,6 +1585,8 @@
<file baseinstalldir="/" name="src/core/lib/security/credentials/tls/tls_credentials.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/tls/tls_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/tls/tls_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/xds/xds_credentials.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/credentials/xds/xds_credentials.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/security/security_connector/alts/alts_security_connector.cc" role="src" />

@ -4317,6 +4317,40 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "token_fetcher_credentials",
srcs = [
"lib/security/credentials/token_fetcher/token_fetcher_credentials.cc",
],
hdrs = [
"lib/security/credentials/token_fetcher/token_fetcher_credentials.h",
],
external_deps = [
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
"absl/status:statusor",
"absl/types:variant",
],
language = "c++",
deps = [
"arena_promise",
"context",
"metadata",
"poll",
"pollset_set",
"ref_counted",
"time",
"useful",
"//:gpr",
"//:grpc_security_base",
"//:httpcli",
"//:iomgr",
"//:orphanable",
"//:promise",
"//:ref_counted_ptr",
],
)
grpc_cc_library(
name = "grpc_oauth2_credentials",
srcs = [
@ -4354,6 +4388,7 @@ grpc_cc_library(
"slice_refcount",
"status_helper",
"time",
"token_fetcher_credentials",
"unique_type_name",
"useful",
"//:gpr",
@ -4401,6 +4436,7 @@ grpc_cc_library(
language = "c++",
deps = [
"closure",
"default_event_engine",
"env",
"error",
"error_utils",
@ -4414,6 +4450,7 @@ grpc_cc_library(
"slice_refcount",
"status_helper",
"time",
"token_fetcher_credentials",
"//:gpr",
"//:grpc_base",
"//:grpc_core_credentials_header",

@ -102,6 +102,8 @@ class grpc_composite_call_credentials : public grpc_call_credentials {
grpc_core::RefCountedPtr<grpc_call_credentials> creds2);
~grpc_composite_call_credentials() override = default;
void Orphaned() override { inner_.clear(); }
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;

@ -183,7 +183,7 @@ using CredentialsMetadataArray = std::vector<std::pair<Slice, Slice>>;
// class. Otherwise, compiler will complain about type mismatch due to
// -Wmismatched-tags.
struct grpc_call_credentials
: public grpc_core::RefCounted<grpc_call_credentials> {
: public grpc_core::DualRefCounted<grpc_call_credentials> {
public:
// TODO(roth): Consider whether security connector actually needs to
// be part of this interface. Currently, it is here only for the

@ -58,6 +58,13 @@ const char* kAccessKeyIdEnvVar = "AWS_ACCESS_KEY_ID";
const char* kSecretAccessKeyEnvVar = "AWS_SECRET_ACCESS_KEY";
const char* kSessionTokenEnvVar = "AWS_SESSION_TOKEN";
bool ShouldUseMetadataServer() {
return !((GetEnv(kRegionEnvVar).has_value() ||
GetEnv(kDefaultRegionEnvVar).has_value()) &&
(GetEnv(kAccessKeyIdEnvVar).has_value() &&
GetEnv(kSecretAccessKeyEnvVar).has_value()));
}
std::string UrlEncode(const absl::string_view& s) {
const char* hex = "0123456789ABCDEF";
std::string result;
@ -78,281 +85,202 @@ std::string UrlEncode(const absl::string_view& s) {
} // namespace
RefCountedPtr<AwsExternalAccountCredentials>
AwsExternalAccountCredentials::Create(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error) {
auto creds = MakeRefCounted<AwsExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
if (error->ok()) {
return creds;
} else {
return nullptr;
}
}
//
// AwsExternalAccountCredentials::AwsFetchBody
//
AwsExternalAccountCredentials::AwsExternalAccountCredentials(
Options options, std::vector<std::string> scopes, grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes)) {
audience_ = options.audience;
auto it = options.credential_source.object().find("environment_id");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("environment_id field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("environment_id field must be a string.");
return;
}
if (it->second.string() != kExpectedEnvironmentId) {
*error = GRPC_ERROR_CREATE("environment_id does not match.");
return;
}
it = options.credential_source.object().find("region_url");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("region_url field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("region_url field must be a string.");
return;
}
region_url_ = it->second.string();
it = options.credential_source.object().find("url");
if (it != options.credential_source.object().end() &&
it->second.type() == Json::Type::kString) {
url_ = it->second.string();
}
it =
options.credential_source.object().find("regional_cred_verification_url");
if (it == options.credential_source.object().end()) {
*error =
GRPC_ERROR_CREATE("regional_cred_verification_url field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE(
"regional_cred_verification_url field must be a string.");
return;
}
regional_cred_verification_url_ = it->second.string();
it = options.credential_source.object().find("imdsv2_session_token_url");
if (it != options.credential_source.object().end() &&
it->second.type() == Json::Type::kString) {
imdsv2_session_token_url_ = it->second.string();
}
AwsExternalAccountCredentials::AwsFetchBody::AwsFetchBody(
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
AwsExternalAccountCredentials* creds, Timestamp deadline)
: FetchBody(std::move(on_done)), creds_(creds), deadline_(deadline) {
MutexLock lock(&mu_);
// Do a quick async hop here, so that we can invoke the callback at
// any time without deadlocking.
fetch_body_ = MakeOrphanable<NoOpFetchBody>(
creds->event_engine(),
[self = RefAsSubclass<AwsFetchBody>()](
absl::StatusOr<std::string> /*result*/) { self->Start(); },
"");
}
bool AwsExternalAccountCredentials::ShouldUseMetadataServer() {
return !((GetEnv(kRegionEnvVar).has_value() ||
GetEnv(kDefaultRegionEnvVar).has_value()) &&
(GetEnv(kAccessKeyIdEnvVar).has_value() &&
GetEnv(kSecretAccessKeyEnvVar).has_value()));
void AwsExternalAccountCredentials::AwsFetchBody::Shutdown() {
MutexLock lock(&mu_);
fetch_body_.reset();
}
void AwsExternalAccountCredentials::RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& /*options*/,
std::function<void(std::string, grpc_error_handle)> cb) {
if (ctx == nullptr) {
FinishRetrieveSubjectToken(
"",
GRPC_ERROR_CREATE(
"Missing HTTPRequestContext to start subject token retrieval."));
return;
}
ctx_ = ctx;
cb_ = cb;
if (!imdsv2_session_token_url_.empty() && ShouldUseMetadataServer()) {
RetrieveImdsV2SessionToken();
} else if (signer_ != nullptr) {
BuildSubjectToken();
} else {
RetrieveRegion();
}
void AwsExternalAccountCredentials::AwsFetchBody::AsyncFinish(
absl::StatusOr<std::string> result) {
creds_->event_engine().Run(
[this, self = Ref(), result = std::move(result)]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
Finish(std::move(result));
self.reset();
});
}
void AwsExternalAccountCredentials::RetrieveImdsV2SessionToken() {
absl::StatusOr<URI> uri = URI::Parse(imdsv2_session_token_url_);
if (!uri.ok()) {
return;
bool AwsExternalAccountCredentials::AwsFetchBody::MaybeFail(
absl::Status status) {
if (!status.ok()) {
AsyncFinish(std::move(status));
return true;
}
grpc_http_header* headers =
static_cast<grpc_http_header*>(gpr_malloc(sizeof(grpc_http_header)));
headers[0].key = gpr_strdup("x-aws-ec2-metadata-token-ttl-seconds");
headers[0].value = gpr_strdup("300");
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.hdr_count = 1;
request.hdrs = headers;
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
GRPC_CLOSURE_INIT(&ctx_->closure, OnRetrieveImdsV2SessionToken, this,
nullptr);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
if (fetch_body_ == nullptr) {
AsyncFinish(
absl::CancelledError("external account credentials fetch cancelled"));
return true;
}
http_request_ =
HttpRequest::Put(std::move(*uri), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
grpc_http_request_destroy(&request);
}
void AwsExternalAccountCredentials::OnRetrieveImdsV2SessionToken(
void* arg, grpc_error_handle error) {
AwsExternalAccountCredentials* self =
static_cast<AwsExternalAccountCredentials*>(arg);
self->OnRetrieveImdsV2SessionTokenInternal(error);
return false;
}
void AwsExternalAccountCredentials::OnRetrieveImdsV2SessionTokenInternal(
grpc_error_handle error) {
if (!error.ok()) {
FinishRetrieveSubjectToken("", error);
return;
}
imdsv2_session_token_ =
std::string(ctx_->response.body, ctx_->response.body_length);
if (signer_ != nullptr) {
void AwsExternalAccountCredentials::AwsFetchBody::Start() {
MutexLock lock(&mu_);
if (MaybeFail(absl::OkStatus())) return;
if (!creds_->imdsv2_session_token_url_.empty() && ShouldUseMetadataServer()) {
RetrieveImdsV2SessionToken();
} else if (creds_->signer_ != nullptr) {
BuildSubjectToken();
} else {
RetrieveRegion();
}
}
void AwsExternalAccountCredentials::AddMetadataRequestHeaders(
grpc_http_request* request) {
if (!imdsv2_session_token_.empty()) {
CHECK_EQ(request->hdr_count, 0u);
CHECK_EQ(request->hdrs, nullptr);
grpc_http_header* headers =
static_cast<grpc_http_header*>(gpr_malloc(sizeof(grpc_http_header)));
headers[0].key = gpr_strdup("x-aws-ec2-metadata-token");
headers[0].value = gpr_strdup(imdsv2_session_token_.c_str());
request->hdr_count = 1;
request->hdrs = headers;
void AwsExternalAccountCredentials::AwsFetchBody::RetrieveImdsV2SessionToken() {
absl::StatusOr<URI> uri = URI::Parse(creds_->imdsv2_session_token_url_);
if (!uri.ok()) {
AsyncFinish(uri.status());
return;
}
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_header* headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header)));
headers[0].key = gpr_strdup("x-aws-ec2-metadata-token-ttl-seconds");
headers[0].value = gpr_strdup("300");
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.hdr_count = 1;
request.hdrs = headers;
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Put(
std::move(*uri), /*args=*/nullptr, creds_->pollent(), &request,
deadline_, on_http_response, response,
std::move(http_request_creds));
http_request->Start();
grpc_http_request_destroy(&request);
return http_request;
},
[self =
RefAsSubclass<AwsFetchBody>()](absl::StatusOr<std::string> result) {
MutexLock lock(&self->mu_);
if (self->MaybeFail(result.status())) return;
self->imdsv2_session_token_ = std::move(*result);
if (self->creds_->signer_ != nullptr) {
self->BuildSubjectToken();
} else {
self->RetrieveRegion();
}
});
}
void AwsExternalAccountCredentials::RetrieveRegion() {
void AwsExternalAccountCredentials::AwsFetchBody::RetrieveRegion() {
auto region_from_env = GetEnv(kRegionEnvVar);
if (!region_from_env.has_value()) {
region_from_env = GetEnv(kDefaultRegionEnvVar);
}
if (region_from_env.has_value()) {
region_ = std::move(*region_from_env);
if (url_.empty()) {
if (creds_->url_.empty()) {
RetrieveSigningKeys();
} else {
RetrieveRoleName();
}
return;
}
absl::StatusOr<URI> uri = URI::Parse(region_url_);
absl::StatusOr<URI> uri = URI::Parse(creds_->region_url_);
if (!uri.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(absl::StrFormat("Invalid region url. %s",
uri.status().ToString())));
return;
}
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
AddMetadataRequestHeaders(&request);
GRPC_CLOSURE_INIT(&ctx_->closure, OnRetrieveRegion, this, nullptr);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ =
HttpRequest::Get(std::move(*uri), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
grpc_http_request_destroy(&request);
}
void AwsExternalAccountCredentials::OnRetrieveRegion(void* arg,
grpc_error_handle error) {
AwsExternalAccountCredentials* self =
static_cast<AwsExternalAccountCredentials*>(arg);
self->OnRetrieveRegionInternal(error);
}
void AwsExternalAccountCredentials::OnRetrieveRegionInternal(
grpc_error_handle error) {
if (!error.ok()) {
FinishRetrieveSubjectToken("", error);
AsyncFinish(GRPC_ERROR_CREATE(
absl::StrFormat("Invalid region url. %s", uri.status().ToString())));
return;
}
// Remove the last letter of availability zone to get pure region
absl::string_view response_body(ctx_->response.body,
ctx_->response.body_length);
region_ = std::string(response_body.substr(0, response_body.size() - 1));
if (url_.empty()) {
RetrieveSigningKeys();
} else {
RetrieveRoleName();
}
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
AddMetadataRequestHeaders(&request);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Get(
std::move(*uri), /*args=*/nullptr, creds_->pollent(), &request,
deadline_, on_http_response, response,
std::move(http_request_creds));
http_request->Start();
grpc_http_request_destroy(&request);
return http_request;
},
[self =
RefAsSubclass<AwsFetchBody>()](absl::StatusOr<std::string> result) {
MutexLock lock(&self->mu_);
if (self->MaybeFail(result.status())) return;
// Remove the last letter of availability zone to get pure region
self->region_ = result->substr(0, result->size() - 1);
if (self->creds_->url_.empty()) {
self->RetrieveSigningKeys();
} else {
self->RetrieveRoleName();
}
});
}
void AwsExternalAccountCredentials::RetrieveRoleName() {
absl::StatusOr<URI> uri = URI::Parse(url_);
void AwsExternalAccountCredentials::AwsFetchBody::RetrieveRoleName() {
absl::StatusOr<URI> uri = URI::Parse(creds_->url_);
if (!uri.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(
absl::StrFormat("Invalid url: %s.", uri.status().ToString())));
AsyncFinish(GRPC_ERROR_CREATE(
absl::StrFormat("Invalid url: %s.", uri.status().ToString())));
return;
}
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
AddMetadataRequestHeaders(&request);
GRPC_CLOSURE_INIT(&ctx_->closure, OnRetrieveRoleName, this, nullptr);
// TODO(ctiller): use the caller's resource quota.
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ =
HttpRequest::Get(std::move(*uri), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
grpc_http_request_destroy(&request);
}
void AwsExternalAccountCredentials::OnRetrieveRoleName(
void* arg, grpc_error_handle error) {
AwsExternalAccountCredentials* self =
static_cast<AwsExternalAccountCredentials*>(arg);
self->OnRetrieveRoleNameInternal(error);
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
AddMetadataRequestHeaders(&request);
// TODO(ctiller): use the caller's resource quota.
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Get(
std::move(*uri), /*args=*/nullptr, creds_->pollent(), &request,
deadline_, on_http_response, response,
std::move(http_request_creds));
http_request->Start();
grpc_http_request_destroy(&request);
return http_request;
},
[self =
RefAsSubclass<AwsFetchBody>()](absl::StatusOr<std::string> result) {
MutexLock lock(&self->mu_);
if (self->MaybeFail(result.status())) return;
self->role_name_ = std::move(*result);
self->RetrieveSigningKeys();
});
}
void AwsExternalAccountCredentials::OnRetrieveRoleNameInternal(
grpc_error_handle error) {
if (!error.ok()) {
FinishRetrieveSubjectToken("", error);
return;
}
role_name_ = std::string(ctx_->response.body, ctx_->response.body_length);
RetrieveSigningKeys();
}
void AwsExternalAccountCredentials::RetrieveSigningKeys() {
void AwsExternalAccountCredentials::AwsFetchBody::RetrieveSigningKeys() {
auto access_key_id_from_env = GetEnv(kAccessKeyIdEnvVar);
auto secret_access_key_from_env = GetEnv(kSecretAccessKeyEnvVar);
auto token_from_env = GetEnv(kSessionTokenEnvVar);
@ -367,122 +295,106 @@ void AwsExternalAccountCredentials::RetrieveSigningKeys() {
return;
}
if (role_name_.empty()) {
FinishRetrieveSubjectToken(
"",
AsyncFinish(
GRPC_ERROR_CREATE("Missing role name when retrieving signing keys."));
return;
}
std::string url_with_role_name = absl::StrCat(url_, "/", role_name_);
std::string url_with_role_name = absl::StrCat(creds_->url_, "/", role_name_);
absl::StatusOr<URI> uri = URI::Parse(url_with_role_name);
if (!uri.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(absl::StrFormat("Invalid url with role name: %s.",
uri.status().ToString())));
AsyncFinish(GRPC_ERROR_CREATE(absl::StrFormat(
"Invalid url with role name: %s.", uri.status().ToString())));
return;
}
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
AddMetadataRequestHeaders(&request);
GRPC_CLOSURE_INIT(&ctx_->closure, OnRetrieveSigningKeys, this, nullptr);
// TODO(ctiller): use the caller's resource quota.
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ =
HttpRequest::Get(std::move(*uri), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
grpc_http_request_destroy(&request);
}
void AwsExternalAccountCredentials::OnRetrieveSigningKeys(
void* arg, grpc_error_handle error) {
AwsExternalAccountCredentials* self =
static_cast<AwsExternalAccountCredentials*>(arg);
self->OnRetrieveSigningKeysInternal(error);
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
AddMetadataRequestHeaders(&request);
// TODO(ctiller): use the caller's resource quota.
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Get(
std::move(*uri), /*args=*/nullptr, creds_->pollent(), &request,
deadline_, on_http_response, response,
std::move(http_request_creds));
http_request->Start();
grpc_http_request_destroy(&request);
return http_request;
},
[self =
RefAsSubclass<AwsFetchBody>()](absl::StatusOr<std::string> result) {
MutexLock lock(&self->mu_);
if (self->MaybeFail(result.status())) return;
self->OnRetrieveSigningKeys(std::move(*result));
});
}
void AwsExternalAccountCredentials::OnRetrieveSigningKeysInternal(
grpc_error_handle error) {
if (!error.ok()) {
FinishRetrieveSubjectToken("", error);
return;
}
absl::string_view response_body(ctx_->response.body,
ctx_->response.body_length);
auto json = JsonParse(response_body);
void AwsExternalAccountCredentials::AwsFetchBody::OnRetrieveSigningKeys(
std::string result) {
auto json = JsonParse(result);
if (!json.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(
absl::StrCat("Invalid retrieve signing keys response: ",
json.status().ToString())));
AsyncFinish(GRPC_ERROR_CREATE(absl::StrCat(
"Invalid retrieve signing keys response: ", json.status().ToString())));
return;
}
if (json->type() != Json::Type::kObject) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE("Invalid retrieve signing keys response: "
"JSON type is not object"));
AsyncFinish(
GRPC_ERROR_CREATE("Invalid retrieve signing keys response: "
"JSON type is not object"));
return;
}
auto it = json->object().find("AccessKeyId");
if (it != json->object().end() && it->second.type() == Json::Type::kString) {
access_key_id_ = it->second.string();
} else {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid AccessKeyId in %s.", response_body)));
AsyncFinish(GRPC_ERROR_CREATE(
absl::StrFormat("Missing or invalid AccessKeyId in %s.", result)));
return;
}
it = json->object().find("SecretAccessKey");
if (it != json->object().end() && it->second.type() == Json::Type::kString) {
secret_access_key_ = it->second.string();
} else {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid SecretAccessKey in %s.", response_body)));
AsyncFinish(GRPC_ERROR_CREATE(
absl::StrFormat("Missing or invalid SecretAccessKey in %s.", result)));
return;
}
it = json->object().find("Token");
if (it != json->object().end() && it->second.type() == Json::Type::kString) {
token_ = it->second.string();
} else {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(absl::StrFormat("Missing or invalid Token in %s.",
response_body)));
AsyncFinish(GRPC_ERROR_CREATE(
absl::StrFormat("Missing or invalid Token in %s.", result)));
return;
}
BuildSubjectToken();
}
void AwsExternalAccountCredentials::BuildSubjectToken() {
void AwsExternalAccountCredentials::AwsFetchBody::BuildSubjectToken() {
grpc_error_handle error;
if (signer_ == nullptr) {
cred_verification_url_ = absl::StrReplaceAll(
regional_cred_verification_url_, {{"{region}", region_}});
signer_ = std::make_unique<AwsRequestSigner>(
if (creds_->signer_ == nullptr) {
creds_->cred_verification_url_ = absl::StrReplaceAll(
creds_->regional_cred_verification_url_, {{"{region}", region_}});
creds_->signer_ = std::make_unique<AwsRequestSigner>(
access_key_id_, secret_access_key_, token_, "POST",
cred_verification_url_, region_, "",
creds_->cred_verification_url_, region_, "",
std::map<std::string, std::string>(), &error);
if (!error.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE_REFERENCING(
"Creating aws request signer failed.", &error, 1));
AsyncFinish(GRPC_ERROR_CREATE_REFERENCING(
"Creating aws request signer failed.", &error, 1));
return;
}
}
auto signed_headers = signer_->GetSignedRequestHeaders();
auto signed_headers = creds_->signer_->GetSignedRequestHeaders();
if (!error.ok()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE_REFERENCING("Invalid getting signed request"
"headers.",
&error, 1));
AsyncFinish(GRPC_ERROR_CREATE_REFERENCING(
"Invalid getting signed request headers.", &error, 1));
return;
}
// Construct subject token
@ -501,30 +413,117 @@ void AwsExternalAccountCredentials::BuildSubjectToken() {
{"value", Json::FromString(signed_headers["x-amz-security-token"])}}));
headers.push_back(Json::FromObject(
{{"key", Json::FromString("x-goog-cloud-target-resource")},
{"value", Json::FromString(audience_)}}));
Json subject_token_json =
Json::FromObject({{"url", Json::FromString(cred_verification_url_)},
{"method", Json::FromString("POST")},
{"headers", Json::FromArray(headers)}});
{"value", Json::FromString(creds_->audience_)}}));
Json subject_token_json = Json::FromObject(
{{"url", Json::FromString(creds_->cred_verification_url_)},
{"method", Json::FromString("POST")},
{"headers", Json::FromArray(headers)}});
std::string subject_token = UrlEncode(JsonDump(subject_token_json));
FinishRetrieveSubjectToken(subject_token, absl::OkStatus());
AsyncFinish(std::move(subject_token));
}
void AwsExternalAccountCredentials::FinishRetrieveSubjectToken(
std::string subject_token, grpc_error_handle error) {
// Reset context
ctx_ = nullptr;
// Move object state into local variables.
auto cb = cb_;
cb_ = nullptr;
// Invoke the callback.
if (!error.ok()) {
cb("", error);
} else {
cb(subject_token, absl::OkStatus());
void AwsExternalAccountCredentials::AwsFetchBody::AddMetadataRequestHeaders(
grpc_http_request* request) {
if (!imdsv2_session_token_.empty()) {
CHECK_EQ(request->hdr_count, 0u);
CHECK_EQ(request->hdrs, nullptr);
grpc_http_header* headers =
static_cast<grpc_http_header*>(gpr_malloc(sizeof(grpc_http_header)));
headers[0].key = gpr_strdup("x-aws-ec2-metadata-token");
headers[0].value = gpr_strdup(imdsv2_session_token_.c_str());
request->hdr_count = 1;
request->hdrs = headers;
}
}
//
// AwsExternalAccountCredentials
//
absl::StatusOr<RefCountedPtr<AwsExternalAccountCredentials>>
AwsExternalAccountCredentials::Create(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine) {
grpc_error_handle error;
auto creds = MakeRefCounted<AwsExternalAccountCredentials>(
std::move(options), std::move(scopes), std::move(event_engine), &error);
if (!error.ok()) return error;
return creds;
}
AwsExternalAccountCredentials::AwsExternalAccountCredentials(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes),
std::move(event_engine)) {
audience_ = options.audience;
auto it = options.credential_source.object().find("environment_id");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("environment_id field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("environment_id field must be a string.");
return;
}
if (it->second.string() != kExpectedEnvironmentId) {
*error = GRPC_ERROR_CREATE("environment_id does not match.");
return;
}
it = options.credential_source.object().find("region_url");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("region_url field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("region_url field must be a string.");
return;
}
region_url_ = it->second.string();
it = options.credential_source.object().find("url");
if (it != options.credential_source.object().end() &&
it->second.type() == Json::Type::kString) {
url_ = it->second.string();
}
it =
options.credential_source.object().find("regional_cred_verification_url");
if (it == options.credential_source.object().end()) {
*error =
GRPC_ERROR_CREATE("regional_cred_verification_url field not present.");
return;
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE(
"regional_cred_verification_url field must be a string.");
return;
}
regional_cred_verification_url_ = it->second.string();
it = options.credential_source.object().find("imdsv2_session_token_url");
if (it != options.credential_source.object().end() &&
it->second.type() == Json::Type::kString) {
imdsv2_session_token_url_ = it->second.string();
}
}
std::string AwsExternalAccountCredentials::debug_string() {
return absl::StrCat("AwsExternalAccountCredentials{Audience:", audience(),
")");
}
UniqueTypeName AwsExternalAccountCredentials::type() const {
static UniqueTypeName::Factory kFactory("AwsExternalAccountCredentials");
return kFactory.Create();
}
OrphanablePtr<ExternalAccountCredentials::FetchBody>
AwsExternalAccountCredentials::RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) {
return MakeOrphanable<AwsFetchBody>(std::move(on_done), this, deadline);
}
absl::string_view AwsExternalAccountCredentials::CredentialSourceType() {
return "aws";
}

@ -38,46 +38,67 @@ namespace grpc_core {
class AwsExternalAccountCredentials final : public ExternalAccountCredentials {
public:
static RefCountedPtr<AwsExternalAccountCredentials> Create(
static absl::StatusOr<RefCountedPtr<AwsExternalAccountCredentials>> Create(
Options options, std::vector<std::string> scopes,
grpc_error_handle* error);
AwsExternalAccountCredentials(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error);
private:
bool ShouldUseMetadataServer();
void RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& options,
std::function<void(std::string, grpc_error_handle)> cb) override;
void RetrieveRegion();
static void OnRetrieveRegion(void* arg, grpc_error_handle error);
void OnRetrieveRegionInternal(grpc_error_handle error);
void RetrieveImdsV2SessionToken();
static void OnRetrieveImdsV2SessionToken(void* arg, grpc_error_handle error);
void OnRetrieveImdsV2SessionTokenInternal(grpc_error_handle error);
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine = nullptr);
void RetrieveRoleName();
static void OnRetrieveRoleName(void* arg, grpc_error_handle error);
void OnRetrieveRoleNameInternal(grpc_error_handle error);
AwsExternalAccountCredentials(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
grpc_error_handle* error);
void RetrieveSigningKeys();
static void OnRetrieveSigningKeys(void* arg, grpc_error_handle error);
void OnRetrieveSigningKeysInternal(grpc_error_handle error);
std::string debug_string() override;
void BuildSubjectToken();
void FinishRetrieveSubjectToken(std::string subject_token,
grpc_error_handle error);
UniqueTypeName type() const override;
void AddMetadataRequestHeaders(grpc_http_request* request);
private:
// A FetchBody impl that itself performs a sequence of FetchBody operations.
class AwsFetchBody : public FetchBody {
public:
AwsFetchBody(absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
AwsExternalAccountCredentials* creds, Timestamp deadline);
private:
void Shutdown() override;
void AsyncFinish(absl::StatusOr<std::string> result);
bool MaybeFail(absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void Start();
void RetrieveImdsV2SessionToken() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void RetrieveRegion() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void RetrieveRoleName() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void RetrieveSigningKeys() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void OnRetrieveSigningKeys(std::string result)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void BuildSubjectToken() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
void AddMetadataRequestHeaders(grpc_http_request* request);
AwsExternalAccountCredentials* creds_;
Timestamp deadline_;
Mutex mu_;
OrphanablePtr<FetchBody> fetch_body_ ABSL_GUARDED_BY(&mu_);
// Information required by request signer
std::string region_;
std::string role_name_;
std::string access_key_id_;
std::string secret_access_key_;
std::string token_;
std::string imdsv2_session_token_;
};
OrphanablePtr<FetchBody> RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) override;
absl::string_view CredentialSourceType() override;
std::string audience_;
OrphanablePtr<HttpRequest> http_request_;
// Fields of credential source
std::string region_url_;
@ -85,19 +106,9 @@ class AwsExternalAccountCredentials final : public ExternalAccountCredentials {
std::string regional_cred_verification_url_;
std::string imdsv2_session_token_url_;
// Information required by request signer
std::string region_;
std::string role_name_;
std::string access_key_id_;
std::string secret_access_key_;
std::string token_;
std::string imdsv2_session_token_;
// These fields are set on the first fetch attempt and cached after that.
std::unique_ptr<AwsRequestSigner> signer_;
std::string cred_verification_url_;
HTTPRequestContext* ctx_ = nullptr;
std::function<void(std::string, grpc_error_handle)> cb_ = nullptr;
};
} // namespace grpc_core

@ -45,6 +45,7 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/external/aws_external_account_credentials.h"
@ -69,9 +70,94 @@
namespace grpc_core {
//
// ExternalAccountCredentials::NoOpFetchBody
//
ExternalAccountCredentials::NoOpFetchBody::NoOpFetchBody(
grpc_event_engine::experimental::EventEngine& event_engine,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
absl::StatusOr<std::string> result)
: FetchBody(std::move(on_done)) {
event_engine.Run([self = RefAsSubclass<NoOpFetchBody>(),
result = std::move(result)]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
self->Finish(std::move(result));
});
}
//
// ExternalAccountCredentials::HttpFetchBody
//
ExternalAccountCredentials::HttpFetchBody::HttpFetchBody(
absl::FunctionRef<OrphanablePtr<HttpRequest>(grpc_http_response*,
grpc_closure*)>
start_http_request,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done)
: FetchBody(std::move(on_done)) {
GRPC_CLOSURE_INIT(&on_http_response_, OnHttpResponse, this, nullptr);
Ref().release(); // Ref held by HTTP request callback.
http_request_ = start_http_request(&response_, &on_http_response_);
}
void ExternalAccountCredentials::HttpFetchBody::OnHttpResponse(
void* arg, grpc_error_handle error) {
RefCountedPtr<HttpFetchBody> self(static_cast<HttpFetchBody*>(arg));
if (!error.ok()) {
self->Finish(std::move(error));
return;
}
absl::string_view response_body(self->response_.body,
self->response_.body_length);
if (self->response_.status != 200) {
self->Finish(absl::UnavailableError(
absl::StrCat("Call to HTTP server ended with status ",
self->response_.status, " [", response_body, "]")));
return;
}
self->Finish(std::string(response_body));
}
//
// ExternalAccountCredentials::ExternalFetchRequest
//
// The token fetching flow:
// 1. Retrieve subject token - Subclass's RetrieveSubjectToken() gets called
// and the subject token is received in ExchangeToken().
// 2. Exchange token - ExchangeToken() gets called with the
// subject token from #1.
// 3. (Optional) Impersonate service account - ImpersonateServiceAccount() gets
// called with the access token of the response from #2. Get an impersonated
// access token in OnImpersonateServiceAccountInternal().
// 4. Finish token fetch - Return back the response that contains an access
// token in FinishTokenFetch().
ExternalAccountCredentials::ExternalFetchRequest::ExternalFetchRequest(
ExternalAccountCredentials* creds, Timestamp deadline,
absl::AnyInvocable<
void(absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done)
: creds_(creds), deadline_(deadline), on_done_(std::move(on_done)) {
fetch_body_ = creds_->RetrieveSubjectToken(
deadline, [self = RefAsSubclass<ExternalFetchRequest>()](
absl::StatusOr<std::string> result) {
self->ExchangeToken(std::move(result));
});
}
void ExternalAccountCredentials::ExternalFetchRequest::Orphan() {
{
MutexLock lock(&mu_);
fetch_body_.reset();
}
Unref();
}
namespace {
std::string UrlEncode(const absl::string_view& s) {
std::string UrlEncode(const absl::string_view s) {
const char* hex = "0123456789ABCDEF";
std::string result;
result.reserve(s.length());
@ -89,6 +175,270 @@ std::string UrlEncode(const absl::string_view& s) {
return result;
}
} // namespace
void ExternalAccountCredentials::ExternalFetchRequest::ExchangeToken(
absl::StatusOr<std::string> subject_token) {
MutexLock lock(&mu_);
if (MaybeFailLocked(subject_token.status())) return;
// Parse URI.
absl::StatusOr<URI> uri = URI::Parse(options().token_url);
if (!uri.ok()) {
return FinishTokenFetch(GRPC_ERROR_CREATE(
absl::StrFormat("Invalid token url: %s. Error: %s", options().token_url,
uri.status().ToString())));
}
// Start HTTP request.
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
const bool add_authorization_header =
!options().client_id.empty() && !options().client_secret.empty();
request.hdr_count = add_authorization_header ? 3 : 2;
auto* headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
headers[0].key = gpr_strdup("Content-Type");
headers[0].value = gpr_strdup("application/x-www-form-urlencoded");
headers[1].key = gpr_strdup("x-goog-api-client");
headers[1].value = gpr_strdup(creds_->MetricsHeaderValue().c_str());
if (add_authorization_header) {
std::string raw_cred = absl::StrFormat("%s:%s", options().client_id,
options().client_secret);
std::string str =
absl::StrFormat("Basic %s", absl::Base64Escape(raw_cred));
headers[2].key = gpr_strdup("Authorization");
headers[2].value = gpr_strdup(str.c_str());
}
request.hdrs = headers;
std::vector<std::string> body_parts;
body_parts.push_back(absl::StrFormat(
"audience=%s", UrlEncode(options().audience).c_str()));
body_parts.push_back(absl::StrFormat(
"grant_type=%s",
UrlEncode(EXTERNAL_ACCOUNT_CREDENTIALS_GRANT_TYPE).c_str()));
body_parts.push_back(absl::StrFormat(
"requested_token_type=%s",
UrlEncode(EXTERNAL_ACCOUNT_CREDENTIALS_REQUESTED_TOKEN_TYPE)
.c_str()));
body_parts.push_back(
absl::StrFormat("subject_token_type=%s",
UrlEncode(options().subject_token_type).c_str()));
body_parts.push_back(absl::StrFormat(
"subject_token=%s", UrlEncode(*subject_token).c_str()));
std::string scope = GOOGLE_CLOUD_PLATFORM_DEFAULT_SCOPE;
if (options().service_account_impersonation_url.empty()) {
scope = absl::StrJoin(creds_->scopes_, " ");
}
body_parts.push_back(
absl::StrFormat("scope=%s", UrlEncode(scope).c_str()));
Json::Object addtional_options_json_object;
if (options().client_id.empty() && options().client_secret.empty()) {
addtional_options_json_object["userProject"] =
Json::FromString(options().workforce_pool_user_project);
}
Json addtional_options_json =
Json::FromObject(std::move(addtional_options_json_object));
body_parts.push_back(absl::StrFormat(
"options=%s", UrlEncode(JsonDump(addtional_options_json)).c_str()));
std::string body = absl::StrJoin(body_parts, "&");
request.body = const_cast<char*>(body.c_str());
request.body_length = body.size();
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Post(
std::move(*uri), /*args=*/nullptr, pollent(), &request, deadline(),
on_http_response, response, std::move(http_request_creds));
http_request->Start();
request.body = nullptr;
grpc_http_request_destroy(&request);
return http_request;
},
[self = RefAsSubclass<ExternalFetchRequest>()](
absl::StatusOr<std::string> result) {
self->MaybeImpersonateServiceAccount(std::move(result));
});
}
void ExternalAccountCredentials::ExternalFetchRequest::
MaybeImpersonateServiceAccount(absl::StatusOr<std::string> response_body) {
MutexLock lock(&mu_);
if (MaybeFailLocked(response_body.status())) return;
// If not doing impersonation, response_body contains oauth token.
if (options().service_account_impersonation_url.empty()) {
return FinishTokenFetch(std::move(response_body));
}
// Do impersonation.
auto json = JsonParse(*response_body);
if (!json.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrCat(
"Invalid token exchange response: ", json.status().ToString())));
return;
}
if (json->type() != Json::Type::kObject) {
FinishTokenFetch(GRPC_ERROR_CREATE(
"Invalid token exchange response: JSON type is not object"));
return;
}
auto it = json->object().find("access_token");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid access_token in %s.", *response_body)));
return;
}
absl::string_view access_token = it->second.string();
absl::StatusOr<URI> uri =
URI::Parse(options().service_account_impersonation_url);
if (!uri.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Invalid service account impersonation url: %s. Error: %s",
options().service_account_impersonation_url, uri.status().ToString())));
return;
}
// Start HTTP request.
fetch_body_ = MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.hdr_count = 2;
grpc_http_header* headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
headers[0].key = gpr_strdup("Content-Type");
headers[0].value = gpr_strdup("application/x-www-form-urlencoded");
std::string str = absl::StrFormat("Bearer %s", access_token);
headers[1].key = gpr_strdup("Authorization");
headers[1].value = gpr_strdup(str.c_str());
request.hdrs = headers;
std::vector<std::string> body_members;
std::string scope = absl::StrJoin(creds_->scopes_, " ");
body_members.push_back(
absl::StrFormat("scope=%s", UrlEncode(scope).c_str()));
body_members.push_back(absl::StrFormat(
"lifetime=%ds",
options().service_account_impersonation.token_lifetime_seconds));
std::string body = absl::StrJoin(body_members, "&");
request.body = const_cast<char*>(body.c_str());
request.body_length = body.size();
// TODO(ctiller): Use the callers resource quota.
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request = HttpRequest::Post(
std::move(*uri), nullptr, pollent(), &request, deadline(),
on_http_response, response, std::move(http_request_creds));
http_request->Start();
request.body = nullptr;
grpc_http_request_destroy(&request);
return http_request;
},
[self = RefAsSubclass<ExternalFetchRequest>()](
absl::StatusOr<std::string> result) {
self->OnImpersonateServiceAccount(std::move(result));
});
}
void ExternalAccountCredentials::ExternalFetchRequest::
OnImpersonateServiceAccount(absl::StatusOr<std::string> response_body) {
MutexLock lock(&mu_);
if (MaybeFailLocked(response_body.status())) return;
auto json = JsonParse(*response_body);
if (!json.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(
absl::StrCat("Invalid service account impersonation response: ",
json.status().ToString())));
return;
}
if (json->type() != Json::Type::kObject) {
FinishTokenFetch(
GRPC_ERROR_CREATE("Invalid service account impersonation response: "
"JSON type is not object"));
return;
}
auto it = json->object().find("accessToken");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid accessToken in %s.", *response_body)));
return;
}
absl::string_view access_token = it->second.string();
it = json->object().find("expireTime");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid expireTime in %s.", *response_body)));
return;
}
absl::string_view expire_time = it->second.string();
absl::Time t;
if (!absl::ParseTime(absl::RFC3339_full, expire_time, &t, nullptr)) {
FinishTokenFetch(GRPC_ERROR_CREATE(
"Invalid expire time of service account impersonation response."));
return;
}
int64_t expire_in = (t - absl::Now()) / absl::Seconds(1);
std::string body = absl::StrFormat(
"{\"access_token\":\"%s\",\"expires_in\":%d,\"token_type\":\"Bearer\"}",
access_token, expire_in);
FinishTokenFetch(std::move(body));
}
void ExternalAccountCredentials::ExternalFetchRequest::FinishTokenFetch(
absl::StatusOr<std::string> response_body) {
absl::StatusOr<RefCountedPtr<Token>> result;
if (!response_body.ok()) {
LOG(ERROR) << "Fetch external account credentials access token: "
<< response_body.status();
result = absl::Status(response_body.status().code(),
absl::StrCat("error fetching oauth2 token: ",
response_body.status().message()));
} else {
absl::optional<Slice> token_value;
Duration token_lifetime;
if (grpc_oauth2_token_fetcher_credentials_parse_server_response_body(
*response_body, &token_value, &token_lifetime) !=
GRPC_CREDENTIALS_OK) {
result = GRPC_ERROR_CREATE("Could not parse oauth token");
} else {
result = MakeRefCounted<Token>(std::move(*token_value),
Timestamp::Now() + token_lifetime);
}
}
creds_->event_engine().Run([on_done = std::exchange(on_done_, nullptr),
result = std::move(result)]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
std::exchange(on_done, nullptr)(std::move(result));
});
}
bool ExternalAccountCredentials::ExternalFetchRequest::MaybeFailLocked(
absl::Status status) {
if (!status.ok()) {
FinishTokenFetch(std::move(status));
return true;
}
if (fetch_body_ == nullptr) { // Will be set by Orphan() on cancellation.
FinishTokenFetch(
absl::CancelledError("external account credentials fetch cancelled"));
return true;
}
return false;
}
//
// ExternalAccountCredentials
//
namespace {
// Expression to match:
// //iam.googleapis.com/locations/[^/]+/workforcePools/[^/]+/providers/.+
bool MatchWorkforcePoolAudience(absl::string_view audience) {
@ -108,49 +458,41 @@ bool MatchWorkforcePoolAudience(absl::string_view audience) {
} // namespace
RefCountedPtr<ExternalAccountCredentials> ExternalAccountCredentials::Create(
absl::StatusOr<RefCountedPtr<ExternalAccountCredentials>>
ExternalAccountCredentials::Create(
const Json& json, std::vector<std::string> scopes,
grpc_error_handle* error) {
CHECK(error->ok());
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine) {
Options options;
options.type = GRPC_AUTH_JSON_TYPE_INVALID;
if (json.type() != Json::Type::kObject) {
*error =
GRPC_ERROR_CREATE("Invalid json to construct credentials options.");
return nullptr;
return GRPC_ERROR_CREATE("Invalid json to construct credentials options.");
}
auto it = json.object().find("type");
if (it == json.object().end()) {
*error = GRPC_ERROR_CREATE("type field not present.");
return nullptr;
return GRPC_ERROR_CREATE("type field not present.");
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("type field must be a string.");
return nullptr;
return GRPC_ERROR_CREATE("type field must be a string.");
}
if (it->second.string() != GRPC_AUTH_JSON_TYPE_EXTERNAL_ACCOUNT) {
*error = GRPC_ERROR_CREATE("Invalid credentials json type.");
return nullptr;
return GRPC_ERROR_CREATE("Invalid credentials json type.");
}
options.type = GRPC_AUTH_JSON_TYPE_EXTERNAL_ACCOUNT;
it = json.object().find("audience");
if (it == json.object().end()) {
*error = GRPC_ERROR_CREATE("audience field not present.");
return nullptr;
return GRPC_ERROR_CREATE("audience field not present.");
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("audience field must be a string.");
return nullptr;
return GRPC_ERROR_CREATE("audience field must be a string.");
}
options.audience = it->second.string();
it = json.object().find("subject_token_type");
if (it == json.object().end()) {
*error = GRPC_ERROR_CREATE("subject_token_type field not present.");
return nullptr;
return GRPC_ERROR_CREATE("subject_token_type field not present.");
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("subject_token_type field must be a string.");
return nullptr;
return GRPC_ERROR_CREATE("subject_token_type field must be a string.");
}
options.subject_token_type = it->second.string();
it = json.object().find("service_account_impersonation_url");
@ -159,12 +501,10 @@ RefCountedPtr<ExternalAccountCredentials> ExternalAccountCredentials::Create(
}
it = json.object().find("token_url");
if (it == json.object().end()) {
*error = GRPC_ERROR_CREATE("token_url field not present.");
return nullptr;
return GRPC_ERROR_CREATE("token_url field not present.");
}
if (it->second.type() != Json::Type::kString) {
*error = GRPC_ERROR_CREATE("token_url field must be a string.");
return nullptr;
return GRPC_ERROR_CREATE("token_url field must be a string.");
}
options.token_url = it->second.string();
it = json.object().find("token_info_url");
@ -173,8 +513,7 @@ RefCountedPtr<ExternalAccountCredentials> ExternalAccountCredentials::Create(
}
it = json.object().find("credential_source");
if (it == json.object().end()) {
*error = GRPC_ERROR_CREATE("credential_source field not present.");
return nullptr;
return GRPC_ERROR_CREATE("credential_source field not present.");
}
options.credential_source = it->second;
it = json.object().find("quota_project_id");
@ -194,10 +533,9 @@ RefCountedPtr<ExternalAccountCredentials> ExternalAccountCredentials::Create(
if (MatchWorkforcePoolAudience(options.audience)) {
options.workforce_pool_user_project = it->second.string();
} else {
*error = GRPC_ERROR_CREATE(
return GRPC_ERROR_CREATE(
"workforce_pool_user_project should not be set for non-workforce "
"pool credentials");
return nullptr;
}
}
it = json.object().find("service_account_impersonation");
@ -211,53 +549,53 @@ RefCountedPtr<ExternalAccountCredentials> ExternalAccountCredentials::Create(
if (!absl::SimpleAtoi(
service_acc_imp_obj_it->second.string(),
&options.service_account_impersonation.token_lifetime_seconds)) {
*error = GRPC_ERROR_CREATE("token_lifetime_seconds must be a number");
return nullptr;
return GRPC_ERROR_CREATE("token_lifetime_seconds must be a number");
}
if (options.service_account_impersonation.token_lifetime_seconds >
IMPERSONATED_CRED_MAX_LIFETIME_IN_SECONDS) {
*error = GRPC_ERROR_CREATE(
return GRPC_ERROR_CREATE(
absl::StrFormat("token_lifetime_seconds must be less than %ds",
IMPERSONATED_CRED_MAX_LIFETIME_IN_SECONDS));
return nullptr;
}
if (options.service_account_impersonation.token_lifetime_seconds <
IMPERSONATED_CRED_MIN_LIFETIME_IN_SECONDS) {
*error = GRPC_ERROR_CREATE(
return GRPC_ERROR_CREATE(
absl::StrFormat("token_lifetime_seconds must be more than %ds",
IMPERSONATED_CRED_MIN_LIFETIME_IN_SECONDS));
return nullptr;
}
}
}
RefCountedPtr<ExternalAccountCredentials> creds;
grpc_error_handle error;
if (options.credential_source.object().find("environment_id") !=
options.credential_source.object().end()) {
creds = MakeRefCounted<AwsExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
std::move(options), std::move(scopes), std::move(event_engine), &error);
} else if (options.credential_source.object().find("file") !=
options.credential_source.object().end()) {
creds = MakeRefCounted<FileExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
std::move(options), std::move(scopes), std::move(event_engine), &error);
} else if (options.credential_source.object().find("url") !=
options.credential_source.object().end()) {
creds = MakeRefCounted<UrlExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
std::move(options), std::move(scopes), std::move(event_engine), &error);
} else {
*error = GRPC_ERROR_CREATE(
return GRPC_ERROR_CREATE(
"Invalid options credential source to create "
"ExternalAccountCredentials.");
}
if (error->ok()) {
return creds;
} else {
return nullptr;
}
if (!error.ok()) return error;
return creds;
}
ExternalAccountCredentials::ExternalAccountCredentials(
Options options, std::vector<std::string> scopes)
: options_(std::move(options)) {
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
: event_engine_(
event_engine == nullptr
? grpc_event_engine::experimental::GetDefaultEventEngine()
: std::move(event_engine)),
options_(std::move(options)) {
if (scopes.empty()) {
scopes.push_back(GOOGLE_CLOUD_PLATFORM_DEFAULT_SCOPE);
}
@ -266,12 +604,6 @@ ExternalAccountCredentials::ExternalAccountCredentials(
ExternalAccountCredentials::~ExternalAccountCredentials() {}
std::string ExternalAccountCredentials::debug_string() {
return absl::StrFormat("ExternalAccountCredentials{Audience:%s,%s}",
options_.audience,
grpc_oauth2_token_fetcher_credentials::debug_string());
}
std::string ExternalAccountCredentials::MetricsHeaderValue() {
return absl::StrFormat(
"gl-cpp/unknown auth/%s google-byoid-sdk source/%s sa-impersonation/%v "
@ -286,300 +618,12 @@ absl::string_view ExternalAccountCredentials::CredentialSourceType() {
return "unknown";
}
// The token fetching flow:
// 1. Retrieve subject token - Subclass's RetrieveSubjectToken() gets called
// and the subject token is received in OnRetrieveSubjectTokenInternal().
// 2. Exchange token - ExchangeToken() gets called with the
// subject token from #1. Receive the response in OnExchangeTokenInternal().
// 3. (Optional) Impersonate service account - ImpersenateServiceAccount() gets
// called with the access token of the response from #2. Get an impersonated
// access token in OnImpersenateServiceAccountInternal().
// 4. Finish token fetch - Return back the response that contains an access
// token in FinishTokenFetch().
// TODO(chuanr): Avoid starting the remaining requests if the channel gets shut
// down.
void ExternalAccountCredentials::fetch_oauth2(
grpc_credentials_metadata_request* metadata_req,
grpc_polling_entity* pollent, grpc_iomgr_cb_func response_cb,
Timestamp deadline) {
CHECK_EQ(ctx_, nullptr);
ctx_ = new HTTPRequestContext(pollent, deadline);
metadata_req_ = metadata_req;
response_cb_ = response_cb;
auto cb = [this](std::string token, grpc_error_handle error) {
OnRetrieveSubjectTokenInternal(token, error);
};
RetrieveSubjectToken(ctx_, options_, cb);
}
void ExternalAccountCredentials::OnRetrieveSubjectTokenInternal(
absl::string_view subject_token, grpc_error_handle error) {
if (!error.ok()) {
FinishTokenFetch(error);
} else {
ExchangeToken(subject_token);
}
}
void ExternalAccountCredentials::ExchangeToken(
absl::string_view subject_token) {
absl::StatusOr<URI> uri = URI::Parse(options_.token_url);
if (!uri.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(
absl::StrFormat("Invalid token url: %s. Error: %s", options_.token_url,
uri.status().ToString())));
return;
}
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
const bool add_authorization_header =
!options_.client_id.empty() && !options_.client_secret.empty();
request.hdr_count = add_authorization_header ? 3 : 2;
auto* headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
headers[0].key = gpr_strdup("Content-Type");
headers[0].value = gpr_strdup("application/x-www-form-urlencoded");
headers[1].key = gpr_strdup("x-goog-api-client");
headers[1].value = gpr_strdup(MetricsHeaderValue().c_str());
if (add_authorization_header) {
std::string raw_cred =
absl::StrFormat("%s:%s", options_.client_id, options_.client_secret);
std::string str = absl::StrFormat("Basic %s", absl::Base64Escape(raw_cred));
headers[2].key = gpr_strdup("Authorization");
headers[2].value = gpr_strdup(str.c_str());
}
request.hdrs = headers;
std::vector<std::string> body_parts;
body_parts.push_back(
absl::StrFormat("audience=%s", UrlEncode(options_.audience).c_str()));
body_parts.push_back(absl::StrFormat(
"grant_type=%s",
UrlEncode(EXTERNAL_ACCOUNT_CREDENTIALS_GRANT_TYPE).c_str()));
body_parts.push_back(absl::StrFormat(
"requested_token_type=%s",
UrlEncode(EXTERNAL_ACCOUNT_CREDENTIALS_REQUESTED_TOKEN_TYPE).c_str()));
body_parts.push_back(absl::StrFormat(
"subject_token_type=%s", UrlEncode(options_.subject_token_type).c_str()));
body_parts.push_back(
absl::StrFormat("subject_token=%s", UrlEncode(subject_token).c_str()));
std::string scope = GOOGLE_CLOUD_PLATFORM_DEFAULT_SCOPE;
if (options_.service_account_impersonation_url.empty()) {
scope = absl::StrJoin(scopes_, " ");
}
body_parts.push_back(absl::StrFormat("scope=%s", UrlEncode(scope).c_str()));
Json::Object addtional_options_json_object;
if (options_.client_id.empty() && options_.client_secret.empty()) {
addtional_options_json_object["userProject"] =
Json::FromString(options_.workforce_pool_user_project);
}
Json addtional_options_json =
Json::FromObject(std::move(addtional_options_json_object));
body_parts.push_back(absl::StrFormat(
"options=%s", UrlEncode(JsonDump(addtional_options_json)).c_str()));
std::string body = absl::StrJoin(body_parts, "&");
request.body = const_cast<char*>(body.c_str());
request.body_length = body.size();
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
GRPC_CLOSURE_INIT(&ctx_->closure, OnExchangeToken, this, nullptr);
CHECK(http_request_ == nullptr);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ =
HttpRequest::Post(std::move(*uri), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
request.body = nullptr;
grpc_http_request_destroy(&request);
}
void ExternalAccountCredentials::OnExchangeToken(void* arg,
grpc_error_handle error) {
ExternalAccountCredentials* self =
static_cast<ExternalAccountCredentials*>(arg);
self->OnExchangeTokenInternal(error);
}
void ExternalAccountCredentials::OnExchangeTokenInternal(
grpc_error_handle error) {
http_request_.reset();
if (!error.ok()) {
FinishTokenFetch(error);
} else {
if (options_.service_account_impersonation_url.empty()) {
metadata_req_->response = ctx_->response;
metadata_req_->response.body = gpr_strdup(
std::string(ctx_->response.body, ctx_->response.body_length).c_str());
metadata_req_->response.hdrs = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * ctx_->response.hdr_count));
for (size_t i = 0; i < ctx_->response.hdr_count; i++) {
metadata_req_->response.hdrs[i].key =
gpr_strdup(ctx_->response.hdrs[i].key);
metadata_req_->response.hdrs[i].value =
gpr_strdup(ctx_->response.hdrs[i].value);
}
FinishTokenFetch(absl::OkStatus());
} else {
ImpersenateServiceAccount();
}
}
}
void ExternalAccountCredentials::ImpersenateServiceAccount() {
absl::string_view response_body(ctx_->response.body,
ctx_->response.body_length);
auto json = JsonParse(response_body);
if (!json.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrCat(
"Invalid token exchange response: ", json.status().ToString())));
return;
}
if (json->type() != Json::Type::kObject) {
FinishTokenFetch(GRPC_ERROR_CREATE(
"Invalid token exchange response: JSON type is not object"));
return;
}
auto it = json->object().find("access_token");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid access_token in %s.", response_body)));
return;
}
std::string access_token = it->second.string();
absl::StatusOr<URI> uri =
URI::Parse(options_.service_account_impersonation_url);
if (!uri.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Invalid service account impersonation url: %s. Error: %s",
options_.service_account_impersonation_url, uri.status().ToString())));
return;
}
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.hdr_count = 2;
grpc_http_header* headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
headers[0].key = gpr_strdup("Content-Type");
headers[0].value = gpr_strdup("application/x-www-form-urlencoded");
std::string str = absl::StrFormat("Bearer %s", access_token);
headers[1].key = gpr_strdup("Authorization");
headers[1].value = gpr_strdup(str.c_str());
request.hdrs = headers;
std::vector<std::string> body_members;
std::string scope = absl::StrJoin(scopes_, " ");
body_members.push_back(absl::StrFormat("scope=%s", UrlEncode(scope).c_str()));
body_members.push_back(absl::StrFormat(
"lifetime=%ds",
options_.service_account_impersonation.token_lifetime_seconds));
std::string body = absl::StrJoin(body_members, "&");
request.body = const_cast<char*>(body.c_str());
request.body_length = body.size();
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
GRPC_CLOSURE_INIT(&ctx_->closure, OnImpersenateServiceAccount, this, nullptr);
// TODO(ctiller): Use the callers resource quota.
CHECK(http_request_ == nullptr);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (uri->scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ = HttpRequest::Post(
std::move(*uri), nullptr, ctx_->pollent, &request, ctx_->deadline,
&ctx_->closure, &ctx_->response, std::move(http_request_creds));
http_request_->Start();
request.body = nullptr;
grpc_http_request_destroy(&request);
}
void ExternalAccountCredentials::OnImpersenateServiceAccount(
void* arg, grpc_error_handle error) {
ExternalAccountCredentials* self =
static_cast<ExternalAccountCredentials*>(arg);
self->OnImpersenateServiceAccountInternal(error);
}
void ExternalAccountCredentials::OnImpersenateServiceAccountInternal(
grpc_error_handle error) {
http_request_.reset();
if (!error.ok()) {
FinishTokenFetch(error);
return;
}
absl::string_view response_body(ctx_->response.body,
ctx_->response.body_length);
auto json = JsonParse(response_body);
if (!json.ok()) {
FinishTokenFetch(GRPC_ERROR_CREATE(
absl::StrCat("Invalid service account impersonation response: ",
json.status().ToString())));
return;
}
if (json->type() != Json::Type::kObject) {
FinishTokenFetch(
GRPC_ERROR_CREATE("Invalid service account impersonation response: "
"JSON type is not object"));
return;
}
auto it = json->object().find("accessToken");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid accessToken in %s.", response_body)));
return;
}
std::string access_token = it->second.string();
it = json->object().find("expireTime");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
FinishTokenFetch(GRPC_ERROR_CREATE(absl::StrFormat(
"Missing or invalid expireTime in %s.", response_body)));
return;
}
std::string expire_time = it->second.string();
absl::Time t;
if (!absl::ParseTime(absl::RFC3339_full, expire_time, &t, nullptr)) {
FinishTokenFetch(GRPC_ERROR_CREATE(
"Invalid expire time of service account impersonation response."));
return;
}
int64_t expire_in = (t - absl::Now()) / absl::Seconds(1);
std::string body = absl::StrFormat(
"{\"access_token\":\"%s\",\"expires_in\":%d,\"token_type\":\"Bearer\"}",
access_token, expire_in);
metadata_req_->response = ctx_->response;
metadata_req_->response.body = gpr_strdup(body.c_str());
metadata_req_->response.body_length = body.length();
metadata_req_->response.hdrs = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * ctx_->response.hdr_count));
for (size_t i = 0; i < ctx_->response.hdr_count; i++) {
metadata_req_->response.hdrs[i].key =
gpr_strdup(ctx_->response.hdrs[i].key);
metadata_req_->response.hdrs[i].value =
gpr_strdup(ctx_->response.hdrs[i].value);
}
FinishTokenFetch(absl::OkStatus());
}
void ExternalAccountCredentials::FinishTokenFetch(grpc_error_handle error) {
GRPC_LOG_IF_ERROR("Fetch external account credentials access token", error);
// Move object state into local variables.
auto* cb = response_cb_;
response_cb_ = nullptr;
auto* metadata_req = metadata_req_;
metadata_req_ = nullptr;
auto* ctx = ctx_;
ctx_ = nullptr;
// Invoke the callback.
cb(metadata_req, error);
// Delete context.
delete ctx;
OrphanablePtr<ExternalAccountCredentials::FetchRequest>
ExternalAccountCredentials::FetchToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<RefCountedPtr<Token>>)> on_done) {
return MakeOrphanable<ExternalFetchRequest>(this, deadline,
std::move(on_done));
}
} // namespace grpc_core
@ -593,14 +637,12 @@ grpc_call_credentials* grpc_external_account_credentials_create(
return nullptr;
}
std::vector<std::string> scopes = absl::StrSplit(scopes_string, ',');
grpc_error_handle error;
auto creds = grpc_core::ExternalAccountCredentials::Create(
*json, std::move(scopes), &error)
.release();
if (!error.ok()) {
auto creds =
grpc_core::ExternalAccountCredentials::Create(*json, std::move(scopes));
if (!creds.ok()) {
LOG(ERROR) << "External account credentials creation failed. Error: "
<< grpc_core::StatusToString(error);
<< grpc_core::StatusToString(creds.status());
return nullptr;
}
return creds;
return creds->release();
}

@ -20,11 +20,13 @@
#include <stdint.h>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/orphanable.h"
@ -34,6 +36,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include "src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h"
#include "src/core/util/http_client/httpcli.h"
#include "src/core/util/http_client/parser.h"
#include "src/core/util/json/json.h"
@ -44,8 +47,7 @@ namespace grpc_core {
// exchanging external account credentials for GCP access token to authorize
// requests to GCP APIs. The specific logic of retrieving subject token is
// implemented in subclasses.
class ExternalAccountCredentials
: public grpc_oauth2_token_fetcher_credentials {
class ExternalAccountCredentials : public TokenFetcherCredentials {
public:
struct ServiceAccountImpersonation {
int32_t token_lifetime_seconds;
@ -66,72 +68,145 @@ class ExternalAccountCredentials
std::string workforce_pool_user_project;
};
static RefCountedPtr<ExternalAccountCredentials> Create(
static absl::StatusOr<RefCountedPtr<ExternalAccountCredentials>> Create(
const Json& json, std::vector<std::string> scopes,
grpc_error_handle* error);
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine = nullptr);
ExternalAccountCredentials(Options options, std::vector<std::string> scopes);
ExternalAccountCredentials(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine = nullptr);
~ExternalAccountCredentials() override;
std::string debug_string() override;
protected:
// This is a helper struct to pass information between multiple callback based
// asynchronous calls.
struct HTTPRequestContext {
HTTPRequestContext(grpc_polling_entity* pollent, Timestamp deadline)
: pollent(pollent), deadline(deadline) {}
~HTTPRequestContext() { grpc_http_response_destroy(&response); }
// Contextual parameters passed from
// grpc_oauth2_token_fetcher_credentials::fetch_oauth2().
grpc_polling_entity* pollent;
Timestamp deadline;
// Reusable token fetch http response and closure.
grpc_closure closure;
grpc_http_response response;
// A base class for a cancellable fetch operation.
class FetchBody : public InternallyRefCounted<FetchBody> {
public:
explicit FetchBody(
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done)
: on_done_(std::move(on_done)) {}
void Orphan() override {
Shutdown();
Unref();
}
protected:
// The subclass must call this when the fetch is complete, even if
// cancelled.
void Finish(absl::StatusOr<std::string> result) {
std::exchange(on_done_, nullptr)(std::move(result));
}
private:
virtual void Shutdown() = 0;
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done_;
};
// Subclasses of base external account credentials need to override this
// method to implement the specific subject token retrieval logic.
// Once the subject token is ready, subclasses need to invoke
// the callback function (cb) to pass the subject token (or error)
// back.
virtual void RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& options,
std::function<void(std::string, grpc_error_handle)> cb) = 0;
// A simple no-op implementation, used for async execution of the
// on_done callback.
class NoOpFetchBody final : public FetchBody {
public:
NoOpFetchBody(grpc_event_engine::experimental::EventEngine& event_engine,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
absl::StatusOr<std::string> result);
virtual absl::string_view CredentialSourceType();
private:
void Shutdown() override {}
};
std::string MetricsHeaderValue();
// An implementation for HTTP requests.
class HttpFetchBody final : public FetchBody {
public:
HttpFetchBody(
absl::FunctionRef<OrphanablePtr<HttpRequest>(grpc_http_response*,
grpc_closure*)>
start_http_request,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done);
private:
// This method implements the common token fetch logic and it will be called
// when grpc_oauth2_token_fetcher_credentials request a new access token.
void fetch_oauth2(grpc_credentials_metadata_request* req,
grpc_polling_entity* pollent, grpc_iomgr_cb_func cb,
Timestamp deadline) override;
~HttpFetchBody() override { grpc_http_response_destroy(&response_); }
private:
void Shutdown() override { http_request_.reset(); }
static void OnHttpResponse(void* arg, grpc_error_handle error);
OrphanablePtr<HttpRequest> http_request_;
grpc_http_response response_;
grpc_closure on_http_response_;
};
void OnRetrieveSubjectTokenInternal(absl::string_view subject_token,
grpc_error_handle error);
// An implementation of TokenFetcherCredentials::FetchRequest that
// executes a series of FetchBody operations to ultimately get to a
// token result.
class ExternalFetchRequest : public FetchRequest {
public:
ExternalFetchRequest(
ExternalAccountCredentials* creds, Timestamp deadline,
absl::AnyInvocable<
void(absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done);
void Orphan() override;
protected:
Timestamp deadline() const { return deadline_; }
grpc_polling_entity* pollent() const { return creds_->pollent(); }
const Options& options() const { return creds_->options_; }
private:
void ExchangeToken(absl::StatusOr<std::string> subject_token);
void MaybeImpersonateServiceAccount(
absl::StatusOr<std::string> response_body);
void OnImpersonateServiceAccount(absl::StatusOr<std::string> response_body);
void FinishTokenFetch(absl::StatusOr<std::string> response_body);
// If status is non-OK or we've been shut down, calls FinishTokenFetch()
// and returns true.
bool MaybeFailLocked(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
ExternalAccountCredentials* creds_;
Timestamp deadline_;
absl::AnyInvocable<void(
absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done_;
Mutex mu_;
OrphanablePtr<FetchBody> fetch_body_ ABSL_GUARDED_BY(&mu_);
};
void ExchangeToken(absl::string_view subject_token);
static void OnExchangeToken(void* arg, grpc_error_handle error);
void OnExchangeTokenInternal(grpc_error_handle error);
virtual absl::string_view CredentialSourceType();
void ImpersenateServiceAccount();
static void OnImpersenateServiceAccount(void* arg, grpc_error_handle error);
void OnImpersenateServiceAccountInternal(grpc_error_handle error);
std::string MetricsHeaderValue();
void FinishTokenFetch(grpc_error_handle error);
absl::string_view audience() const { return options_.audience; }
grpc_event_engine::experimental::EventEngine& event_engine() const {
return *event_engine_;
}
private:
OrphanablePtr<FetchRequest> FetchToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<RefCountedPtr<Token>>)> on_done)
final;
// Subclasses of ExternalAccountCredentials need to override this
// method to implement the specific-subject token retrieval logic.
// The caller will save the resulting FetchBody object, which will
// be orphaned upon cancellation. The FetchBody object must
// eventually invoke on_done.
virtual OrphanablePtr<FetchBody> RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) = 0;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
Options options_;
std::vector<std::string> scopes_;
OrphanablePtr<HttpRequest> http_request_;
HTTPRequestContext* ctx_ = nullptr;
grpc_credentials_metadata_request* metadata_req_ = nullptr;
grpc_iomgr_cb_func response_cb_ = nullptr;
};
} // namespace grpc_core

@ -26,6 +26,7 @@
#include <grpc/support/json.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/load_file.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_internal.h"
@ -34,22 +35,78 @@
namespace grpc_core {
RefCountedPtr<FileExternalAccountCredentials>
FileExternalAccountCredentials::Create(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error) {
auto creds = MakeRefCounted<FileExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
if (error->ok()) {
return creds;
} else {
return nullptr;
//
// FileExternalAccountCredentials::FileFetchBody
//
FileExternalAccountCredentials::FileFetchBody::FileFetchBody(
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
FileExternalAccountCredentials* creds)
: FetchBody(std::move(on_done)), creds_(creds) {
// Start work asynchronously, since we can't invoke the callback
// synchronously without causing a deadlock.
creds->event_engine().Run([self = RefAsSubclass<FileFetchBody>()]() mutable {
ApplicationCallbackExecCtx application_exec_ctx;
ExecCtx exec_ctx;
self->ReadFile();
self.reset();
});
}
void FileExternalAccountCredentials::FileFetchBody::ReadFile() {
// To retrieve the subject token, we read the file every time we make a
// request because it may have changed since the last request.
auto content_slice = LoadFile(creds_->file_, /*add_null_terminator=*/false);
if (!content_slice.ok()) {
Finish(content_slice.status());
return;
}
absl::string_view content = content_slice->as_string_view();
if (creds_->format_type_ == "json") {
auto content_json = JsonParse(content);
if (!content_json.ok() || content_json->type() != Json::Type::kObject) {
Finish(GRPC_ERROR_CREATE(
"The content of the file is not a valid json object."));
return;
}
auto content_it =
content_json->object().find(creds_->format_subject_token_field_name_);
if (content_it == content_json->object().end()) {
Finish(GRPC_ERROR_CREATE("Subject token field not present."));
return;
}
if (content_it->second.type() != Json::Type::kString) {
Finish(GRPC_ERROR_CREATE("Subject token field must be a string."));
return;
}
Finish(content_it->second.string());
return;
}
Finish(std::string(content));
}
//
// FileExternalAccountCredentials
//
absl::StatusOr<RefCountedPtr<FileExternalAccountCredentials>>
FileExternalAccountCredentials::Create(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine) {
grpc_error_handle error;
auto creds = MakeRefCounted<FileExternalAccountCredentials>(
std::move(options), std::move(scopes), std::move(event_engine), &error);
if (!error.ok()) return error;
return creds;
}
FileExternalAccountCredentials::FileExternalAccountCredentials(
Options options, std::vector<std::string> scopes, grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes)) {
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes),
std::move(event_engine)) {
auto it = options.credential_source.object().find("file");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("file field not present.");
@ -96,38 +153,21 @@ FileExternalAccountCredentials::FileExternalAccountCredentials(
}
}
void FileExternalAccountCredentials::RetrieveSubjectToken(
HTTPRequestContext* /*ctx*/, const Options& /*options*/,
std::function<void(std::string, grpc_error_handle)> cb) {
// To retrieve the subject token, we read the file every time we make a
// request because it may have changed since the last request.
auto content_slice = LoadFile(file_, /*add_null_terminator=*/false);
if (!content_slice.ok()) {
cb("", content_slice.status());
return;
}
absl::string_view content = content_slice->as_string_view();
if (format_type_ == "json") {
auto content_json = JsonParse(content);
if (!content_json.ok() || content_json->type() != Json::Type::kObject) {
cb("", GRPC_ERROR_CREATE(
"The content of the file is not a valid json object."));
return;
}
auto content_it =
content_json->object().find(format_subject_token_field_name_);
if (content_it == content_json->object().end()) {
cb("", GRPC_ERROR_CREATE("Subject token field not present."));
return;
}
if (content_it->second.type() != Json::Type::kString) {
cb("", GRPC_ERROR_CREATE("Subject token field must be a string."));
return;
}
cb(content_it->second.string(), absl::OkStatus());
return;
}
cb(std::string(content), absl::OkStatus());
std::string FileExternalAccountCredentials::debug_string() {
return absl::StrCat("FileExternalAccountCredentials{Audience:", audience(),
")");
}
UniqueTypeName FileExternalAccountCredentials::type() const {
static UniqueTypeName::Factory kFactory("FileExternalAccountCredentials");
return kFactory.Create();
}
OrphanablePtr<ExternalAccountCredentials::FetchBody>
FileExternalAccountCredentials::RetrieveSubjectToken(
Timestamp /*deadline*/,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) {
return MakeOrphanable<FileFetchBody>(std::move(on_done), this);
}
absl::string_view FileExternalAccountCredentials::CredentialSourceType() {

@ -33,18 +33,38 @@ namespace grpc_core {
class FileExternalAccountCredentials final : public ExternalAccountCredentials {
public:
static RefCountedPtr<FileExternalAccountCredentials> Create(
static absl::StatusOr<RefCountedPtr<FileExternalAccountCredentials>> Create(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine = nullptr);
FileExternalAccountCredentials(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
grpc_error_handle* error);
FileExternalAccountCredentials(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error);
std::string debug_string() override;
UniqueTypeName type() const override;
private:
void RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& options,
std::function<void(std::string, grpc_error_handle)> cb) override;
class FileFetchBody final : public FetchBody {
public:
FileFetchBody(absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done,
FileExternalAccountCredentials* creds);
private:
void Shutdown() override {}
void ReadFile();
FileExternalAccountCredentials* creds_;
};
OrphanablePtr<FetchBody> RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) override;
absl::string_view CredentialSourceType() override;

@ -46,22 +46,24 @@
namespace grpc_core {
RefCountedPtr<UrlExternalAccountCredentials>
UrlExternalAccountCredentials::Create(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error) {
absl::StatusOr<RefCountedPtr<UrlExternalAccountCredentials>>
UrlExternalAccountCredentials::Create(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine) {
grpc_error_handle error;
auto creds = MakeRefCounted<UrlExternalAccountCredentials>(
std::move(options), std::move(scopes), error);
if (error->ok()) {
return creds;
} else {
return nullptr;
}
std::move(options), std::move(scopes), std::move(event_engine), &error);
if (!error.ok()) return error;
return creds;
}
UrlExternalAccountCredentials::UrlExternalAccountCredentials(
Options options, std::vector<std::string> scopes, grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes)) {
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
grpc_error_handle* error)
: ExternalAccountCredentials(options, std::move(scopes),
std::move(event_engine)) {
auto it = options.credential_source.object().find("url");
if (it == options.credential_source.object().end()) {
*error = GRPC_ERROR_CREATE("url field not present.");
@ -130,114 +132,88 @@ UrlExternalAccountCredentials::UrlExternalAccountCredentials(
}
}
void UrlExternalAccountCredentials::RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& /*options*/,
std::function<void(std::string, grpc_error_handle)> cb) {
if (ctx == nullptr) {
FinishRetrieveSubjectToken(
"",
GRPC_ERROR_CREATE(
"Missing HTTPRequestContext to start subject token retrieval."));
return;
}
auto url_for_request =
URI::Create(url_.scheme(), url_.authority(), url_full_path_,
{} /* query params */, "" /* fragment */);
if (!url_for_request.ok()) {
FinishRetrieveSubjectToken(
"", absl_status_to_grpc_error(url_for_request.status()));
return;
}
ctx_ = ctx;
cb_ = cb;
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.path = gpr_strdup(url_full_path_.c_str());
grpc_http_header* headers = nullptr;
request.hdr_count = headers_.size();
headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
int i = 0;
for (auto const& header : headers_) {
headers[i].key = gpr_strdup(header.first.c_str());
headers[i].value = gpr_strdup(header.second.c_str());
++i;
}
request.hdrs = headers;
grpc_http_response_destroy(&ctx_->response);
ctx_->response = {};
GRPC_CLOSURE_INIT(&ctx_->closure, OnRetrieveSubjectToken, this, nullptr);
CHECK(http_request_ == nullptr);
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (url_.scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ =
HttpRequest::Get(std::move(*url_for_request), nullptr /* channel args */,
ctx_->pollent, &request, ctx_->deadline, &ctx_->closure,
&ctx_->response, std::move(http_request_creds));
http_request_->Start();
grpc_http_request_destroy(&request);
std::string UrlExternalAccountCredentials::debug_string() {
return absl::StrCat("UrlExternalAccountCredentials{Audience:", audience(),
")");
}
void UrlExternalAccountCredentials::OnRetrieveSubjectToken(
void* arg, grpc_error_handle error) {
UrlExternalAccountCredentials* self =
static_cast<UrlExternalAccountCredentials*>(arg);
self->OnRetrieveSubjectTokenInternal(error);
UniqueTypeName UrlExternalAccountCredentials::type() const {
static UniqueTypeName::Factory kFactory("UrlExternalAccountCredentials");
return kFactory.Create();
}
void UrlExternalAccountCredentials::OnRetrieveSubjectTokenInternal(
grpc_error_handle error) {
http_request_.reset();
if (!error.ok()) {
FinishRetrieveSubjectToken("", error);
return;
}
absl::string_view response_body(ctx_->response.body,
ctx_->response.body_length);
if (format_type_ == "json") {
auto response_json = JsonParse(response_body);
if (!response_json.ok() || response_json->type() != Json::Type::kObject) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE(
"The format of response is not a valid json object."));
return;
}
auto response_it =
response_json->object().find(format_subject_token_field_name_);
if (response_it == response_json->object().end()) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE("Subject token field not present."));
return;
}
if (response_it->second.type() != Json::Type::kString) {
FinishRetrieveSubjectToken(
"", GRPC_ERROR_CREATE("Subject token field must be a string."));
return;
}
FinishRetrieveSubjectToken(response_it->second.string(), error);
return;
}
FinishRetrieveSubjectToken(std::string(response_body), absl::OkStatus());
}
void UrlExternalAccountCredentials::FinishRetrieveSubjectToken(
std::string subject_token, grpc_error_handle error) {
// Reset context
ctx_ = nullptr;
// Move object state into local variables.
auto cb = cb_;
cb_ = nullptr;
// Invoke the callback.
if (!error.ok()) {
cb("", error);
} else {
cb(subject_token, absl::OkStatus());
OrphanablePtr<ExternalAccountCredentials::FetchBody>
UrlExternalAccountCredentials::RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) {
auto url_for_request =
URI::Create(url_.scheme(), url_.authority(), url_full_path_,
{} /* query params */, "" /* fragment */);
if (!url_for_request.ok()) {
return MakeOrphanable<NoOpFetchBody>(
event_engine(), std::move(on_done),
absl_status_to_grpc_error(url_for_request.status()));
}
return MakeOrphanable<HttpFetchBody>(
[&](grpc_http_response* response, grpc_closure* on_http_response) {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.path = gpr_strdup(url_full_path_.c_str());
grpc_http_header* headers = nullptr;
request.hdr_count = headers_.size();
headers = static_cast<grpc_http_header*>(
gpr_malloc(sizeof(grpc_http_header) * request.hdr_count));
int i = 0;
for (auto const& header : headers_) {
headers[i].key = gpr_strdup(header.first.c_str());
headers[i].value = gpr_strdup(header.second.c_str());
++i;
}
request.hdrs = headers;
RefCountedPtr<grpc_channel_credentials> http_request_creds;
if (url_.scheme() == "http") {
http_request_creds = RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create());
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
auto http_request =
HttpRequest::Get(std::move(*url_for_request), /*args=*/nullptr,
pollent(), &request, deadline, on_http_response,
response, std::move(http_request_creds));
http_request->Start();
grpc_http_request_destroy(&request);
return http_request;
},
[this, on_done = std::move(on_done)](
absl::StatusOr<std::string> response_body) mutable {
if (!response_body.ok()) {
on_done(std::move(response_body));
return;
}
if (format_type_ == "json") {
auto response_json = JsonParse(*response_body);
if (!response_json.ok() ||
response_json->type() != Json::Type::kObject) {
on_done(GRPC_ERROR_CREATE(
"The format of response is not a valid json object."));
return;
}
auto response_it =
response_json->object().find(format_subject_token_field_name_);
if (response_it == response_json->object().end()) {
on_done(GRPC_ERROR_CREATE("Subject token field not present."));
return;
}
if (response_it->second.type() != Json::Type::kString) {
on_done(GRPC_ERROR_CREATE("Subject token field must be a string."));
return;
}
on_done(response_it->second.string());
return;
}
on_done(std::move(response_body));
});
}
absl::string_view UrlExternalAccountCredentials::CredentialSourceType() {

@ -37,37 +37,34 @@ namespace grpc_core {
class UrlExternalAccountCredentials final : public ExternalAccountCredentials {
public:
static RefCountedPtr<UrlExternalAccountCredentials> Create(
static absl::StatusOr<RefCountedPtr<UrlExternalAccountCredentials>> Create(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine = nullptr);
UrlExternalAccountCredentials(
Options options, std::vector<std::string> scopes,
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine,
grpc_error_handle* error);
UrlExternalAccountCredentials(Options options,
std::vector<std::string> scopes,
grpc_error_handle* error);
std::string debug_string() override;
UniqueTypeName type() const override;
private:
void RetrieveSubjectToken(
HTTPRequestContext* ctx, const Options& options,
std::function<void(std::string, grpc_error_handle)> cb) override;
OrphanablePtr<FetchBody> RetrieveSubjectToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<std::string>)> on_done) override;
absl::string_view CredentialSourceType() override;
static void OnRetrieveSubjectToken(void* arg, grpc_error_handle error);
void OnRetrieveSubjectTokenInternal(grpc_error_handle error);
void FinishRetrieveSubjectToken(std::string subject_token,
grpc_error_handle error);
// Fields of credential source
URI url_;
std::string url_full_path_;
std::map<std::string, std::string> headers_;
std::string format_type_;
std::string format_subject_token_field_name_;
OrphanablePtr<HttpRequest> http_request_;
HTTPRequestContext* ctx_ = nullptr;
std::function<void(std::string, grpc_error_handle)> cb_ = nullptr;
};
} // namespace grpc_core

@ -99,6 +99,8 @@ class grpc_md_only_test_credentials : public grpc_call_credentials {
key_(grpc_core::Slice::FromCopiedString(md_key)),
value_(grpc_core::Slice::FromCopiedString(md_value)) {}
void Orphaned() override {}
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;

@ -258,68 +258,54 @@ static int is_metadata_server_reachable() {
static grpc_error_handle create_default_creds_from_path(
const std::string& creds_path,
grpc_core::RefCountedPtr<grpc_call_credentials>* creds) {
grpc_auth_json_key key;
grpc_auth_refresh_token token;
grpc_core::RefCountedPtr<grpc_call_credentials> result;
absl::StatusOr<grpc_core::Slice> creds_data;
grpc_error_handle error;
Json json;
if (creds_path.empty()) {
error = GRPC_ERROR_CREATE("creds_path unset");
goto end;
return GRPC_ERROR_CREATE("creds_path unset");
}
creds_data = grpc_core::LoadFile(creds_path, /*add_null_terminator=*/false);
auto creds_data =
grpc_core::LoadFile(creds_path, /*add_null_terminator=*/false);
if (!creds_data.ok()) {
error = absl_status_to_grpc_error(creds_data.status());
goto end;
return absl_status_to_grpc_error(creds_data.status());
}
{
auto json_or = grpc_core::JsonParse(creds_data->as_string_view());
if (!json_or.ok()) {
error = absl_status_to_grpc_error(json_or.status());
goto end;
}
json = std::move(*json_or);
auto json = grpc_core::JsonParse(creds_data->as_string_view());
if (!json.ok()) {
return absl_status_to_grpc_error(json.status());
}
if (json.type() != Json::Type::kObject) {
error = GRPC_ERROR_CREATE(absl::StrCat("Failed to parse JSON \"",
creds_data->as_string_view(), "\""));
goto end;
if (json->type() != Json::Type::kObject) {
return GRPC_ERROR_CREATE(absl::StrCat("Failed to parse JSON \"",
creds_data->as_string_view(), "\""));
}
// First, try an auth json key.
key = grpc_auth_json_key_create_from_json(json);
grpc_auth_json_key key = grpc_auth_json_key_create_from_json(*json);
if (grpc_auth_json_key_is_valid(&key)) {
result =
*creds =
grpc_service_account_jwt_access_credentials_create_from_auth_json_key(
key, grpc_max_auth_token_lifetime());
if (result == nullptr) {
error = GRPC_ERROR_CREATE(
if (*creds == nullptr) {
return GRPC_ERROR_CREATE(
"grpc_service_account_jwt_access_credentials_create_from_auth_json_"
"key failed");
}
goto end;
return absl::OkStatus();
}
// Then try a refresh token if the auth json key was invalid.
token = grpc_auth_refresh_token_create_from_json(json);
grpc_auth_refresh_token token =
grpc_auth_refresh_token_create_from_json(*json);
if (grpc_auth_refresh_token_is_valid(&token)) {
result =
*creds =
grpc_refresh_token_credentials_create_from_auth_refresh_token(token);
if (result == nullptr) {
error = GRPC_ERROR_CREATE(
if (*creds == nullptr) {
return GRPC_ERROR_CREATE(
"grpc_refresh_token_credentials_create_from_auth_refresh_token "
"failed");
}
goto end;
return absl::OkStatus();
}
result = grpc_core::ExternalAccountCredentials::Create(json, {}, &error);
end:
CHECK((result == nullptr) + (error.ok()) == 1);
*creds = result;
return error;
// Use external creds.
auto external_creds =
grpc_core::ExternalAccountCredentials::Create(*json, {});
if (!external_creds.ok()) return external_creds.status();
*creds = std::move(*external_creds);
return absl::OkStatus();
}
static void update_tenancy() {

@ -40,6 +40,8 @@ class grpc_google_iam_credentials : public grpc_call_credentials {
grpc_google_iam_credentials(const char* token,
const char* authority_selector);
void Orphaned() override {}
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;

@ -51,6 +51,8 @@ class grpc_service_account_jwt_access_credentials
gpr_timespec token_lifetime);
~grpc_service_account_jwt_access_credentials() override;
void Orphaned() override {}
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;

@ -18,7 +18,6 @@
#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include <stdlib.h>
#include <string.h>
#include <algorithm>
@ -30,6 +29,7 @@
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
@ -141,13 +141,49 @@ void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token* refresh_token) {
}
//
// Oauth2 Token Fetcher credentials.
// Oauth2 Token parsing.
//
grpc_oauth2_token_fetcher_credentials::
~grpc_oauth2_token_fetcher_credentials() {
gpr_mu_destroy(&mu_);
grpc_pollset_set_destroy(grpc_polling_entity_pollset_set(&pollent_));
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response_body(
absl::string_view body, absl::optional<grpc_core::Slice>* token_value,
grpc_core::Duration* token_lifetime) {
auto json = grpc_core::JsonParse(body);
if (!json.ok()) {
LOG(ERROR) << "Could not parse JSON from " << body << ": " << json.status();
return GRPC_CREDENTIALS_ERROR;
}
if (json->type() != Json::Type::kObject) {
LOG(ERROR) << "Response should be a JSON object";
return GRPC_CREDENTIALS_ERROR;
}
auto it = json->object().find("access_token");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
LOG(ERROR) << "Missing or invalid access_token in JSON.";
return GRPC_CREDENTIALS_ERROR;
}
absl::string_view access_token = it->second.string();
it = json->object().find("token_type");
if (it == json->object().end() || it->second.type() != Json::Type::kString) {
LOG(ERROR) << "Missing or invalid token_type in JSON.";
return GRPC_CREDENTIALS_ERROR;
}
absl::string_view token_type = it->second.string();
it = json->object().find("expires_in");
if (it == json->object().end() || it->second.type() != Json::Type::kNumber) {
LOG(ERROR) << "Missing or invalid expires_in in JSON.";
return GRPC_CREDENTIALS_ERROR;
}
absl::string_view expires_in = it->second.string();
long seconds;
if (!absl::SimpleAtoi(expires_in, &seconds)) {
LOG(ERROR) << "Invalid expires_in in JSON.";
return GRPC_CREDENTIALS_ERROR;
}
*token_lifetime = grpc_core::Duration::Seconds(seconds);
*token_value = grpc_core::Slice::FromCopiedString(
absl::StrCat(token_type, " ", access_token));
return GRPC_CREDENTIALS_OK;
}
grpc_credentials_status
@ -155,217 +191,99 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response(
const grpc_http_response* response,
absl::optional<grpc_core::Slice>* token_value,
grpc_core::Duration* token_lifetime) {
char* null_terminated_body = nullptr;
grpc_credentials_status status = GRPC_CREDENTIALS_OK;
*token_value = absl::nullopt;
if (response == nullptr) {
LOG(ERROR) << "Received NULL response.";
status = GRPC_CREDENTIALS_ERROR;
goto end;
return GRPC_CREDENTIALS_ERROR;
}
if (response->body_length > 0) {
null_terminated_body =
static_cast<char*>(gpr_malloc(response->body_length + 1));
null_terminated_body[response->body_length] = '\0';
memcpy(null_terminated_body, response->body, response->body_length);
}
absl::string_view body(response->body, response->body_length);
if (response->status != 200) {
LOG(ERROR) << "Call to http server ended with error " << response->status
<< " ["
<< (null_terminated_body != nullptr ? null_terminated_body : "")
<< "]";
status = GRPC_CREDENTIALS_ERROR;
goto end;
} else {
const char* access_token = nullptr;
const char* token_type = nullptr;
const char* expires_in = nullptr;
Json::Object::const_iterator it;
auto json = grpc_core::JsonParse(
null_terminated_body != nullptr ? null_terminated_body : "");
if (!json.ok()) {
LOG(ERROR) << "Could not parse JSON from " << null_terminated_body << ": "
<< json.status();
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
if (json->type() != Json::Type::kObject) {
LOG(ERROR) << "Response should be a JSON object";
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
it = json->object().find("access_token");
if (it == json->object().end() ||
it->second.type() != Json::Type::kString) {
LOG(ERROR) << "Missing or invalid access_token in JSON.";
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
access_token = it->second.string().c_str();
it = json->object().find("token_type");
if (it == json->object().end() ||
it->second.type() != Json::Type::kString) {
LOG(ERROR) << "Missing or invalid token_type in JSON.";
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
token_type = it->second.string().c_str();
it = json->object().find("expires_in");
if (it == json->object().end() ||
it->second.type() != Json::Type::kNumber) {
LOG(ERROR) << "Missing or invalid expires_in in JSON.";
status = GRPC_CREDENTIALS_ERROR;
goto end;
}
expires_in = it->second.string().c_str();
*token_lifetime =
grpc_core::Duration::Seconds(strtol(expires_in, nullptr, 10));
*token_value = grpc_core::Slice::FromCopiedString(
absl::StrCat(token_type, " ", access_token));
status = GRPC_CREDENTIALS_OK;
<< " [" << body << "]";
return GRPC_CREDENTIALS_ERROR;
}
end:
if (status != GRPC_CREDENTIALS_OK) *token_value = absl::nullopt;
gpr_free(null_terminated_body);
return status;
return grpc_oauth2_token_fetcher_credentials_parse_server_response_body(
body, token_value, token_lifetime);
}
static void on_oauth2_token_fetcher_http_response(void* user_data,
grpc_error_handle error) {
GRPC_LOG_IF_ERROR("oauth_fetch", error);
grpc_credentials_metadata_request* r =
static_cast<grpc_credentials_metadata_request*>(user_data);
grpc_oauth2_token_fetcher_credentials* c =
reinterpret_cast<grpc_oauth2_token_fetcher_credentials*>(r->creds.get());
c->on_http_response(r, error);
}
//
// Oauth2TokenFetcherCredentials
//
void grpc_oauth2_token_fetcher_credentials::on_http_response(
grpc_credentials_metadata_request* r, grpc_error_handle error) {
absl::optional<grpc_core::Slice> access_token_value;
grpc_core::Duration token_lifetime;
grpc_credentials_status status =
error.ok() ? grpc_oauth2_token_fetcher_credentials_parse_server_response(
&r->response, &access_token_value, &token_lifetime)
: GRPC_CREDENTIALS_ERROR;
// Update cache and grab list of pending requests.
gpr_mu_lock(&mu_);
token_fetch_pending_ = false;
if (access_token_value.has_value()) {
access_token_value_ = access_token_value->Ref();
} else {
access_token_value_ = absl::nullopt;
}
token_expiration_ = status == GRPC_CREDENTIALS_OK
? gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
token_lifetime.as_timespec())
: gpr_inf_past(GPR_CLOCK_MONOTONIC);
grpc_oauth2_pending_get_request_metadata* pending_request = pending_requests_;
pending_requests_ = nullptr;
gpr_mu_unlock(&mu_);
// Invoke callbacks for all pending requests.
while (pending_request != nullptr) {
if (status == GRPC_CREDENTIALS_OK) {
pending_request->result = access_token_value->Ref();
} else {
auto err = GRPC_ERROR_CREATE_REFERENCING(
"Error occurred when fetching oauth2 token.", &error, 1);
pending_request->result = grpc_error_to_absl_status(err);
}
pending_request->done.store(true, std::memory_order_release);
pending_request->waker.Wakeup();
grpc_polling_entity_del_from_pollset_set(
pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_));
grpc_oauth2_pending_get_request_metadata* prev = pending_request;
pending_request = pending_request->next;
prev->Unref();
}
delete r;
}
namespace grpc_core {
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
grpc_oauth2_token_fetcher_credentials::GetRequestMetadata(
grpc_core::ClientMetadataHandle initial_metadata,
const grpc_call_credentials::GetRequestMetadataArgs*) {
// Check if we can use the cached token.
absl::optional<grpc_core::Slice> cached_access_token_value;
gpr_mu_lock(&mu_);
if (access_token_value_.has_value() &&
gpr_time_cmp(
gpr_time_sub(token_expiration_, gpr_now(GPR_CLOCK_MONOTONIC)),
gpr_time_from_seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS,
GPR_TIMESPAN)) > 0) {
cached_access_token_value = access_token_value_->Ref();
}
if (cached_access_token_value.has_value()) {
gpr_mu_unlock(&mu_);
initial_metadata->Append(
GRPC_AUTHORIZATION_METADATA_KEY, std::move(*cached_access_token_value),
[](absl::string_view, const grpc_core::Slice&) { abort(); });
return grpc_core::Immediate(std::move(initial_metadata));
// State held for a pending HTTP request.
class Oauth2TokenFetcherCredentials::HttpFetchRequest final
: public TokenFetcherCredentials::FetchRequest {
public:
HttpFetchRequest(
Oauth2TokenFetcherCredentials* creds, Timestamp deadline,
absl::AnyInvocable<
void(absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done)
: on_done_(std::move(on_done)) {
GRPC_CLOSURE_INIT(&on_http_response_, OnHttpResponse, this, nullptr);
Ref().release(); // Ref held by HTTP request callback.
http_request_ = creds->StartHttpRequest(creds->pollent(), deadline,
&response_, &on_http_response_);
}
// Couldn't get the token from the cache.
// Add request to pending_requests_ and start a new fetch if needed.
grpc_core::Duration refresh_threshold =
grpc_core::Duration::Seconds(GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS);
auto pending_request =
grpc_core::MakeRefCounted<grpc_oauth2_pending_get_request_metadata>();
pending_request->pollent = grpc_core::GetContext<grpc_polling_entity>();
pending_request->waker =
grpc_core::GetContext<grpc_core::Activity>()->MakeNonOwningWaker();
grpc_polling_entity_add_to_pollset_set(
pending_request->pollent, grpc_polling_entity_pollset_set(&pollent_));
pending_request->next = pending_requests_;
pending_request->md = std::move(initial_metadata);
pending_requests_ = pending_request->Ref().release();
bool start_fetch = false;
if (!token_fetch_pending_) {
token_fetch_pending_ = true;
start_fetch = true;
~HttpFetchRequest() override { grpc_http_response_destroy(&response_); }
void Orphan() override {
http_request_.reset();
Unref();
}
gpr_mu_unlock(&mu_);
if (start_fetch) {
fetch_oauth2(new grpc_credentials_metadata_request(Ref()), &pollent_,
on_oauth2_token_fetcher_http_response,
grpc_core::Timestamp::Now() + refresh_threshold);
private:
static void OnHttpResponse(void* arg, grpc_error_handle error) {
RefCountedPtr<HttpFetchRequest> self(static_cast<HttpFetchRequest*>(arg));
if (!error.ok()) {
self->on_done_(std::move(error));
return;
}
// Parse oauth2 token.
absl::optional<Slice> access_token_value;
Duration token_lifetime;
grpc_credentials_status status =
grpc_oauth2_token_fetcher_credentials_parse_server_response(
&self->response_, &access_token_value, &token_lifetime);
if (status != GRPC_CREDENTIALS_OK) {
self->on_done_(absl::UnavailableError("error parsing oauth2 token"));
return;
}
self->on_done_(MakeRefCounted<Token>(std::move(*access_token_value),
Timestamp::Now() + token_lifetime));
}
return
[pending_request]()
-> grpc_core::Poll<absl::StatusOr<grpc_core::ClientMetadataHandle>> {
if (!pending_request->done.load(std::memory_order_acquire)) {
return grpc_core::Pending{};
}
if (pending_request->result.ok()) {
pending_request->md->Append(
GRPC_AUTHORIZATION_METADATA_KEY,
std::move(*pending_request->result),
[](absl::string_view, const grpc_core::Slice&) { abort(); });
return std::move(pending_request->md);
} else {
return pending_request->result.status();
}
};
}
grpc_oauth2_token_fetcher_credentials::grpc_oauth2_token_fetcher_credentials()
: token_expiration_(gpr_inf_past(GPR_CLOCK_MONOTONIC)),
pollent_(grpc_polling_entity_create_from_pollset_set(
grpc_pollset_set_create())) {
gpr_mu_init(&mu_);
}
OrphanablePtr<HttpRequest> http_request_;
grpc_closure on_http_response_;
grpc_http_response response_;
absl::AnyInvocable<void(
absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done_;
};
std::string grpc_oauth2_token_fetcher_credentials::debug_string() {
std::string Oauth2TokenFetcherCredentials::debug_string() {
return "OAuth2TokenFetcherCredentials";
}
grpc_core::UniqueTypeName grpc_oauth2_token_fetcher_credentials::type() const {
static grpc_core::UniqueTypeName::Factory kFactory("Oauth2");
UniqueTypeName Oauth2TokenFetcherCredentials::type() const {
static UniqueTypeName::Factory kFactory("Oauth2");
return kFactory.Create();
}
OrphanablePtr<TokenFetcherCredentials::FetchRequest>
Oauth2TokenFetcherCredentials::FetchToken(
Timestamp deadline,
absl::AnyInvocable<
void(absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done) {
return MakeOrphanable<HttpFetchRequest>(this, deadline, std::move(on_done));
}
} // namespace grpc_core
//
// Google Compute Engine credentials.
//
@ -373,16 +291,21 @@ grpc_core::UniqueTypeName grpc_oauth2_token_fetcher_credentials::type() const {
namespace {
class grpc_compute_engine_token_fetcher_credentials
: public grpc_oauth2_token_fetcher_credentials {
: public grpc_core::Oauth2TokenFetcherCredentials {
public:
grpc_compute_engine_token_fetcher_credentials() = default;
~grpc_compute_engine_token_fetcher_credentials() override = default;
protected:
void fetch_oauth2(grpc_credentials_metadata_request* metadata_req,
grpc_polling_entity* pollent,
grpc_iomgr_cb_func response_cb,
grpc_core::Timestamp deadline) override {
std::string debug_string() override {
return absl::StrFormat(
"GoogleComputeEngineTokenFetcherCredentials{%s}",
grpc_core::Oauth2TokenFetcherCredentials::debug_string());
}
private:
grpc_core::OrphanablePtr<grpc_core::HttpRequest> StartHttpRequest(
grpc_polling_entity* pollent, grpc_core::Timestamp deadline,
grpc_http_response* response, grpc_closure* on_complete) override {
grpc_http_header header = {const_cast<char*>("Metadata-Flavor"),
const_cast<char*>("Google")};
grpc_http_request request;
@ -396,26 +319,14 @@ class grpc_compute_engine_token_fetcher_credentials
GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH,
{} /* query params */, "" /* fragment */);
CHECK(uri.ok()); // params are hardcoded
http_request_ = grpc_core::HttpRequest::Get(
std::move(*uri), nullptr /* channel args */, pollent, &request,
deadline,
GRPC_CLOSURE_INIT(&http_get_cb_closure_, response_cb, metadata_req,
grpc_schedule_on_exec_ctx),
&metadata_req->response,
auto http_request = grpc_core::HttpRequest::Get(
std::move(*uri), /*args=*/nullptr, pollent, &request, deadline,
on_complete, response,
grpc_core::RefCountedPtr<grpc_channel_credentials>(
grpc_insecure_credentials_create()));
http_request_->Start();
http_request->Start();
return http_request;
}
std::string debug_string() override {
return absl::StrFormat(
"GoogleComputeEngineTokenFetcherCredentials{%s}",
grpc_oauth2_token_fetcher_credentials::debug_string());
}
private:
grpc_closure http_get_cb_closure_;
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request_;
};
} // namespace
@ -434,22 +345,26 @@ grpc_call_credentials* grpc_google_compute_engine_credentials_create(
// Google Refresh Token credentials.
//
grpc_google_refresh_token_credentials::grpc_google_refresh_token_credentials(
grpc_auth_refresh_token refresh_token)
: refresh_token_(refresh_token) {}
grpc_google_refresh_token_credentials::
~grpc_google_refresh_token_credentials() {
grpc_auth_refresh_token_destruct(&refresh_token_);
}
void grpc_google_refresh_token_credentials::fetch_oauth2(
grpc_credentials_metadata_request* metadata_req,
grpc_polling_entity* pollent, grpc_iomgr_cb_func response_cb,
grpc_core::Timestamp deadline) {
grpc_core::OrphanablePtr<grpc_core::HttpRequest>
grpc_google_refresh_token_credentials::StartHttpRequest(
grpc_polling_entity* pollent, grpc_core::Timestamp deadline,
grpc_http_response* response, grpc_closure* on_complete) {
grpc_http_header header = {
const_cast<char*>("Content-Type"),
const_cast<char*>("application/x-www-form-urlencoded")};
grpc_http_request request;
std::string body = absl::StrFormat(
GRPC_REFRESH_TOKEN_POST_BODY_FORMAT_STRING, refresh_token_.client_id,
refresh_token_.client_secret, refresh_token_.refresh_token);
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
request.hdr_count = 1;
request.hdrs = &header;
@ -462,18 +377,13 @@ void grpc_google_refresh_token_credentials::fetch_oauth2(
GRPC_GOOGLE_OAUTH2_SERVICE_TOKEN_PATH,
{} /* query params */, "" /* fragment */);
CHECK(uri.ok()); // params are hardcoded
http_request_ = grpc_core::HttpRequest::Post(
std::move(*uri), nullptr /* channel args */, pollent, &request, deadline,
GRPC_CLOSURE_INIT(&http_post_cb_closure_, response_cb, metadata_req,
grpc_schedule_on_exec_ctx),
&metadata_req->response, grpc_core::CreateHttpRequestSSLCredentials());
http_request_->Start();
auto http_request = grpc_core::HttpRequest::Post(
std::move(*uri), /*args=*/nullptr, pollent, &request, deadline,
on_complete, response, grpc_core::CreateHttpRequestSSLCredentials());
http_request->Start();
return http_request;
}
grpc_google_refresh_token_credentials::grpc_google_refresh_token_credentials(
grpc_auth_refresh_token refresh_token)
: refresh_token_(refresh_token) {}
grpc_core::RefCountedPtr<grpc_call_credentials>
grpc_refresh_token_credentials_create_from_auth_refresh_token(
grpc_auth_refresh_token refresh_token) {
@ -486,9 +396,9 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token(
}
std::string grpc_google_refresh_token_credentials::debug_string() {
return absl::StrFormat("GoogleRefreshToken{ClientID:%s,%s}",
refresh_token_.client_id,
grpc_oauth2_token_fetcher_credentials::debug_string());
return absl::StrFormat(
"GoogleRefreshToken{ClientID:%s,%s}", refresh_token_.client_id,
grpc_core::Oauth2TokenFetcherCredentials::debug_string());
}
grpc_core::UniqueTypeName grpc_google_refresh_token_credentials::type() const {
@ -545,8 +455,7 @@ grpc_error_handle LoadTokenFile(const char* path, grpc_slice* token) {
return absl::OkStatus();
}
class StsTokenFetcherCredentials
: public grpc_oauth2_token_fetcher_credentials {
class StsTokenFetcherCredentials : public Oauth2TokenFetcherCredentials {
public:
StsTokenFetcherCredentials(URI sts_url,
const grpc_sts_credentials_options* options)
@ -563,21 +472,19 @@ class StsTokenFetcherCredentials
std::string debug_string() override {
return absl::StrFormat(
"StsTokenFetcherCredentials{Path:%s,Authority:%s,%s}", sts_url_.path(),
sts_url_.authority(),
grpc_oauth2_token_fetcher_credentials::debug_string());
sts_url_.authority(), Oauth2TokenFetcherCredentials::debug_string());
}
private:
void fetch_oauth2(grpc_credentials_metadata_request* metadata_req,
grpc_polling_entity* pollent,
grpc_iomgr_cb_func response_cb,
Timestamp deadline) override {
OrphanablePtr<HttpRequest> StartHttpRequest(
grpc_polling_entity* pollent, Timestamp deadline,
grpc_http_response* response, grpc_closure* on_complete) override {
grpc_http_request request;
memset(&request, 0, sizeof(grpc_http_request));
grpc_error_handle err = FillBody(&request.body, &request.body_length);
if (!err.ok()) {
response_cb(metadata_req, err);
return;
ExecCtx::Run(DEBUG_LOCATION, on_complete, std::move(err));
return nullptr;
}
grpc_http_header header = {
const_cast<char*>("Content-Type"),
@ -594,13 +501,12 @@ class StsTokenFetcherCredentials
} else {
http_request_creds = CreateHttpRequestSSLCredentials();
}
http_request_ = HttpRequest::Post(
sts_url_, nullptr /* channel args */, pollent, &request, deadline,
GRPC_CLOSURE_INIT(&http_post_cb_closure_, response_cb, metadata_req,
grpc_schedule_on_exec_ctx),
&metadata_req->response, std::move(http_request_creds));
http_request_->Start();
auto http_request = HttpRequest::Post(
sts_url_, /*args=*/nullptr, pollent, &request, deadline, on_complete,
response, std::move(http_request_creds));
http_request->Start();
gpr_free(request.body);
return http_request;
}
grpc_error_handle FillBody(char** body, size_t* body_length) {
@ -646,7 +552,6 @@ class StsTokenFetcherCredentials
}
URI sts_url_;
grpc_closure http_post_cb_closure_;
UniquePtr<char> resource_;
UniquePtr<char> audience_;
UniquePtr<char> scope_;

@ -24,6 +24,7 @@
#include <utility>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/credentials.h>
@ -43,6 +44,7 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/lib/uri/uri_parser.h"
@ -80,73 +82,45 @@ grpc_auth_refresh_token grpc_auth_refresh_token_create_from_json(
/// Destructs the object.
void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token* refresh_token);
// -- Credentials Metadata Request. --
struct grpc_credentials_metadata_request {
explicit grpc_credentials_metadata_request(
grpc_core::RefCountedPtr<grpc_call_credentials> creds)
: creds(std::move(creds)) {}
~grpc_credentials_metadata_request() {
grpc_http_response_destroy(&response);
}
grpc_core::RefCountedPtr<grpc_call_credentials> creds;
grpc_http_response response;
};
struct grpc_oauth2_pending_get_request_metadata
: public grpc_core::RefCounted<grpc_oauth2_pending_get_request_metadata> {
std::atomic<bool> done{false};
grpc_core::Waker waker;
grpc_polling_entity* pollent;
grpc_core::ClientMetadataHandle md;
struct grpc_oauth2_pending_get_request_metadata* next;
absl::StatusOr<grpc_core::Slice> result;
};
// -- Oauth2 Token Fetcher credentials --
//
// This object is a base for credentials that need to acquire an oauth2 token
// from an http service.
class grpc_oauth2_token_fetcher_credentials : public grpc_call_credentials {
public:
grpc_oauth2_token_fetcher_credentials();
~grpc_oauth2_token_fetcher_credentials() override;
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;
namespace grpc_core {
void on_http_response(grpc_credentials_metadata_request* r,
grpc_error_handle error);
// A base class for oauth2 token fetching credentials.
// Subclasses must implement StartHttpRequest().
class Oauth2TokenFetcherCredentials : public TokenFetcherCredentials {
public:
std::string debug_string() override;
grpc_core::UniqueTypeName type() const override;
UniqueTypeName type() const override;
OrphanablePtr<FetchRequest> FetchToken(
Timestamp deadline,
absl::AnyInvocable<
void(absl::StatusOr<RefCountedPtr<TokenFetcherCredentials::Token>>)>
on_done) final;
protected:
virtual void fetch_oauth2(grpc_credentials_metadata_request* req,
grpc_polling_entity* pollent, grpc_iomgr_cb_func cb,
grpc_core::Timestamp deadline) = 0;
virtual OrphanablePtr<HttpRequest> StartHttpRequest(
grpc_polling_entity* pollent, Timestamp deadline,
grpc_http_response* response, grpc_closure* on_complete) = 0;
private:
class HttpFetchRequest;
int cmp_impl(const grpc_call_credentials* other) const override {
// TODO(yashykt): Check if we can do something better here
return grpc_core::QsortCompare(
static_cast<const grpc_call_credentials*>(this), other);
return QsortCompare(static_cast<const grpc_call_credentials*>(this), other);
}
gpr_mu mu_;
absl::optional<grpc_core::Slice> access_token_value_;
gpr_timespec token_expiration_;
bool token_fetch_pending_ = false;
grpc_oauth2_pending_get_request_metadata* pending_requests_ = nullptr;
grpc_polling_entity pollent_;
};
} // namespace grpc_core
// Google refresh token credentials.
class grpc_google_refresh_token_credentials final
: public grpc_oauth2_token_fetcher_credentials {
: public grpc_core::Oauth2TokenFetcherCredentials {
public:
explicit grpc_google_refresh_token_credentials(
grpc_auth_refresh_token refresh_token);
@ -160,15 +134,12 @@ class grpc_google_refresh_token_credentials final
grpc_core::UniqueTypeName type() const override;
protected:
void fetch_oauth2(grpc_credentials_metadata_request* req,
grpc_polling_entity* pollent, grpc_iomgr_cb_func cb,
grpc_core::Timestamp deadline) override;
private:
grpc_core::OrphanablePtr<grpc_core::HttpRequest> StartHttpRequest(
grpc_polling_entity* pollent, grpc_core::Timestamp deadline,
grpc_http_response* response, grpc_closure* on_complete) override;
grpc_auth_refresh_token refresh_token_;
grpc_closure http_post_cb_closure_;
grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request_;
};
// Access token credentials.
@ -176,6 +147,8 @@ class grpc_access_token_credentials final : public grpc_call_credentials {
public:
explicit grpc_access_token_credentials(const char* access_token);
void Orphaned() override {}
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;
@ -202,6 +175,11 @@ grpc_core::RefCountedPtr<grpc_call_credentials>
grpc_refresh_token_credentials_create_from_auth_refresh_token(
grpc_auth_refresh_token token);
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response_body(
absl::string_view body, absl::optional<grpc_core::Slice>* token_value,
grpc_core::Duration* token_lifetime);
// Exposed for testing only.
grpc_credentials_status
grpc_oauth2_token_fetcher_credentials_parse_server_response(

@ -57,6 +57,8 @@ struct grpc_plugin_credentials final : public grpc_call_credentials {
grpc_security_level min_security_level);
~grpc_plugin_credentials() override;
void Orphaned() override {}
grpc_core::ArenaPromise<absl::StatusOr<grpc_core::ClientMetadataHandle>>
GetRequestMetadata(grpc_core::ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;

@ -0,0 +1,130 @@
//
//
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
namespace grpc_core {
namespace {
// Amount of time before the token's expiration that we consider it
// invalid and start a new fetch. Also determines the timeout for the
// fetch request.
constexpr Duration kTokenRefreshDuration = Duration::Seconds(60);
} // namespace
//
// TokenFetcherCredentials::Token
//
void TokenFetcherCredentials::Token::AddTokenToClientInitialMetadata(
ClientMetadata& metadata) const {
metadata.Append(GRPC_AUTHORIZATION_METADATA_KEY, token_.Ref(),
[](absl::string_view, const Slice&) { abort(); });
}
//
// TokenFetcherCredentials
//
TokenFetcherCredentials::TokenFetcherCredentials()
: pollent_(grpc_polling_entity_create_from_pollset_set(
grpc_pollset_set_create())) {}
TokenFetcherCredentials::~TokenFetcherCredentials() {
grpc_pollset_set_destroy(grpc_polling_entity_pollset_set(&pollent_));
}
void TokenFetcherCredentials::Orphaned() {
MutexLock lock(&mu_);
auto* fetch_request = absl::get_if<OrphanablePtr<FetchRequest>>(&token_);
if (fetch_request != nullptr) fetch_request->reset();
}
ArenaPromise<absl::StatusOr<ClientMetadataHandle>>
TokenFetcherCredentials::GetRequestMetadata(
ClientMetadataHandle initial_metadata, const GetRequestMetadataArgs*) {
RefCountedPtr<PendingCall> pending_call;
{
MutexLock lock(&mu_);
// Check if we can use the cached token.
auto* cached_token = absl::get_if<RefCountedPtr<Token>>(&token_);
if (cached_token != nullptr && *cached_token != nullptr &&
((*cached_token)->ExpirationTime() - Timestamp::Now()) >
kTokenRefreshDuration) {
(*cached_token)->AddTokenToClientInitialMetadata(*initial_metadata);
return Immediate(std::move(initial_metadata));
}
// Couldn't get the token from the cache.
// Add this call to the pending list.
pending_call = MakeRefCounted<PendingCall>();
pending_call->waker = GetContext<Activity>()->MakeNonOwningWaker();
pending_call->pollent = GetContext<grpc_polling_entity>();
grpc_polling_entity_add_to_pollset_set(
pending_call->pollent, grpc_polling_entity_pollset_set(&pollent_));
pending_call->md = std::move(initial_metadata);
pending_calls_.insert(pending_call);
// Start a new fetch if needed.
if (!absl::holds_alternative<OrphanablePtr<FetchRequest>>(token_)) {
token_ = FetchToken(
/*deadline=*/Timestamp::Now() + kTokenRefreshDuration,
[self = WeakRefAsSubclass<TokenFetcherCredentials>()](
absl::StatusOr<RefCountedPtr<Token>> token) mutable {
self->TokenFetchComplete(std::move(token));
});
}
}
return [pending_call = std::move(
pending_call)]() -> Poll<absl::StatusOr<ClientMetadataHandle>> {
if (!pending_call->done.load(std::memory_order_acquire)) {
return Pending{};
}
if (!pending_call->result.ok()) {
return pending_call->result.status();
}
(*pending_call->result)->AddTokenToClientInitialMetadata(*pending_call->md);
return std::move(pending_call->md);
};
}
void TokenFetcherCredentials::TokenFetchComplete(
absl::StatusOr<RefCountedPtr<Token>> token) {
// Update cache and grab list of pending requests.
absl::flat_hash_set<RefCountedPtr<PendingCall>> pending_calls;
{
MutexLock lock(&mu_);
token_ = token.value_or(nullptr);
pending_calls_.swap(pending_calls);
}
// Invoke callbacks for all pending requests.
for (auto& pending_call : pending_calls) {
pending_call->result = token;
pending_call->done.store(true, std::memory_order_release);
pending_call->waker.Wakeup();
grpc_polling_entity_del_from_pollset_set(
pending_call->pollent, grpc_polling_entity_pollset_set(&pollent_));
}
}
} // namespace grpc_core

@ -0,0 +1,115 @@
//
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef GRPC_SRC_CORE_LIB_SECURITY_CREDENTIALS_TOKEN_FETCHER_TOKEN_FETCHER_CREDENTIALS_H
#define GRPC_SRC_CORE_LIB_SECURITY_CREDENTIALS_TOKEN_FETCHER_TOKEN_FETCHER_CREDENTIALS_H
#include <atomic>
#include <memory>
#include <utility>
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h"
#include "absl/types/variant.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/util/http_client/httpcli.h"
#include "src/core/util/useful.h"
namespace grpc_core {
// A base class for credentials that fetch tokens via an HTTP request.
// Subclasses must implement FetchToken().
class TokenFetcherCredentials : public grpc_call_credentials {
public:
// Represents a token.
class Token : public RefCounted<Token> {
public:
Token(Slice token, Timestamp expiration)
: token_(std::move(token)), expiration_(expiration) {}
// Returns the token's expiration time.
Timestamp ExpirationTime() const { return expiration_; }
// Adds the token to the call's client initial metadata.
void AddTokenToClientInitialMetadata(ClientMetadata& metadata) const;
private:
Slice token_;
Timestamp expiration_;
};
~TokenFetcherCredentials() override;
void Orphaned() override;
ArenaPromise<absl::StatusOr<ClientMetadataHandle>> GetRequestMetadata(
ClientMetadataHandle initial_metadata,
const GetRequestMetadataArgs* args) override;
protected:
// Base class for fetch requests.
class FetchRequest : public InternallyRefCounted<FetchRequest> {};
TokenFetcherCredentials();
// Fetches a token. The on_done callback will be invoked when complete.
virtual OrphanablePtr<FetchRequest> FetchToken(
Timestamp deadline,
absl::AnyInvocable<void(absl::StatusOr<RefCountedPtr<Token>>)>
on_done) = 0;
grpc_polling_entity* pollent() { return &pollent_; }
private:
// A call that is waiting for a token fetch request to complete.
struct PendingCall : public RefCounted<PendingCall> {
std::atomic<bool> done{false};
Waker waker;
grpc_polling_entity* pollent;
ClientMetadataHandle md;
absl::StatusOr<RefCountedPtr<Token>> result;
};
int cmp_impl(const grpc_call_credentials* other) const override {
// TODO(yashykt): Check if we can do something better here
return QsortCompare(static_cast<const grpc_call_credentials*>(this), other);
}
void TokenFetchComplete(absl::StatusOr<RefCountedPtr<Token>> token);
Mutex mu_;
// Either the cached token or a pending request to fetch the token.
absl::variant<RefCountedPtr<Token>, OrphanablePtr<FetchRequest>> token_
ABSL_GUARDED_BY(&mu_);
// Calls that are queued up waiting for the token.
absl::flat_hash_set<RefCountedPtr<PendingCall>> pending_calls_
ABSL_GUARDED_BY(&mu_);
grpc_polling_entity pollent_ ABSL_GUARDED_BY(&mu_);
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_LIB_SECURITY_CREDENTIALS_TOKEN_FETCHER_TOKEN_FETCHER_CREDENTIALS_H

@ -644,6 +644,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/security/credentials/tls/grpc_tls_crl_provider.cc',
'src/core/lib/security/credentials/tls/tls_credentials.cc',
'src/core/lib/security/credentials/tls/tls_utils.cc',
'src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc',
'src/core/lib/security/credentials/xds/xds_credentials.cc',
'src/core/lib/security/security_connector/alts/alts_security_connector.cc',
'src/core/lib/security/security_connector/fake/fake_security_connector.cc',

@ -58,6 +58,8 @@ class ClientAuthFilterTest : public FilterTest<ClientAuthFilter> {
: grpc_call_credentials(GRPC_SECURITY_NONE),
status_(std::move(status)) {}
void Orphaned() override {}
UniqueTypeName type() const override {
static UniqueTypeName::Factory kFactory("FailCallCreds");
return kFactory.Create();

@ -111,6 +111,8 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//src/core:channel_args",
"//test/core/event_engine:event_engine_test_utils",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/test_util:grpc_test_util",
"//test/core/test_util:grpc_test_util_base",
],

File diff suppressed because it is too large Load Diff

@ -2608,6 +2608,8 @@ src/core/lib/security/credentials/tls/tls_credentials.cc \
src/core/lib/security/credentials/tls/tls_credentials.h \
src/core/lib/security/credentials/tls/tls_utils.cc \
src/core/lib/security/credentials/tls/tls_utils.h \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h \
src/core/lib/security/credentials/xds/xds_credentials.cc \
src/core/lib/security/credentials/xds/xds_credentials.h \
src/core/lib/security/security_connector/alts/alts_security_connector.cc \

@ -2379,6 +2379,8 @@ src/core/lib/security/credentials/tls/tls_credentials.cc \
src/core/lib/security/credentials/tls/tls_credentials.h \
src/core/lib/security/credentials/tls/tls_utils.cc \
src/core/lib/security/credentials/tls/tls_utils.h \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.cc \
src/core/lib/security/credentials/token_fetcher/token_fetcher_credentials.h \
src/core/lib/security/credentials/xds/xds_credentials.cc \
src/core/lib/security/credentials/xds/xds_credentials.h \
src/core/lib/security/security_connector/alts/alts_security_connector.cc \

Loading…
Cancel
Save