Gcp Observability: Make GcpObservabilityInit blocking (#32612)

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/32616/head
Yash Tibrewal 2 years ago committed by GitHub
parent e97f632886
commit 6f960be41b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      BUILD
  2. 49
      src/cpp/ext/filters/census/client_filter.cc
  3. 92
      src/cpp/ext/filters/census/grpc_plugin.h
  4. 65
      src/cpp/ext/filters/census/promise_notification.h
  5. 65
      src/cpp/ext/filters/census/server_filter.cc
  6. 2
      src/cpp/ext/gcp/BUILD
  7. 139
      src/cpp/ext/gcp/observability.cc
  8. 29
      test/cpp/ext/filters/census/BUILD
  9. 113
      test/cpp/ext/filters/census/constant_labels_wait_test.cc

@ -2200,14 +2200,12 @@ grpc_cc_library(
"src/cpp/ext/filters/census/grpc_plugin.h",
"src/cpp/ext/filters/census/measures.h",
"src/cpp/ext/filters/census/open_census_call_tracer.h",
"src/cpp/ext/filters/census/promise_notification.h",
"src/cpp/ext/filters/census/rpc_encoding.h",
"src/cpp/ext/filters/census/server_filter.h",
],
external_deps = [
"absl/base",
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/meta:type_traits",
"absl/status",
"absl/status:statusor",
@ -2231,7 +2229,6 @@ grpc_cc_library(
"grpc++_base",
"grpc_base",
"grpc_public_hdrs",
"//src/core:activity",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:cancel_callback",
@ -2239,13 +2236,11 @@ grpc_cc_library(
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:default_event_engine",
"//src/core:error",
"//src/core:experiments",
"//src/core:map",
"//src/core:pipe",
"//src/core:poll",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
"//src/core:slice_refcount",

@ -27,7 +27,6 @@
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -55,7 +54,6 @@
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@ -65,7 +63,6 @@
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/cpp/ext/filters/census/promise_notification.h"
namespace grpc {
namespace internal {
@ -88,11 +85,6 @@ absl::StatusOr<OpenCensusClientFilter> OpenCensusClientFilter::Create(
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) {
bool observability_enabled =
args.GetInt(GRPC_ARG_ENABLE_OBSERVABILITY).value_or(true);
// Only run the Post-Init Registry if observability is enabled to avoid
// running into a cyclic loop for exporter channels.
if (observability_enabled) {
OpenCensusRegistry::Get().RunFunctionsPostInit();
}
return OpenCensusClientFilter(/*tracing_enabled=*/observability_enabled);
}
@ -100,34 +92,19 @@ grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
OpenCensusClientFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto continue_making_call_promise = [this,
next_promise_factory =
std::move(next_promise_factory),
call_args =
std::move(call_args)]() mutable {
auto* path = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata());
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* tracer = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusCallTracer>(
call_context,
path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>(),
OpenCensusTracingEnabled() && tracing_enabled_);
GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr;
return next_promise_factory(std::move(call_args));
};
// If the OpenCensus plugin is not yet ready, then wait for it to be ready.
if (!grpc::internal::OpenCensusRegistry::Get().Ready()) {
auto notification = std::make_shared<PromiseNotification>();
grpc::internal::OpenCensusRegistry::Get().NotifyOnReady(
[notification]() { notification->Notify(); });
return grpc_core::Seq([notification]() { return notification->Wait(); },
std::move(continue_making_call_promise));
}
return continue_making_call_promise();
auto* path = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata());
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* tracer =
grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusCallTracer>(
call_context, path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>(),
OpenCensusTracingEnabled() && tracing_enabled_);
GPR_DEBUG_ASSERT(call_context[GRPC_CONTEXT_CALL_TRACER].value == nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER].destroy = nullptr;
return next_promise_factory(std::move(call_args));
}
//

@ -22,25 +22,16 @@
#include <grpc/support/port_platform.h>
#include <algorithm>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "absl/base/call_once.h"
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "opencensus/tags/tag_key.h"
#include "opencensus/tags/tag_map.h"
#include <grpc/event_engine/event_engine.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc {
// The following using declarations have been added to prevent breaking users
@ -160,22 +151,8 @@ class OpenCensusRegistry {
static OpenCensusRegistry& Get();
// Registers the functions to be run post-init.
void RegisterFunctions(std::function<void()> f) {
exporter_registry_.push_back(std::move(f));
}
void RegisterWaitOnReady() { wait_on_ready_ = true; }
// Runs the registry post-init exactly once. Protected with an absl::CallOnce.
void RunFunctionsPostInit() {
absl::call_once(once_, &OpenCensusRegistry::RunFunctionsPostInitHelper,
this);
}
void RegisterConstantLabels(
const std::map<std::string, std::string>& labels) {
grpc_core::MutexLock lock(&mu_);
constant_labels_.reserve(labels.size());
for (const auto& label : labels) {
auto tag_key = opencensus::tags::TagKey::Register(label.first);
@ -184,89 +161,26 @@ class OpenCensusRegistry {
}
void RegisterConstantAttributes(std::vector<Attribute> attributes) {
grpc_core::MutexLock lock(&mu_);
constant_attributes_ = std::move(attributes);
}
void NotifyOnReady(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_);
// Environment has already been detected
if (ready_) {
// Execute on the event engine to avoid deadlocks.
return event_engine()->Run(std::move(callback));
}
callbacks_.push_back(std::move(callback));
}
::opencensus::tags::TagMap PopulateTagMapWithConstantLabels(
const ::opencensus::tags::TagMap& tag_map);
void PopulateCensusContextWithConstantAttributes(
grpc::experimental::CensusContext* context);
void SetReady() {
std::vector<absl::AnyInvocable<void()>> callbacks;
{
grpc_core::MutexLock lock(&mu_);
ready_ = true;
callbacks = std::move(callbacks_);
}
for (auto& callback : callbacks) {
callback();
}
}
bool Ready() {
if (!wait_on_ready_) {
return true;
}
grpc_core::MutexLock lock(&mu_);
return ready_;
}
const std::vector<Label>& ConstantLabels() {
grpc_core::MutexLock lock(&mu_);
return constant_labels_;
}
const std::vector<Label>& ConstantLabels() { return constant_labels_; }
const std::vector<Attribute>& ConstantAttributes() {
grpc_core::MutexLock lock(&mu_);
return constant_attributes_;
}
private:
void RunFunctionsPostInitHelper() {
for (const auto& f : exporter_registry_) {
f();
}
}
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
if (event_engine_ == nullptr) {
event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
}
return event_engine_;
}
OpenCensusRegistry() = default;
std::vector<std::function<void()>> exporter_registry_;
absl::once_flag once_;
// Some setups might need to set up the constant labels that are fetched after
// start up. wait_on_ready_ allows implementations to check whether there is
// such a need. This is only set before grpc_init in a single thread, so it
// should not need any protection.
bool wait_on_ready_ = false;
grpc_core::Mutex mu_;
// If wait_on_ready_ is true, ready_ indicates whether the plugin is now ready
// to start serving.
bool ready_ ABSL_GUARDED_BY(mu_) = false;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_
ABSL_GUARDED_BY(mu_);
std::vector<Label> constant_labels_ ABSL_GUARDED_BY(mu_);
std::vector<Attribute> constant_attributes_ ABSL_GUARDED_BY(mu_);
std::vector<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
std::vector<Label> constant_labels_;
std::vector<Attribute> constant_attributes_;
};
} // namespace internal

@ -1,65 +0,0 @@
//
//
// Copyright 2018 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_FILTERS_CENSUS_PROMISE_NOTIFICATION_H
#define GRPC_SRC_CPP_EXT_FILTERS_CENSUS_PROMISE_NOTIFICATION_H
#include <grpc/support/port_platform.h>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
// TODO(yashykt): Make this part of the standard promises library.
// Helper class for creating a promise that waits until it is notified.
class PromiseNotification {
public:
grpc_core::Poll<int> Wait() {
grpc_core::MutexLock lock(&mu_);
if (done_) {
return 42;
}
if (!polled_) {
waker_ = grpc_core::Activity::current()->MakeOwningWaker();
polled_ = true;
}
return grpc_core::Pending{};
}
void Notify() {
grpc_core::Waker waker;
{
grpc_core::MutexLock lock(&mu_);
done_ = true;
waker = std::move(waker_);
}
waker.Wakeup();
}
private:
grpc_core::Mutex mu_;
bool done_ ABSL_GUARDED_BY(mu_) = false;
bool polled_ ABSL_GUARDED_BY(mu_) = false;
grpc_core::Waker waker_ ABSL_GUARDED_BY(mu_);
};
#endif // GRPC_SRC_CPP_EXT_FILTERS_CENSUS_PROMISE_NOTIFICATION_H

@ -25,9 +25,7 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
@ -51,7 +49,6 @@
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -59,7 +56,6 @@
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/promise_notification.h"
namespace grpc {
namespace internal {
@ -212,7 +208,6 @@ const grpc_channel_filter OpenCensusServerFilter::kFilter =
absl::StatusOr<OpenCensusServerFilter> OpenCensusServerFilter::Create(
const grpc_core::ChannelArgs& /*args*/,
grpc_core::ChannelFilter::Args /*filter_args*/) {
OpenCensusRegistry::Get().RunFunctionsPostInit();
return OpenCensusServerFilter();
}
@ -220,43 +215,29 @@ grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
OpenCensusServerFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto continue_making_call_promise = [next_promise_factory =
std::move(next_promise_factory),
call_args =
std::move(call_args)]() mutable {
auto* calld = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusServerCallData>(
call_args.client_initial_metadata.get());
call_args.client_to_server_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnRecvMessage();
return message;
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnSendMessage();
return message;
});
grpc_core::GetContext<grpc_core::CallFinalization>()->Add(
[calld](const grpc_call_final_info* final_info) {
calld->Finalize(final_info);
});
return grpc_core::OnCancel(Map(next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle md) {
calld->OnServerTrailingMetadata(md.get());
return md;
}),
[calld]() { calld->OnCancel(); });
};
// If the OpenCensus plugin is not yet ready, then wait for it to be ready.
if (!grpc::internal::OpenCensusRegistry::Get().Ready()) {
auto notification = std::make_shared<PromiseNotification>();
grpc::internal::OpenCensusRegistry::Get().NotifyOnReady(
[notification]() { notification->Notify(); });
return grpc_core::Seq([notification]() { return notification->Wait(); },
std::move(continue_making_call_promise));
}
return continue_making_call_promise();
auto* calld = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenCensusServerCallData>(
call_args.client_initial_metadata.get());
call_args.client_to_server_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnRecvMessage();
return message;
});
call_args.server_to_client_messages->InterceptAndMap(
[calld](grpc_core::MessageHandle message) {
calld->OnSendMessage();
return message;
});
grpc_core::GetContext<grpc_core::CallFinalization>()->Add(
[calld](const grpc_call_final_info* final_info) {
calld->Finalize(final_info);
});
return grpc_core::OnCancel(Map(next_promise_factory(std::move(call_args)),
[calld](grpc_core::ServerMetadataHandle md) {
calld->OnServerTrailingMetadata(md.get());
return md;
}),
[calld]() { calld->OnCancel(); });
}
} // namespace internal

