This reverts commit bd9bc5fd3e
.
pull/30077/head
parent
71c0eb090a
commit
fa57b9d0bc
30 changed files with 1070 additions and 1368 deletions
File diff suppressed because it is too large
Load Diff
@ -1,275 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_client_grpc.h" |
|
||||||
|
|
||||||
#include <limits.h> |
|
||||||
|
|
||||||
#include <memory> |
|
||||||
#include <string> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/base/thread_annotations.h" |
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
|
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/slice.h> |
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/string_util.h> |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_bootstrap.h" |
|
||||||
#include "src/core/ext/xds/xds_channel_args.h" |
|
||||||
#include "src/core/ext/xds/xds_cluster_specifier_plugin.h" |
|
||||||
#include "src/core/ext/xds/xds_http_filters.h" |
|
||||||
#include "src/core/ext/xds/xds_transport.h" |
|
||||||
#include "src/core/ext/xds/xds_transport_grpc.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/debug/trace.h" |
|
||||||
#include "src/core/lib/gpr/env.h" |
|
||||||
#include "src/core/lib/gpr/useful.h" |
|
||||||
#include "src/core/lib/gprpp/debug_location.h" |
|
||||||
#include "src/core/lib/gprpp/memory.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/gprpp/sync.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
#include "src/core/lib/iomgr/exec_ctx.h" |
|
||||||
#include "src/core/lib/iomgr/load_file.h" |
|
||||||
#include "src/core/lib/slice/slice_internal.h" |
|
||||||
#include "src/core/lib/slice/slice_refcount.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
Mutex* g_mu = nullptr; |
|
||||||
const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr; |
|
||||||
XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr; |
|
||||||
char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr; |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
void XdsClientGlobalInit() { |
|
||||||
g_mu = new Mutex; |
|
||||||
XdsHttpFilterRegistry::Init(); |
|
||||||
XdsClusterSpecifierPluginRegistry::Init(); |
|
||||||
} |
|
||||||
|
|
||||||
// TODO(roth): Find a better way to clear the fallback config that does
|
|
||||||
// not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
|
|
||||||
void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS { |
|
||||||
gpr_free(g_fallback_bootstrap_config); |
|
||||||
g_fallback_bootstrap_config = nullptr; |
|
||||||
delete g_mu; |
|
||||||
g_mu = nullptr; |
|
||||||
XdsHttpFilterRegistry::Shutdown(); |
|
||||||
XdsClusterSpecifierPluginRegistry::Shutdown(); |
|
||||||
} |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
std::string GetBootstrapContents(const char* fallback_config, |
|
||||||
grpc_error_handle* error) { |
|
||||||
// First, try GRPC_XDS_BOOTSTRAP env var.
|
|
||||||
UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP")); |
|
||||||
if (path != nullptr) { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
||||||
gpr_log(GPR_INFO, |
|
||||||
"Got bootstrap file location from GRPC_XDS_BOOTSTRAP " |
|
||||||
"environment variable: %s", |
|
||||||
path.get()); |
|
||||||
} |
|
||||||
grpc_slice contents; |
|
||||||
*error = |
|
||||||
grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents); |
|
||||||
if (!GRPC_ERROR_IS_NONE(*error)) return ""; |
|
||||||
std::string contents_str(StringViewFromSlice(contents)); |
|
||||||
grpc_slice_unref_internal(contents); |
|
||||||
return contents_str; |
|
||||||
} |
|
||||||
// Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
|
|
||||||
UniquePtr<char> env_config(gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG")); |
|
||||||
if (env_config != nullptr) { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
||||||
gpr_log(GPR_INFO, |
|
||||||
"Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG " |
|
||||||
"environment variable"); |
|
||||||
} |
|
||||||
return env_config.get(); |
|
||||||
} |
|
||||||
// Finally, try fallback config.
|
|
||||||
if (fallback_config != nullptr) { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
||||||
gpr_log(GPR_INFO, "Got bootstrap contents from fallback config"); |
|
||||||
} |
|
||||||
return fallback_config; |
|
||||||
} |
|
||||||
// No bootstrap config found.
|
|
||||||
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
|
||||||
"Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG " |
|
||||||
"not defined"); |
|
||||||
return ""; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
RefCountedPtr<XdsClient> GrpcXdsClient::GetOrCreate( |
|
||||||
const grpc_channel_args* args, grpc_error_handle* error) { |
|
||||||
RefCountedPtr<XdsClient> xds_client; |
|
||||||
// If getting bootstrap from channel args, create a local XdsClient
|
|
||||||
// instance for the channel or server instead of using the global instance.
|
|
||||||
const char* bootstrap_config = grpc_channel_args_find_string( |
|
||||||
args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG); |
|
||||||
if (bootstrap_config != nullptr) { |
|
||||||
std::unique_ptr<XdsBootstrap> bootstrap = |
|
||||||
XdsBootstrap::Create(bootstrap_config, error); |
|
||||||
if (GRPC_ERROR_IS_NONE(*error)) { |
|
||||||
grpc_channel_args* xds_channel_args = |
|
||||||
grpc_channel_args_find_pointer<grpc_channel_args>( |
|
||||||
args, |
|
||||||
GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS); |
|
||||||
return MakeRefCounted<GrpcXdsClient>(std::move(bootstrap), |
|
||||||
xds_channel_args); |
|
||||||
} |
|
||||||
return nullptr; |
|
||||||
} |
|
||||||
// Otherwise, use the global instance.
|
|
||||||
{ |
|
||||||
MutexLock lock(g_mu); |
|
||||||
if (g_xds_client != nullptr) { |
|
||||||
auto xds_client = g_xds_client->RefIfNonZero(); |
|
||||||
if (xds_client != nullptr) return xds_client; |
|
||||||
} |
|
||||||
// Find bootstrap contents.
|
|
||||||
std::string bootstrap_contents = |
|
||||||
GetBootstrapContents(g_fallback_bootstrap_config, error); |
|
||||||
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) { |
|
||||||
gpr_log(GPR_INFO, "xDS bootstrap contents: %s", |
|
||||||
bootstrap_contents.c_str()); |
|
||||||
} |
|
||||||
// Parse bootstrap.
|
|
||||||
std::unique_ptr<XdsBootstrap> bootstrap = |
|
||||||
XdsBootstrap::Create(bootstrap_contents, error); |
|
||||||
if (!GRPC_ERROR_IS_NONE(*error)) return nullptr; |
|
||||||
// Instantiate XdsClient.
|
|
||||||
xds_client = |
|
||||||
MakeRefCounted<GrpcXdsClient>(std::move(bootstrap), g_channel_args); |
|
||||||
g_xds_client = xds_client.get(); |
|
||||||
} |
|
||||||
return xds_client; |
|
||||||
} |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
Duration GetResourceDurationFromArgs(const grpc_channel_args* args) { |
|
||||||
return Duration::Milliseconds(grpc_channel_args_find_integer( |
|
||||||
args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS, |
|
||||||
{15000, 0, INT_MAX})); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
GrpcXdsClient::GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap, |
|
||||||
const grpc_channel_args* args) |
|
||||||
: XdsClient(std::move(bootstrap), |
|
||||||
MakeOrphanable<GrpcXdsTransportFactory>(args), |
|
||||||
GetResourceDurationFromArgs(args)) {} |
|
||||||
|
|
||||||
GrpcXdsClient::~GrpcXdsClient() { |
|
||||||
MutexLock lock(g_mu); |
|
||||||
if (g_xds_client == this) g_xds_client = nullptr; |
|
||||||
} |
|
||||||
|
|
||||||
grpc_pollset_set* GrpcXdsClient::interested_parties() const { |
|
||||||
return reinterpret_cast<GrpcXdsTransportFactory*>(transport_factory()) |
|
||||||
->interested_parties(); |
|
||||||
} |
|
||||||
|
|
||||||
#define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client" |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
void* XdsClientArgCopy(void* p) { |
|
||||||
XdsClient* xds_client = static_cast<XdsClient*>(p); |
|
||||||
xds_client->Ref(DEBUG_LOCATION, "channel arg").release(); |
|
||||||
return p; |
|
||||||
} |
|
||||||
|
|
||||||
void XdsClientArgDestroy(void* p) { |
|
||||||
XdsClient* xds_client = static_cast<XdsClient*>(p); |
|
||||||
xds_client->Unref(DEBUG_LOCATION, "channel arg"); |
|
||||||
} |
|
||||||
|
|
||||||
int XdsClientArgCmp(void* p, void* q) { return QsortCompare(p, q); } |
|
||||||
|
|
||||||
const grpc_arg_pointer_vtable kXdsClientArgVtable = { |
|
||||||
XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp}; |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
grpc_arg GrpcXdsClient::MakeChannelArg() const { |
|
||||||
return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT), |
|
||||||
const_cast<GrpcXdsClient*>(this), |
|
||||||
&kXdsClientArgVtable); |
|
||||||
} |
|
||||||
|
|
||||||
RefCountedPtr<XdsClient> GrpcXdsClient::GetFromChannelArgs( |
|
||||||
const grpc_channel_args& args) { |
|
||||||
XdsClient* xds_client = |
|
||||||
grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT); |
|
||||||
if (xds_client == nullptr) return nullptr; |
|
||||||
return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs"); |
|
||||||
} |
|
||||||
|
|
||||||
namespace internal { |
|
||||||
|
|
||||||
void SetXdsChannelArgsForTest(grpc_channel_args* args) { |
|
||||||
MutexLock lock(g_mu); |
|
||||||
g_channel_args = args; |
|
||||||
} |
|
||||||
|
|
||||||
void UnsetGlobalXdsClientForTest() { |
|
||||||
MutexLock lock(g_mu); |
|
||||||
g_xds_client = nullptr; |
|
||||||
} |
|
||||||
|
|
||||||
void SetXdsFallbackBootstrapConfig(const char* config) { |
|
||||||
MutexLock lock(g_mu); |
|
||||||
gpr_free(g_fallback_bootstrap_config); |
|
||||||
g_fallback_bootstrap_config = gpr_strdup(config); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace internal
|
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
// The returned bytes may contain NULL(0), so we can't use c-string.
|
|
||||||
grpc_slice grpc_dump_xds_configs(void) { |
|
||||||
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; |
|
||||||
grpc_core::ExecCtx exec_ctx; |
|
||||||
grpc_error_handle error = GRPC_ERROR_NONE; |
|
||||||
auto xds_client = grpc_core::GrpcXdsClient::GetOrCreate(nullptr, &error); |
|
||||||
if (!GRPC_ERROR_IS_NONE(error)) { |
|
||||||
// If we aren't using xDS, just return an empty string.
|
|
||||||
GRPC_ERROR_UNREF(error); |
|
||||||
return grpc_empty_slice(); |
|
||||||
} |
|
||||||
return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary()); |
|
||||||
} |
|
@ -1,65 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H |
|
||||||
#define GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <memory> |
|
||||||
|
|
||||||
#include <grpc/impl/codegen/grpc_types.h> |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_bootstrap.h" |
|
||||||
#include "src/core/ext/xds/xds_client.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/iomgr/error.h" |
|
||||||
#include "src/core/lib/iomgr/iomgr_fwd.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
class GrpcXdsClient : public XdsClient { |
|
||||||
public: |
|
||||||
// Factory function to get or create the global XdsClient instance.
|
|
||||||
// If *error is not GRPC_ERROR_NONE upon return, then there was
|
|
||||||
// an error initializing the client.
|
|
||||||
static RefCountedPtr<XdsClient> GetOrCreate(const grpc_channel_args* args, |
|
||||||
grpc_error_handle* error); |
|
||||||
|
|
||||||
// Do not instantiate directly -- use GetOrCreate() instead.
|
|
||||||
GrpcXdsClient(std::unique_ptr<XdsBootstrap> bootstrap, |
|
||||||
const grpc_channel_args* args); |
|
||||||
~GrpcXdsClient() override; |
|
||||||
|
|
||||||
grpc_pollset_set* interested_parties() const; |
|
||||||
|
|
||||||
// Helpers for encoding the XdsClient object in channel args.
|
|
||||||
grpc_arg MakeChannelArg() const; |
|
||||||
static RefCountedPtr<XdsClient> GetFromChannelArgs( |
|
||||||
const grpc_channel_args& args); |
|
||||||
}; |
|
||||||
|
|
||||||
namespace internal { |
|
||||||
void SetXdsChannelArgsForTest(grpc_channel_args* args); |
|
||||||
void UnsetGlobalXdsClientForTest(); |
|
||||||
// Sets bootstrap config to be used when no env var is set.
|
|
||||||
// Does not take ownership of config.
|
|
||||||
void SetXdsFallbackBootstrapConfig(const char* config); |
|
||||||
} // namespace internal
|
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_GRPC_H
|
|
@ -1,86 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H |
|
||||||
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <string> |
|
||||||
|
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_bootstrap.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
// A factory for creating new XdsTransport instances.
|
|
||||||
class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> { |
|
||||||
public: |
|
||||||
// Represents a transport for xDS communication (e.g., a gRPC channel).
|
|
||||||
class XdsTransport : public InternallyRefCounted<XdsTransport> { |
|
||||||
public: |
|
||||||
// Represents a bidi streaming RPC call.
|
|
||||||
class StreamingCall : public InternallyRefCounted<StreamingCall> { |
|
||||||
public: |
|
||||||
// An interface for handling events on a streaming call.
|
|
||||||
class EventHandler { |
|
||||||
public: |
|
||||||
virtual ~EventHandler() = default; |
|
||||||
|
|
||||||
// Called when a SendMessage() operation completes.
|
|
||||||
virtual void OnRequestSent(bool ok) = 0; |
|
||||||
// Called when a message is received on the stream.
|
|
||||||
virtual void OnRecvMessage(absl::string_view payload) = 0; |
|
||||||
// Called when status is received on the stream.
|
|
||||||
virtual void OnStatusReceived(absl::Status status) = 0; |
|
||||||
}; |
|
||||||
|
|
||||||
// Sends a message on the stream. When the message has been sent,
|
|
||||||
// the EventHandler::OnRequestSent() method will be called.
|
|
||||||
// Only one message will be in flight at a time; subsequent
|
|
||||||
// messages will not be sent until this one is done.
|
|
||||||
virtual void SendMessage(std::string payload) = 0; |
|
||||||
}; |
|
||||||
|
|
||||||
// Create a streaming call on this transport for the specified method.
|
|
||||||
// Events on the stream will be reported to event_handler.
|
|
||||||
virtual OrphanablePtr<StreamingCall> CreateStreamingCall( |
|
||||||
const char* method, |
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) = 0; |
|
||||||
|
|
||||||
// Resets connection backoff for the transport.
|
|
||||||
virtual void ResetBackoff() = 0; |
|
||||||
}; |
|
||||||
|
|
||||||
// Creates a new transport for the specified server.
|
|
||||||
// The on_connectivity_failure callback will be invoked whenever there is
|
|
||||||
// a connectivity failure on the transport.
|
|
||||||
// *status will be set if there is an error creating the channel,
|
|
||||||
// although the returned channel must still accept calls (which may fail).
|
|
||||||
virtual OrphanablePtr<XdsTransport> Create( |
|
||||||
const XdsBootstrap::XdsServer& server, |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure, |
|
||||||
absl::Status* status) = 0; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_H
|
|
@ -1,358 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_transport_grpc.h" |
|
||||||
|
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/container/inlined_vector.h" |
|
||||||
|
|
||||||
#include <grpc/byte_buffer.h> |
|
||||||
#include <grpc/byte_buffer_reader.h> |
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/impl/codegen/connectivity_state.h> |
|
||||||
#include <grpc/impl/codegen/propagation_bits.h> |
|
||||||
#include <grpc/slice.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/time.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
|
||||||
#include "src/core/ext/xds/xds_bootstrap.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_fwd.h" |
|
||||||
#include "src/core/lib/channel/channel_stack.h" |
|
||||||
#include "src/core/lib/config/core_configuration.h" |
|
||||||
#include "src/core/lib/gprpp/debug_location.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/gprpp/time.h" |
|
||||||
#include "src/core/lib/iomgr/closure.h" |
|
||||||
#include "src/core/lib/iomgr/pollset_set.h" |
|
||||||
#include "src/core/lib/security/credentials/channel_creds_registry.h" |
|
||||||
#include "src/core/lib/security/credentials/credentials.h" |
|
||||||
#include "src/core/lib/slice/slice.h" |
|
||||||
#include "src/core/lib/slice/slice_internal.h" |
|
||||||
#include "src/core/lib/slice/slice_refcount.h" |
|
||||||
#include "src/core/lib/surface/call.h" |
|
||||||
#include "src/core/lib/surface/channel.h" |
|
||||||
#include "src/core/lib/surface/lame_client.h" |
|
||||||
#include "src/core/lib/transport/connectivity_state.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
//
|
|
||||||
// GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
|
|
||||||
//
|
|
||||||
|
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( |
|
||||||
RefCountedPtr<GrpcXdsTransportFactory> factory, grpc_channel* channel, |
|
||||||
const char* method, |
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) |
|
||||||
: factory_(std::move(factory)), event_handler_(std::move(event_handler)) { |
|
||||||
// Create call.
|
|
||||||
call_ = grpc_channel_create_pollset_set_call( |
|
||||||
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, factory_->interested_parties(), |
|
||||||
StaticSlice::FromStaticString(method).c_slice(), nullptr, |
|
||||||
Timestamp::InfFuture(), nullptr); |
|
||||||
GPR_ASSERT(call_ != nullptr); |
|
||||||
// Init data associated with the call.
|
|
||||||
grpc_metadata_array_init(&initial_metadata_recv_); |
|
||||||
grpc_metadata_array_init(&trailing_metadata_recv_); |
|
||||||
// Initialize closure to be used for sending messages.
|
|
||||||
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr); |
|
||||||
// Start ops on the call.
|
|
||||||
grpc_call_error call_error; |
|
||||||
grpc_op ops[3]; |
|
||||||
memset(ops, 0, sizeof(ops)); |
|
||||||
// Send initial metadata. No callback for this, since we don't really
|
|
||||||
// care when it finishes.
|
|
||||||
grpc_op* op = ops; |
|
||||||
op->op = GRPC_OP_SEND_INITIAL_METADATA; |
|
||||||
op->data.send_initial_metadata.count = 0; |
|
||||||
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | |
|
||||||
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
call_error = grpc_call_start_batch_and_execute( |
|
||||||
call_, ops, static_cast<size_t>(op - ops), nullptr); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
||||||
// Start a batch with recv_initial_metadata and recv_message.
|
|
||||||
op = ops; |
|
||||||
op->op = GRPC_OP_RECV_INITIAL_METADATA; |
|
||||||
op->data.recv_initial_metadata.recv_initial_metadata = |
|
||||||
&initial_metadata_recv_; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
op->op = GRPC_OP_RECV_MESSAGE; |
|
||||||
op->data.recv_message.recv_message = &recv_message_payload_; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
Ref(DEBUG_LOCATION, "OnResponseReceived").release(); |
|
||||||
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); |
|
||||||
call_error = grpc_call_start_batch_and_execute( |
|
||||||
call_, ops, static_cast<size_t>(op - ops), &on_response_received_); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
||||||
// Start a batch for recv_trailing_metadata.
|
|
||||||
op = ops; |
|
||||||
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
|
||||||
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; |
|
||||||
op->data.recv_status_on_client.status = &status_code_; |
|
||||||
op->data.recv_status_on_client.status_details = &status_details_; |
|
||||||
op->flags = 0; |
|
||||||
op->reserved = nullptr; |
|
||||||
op++; |
|
||||||
// This callback signals the end of the call, so it relies on the initial
|
|
||||||
// ref instead of a new ref. When it's invoked, it's the initial ref that is
|
|
||||||
// unreffed.
|
|
||||||
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); |
|
||||||
call_error = grpc_call_start_batch_and_execute( |
|
||||||
call_, ops, static_cast<size_t>(op - ops), &on_status_received_); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
||||||
} |
|
||||||
|
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
||||||
~GrpcStreamingCall() { |
|
||||||
grpc_metadata_array_destroy(&initial_metadata_recv_); |
|
||||||
grpc_metadata_array_destroy(&trailing_metadata_recv_); |
|
||||||
grpc_byte_buffer_destroy(send_message_payload_); |
|
||||||
grpc_byte_buffer_destroy(recv_message_payload_); |
|
||||||
grpc_slice_unref_internal(status_details_); |
|
||||||
GPR_ASSERT(call_ != nullptr); |
|
||||||
grpc_call_unref(call_); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() { |
|
||||||
GPR_ASSERT(call_ != nullptr); |
|
||||||
// If we are here because xds_client wants to cancel the call,
|
|
||||||
// OnStatusReceived() will complete the cancellation and clean up.
|
|
||||||
// Otherwise, we are here because xds_client has to orphan a failed call,
|
|
||||||
// in which case the following cancellation will be a no-op.
|
|
||||||
grpc_call_cancel_internal(call_); |
|
||||||
// Note that the initial ref is held by OnStatusReceived(), so the
|
|
||||||
// corresponding unref happens there instead of here.
|
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage( |
|
||||||
std::string payload) { |
|
||||||
// Create payload.
|
|
||||||
grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload)); |
|
||||||
send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1); |
|
||||||
grpc_slice_unref_internal(slice); |
|
||||||
// Send the message.
|
|
||||||
grpc_op op; |
|
||||||
memset(&op, 0, sizeof(op)); |
|
||||||
op.op = GRPC_OP_SEND_MESSAGE; |
|
||||||
op.data.send_message.send_message = send_message_payload_; |
|
||||||
Ref(DEBUG_LOCATION, "OnRequestSent").release(); |
|
||||||
grpc_call_error call_error = |
|
||||||
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
||||||
OnRequestSent(void* arg, grpc_error_handle error) { |
|
||||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
|
||||||
// Clean up the sent message.
|
|
||||||
grpc_byte_buffer_destroy(self->send_message_payload_); |
|
||||||
self->send_message_payload_ = nullptr; |
|
||||||
// Invoke request handler.
|
|
||||||
self->event_handler_->OnRequestSent(GRPC_ERROR_IS_NONE(error)); |
|
||||||
// Drop the ref.
|
|
||||||
self->Unref(DEBUG_LOCATION, "OnRequestSent"); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
||||||
OnResponseReceived(void* arg, grpc_error_handle /*error*/) { |
|
||||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
|
||||||
// If there was no payload, then we received status before we received
|
|
||||||
// another message, so we stop reading.
|
|
||||||
if (self->recv_message_payload_ == nullptr) { |
|
||||||
self->Unref(DEBUG_LOCATION, "OnResponseReceived"); |
|
||||||
return; |
|
||||||
} |
|
||||||
// Process the response.
|
|
||||||
grpc_byte_buffer_reader bbr; |
|
||||||
grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_); |
|
||||||
grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); |
|
||||||
grpc_byte_buffer_reader_destroy(&bbr); |
|
||||||
grpc_byte_buffer_destroy(self->recv_message_payload_); |
|
||||||
self->recv_message_payload_ = nullptr; |
|
||||||
self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice)); |
|
||||||
grpc_slice_unref_internal(response_slice); |
|
||||||
// Keep reading.
|
|
||||||
grpc_op op; |
|
||||||
memset(&op, 0, sizeof(op)); |
|
||||||
op.op = GRPC_OP_RECV_MESSAGE; |
|
||||||
op.data.recv_message.recv_message = &self->recv_message_payload_; |
|
||||||
GPR_ASSERT(self->call_ != nullptr); |
|
||||||
// Reuses the "OnResponseReceived" ref taken in ctor.
|
|
||||||
const grpc_call_error call_error = grpc_call_start_batch_and_execute( |
|
||||||
self->call_, &op, 1, &self->on_response_received_); |
|
||||||
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
||||||
OnStatusReceived(void* arg, grpc_error_handle /*error*/) { |
|
||||||
auto* self = static_cast<GrpcStreamingCall*>(arg); |
|
||||||
self->event_handler_->OnStatusReceived( |
|
||||||
absl::Status(static_cast<absl::StatusCode>(self->status_code_), |
|
||||||
StringViewFromSlice(self->status_details_))); |
|
||||||
self->Unref(DEBUG_LOCATION, "OnStatusReceived"); |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
|
|
||||||
//
|
|
||||||
|
|
||||||
class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher |
|
||||||
: public AsyncConnectivityStateWatcherInterface { |
|
||||||
public: |
|
||||||
explicit StateWatcher( |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure) |
|
||||||
: on_connectivity_failure_(std::move(on_connectivity_failure)) {} |
|
||||||
|
|
||||||
private: |
|
||||||
void OnConnectivityStateChange(grpc_connectivity_state new_state, |
|
||||||
const absl::Status& status) override { |
|
||||||
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
|
||||||
on_connectivity_failure_(status); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
std::function<void(absl::Status)> on_connectivity_failure_; |
|
||||||
}; |
|
||||||
|
|
||||||
//
|
|
||||||
// GrpcXdsClient::GrpcXdsTransport
|
|
||||||
//
|
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
grpc_channel* CreateXdsChannel(grpc_channel_args* args, |
|
||||||
const XdsBootstrap::XdsServer& server) { |
|
||||||
RefCountedPtr<grpc_channel_credentials> channel_creds = |
|
||||||
CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds( |
|
||||||
server.channel_creds_type, server.channel_creds_config); |
|
||||||
return grpc_channel_create(server.server_uri.c_str(), channel_creds.get(), |
|
||||||
args); |
|
||||||
} |
|
||||||
|
|
||||||
bool IsLameChannel(grpc_channel* channel) { |
|
||||||
grpc_channel_element* elem = |
|
||||||
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); |
|
||||||
return elem->filter == &LameClientFilter::kFilter; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( |
|
||||||
GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server, |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure, |
|
||||||
absl::Status* status) |
|
||||||
: factory_(factory) { |
|
||||||
channel_ = CreateXdsChannel(factory->args_, server); |
|
||||||
GPR_ASSERT(channel_ != nullptr); |
|
||||||
if (IsLameChannel(channel_)) { |
|
||||||
*status = absl::UnavailableError("xds client has a lame channel"); |
|
||||||
} else { |
|
||||||
ClientChannel* client_channel = |
|
||||||
ClientChannel::GetFromChannel(Channel::FromC(channel_)); |
|
||||||
GPR_ASSERT(client_channel != nullptr); |
|
||||||
watcher_ = new StateWatcher(std::move(on_connectivity_failure)); |
|
||||||
client_channel->AddConnectivityWatcher( |
|
||||||
GRPC_CHANNEL_IDLE, |
|
||||||
OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_)); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() { |
|
||||||
grpc_channel_destroy(channel_); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() { |
|
||||||
if (IsLameChannel(channel_)) return; |
|
||||||
ClientChannel* client_channel = |
|
||||||
ClientChannel::GetFromChannel(Channel::FromC(channel_)); |
|
||||||
GPR_ASSERT(client_channel != nullptr); |
|
||||||
client_channel->RemoveConnectivityWatcher(watcher_); |
|
||||||
Unref(); |
|
||||||
} |
|
||||||
|
|
||||||
OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall> |
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall( |
|
||||||
const char* method, |
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) { |
|
||||||
return MakeOrphanable<GrpcStreamingCall>( |
|
||||||
factory_->Ref(DEBUG_LOCATION, "StreamingCall"), channel_, method, |
|
||||||
std::move(event_handler)); |
|
||||||
} |
|
||||||
|
|
||||||
void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() { |
|
||||||
grpc_channel_reset_connect_backoff(channel_); |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// GrpcXdsTransportFactory
|
|
||||||
//
|
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) { |
|
||||||
absl::InlinedVector<grpc_arg, 1> args_to_add = { |
|
||||||
grpc_channel_arg_integer_create( |
|
||||||
const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), |
|
||||||
5 * 60 * GPR_MS_PER_SEC), |
|
||||||
}; |
|
||||||
return grpc_channel_args_copy_and_add(args, args_to_add.data(), |
|
||||||
args_to_add.size()); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
GrpcXdsTransportFactory::GrpcXdsTransportFactory(const grpc_channel_args* args) |
|
||||||
: args_(ModifyChannelArgs(args)), |
|
||||||
interested_parties_(grpc_pollset_set_create()) { |
|
||||||
// Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
|
|
||||||
// destroyed.
|
|
||||||
grpc_init(); |
|
||||||
} |
|
||||||
|
|
||||||
GrpcXdsTransportFactory::~GrpcXdsTransportFactory() { |
|
||||||
grpc_channel_args_destroy(args_); |
|
||||||
grpc_pollset_set_destroy(interested_parties_); |
|
||||||
// Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
|
|
||||||
// is destroyed.
|
|
||||||
grpc_shutdown(); |
|
||||||
} |
|
||||||
|
|
||||||
OrphanablePtr<XdsTransportFactory::XdsTransport> |
|
||||||
GrpcXdsTransportFactory::Create( |
|
||||||
const XdsBootstrap::XdsServer& server, |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure, |
|
||||||
absl::Status* status) { |
|
||||||
return MakeOrphanable<GrpcXdsTransport>( |
|
||||||
this, server, std::move(on_connectivity_failure), status); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
@ -1,134 +0,0 @@ |
|||||||
//
|
|
||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H |
|
||||||
#define GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <string> |
|
||||||
|
|
||||||
#include "absl/status/status.h" |
|
||||||
|
|
||||||
#include <grpc/impl/codegen/grpc_types.h> |
|
||||||
#include <grpc/slice.h> |
|
||||||
#include <grpc/status.h> |
|
||||||
|
|
||||||
#include "src/core/ext/xds/xds_bootstrap.h" |
|
||||||
#include "src/core/ext/xds/xds_transport.h" |
|
||||||
#include "src/core/lib/gprpp/orphanable.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/iomgr/closure.h" |
|
||||||
#include "src/core/lib/iomgr/error.h" |
|
||||||
#include "src/core/lib/iomgr/iomgr_fwd.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
class GrpcXdsTransportFactory : public XdsTransportFactory { |
|
||||||
public: |
|
||||||
class GrpcXdsTransport; |
|
||||||
|
|
||||||
explicit GrpcXdsTransportFactory(const grpc_channel_args* args); |
|
||||||
~GrpcXdsTransportFactory() override; |
|
||||||
|
|
||||||
void Orphan() override { Unref(); } |
|
||||||
|
|
||||||
OrphanablePtr<XdsTransport> Create( |
|
||||||
const XdsBootstrap::XdsServer& server, |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure, |
|
||||||
absl::Status* status) override; |
|
||||||
|
|
||||||
grpc_pollset_set* interested_parties() const { return interested_parties_; } |
|
||||||
|
|
||||||
private: |
|
||||||
grpc_channel_args* args_; |
|
||||||
grpc_pollset_set* interested_parties_; |
|
||||||
}; |
|
||||||
|
|
||||||
class GrpcXdsTransportFactory::GrpcXdsTransport |
|
||||||
: public XdsTransportFactory::XdsTransport { |
|
||||||
public: |
|
||||||
class GrpcStreamingCall; |
|
||||||
|
|
||||||
GrpcXdsTransport(GrpcXdsTransportFactory* factory, |
|
||||||
const XdsBootstrap::XdsServer& server, |
|
||||||
std::function<void(absl::Status)> on_connectivity_failure, |
|
||||||
absl::Status* status); |
|
||||||
~GrpcXdsTransport() override; |
|
||||||
|
|
||||||
void Orphan() override; |
|
||||||
|
|
||||||
OrphanablePtr<StreamingCall> CreateStreamingCall( |
|
||||||
const char* method, |
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler) override; |
|
||||||
|
|
||||||
void ResetBackoff() override; |
|
||||||
|
|
||||||
private: |
|
||||||
class StateWatcher; |
|
||||||
|
|
||||||
GrpcXdsTransportFactory* factory_; // Not owned.
|
|
||||||
grpc_channel* channel_; |
|
||||||
StateWatcher* watcher_; |
|
||||||
}; |
|
||||||
|
|
||||||
class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall |
|
||||||
: public XdsTransportFactory::XdsTransport::StreamingCall { |
|
||||||
public: |
|
||||||
GrpcStreamingCall(RefCountedPtr<GrpcXdsTransportFactory> factory, |
|
||||||
grpc_channel* channel, const char* method, |
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler); |
|
||||||
~GrpcStreamingCall() override; |
|
||||||
|
|
||||||
void Orphan() override; |
|
||||||
|
|
||||||
void SendMessage(std::string payload) override; |
|
||||||
|
|
||||||
private: |
|
||||||
static void OnRequestSent(void* arg, grpc_error_handle error); |
|
||||||
static void OnResponseReceived(void* arg, grpc_error_handle /*error*/); |
|
||||||
static void OnStatusReceived(void* arg, grpc_error_handle /*error*/); |
|
||||||
|
|
||||||
RefCountedPtr<GrpcXdsTransportFactory> factory_; |
|
||||||
|
|
||||||
std::unique_ptr<StreamingCall::EventHandler> event_handler_; |
|
||||||
|
|
||||||
// Always non-NULL.
|
|
||||||
grpc_call* call_; |
|
||||||
|
|
||||||
// recv_initial_metadata
|
|
||||||
grpc_metadata_array initial_metadata_recv_; |
|
||||||
|
|
||||||
// send_message
|
|
||||||
grpc_byte_buffer* send_message_payload_ = nullptr; |
|
||||||
grpc_closure on_request_sent_; |
|
||||||
|
|
||||||
// recv_message
|
|
||||||
grpc_byte_buffer* recv_message_payload_ = nullptr; |
|
||||||
grpc_closure on_response_received_; |
|
||||||
|
|
||||||
// recv_trailing_metadata
|
|
||||||
grpc_metadata_array trailing_metadata_recv_; |
|
||||||
grpc_status_code status_code_; |
|
||||||
grpc_slice status_details_; |
|
||||||
grpc_closure on_status_received_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_CORE_EXT_XDS_XDS_TRANSPORT_GRPC_H
|
|
Loading…
Reference in new issue