mirror of https://github.com/grpc/grpc.git
Second attempt: XdsClient: refactor transport code to make it injectable (#30225)
* Revert "Revert "XdsClient: refactor transport code to make it injectable (#30183)" (#30223)"
This reverts commit fa57b9d0bc
.
* fix deadlock seen internally
* Automated change: Fix sanity tests
* fix memory leak
Co-authored-by: markdroth <markdroth@users.noreply.github.com>
pull/30189/head
parent
70a311b736
commit
a3afb81274
30 changed files with 1443 additions and 1125 deletions
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,275 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/xds/xds_client_grpc.h" |
||||
|
||||
#include <limits.h> |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/string_util.h> |
||||
|
||||
#include "src/core/ext/xds/xds_bootstrap.h" |
||||
#include "src/core/ext/xds/xds_channel_args.h" |
||||
#include "src/core/ext/xds/xds_cluster_specifier_plugin.h" |
||||
#include "src/core/ext/xds/xds_http_filters.h" |
||||
#include "src/core/ext/xds/xds_transport.h" |
||||
#include "src/core/ext/xds/xds_transport_grpc.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/debug/trace.h" |
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/iomgr/load_file.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
namespace { |
||||
|
||||
Mutex* g_mu = nullptr; |
||||
const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; |
||||
XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; |
||||
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; |
||||
|
||||
} // namespace
|
||||
|
||||
void XdsClientGlobalInit() { |
||||
g_mu = new Mutex; |
||||
XdsHttpFilterRegistry::Init(); |
||||
XdsClusterSpecifierPluginRegistry::Init(); |
||||
} |
||||
|
||||
// TODO(roth): Find a better way to clear the fallback config that does
|
||||
// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
|
||||
void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS { |
||||
gpr_free(g_fallback_bootstrap_config); |
||||
g_fallback_bootstrap_config = nullptr; |
||||
delete g_mu; |
||||
g_mu = nullptr; |
||||
XdsHttpFilterRegistry::Shutdown(); |
||||
XdsClusterSpecifierPluginRegistry::Shutdown(); |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
std::string GetBootstrapContents(const char* fallback_config, |
||||
grpc_error_handle* error) { |
||||
// First, try GRPC_XDS_BOOTSTRAP env var.
|
||||
UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP")); |
||||
if (path != nullptr) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"Got bootstrap file location from GRPC_XDS_BOOTSTRAP " |
||||
"environment variable: %s", |
||||
path.get()); |
||||
} |
||||
grpc_slice contents; |
||||
*error = |
||||
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents); |
||||
if (!GRPC_ERROR_IS_NONE(*error)) return ""; |
||||
std::string contents_str(StringViewFromSlice(contents)); |
||||
grpc_slice_unref_internal(contents); |
||||
return contents_str; |
||||
} |
||||
// Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
|
||||
UniquePtr<char> env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG")); |
||||
if (env_config != nullptr) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG " |
||||
"environment variable"); |
||||
} |
||||
return env_config.get(); |
||||
} |
||||
// Finally, try fallback config.
|
||||
if (fallback_config != nullptr) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
||||
gpr_log(GPR_INFO, "Got bootstrap contents from fallback config"); |
||||
} |
||||
return fallback_config; |
||||
} |
||||
// No bootstrap config found.
|
||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG " |
||||
"not defined"); |
||||
return ""; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
RefCountedPtr<XdsClient> GrpcXdsClient::GetOrCreate( |
||||
const grpc_channel_args* args, grpc_error_handle* error) { |
||||
RefCountedPtr<XdsClient> xds_client; |
||||
// If getting bootstrap from channel args, create a local XdsClient
|
||||
// instance for the channel or server instead of using the global instance.
|
||||
const char* bootstrap_config = grpc_channel_args_find_string( |
||||
args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG); |
||||
if (bootstrap_config != nullptr) { |
||||
std::unique_ptr<XdsBootstrap> bootstrap = |
||||
XdsBootstrap::Create(bootstrap_config, error); |
||||
if (GRPC_ERROR_IS_NONE(*error)) { |
||||
grpc_channel_args* xds_channel_args = |
||||
grpc_channel_args_find_pointer<grpc_channel_args>( |
||||
args, |
||||
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); |
||||
return MakeRefCounted<GrpcXdsClient>(std::move(bootstrap), |
||||
xds_channel_args); |
||||
} |
||||
return nullptr; |
||||
} |
||||
// Otherwise, use the global instance.
|
||||
{ |
||||
MutexLock lock(g_mu); |
||||
if (g_xds_client != nullptr) { |
||||
auto xds_client = g_xds_client->RefIfNonZero(); |
||||
if (xds_client != nullptr) return xds_client; |
||||
} |
||||
// Find bootstrap contents.
|
||||
std::string bootstrap_contents = |
||||
GetBootstrapContents(g_fallback_bootstrap_config, error); |
||||
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
||||
gpr_log(GPR_INFO, "xDS bootstrap contents: %s", |
||||
bootstrap_contents.c_str()); |
||||
} |
||||
// Parse bootstrap.
|
||||
std::unique_ptr<XdsBootstrap> bootstrap = |
||||
XdsBootstrap::Create(bootstrap_contents, error); |
||||
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; |
||||
// Instantiate XdsClient.
|
||||
xds_client = |
||||
MakeRefCounted<GrpcXdsClient>(std::move(bootstrap), g_channel_args); |
||||
g_xds_client = xds_client.get(); |
||||
} |
||||
return xds_client; |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
Duration GetResourceDurationFromArgs(const grpc_channel_args* args) { |
||||
return Duration::Milliseconds(grpc_channel_args_find_integer( |
||||
args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, |
||||
{15000, 0, INT_MAX})); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
GrpcXdsClient::GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap, |
||||
const grpc_channel_args* args) |
||||
: XdsClient(std::move(bootstrap), |
||||
MakeOrphanable<GrpcXdsTransportFactory>(args), |
||||
GetResourceDurationFromArgs(args)) {} |
||||
|
||||
GrpcXdsClient::~GrpcXdsClient() { |
||||
MutexLock lock(g_mu); |
||||
if (g_xds_client == this) g_xds_client = nullptr; |
||||
} |
||||
|
||||
grpc_pollset_set* GrpcXdsClient::interested_parties() const { |
||||
return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory()) |
||||
->interested_parties(); |
||||
} |
||||
|
||||
#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client" |
||||
|
||||
namespace { |
||||
|
||||
void* XdsClientArgCopy(void* p) { |
||||
XdsClient* xds_client = static_cast<XdsClient*>(p); |
||||
xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); |
||||
return p; |
||||
} |
||||
|
||||
void XdsClientArgDestroy(void* p) { |
||||
XdsClient* xds_client = static_cast<XdsClient*>(p); |
||||
xds_client->Unref(DEBUG_LOCATION, "channel arg"); |
||||
} |
||||
|
||||
int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); } |
||||
|
||||
const grpc_arg_pointer_vtable kXdsClientArgVtable = { |
||||
XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp}; |
||||
|
||||
} // namespace
|
||||
|
||||
grpc_arg GrpcXdsClient::MakeChannelArg() const { |
||||
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT), |
||||
const_cast<GrpcXdsClient*>(this), |
||||
&kXdsClientArgVtable); |
||||
} |
||||
|
||||
RefCountedPtr<XdsClient> GrpcXdsClient::GetFromChannelArgs( |
||||
const grpc_channel_args& args) { |
||||
XdsClient* xds_client = |
||||
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT); |
||||
if (xds_client == nullptr) return nullptr; |
||||
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); |
||||
} |
||||
|
||||
namespace internal { |
||||
|
||||
void SetXdsChannelArgsForTest(grpc_channel_args* args) { |
||||
MutexLock lock(g_mu); |
||||
g_channel_args = args; |
||||
} |
||||
|
||||
void UnsetGlobalXdsClientForTest() { |
||||
MutexLock lock(g_mu); |
||||
g_xds_client = nullptr; |
||||
} |
||||
|
||||
void SetXdsFallbackBootstrapConfig(const char* config) { |
||||
MutexLock lock(g_mu); |
||||
gpr_free(g_fallback_bootstrap_config); |
||||
g_fallback_bootstrap_config = gpr_strdup(config); |
||||
} |
||||
|
||||
} // namespace internal
|
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
// The returned bytes may contain NULL(0), so we can't use c-string.
|
||||
grpc_slice grpc_dump_xds_configs(void) { |
||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
||||
grpc_core::ExecCtx exec_ctx; |
||||
grpc_error_handle error = GRPC_ERROR_NONE; |
||||
auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(nullptr, &error); |
||||
if (!GRPC_ERROR_IS_NONE(error)) { |
||||
// If we aren't using xDS, just return an empty string.
|
||||
GRPC_ERROR_UNREF(error); |
||||
return grpc_empty_slice(); |
||||
} |
||||
return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary()); |
||||
} |
@ -0,0 +1,65 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H |
||||
#define GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <memory> |
||||
|
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
|
||||
#include "src/core/ext/xds/xds_bootstrap.h" |
||||
#include "src/core/ext/xds/xds_client.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/iomgr_fwd.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class GrpcXdsClient : public XdsClient { |
||||
public: |
||||
// Factory function to get or create the global XdsClient instance.
|
||||
// If *error is not GRPC_ERROR_NONE upon return, then there was
|
||||
// an error initializing the client.
|
||||
static RefCountedPtr<XdsClient> GetOrCreate(const grpc_channel_args* args, |
||||
grpc_error_handle* error); |
||||
|
||||
// Do not instantiate directly -- use GetOrCreate() instead.
|
||||
GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap, |
||||
const grpc_channel_args* args); |
||||
~GrpcXdsClient() override; |
||||
|
||||
grpc_pollset_set* interested_parties() const; |
||||
|
||||
// Helpers for encoding the XdsClient object in channel args.
|
||||
grpc_arg MakeChannelArg() const; |
||||
static RefCountedPtr<XdsClient> GetFromChannelArgs( |
||||
const grpc_channel_args& args); |
||||
}; |
||||
|
||||
namespace internal { |
||||
void SetXdsChannelArgsForTest(grpc_channel_args* args); |
||||
void UnsetGlobalXdsClientForTest(); |
||||
// Sets bootstrap config to be used when no env var is set.
|
||||
// Does not take ownership of config.
|
||||
void SetXdsFallbackBootstrapConfig(const char* config); |
||||
} // namespace internal
|
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H
|
@ -0,0 +1,86 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H |
||||
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <functional> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/ext/xds/xds_bootstrap.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// A factory for creating new XdsTransport instances.
|
||||
class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> { |
||||
public: |
||||
// Represents a transport for xDS communication (e.g., a gRPC channel).
|
||||
class XdsTransport : public InternallyRefCounted<XdsTransport> { |
||||
public: |
||||
// Represents a bidi streaming RPC call.
|
||||
class StreamingCall : public InternallyRefCounted<StreamingCall> { |
||||
public: |
||||
// An interface for handling events on a streaming call.
|
||||
class EventHandler { |
||||
public: |
||||
virtual ~EventHandler() = default; |
||||
|
||||
// Called when a SendMessage() operation completes.
|
||||
virtual void OnRequestSent(bool ok) = 0; |
||||
// Called when a message is received on the stream.
|
||||
virtual void OnRecvMessage(absl::string_view payload) = 0; |
||||
// Called when status is received on the stream.
|
||||
virtual void OnStatusReceived(absl::Status status) = 0; |
||||
}; |
||||
|
||||
// Sends a message on the stream. When the message has been sent,
|
||||
// the EventHandler::OnRequestSent() method will be called.
|
||||
// Only one message will be in flight at a time; subsequent
|
||||
// messages will not be sent until this one is done.
|
||||
virtual void SendMessage(std::string payload) = 0; |
||||
}; |
||||
|
||||
// Create a streaming call on this transport for the specified method.
|
||||
// Events on the stream will be reported to event_handler.
|
||||
virtual OrphanablePtr<StreamingCall> CreateStreamingCall( |
||||
const char* method, |
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) = 0; |
||||
|
||||
// Resets connection backoff for the transport.
|
||||
virtual void ResetBackoff() = 0; |
||||
}; |
||||
|
||||
// Creates a new transport for the specified server.
|
||||
// The on_connectivity_failure callback will be invoked whenever there is
|
||||
// a connectivity failure on the transport.
|
||||
// *status will be set if there is an error creating the channel,
|
||||
// although the returned channel must still accept calls (which may fail).
|
||||
virtual OrphanablePtr<XdsTransport> Create( |
||||
const XdsBootstrap::XdsServer& server, |
||||
std::function<void(absl::Status)> on_connectivity_failure, |
||||
absl::Status* status) = 0; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H
|
@ -0,0 +1,359 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/xds/xds_transport_grpc.h" |
||||
|
||||
#include <string.h> |
||||
|
||||
#include <functional> |
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/container/inlined_vector.h" |
||||
|
||||
#include <grpc/byte_buffer.h> |
||||
#include <grpc/byte_buffer_reader.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/impl/codegen/connectivity_state.h> |
||||
#include <grpc/impl/codegen/propagation_bits.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/time.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/ext/xds/xds_bootstrap.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/pollset_set.h" |
||||
#include "src/core/lib/security/credentials/channel_creds_registry.h" |
||||
#include "src/core/lib/security/credentials/credentials.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/slice/slice_refcount.h" |
||||
#include "src/core/lib/surface/call.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/lame_client.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
//
|
||||
// GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
|
||||
//
|
||||
|
||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( |
||||
RefCountedPtr<GrpcXdsTransportFactory> factory, grpc_channel* channel, |
||||
const char* method, |
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) |
||||
: factory_(std::move(factory)), event_handler_(std::move(event_handler)) { |
||||
// Create call.
|
||||
call_ = grpc_channel_create_pollset_set_call( |
||||
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, factory_->interested_parties(), |
||||
StaticSlice::FromStaticString(method).c_slice(), nullptr, |
||||
Timestamp::InfFuture(), nullptr); |
||||
GPR_ASSERT(call_ != nullptr); |
||||
// Init data associated with the call.
|
||||
grpc_metadata_array_init(&initial_metadata_recv_); |
||||
grpc_metadata_array_init(&trailing_metadata_recv_); |
||||
// Initialize closure to be used for sending messages.
|
||||
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr); |
||||
// Start ops on the call.
|
||||
grpc_call_error call_error; |
||||
grpc_op ops[3]; |
||||
memset(ops, 0, sizeof(ops)); |
||||
// Send initial metadata. No callback for this, since we don't really
|
||||
// care when it finishes.
|
||||
grpc_op* op = ops; |
||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
||||
op->data.send_initial_metadata.count = 0; |
||||
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | |
||||
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
call_error = grpc_call_start_batch_and_execute( |
||||
call_, ops, static_cast<size_t>(op - ops), nullptr); |
||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
||||
// Start a batch with recv_initial_metadata and recv_message.
|
||||
op = ops; |
||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
||||
op->data.recv_initial_metadata.recv_initial_metadata = |
||||
&initial_metadata_recv_; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
op->op = GRPC_OP_RECV_MESSAGE; |
||||
op->data.recv_message.recv_message = &recv_message_payload_; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
Ref(DEBUG_LOCATION, "OnResponseReceived").release(); |
||||
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); |
||||
call_error = grpc_call_start_batch_and_execute( |
||||
call_, ops, static_cast<size_t>(op - ops), &on_response_received_); |
||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
||||
// Start a batch for recv_trailing_metadata.
|
||||
op = ops; |
||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; |
||||
op->data.recv_status_on_client.status = &status_code_; |
||||
op->data.recv_status_on_client.status_details = &status_details_; |
||||
op->flags = 0; |
||||
op->reserved = nullptr; |
||||
op++; |
||||
// This callback signals the end of the call, so it relies on the initial
|
||||
// ref instead of a new ref. When it's invoked, it's the initial ref that is
|
||||
// unreffed.
|
||||
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); |
||||
call_error = grpc_call_start_batch_and_execute( |
||||
call_, ops, static_cast<size_t>(op - ops), &on_status_received_); |
||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
||||
} |
||||
|
||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
||||
~GrpcStreamingCall() { |
||||
grpc_metadata_array_destroy(&initial_metadata_recv_); |
||||
grpc_metadata_array_destroy(&trailing_metadata_recv_); |
||||
grpc_byte_buffer_destroy(send_message_payload_); |
||||
grpc_byte_buffer_destroy(recv_message_payload_); |
||||
grpc_slice_unref_internal(status_details_); |
||||
GPR_ASSERT(call_ != nullptr); |
||||
grpc_call_unref(call_); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() { |
||||
GPR_ASSERT(call_ != nullptr); |
||||
// If we are here because xds_client wants to cancel the call,
|
||||
// OnStatusReceived() will complete the cancellation and clean up.
|
||||
// Otherwise, we are here because xds_client has to orphan a failed call,
|
||||
// in which case the following cancellation will be a no-op.
|
||||
grpc_call_cancel_internal(call_); |
||||
// Note that the initial ref is held by OnStatusReceived(), so the
|
||||
// corresponding unref happens there instead of here.
|
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage( |
||||
std::string payload) { |
||||
// Create payload.
|
||||
grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload)); |
||||
send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1); |
||||
grpc_slice_unref_internal(slice); |
||||
// Send the message.
|
||||
grpc_op op; |
||||
memset(&op, 0, sizeof(op)); |
||||
op.op = GRPC_OP_SEND_MESSAGE; |
||||
op.data.send_message.send_message = send_message_payload_; |
||||
Ref(DEBUG_LOCATION, "OnRequestSent").release(); |
||||
grpc_call_error call_error = |
||||
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); |
||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
||||
OnRequestSent(void* arg, grpc_error_handle error) { |
||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
||||
// Clean up the sent message.
|
||||
grpc_byte_buffer_destroy(self->send_message_payload_); |
||||
self->send_message_payload_ = nullptr; |
||||
// Invoke request handler.
|
||||
self->event_handler_->OnRequestSent(GRPC_ERROR_IS_NONE(error)); |
||||
// Drop the ref.
|
||||
self->Unref(DEBUG_LOCATION, "OnRequestSent"); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
||||
OnResponseReceived(void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
||||
// If there was no payload, then we received status before we received
|
||||
// another message, so we stop reading.
|
||||
if (self->recv_message_payload_ == nullptr) { |
||||
self->Unref(DEBUG_LOCATION, "OnResponseReceived"); |
||||
return; |
||||
} |
||||
// Process the response.
|
||||
grpc_byte_buffer_reader bbr; |
||||
grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_); |
||||
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
||||
grpc_byte_buffer_reader_destroy(&bbr); |
||||
grpc_byte_buffer_destroy(self->recv_message_payload_); |
||||
self->recv_message_payload_ = nullptr; |
||||
self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice)); |
||||
grpc_slice_unref_internal(response_slice); |
||||
// Keep reading.
|
||||
grpc_op op; |
||||
memset(&op, 0, sizeof(op)); |
||||
op.op = GRPC_OP_RECV_MESSAGE; |
||||
op.data.recv_message.recv_message = &self->recv_message_payload_; |
||||
GPR_ASSERT(self->call_ != nullptr); |
||||
// Reuses the "OnResponseReceived" ref taken in ctor.
|
||||
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
||||
self->call_, &op, 1, &self->on_response_received_); |
||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
||||
OnStatusReceived(void* arg, grpc_error_handle /*error*/) { |
||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
||||
self->event_handler_->OnStatusReceived( |
||||
absl::Status(static_cast<absl::StatusCode>(self->status_code_), |
||||
StringViewFromSlice(self->status_details_))); |
||||
self->Unref(DEBUG_LOCATION, "OnStatusReceived"); |
||||
} |
||||
|
||||
//
|
||||
// GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
|
||||
//
|
||||
|
||||
class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher |
||||
: public AsyncConnectivityStateWatcherInterface { |
||||
public: |
||||
explicit StateWatcher( |
||||
std::function<void(absl::Status)> on_connectivity_failure) |
||||
: on_connectivity_failure_(std::move(on_connectivity_failure)) {} |
||||
|
||||
private: |
||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
||||
const absl::Status& status) override { |
||||
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||
on_connectivity_failure_(status); |
||||
} |
||||
} |
||||
|
||||
std::function<void(absl::Status)> on_connectivity_failure_; |
||||
}; |
||||
|
||||
//
|
||||
// GrpcXdsClient::GrpcXdsTransport
|
||||
//
|
||||
|
||||
namespace { |
||||
|
||||
grpc_channel* CreateXdsChannel(grpc_channel_args* args, |
||||
const XdsBootstrap::XdsServer& server) { |
||||
RefCountedPtr<grpc_channel_credentials> channel_creds = |
||||
CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds( |
||||
server.channel_creds_type, server.channel_creds_config); |
||||
return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(), |
||||
args); |
||||
} |
||||
|
||||
bool IsLameChannel(grpc_channel* channel) { |
||||
grpc_channel_element* elem = |
||||
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); |
||||
return elem->filter == &LameClientFilter::kFilter; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( |
||||
GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server, |
||||
std::function<void(absl::Status)> on_connectivity_failure, |
||||
absl::Status* status) |
||||
: factory_(factory) { |
||||
channel_ = CreateXdsChannel(factory->args_, server); |
||||
GPR_ASSERT(channel_ != nullptr); |
||||
if (IsLameChannel(channel_)) { |
||||
*status = absl::UnavailableError("xds client has a lame channel"); |
||||
} else { |
||||
ClientChannel* client_channel = |
||||
ClientChannel::GetFromChannel(Channel::FromC(channel_)); |
||||
GPR_ASSERT(client_channel != nullptr); |
||||
watcher_ = new StateWatcher(std::move(on_connectivity_failure)); |
||||
client_channel->AddConnectivityWatcher( |
||||
GRPC_CHANNEL_IDLE, |
||||
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_)); |
||||
} |
||||
} |
||||
|
||||
GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() { |
||||
grpc_channel_destroy(channel_); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { |
||||
if (!IsLameChannel(channel_)) { |
||||
ClientChannel* client_channel = |
||||
ClientChannel::GetFromChannel(Channel::FromC(channel_)); |
||||
GPR_ASSERT(client_channel != nullptr); |
||||
client_channel->RemoveConnectivityWatcher(watcher_); |
||||
} |
||||
Unref(); |
||||
} |
||||
|
||||
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> |
||||
GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall( |
||||
const char* method, |
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) { |
||||
return MakeOrphanable<GrpcStreamingCall>( |
||||
factory_->Ref(DEBUG_LOCATION, "StreamingCall"), channel_, method, |
||||
std::move(event_handler)); |
||||
} |
||||
|
||||
void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() { |
||||
grpc_channel_reset_connect_backoff(channel_); |
||||
} |
||||
|
||||
//
|
||||
// GrpcXdsTransportFactory
|
||||
//
|
||||
|
||||
namespace { |
||||
|
||||
grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) { |
||||
absl::InlinedVector<grpc_arg, 1> args_to_add = { |
||||
grpc_channel_arg_integer_create( |
||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), |
||||
5 * 60 * GPR_MS_PER_SEC), |
||||
}; |
||||
return grpc_channel_args_copy_and_add(args, args_to_add.data(), |
||||
args_to_add.size()); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
GrpcXdsTransportFactory::GrpcXdsTransportFactory(const grpc_channel_args* args) |
||||
: args_(ModifyChannelArgs(args)), |
||||
interested_parties_(grpc_pollset_set_create()) { |
||||
// Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
|
||||
// destroyed.
|
||||
grpc_init(); |
||||
} |
||||
|
||||
GrpcXdsTransportFactory::~GrpcXdsTransportFactory() { |
||||
grpc_channel_args_destroy(args_); |
||||
grpc_pollset_set_destroy(interested_parties_); |
||||
// Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
|
||||
// is destroyed.
|
||||
grpc_shutdown(); |
||||
} |
||||
|
||||
OrphanablePtr<XdsTransportFactory::XdsTransport> |
||||
GrpcXdsTransportFactory::Create( |
||||
const XdsBootstrap::XdsServer& server, |
||||
std::function<void(absl::Status)> on_connectivity_failure, |
||||
absl::Status* status) { |
||||
return MakeOrphanable<GrpcXdsTransport>( |
||||
this, server, std::move(on_connectivity_failure), status); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,134 @@ |
||||
//
|
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H |
||||
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <functional> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include <grpc/impl/codegen/grpc_types.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/status.h> |
||||
|
||||
#include "src/core/ext/xds/xds_bootstrap.h" |
||||
#include "src/core/ext/xds/xds_transport.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/iomgr_fwd.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class GrpcXdsTransportFactory : public XdsTransportFactory { |
||||
public: |
||||
class GrpcXdsTransport; |
||||
|
||||
explicit GrpcXdsTransportFactory(const grpc_channel_args* args); |
||||
~GrpcXdsTransportFactory() override; |
||||
|
||||
void Orphan() override { Unref(); } |
||||
|
||||
OrphanablePtr<XdsTransport> Create( |
||||
const XdsBootstrap::XdsServer& server, |
||||
std::function<void(absl::Status)> on_connectivity_failure, |
||||
absl::Status* status) override; |
||||
|
||||
grpc_pollset_set* interested_parties() const { return interested_parties_; } |
||||
|
||||
private: |
||||
grpc_channel_args* args_; |
||||
grpc_pollset_set* interested_parties_; |
||||
}; |
||||
|
||||
class GrpcXdsTransportFactory::GrpcXdsTransport |
||||
: public XdsTransportFactory::XdsTransport { |
||||
public: |
||||
class GrpcStreamingCall; |
||||
|
||||
GrpcXdsTransport(GrpcXdsTransportFactory* factory, |
||||
const XdsBootstrap::XdsServer& server, |
||||
std::function<void(absl::Status)> on_connectivity_failure, |
||||
absl::Status* status); |
||||
~GrpcXdsTransport() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
OrphanablePtr<StreamingCall> CreateStreamingCall( |
||||
const char* method, |
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) override; |
||||
|
||||
void ResetBackoff() override; |
||||
|
||||
private: |
||||
class StateWatcher; |
||||
|
||||
GrpcXdsTransportFactory* factory_; // Not owned.
|
||||
grpc_channel* channel_; |
||||
StateWatcher* watcher_; |
||||
}; |
||||
|
||||
class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall |
||||
: public XdsTransportFactory::XdsTransport::StreamingCall { |
||||
public: |
||||
GrpcStreamingCall(RefCountedPtr<GrpcXdsTransportFactory> factory, |
||||
grpc_channel* channel, const char* method, |
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler); |
||||
~GrpcStreamingCall() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
void SendMessage(std::string payload) override; |
||||
|
||||
private: |
||||
static void OnRequestSent(void* arg, grpc_error_handle error); |
||||
static void OnResponseReceived(void* arg, grpc_error_handle /*error*/); |
||||
static void OnStatusReceived(void* arg, grpc_error_handle /*error*/); |
||||
|
||||
RefCountedPtr<GrpcXdsTransportFactory> factory_; |
||||
|
||||
std::unique_ptr<StreamingCall::EventHandler> event_handler_; |
||||
|
||||
// Always non-NULL.
|
||||
grpc_call* call_; |
||||
|
||||
// recv_initial_metadata
|
||||
grpc_metadata_array initial_metadata_recv_; |
||||
|
||||
// send_message
|
||||
grpc_byte_buffer* send_message_payload_ = nullptr; |
||||
grpc_closure on_request_sent_; |
||||
|
||||
// recv_message
|
||||
grpc_byte_buffer* recv_message_payload_ = nullptr; |
||||
grpc_closure on_response_received_; |
||||
|
||||
// recv_trailing_metadata
|
||||
grpc_metadata_array trailing_metadata_recv_; |
||||
grpc_status_code status_code_; |
||||
grpc_slice status_details_; |
||||
grpc_closure on_status_received_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H
|
Loading…
Reference in new issue