@ -56,9 +56,11 @@ grpc_cc_library(
"observability_config",
"observability_logging_sink",
"//:gpr",
"//:grpc",
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/core:logging_filter",
"//src/core:notification",
],
)

@ -38,12 +38,14 @@
#include "opencensus/trace/sampler.h"
#include "opencensus/trace/trace_config.h"
#include <grpc/grpc.h>
#include <grpcpp/ext/gcp_observability.h>
#include <grpcpp/opencensus.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
#include "src/core/ext/filters/logging/logging_filter.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"
#include "src/cpp/ext/gcp/environment_autodetect.h"
@ -104,76 +106,89 @@ absl::Status GcpObservabilityInit() {
// Disable OpenCensus stats
grpc::internal::EnableOpenCensusStats(false);
}
// If tracing or monitoring is enabled, we need to get the OpenCensus plugin
// to wait for the environment to be autodetected.
// If tracing or monitoring is enabled, we need to register the OpenCensus
// plugin to wait for the environment to be autodetected.
if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
grpc::RegisterOpenCensusPlugin();
grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady();
grpc::internal::OpenCensusRegistry::Get().RegisterFunctions([config]() {
grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone([config]() {
auto* resource =
grpc::internal::EnvironmentAutoDetect::Get().resource();
if (config->cloud_trace.has_value()) {
// Set up attributes for constant tracing
std::vector<internal::OpenCensusRegistry::Attribute> attributes;
attributes.reserve(resource->labels.size() + config->labels.size());
// First insert in environment labels
for (const auto& resource_label : resource->labels) {
attributes.push_back(internal::OpenCensusRegistry::Attribute{
absl::StrCat(resource->resource_type, ".",
resource_label.first),
resource_label.second});
}
// Then insert in labels from the GCP Observability config.
for (const auto& constant_label : config->labels) {
attributes.push_back(internal::OpenCensusRegistry::Attribute{
constant_label.first, constant_label.second});
}
grpc::internal::OpenCensusRegistry::Get().RegisterConstantAttributes(
std::move(attributes));
// Set up the StackDriver Exporter
opencensus::trace::TraceConfig::SetCurrentTraceParams(
{kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
opencensus::trace::ProbabilitySampler(
config->cloud_trace->sampling_rate)});
opencensus::exporters::trace::StackdriverOptions trace_opts;
trace_opts.project_id = config->project_id;
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
trace_opts.trace_service_stub =
::google::devtools::cloudtrace::v2::TraceService::NewStub(
CreateCustomChannel(kGoogleStackdriverTraceAddress,
GoogleDefaultCredentials(), args));
opencensus::exporters::trace::StackdriverExporter::Register(
std::move(trace_opts));
}
if (config->cloud_monitoring.has_value()) {
grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
config->labels);
opencensus::exporters::stats::StackdriverOptions stats_opts;
stats_opts.project_id = config->project_id;
stats_opts.monitored_resource.set_type(resource->resource_type);
stats_opts.monitored_resource.mutable_labels()->insert(
resource->labels.begin(), resource->labels.end());
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
stats_opts.metric_service_stub =
google::monitoring::v3::MetricService::NewStub(
CreateCustomChannel(kGoogleStackdriverStatsAddress,
GoogleDefaultCredentials(), args));
opencensus::exporters::stats::StackdriverExporter::Register(
std::move(stats_opts));
}
RegisterOpenCensusViewsForGcpObservability();
grpc::internal::OpenCensusRegistry::Get().SetReady();
});
});
}
if (config->cloud_logging.has_value()) {
g_logging_sink = new grpc::internal::ObservabilityLoggingSink(
config->cloud_logging.value(), config->project_id, config->labels);
grpc_core::RegisterLoggingFilter(g_logging_sink);
}
// If tracing or monitoring is enabled, we need to detect the environment for
// OpenCensus, set the labels and attributes and prepare the StackDriver
// exporter.
// Note that this should be the last step of GcpObservabilityInit() since we
// can't register any more filters after grpc_init.
if (config->cloud_trace.has_value() || config->cloud_monitoring.has_value()) {
grpc_init();
grpc_core::Notification notification;
grpc::internal::EnvironmentAutoDetect::Get().NotifyOnDone(
[&]() { notification.Notify(); });
notification.WaitForNotification();
auto* resource = grpc::internal::EnvironmentAutoDetect::Get().resource();
if (config->cloud_trace.has_value()) {
// Set up attributes for constant tracing
std::vector<internal::OpenCensusRegistry::Attribute> attributes;
attributes.reserve(resource->labels.size() + config->labels.size());
// First insert in environment labels
for (const auto& resource_label : resource->labels) {
attributes.push_back(internal::OpenCensusRegistry::Attribute{
absl::StrCat(resource->resource_type, ".", resource_label.first),
resource_label.second});
}
// Then insert in labels from the GCP Observability config.
for (const auto& constant_label : config->labels) {
attributes.push_back(internal::OpenCensusRegistry::Attribute{
constant_label.first, constant_label.second});
}
grpc::internal::OpenCensusRegistry::Get().RegisterConstantAttributes(
std::move(attributes));
}
if (config->cloud_monitoring.has_value()) {
grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
config->labels);
RegisterOpenCensusViewsForGcpObservability();
}
// Note that we are setting up the exporters after registering the
// attributes and labels to avoid a case where the exporters start an RPC
// before we are ready.
if (config->cloud_trace.has_value()) {
// Set up the StackDriver Exporter for tracing.
opencensus::trace::TraceConfig::SetCurrentTraceParams(
{kMaxAttributes, kMaxAnnotations, kMaxMessageEvents, kMaxLinks,
opencensus::trace::ProbabilitySampler(
config->cloud_trace->sampling_rate)});
opencensus::exporters::trace::StackdriverOptions trace_opts;
trace_opts.project_id = config->project_id;
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
trace_opts.trace_service_stub =
::google::devtools::cloudtrace::v2::TraceService::NewStub(
CreateCustomChannel(kGoogleStackdriverTraceAddress,
GoogleDefaultCredentials(), args));
opencensus::exporters::trace::StackdriverExporter::Register(
std::move(trace_opts));
}
if (config->cloud_monitoring.has_value()) {
// Set up the StackDriver Exporter for monitoring.
opencensus::exporters::stats::StackdriverOptions stats_opts;
stats_opts.project_id = config->project_id;
stats_opts.monitored_resource.set_type(resource->resource_type);
stats_opts.monitored_resource.mutable_labels()->insert(
resource->labels.begin(), resource->labels.end());
ChannelArguments args;
args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
stats_opts.metric_service_stub =
google::monitoring::v3::MetricService::NewStub(
CreateCustomChannel(kGoogleStackdriverStatsAddress,
GoogleDefaultCredentials(), args));
opencensus::exporters::stats::StackdriverExporter::Register(
std::move(stats_opts));
}
grpc_shutdown();
}
return absl::OkStatus();
}

@ -99,32 +99,3 @@ grpc_cc_test(
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "constant_labels_wait_test",
srcs = [
"constant_labels_wait_test.cc",
],
external_deps = [
"gtest",
"opencensus-stats-test",
"opencensus-tags",
"opencensus-with-tag-map",
],
flaky = True,
language = "C++",
linkstatic = True,
tags = [
"census_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
"library",
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_config",
"//test/cpp/util:test_util",
],
)

@ -1,113 +0,0 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opencensus/stats/stats.h"
#include "opencensus/stats/testing/test_utils.h"
#include "opencensus/tags/tag_map.h"
#include <grpc++/grpc++.h>
#include <grpcpp/opencensus.h>
#include "src/core/lib/gprpp/notification.h"
#include "src/cpp/ext/filters/census/context.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
#include "test/cpp/ext/filters/census/library.h"
namespace grpc {
namespace testing {
namespace {
using ::opencensus::stats::View;
using ::opencensus::stats::testing::TestUtils;
class ConstantLabelsWaitTest : public StatsPluginEnd2EndTest {
protected:
static void SetUpTestSuite() {
grpc::internal::OpenCensusRegistry::Get().RegisterWaitOnReady();
grpc::internal::EnableOpenCensusTracing(false);
StatsPluginEnd2EndTest::SetUpTestSuite();
}
};
// Check that RPCs wait for labels to be registered to OpenCensus.
TEST_F(ConstantLabelsWaitTest, RpcWaitsForLabelsRegistration) {
View* client_completed_rpcs_view = nullptr;
View* server_completed_rpcs_view = nullptr;
EchoRequest request;
request.set_message("foo");
EchoResponse response;
{
grpc::ClientContext context;
grpc_core::Notification notification;
stub_->async()->Echo(&context, &request, &response,
[&notification, &response](Status s) mutable {
EXPECT_TRUE(s.ok());
notification.Notify();
EXPECT_EQ("foo", response.message());
});
// Introduce a sleep to check that the RPC waits for labels to be
// registered.
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
grpc::internal::OpenCensusRegistry::Get().RegisterConstantLabels(
{{"key", "value"}});
client_completed_rpcs_view = new View(ClientCompletedRpcsCumulative());
server_completed_rpcs_view = new View(ServerCompletedRpcsCumulative());
{
grpc_core::ExecCtx exec_ctx;
grpc::internal::OpenCensusRegistry::Get().SetReady();
}
notification.WaitForNotification();
}
absl::SleepFor(absl::Milliseconds(500 * grpc_test_slowdown_factor()));
TestUtils::Flush();
EXPECT_THAT(
client_completed_rpcs_view->GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre("value", client_method_name_, "OK"), 1)));
EXPECT_THAT(
server_completed_rpcs_view->GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre("value", server_method_name_, "OK"), 1)));
delete client_completed_rpcs_view;
delete server_completed_rpcs_view;
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save