[OTel] Basic C++ OTel Stats Functionality (#33650)

Note that the plugin is still under `grpc::internal` namespace and not
under `experimental` intentionally.

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
pull/33735/head
Yash Tibrewal 2 years ago committed by GitHub
parent 2556033de0
commit d2f37b8b45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      bazel/grpc_build_system.bzl
  2. 73
      src/cpp/ext/otel/BUILD
  3. 134
      src/cpp/ext/otel/otel_call_tracer.h
  4. 212
      src/cpp/ext/otel/otel_client_filter.cc
  5. 54
      src/cpp/ext/otel/otel_client_filter.h
  6. 96
      src/cpp/ext/otel/otel_plugin.cc
  7. 67
      src/cpp/ext/otel/otel_plugin.h
  8. 175
      src/cpp/ext/otel/otel_server_call_tracer.cc
  9. 36
      src/cpp/ext/otel/otel_server_call_tracer.h
  10. 9
      test/cpp/ext/otel/BUILD
  11. 283
      test/cpp/ext/otel/otel_plugin_test.cc
  12. 3
      tools/bazel.rc
  13. 2
      tools/buildgen/extract_metadata_from_bazel_xml.py
  14. 10
      tools/distrib/fix_build_deps.py
  15. 1
      tools/distrib/iwyu.sh

@ -197,6 +197,7 @@ def grpc_cc_library(
testonly = testonly,
linkopts = linkopts,
includes = [
"api/include",
"include",
"src/core/ext/upb-generated", # Once upb code-gen issue is resolved, remove this.
"src/core/ext/upbdefs-generated", # Once upb code-gen issue is resolved, remove this.

@ -0,0 +1,73 @@
# gRPC Bazel BUILD file.
#
# Copyright 2023 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load(
"//bazel:grpc_build_system.bzl",
"grpc_cc_library",
)
licenses(["reciprocal"])
package(
default_visibility = ["//visibility:public"],
features = [
"layering_check",
],
)
grpc_cc_library(
name = "otel_plugin",
srcs = [
"otel_client_filter.cc",
"otel_plugin.cc",
"otel_server_call_tracer.cc",
],
hdrs = [
"otel_call_tracer.h",
"otel_client_filter.h",
"otel_plugin.h",
"otel_server_call_tracer.h",
],
external_deps = [
"absl/base:core_headers",
"absl/container:inlined_vector",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/strings:str_format",
"absl/time",
"otel/api",
],
language = "c++",
visibility = ["//:__subpackages__"],
deps = [
"//:channel_stack_builder",
"//:config",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:legacy_context",
"//src/core:arena",
"//src/core:arena_promise",
"//src/core:channel_args",
"//src/core:channel_fwd",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:error",
"//src/core:slice",
"//src/core:slice_buffer",
],
)

@ -0,0 +1,134 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_CALL_TRACER_H
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_CALL_TRACER_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <string>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include <grpc/support/time.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc {
namespace internal {
class OpenTelemetryCallTracer : public grpc_core::ClientCallTracer {
public:
class OpenTelemetryCallAttemptTracer : public CallAttemptTracer {
public:
OpenTelemetryCallAttemptTracer(OpenTelemetryCallTracer* parent,
bool arena_allocated);
std::string TraceId() override {
// Not implemented
return "";
}
std::string SpanId() override {
// Not implemented
return "";
}
bool IsSampled() override {
// Not implemented
return false;
}
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override;
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override;
void RecordReceivedInitialMetadata(
grpc_metadata_batch* /*recv_initial_metadata*/) override {}
void RecordReceivedMessage(
const grpc_core::SliceBuffer& recv_message) override;
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override;
void RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) override;
void RecordCancel(grpc_error_handle cancel_error) override;
void RecordEnd(const gpr_timespec& /*latency*/) override;
void RecordAnnotation(absl::string_view /*annotation*/) override;
private:
const OpenTelemetryCallTracer* parent_;
const bool arena_allocated_;
// Start time (for measuring latency).
absl::Time start_time_;
};
explicit OpenTelemetryCallTracer(grpc_core::Slice path,
grpc_core::Arena* arena);
~OpenTelemetryCallTracer() override;
std::string TraceId() override {
// Not implemented
return "";
}
std::string SpanId() override {
// Not implemented
return "";
}
bool IsSampled() override {
// Not implemented
return false;
}
OpenTelemetryCallAttemptTracer* StartNewAttempt(
bool is_transparent_retry) override;
void RecordAnnotation(absl::string_view /*annotation*/) override;
private:
// Client method.
grpc_core::Slice path_;
absl::string_view method_;
grpc_core::Arena* arena_;
grpc_core::Mutex mu_;
// Non-transparent attempts per call
uint64_t retries_ ABSL_GUARDED_BY(&mu_) = 0;
// Transparent retries per call
uint64_t transparent_retries_ ABSL_GUARDED_BY(&mu_) = 0;
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_CALL_TRACER_H

@ -0,0 +1,212 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/otel/otel_client_filter.h"
#include <functional>
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/cpp/ext/otel/otel_call_tracer.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
//
// OpenTelemetryClientFilter
//
const grpc_channel_filter OpenTelemetryClientFilter::kFilter =
grpc_core::MakePromiseBasedFilter<OpenTelemetryClientFilter,
grpc_core::FilterEndpoint::kClient>(
"otel_client");
absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
const grpc_core::ChannelArgs& /*args*/,
ChannelFilter::Args /*filter_args*/) {
return OpenTelemetryClientFilter();
}
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle>
OpenTelemetryClientFilter::MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) {
auto* path = call_args.client_initial_metadata->get_pointer(
grpc_core::HttpPathMetadata());
auto* call_context = grpc_core::GetContext<grpc_call_context_element>();
auto* tracer = grpc_core::GetContext<grpc_core::Arena>()
->ManagedNew<OpenTelemetryCallTracer>(
path != nullptr ? path->Ref() : grpc_core::Slice(),
grpc_core::GetContext<grpc_core::Arena>());
GPR_DEBUG_ASSERT(
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value ==
nullptr);
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer;
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr;
return next_promise_factory(std::move(call_args));
}
//
// OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer
//
OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
OpenTelemetryCallAttemptTracer(OpenTelemetryCallTracer* parent,
bool arena_allocated)
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
// TODO(yashykt): Figure out how to get this to work with absl::string_view
OTelPluginState().client.attempt.started->Add(
1, {{std::string(OTelMethodKey()), std::string(parent_->method_)}});
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordSendMessage(
const grpc_core::SliceBuffer& send_message) {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) {
absl::InlinedVector<std::pair<std::string, std::string>, 2> attributes = {
{std::string(OTelMethodKey()), std::string(parent_->method_)},
{std::string(OTelStatusKey()), absl::StatusCodeToString(status.code())}};
OTelPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), attributes,
opentelemetry::context::Context{});
OTelPluginState().client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
attributes, opentelemetry::context::Context{});
OTelPluginState().client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
attributes, opentelemetry::context::Context{});
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordCancel(
absl::Status /*cancel_error*/) {}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordEnd(
const gpr_timespec& /*latency*/) {
if (arena_allocated_) {
this->~OpenTelemetryCallAttemptTracer();
} else {
delete this;
}
}
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordAnnotation(
absl::string_view /*annotation*/) {
// Not implemented
}
//
// OpenTelemetryCallTracer
//
OpenTelemetryCallTracer::OpenTelemetryCallTracer(grpc_core::Slice path,
grpc_core::Arena* arena)
: path_(std::move(path)),
method_(absl::StripPrefix(path_.as_string_view(), "/")),
arena_(arena) {}
OpenTelemetryCallTracer::~OpenTelemetryCallTracer() {}
OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer*
OpenTelemetryCallTracer::StartNewAttempt(bool is_transparent_retry) {
// We allocate the first attempt on the arena and all subsequent attempts
// on the heap, so that in the common case we don't require a heap
// allocation, nor do we unnecessarily grow the arena.
bool is_first_attempt = true;
{
grpc_core::MutexLock lock(&mu_);
if (transparent_retries_ != 0 || retries_ != 0) {
is_first_attempt = false;
}
if (is_transparent_retry) {
++transparent_retries_;
} else {
++retries_;
}
}
if (is_first_attempt) {
return arena_->New<OpenTelemetryCallAttemptTracer>(
this, /*arena_allocated=*/true);
}
return new OpenTelemetryCallAttemptTracer(this, /*arena_allocated=*/false);
}
void OpenTelemetryCallTracer::RecordAnnotation(
absl::string_view /*annotation*/) {
// Not implemented
}
} // namespace internal
} // namespace grpc

@ -0,0 +1,54 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H
#include <grpc/support/port_platform.h>
#include "absl/status/statusor.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/promise_based_filter.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/transport/transport.h"
namespace grpc {
namespace internal {
class OpenTelemetryClientFilter : public grpc_core::ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<OpenTelemetryClientFilter> Create(
const grpc_core::ChannelArgs& /*args*/,
ChannelFilter::Args /*filter_args*/);
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise(
grpc_core::CallArgs call_args,
grpc_core::NextPromiseFactory next_promise_factory) override;
private:
explicit OpenTelemetryClientFilter() {}
};
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H

@ -0,0 +1,96 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/otel/otel_plugin.h"
#include <limits.h>
#include "opentelemetry/metrics/meter.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/provider.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/nostd/unique_ptr.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/surface/channel_stack_type.h"
#include "src/cpp/ext/otel/otel_client_filter.h"
#include "src/cpp/ext/otel/otel_server_call_tracer.h"
namespace grpc {
namespace internal {
// TODO(yashykt): Extend this to allow multiple OTel plugins to be registered in
// the same binary.
struct OTelPluginState* g_otel_plugin_state_;
const struct OTelPluginState& OTelPluginState() {
GPR_DEBUG_ASSERT(g_otel_plugin_state_ != nullptr);
return *g_otel_plugin_state_;
}
void RegisterOpenTelemetryPlugin() {
auto meter_provider = opentelemetry::metrics::Provider::GetMeterProvider();
auto meter = meter_provider->GetMeter("grpc");
delete g_otel_plugin_state_;
g_otel_plugin_state_ = new struct OTelPluginState;
g_otel_plugin_state_->client.attempt.started =
meter->CreateUInt64Counter("grpc.client.attempt.started");
g_otel_plugin_state_->client.attempt.duration =
meter->CreateDoubleHistogram("grpc.client.attempt.duration");
g_otel_plugin_state_->client.attempt.sent_total_compressed_message_size =
meter->CreateUInt64Histogram(
"grpc.client.attempt.sent_total_compressed_message_size");
g_otel_plugin_state_->client.attempt.rcvd_total_compressed_message_size =
meter->CreateUInt64Histogram(
"grpc.client.attempt.rcvd_total_compressed_message_size");
g_otel_plugin_state_->server.call.started =
meter->CreateUInt64Counter("grpc.server.call.started");
g_otel_plugin_state_->server.call.duration =
meter->CreateDoubleHistogram("grpc.server.call.duration");
g_otel_plugin_state_->server.call.sent_total_compressed_message_size =
meter->CreateUInt64Histogram(
"grpc.server.call.sent_total_compressed_message_size");
g_otel_plugin_state_->server.call.rcvd_total_compressed_message_size =
meter->CreateUInt64Histogram(
"grpc.server.call.rcvd_total_compressed_message_size");
grpc_core::ServerCallTracerFactory::RegisterGlobal(
new grpc::internal::OpenTelemetryServerCallTracerFactory);
grpc_core::CoreConfiguration::RegisterBuilder(
[](grpc_core::CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_CHANNEL, /*priority=*/INT_MAX,
[](grpc_core::ChannelStackBuilder* builder) {
builder->PrependFilter(
&grpc::internal::OpenTelemetryClientFilter::kFilter);
return true;
});
});
}
absl::string_view OTelMethodKey() { return "grpc.method"; }
absl::string_view OTelStatusKey() { return "grpc.status"; }
} // namespace internal
} // namespace grpc

@ -0,0 +1,67 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <memory>
#include "absl/strings/string_view.h"
#include "opentelemetry/metrics/sync_instruments.h"
namespace grpc {
namespace internal {
struct OTelPluginState {
struct Client {
struct Attempt {
std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
std::unique_ptr<opentelemetry::metrics::Histogram<double>> duration;
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
sent_total_compressed_message_size;
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
rcvd_total_compressed_message_size;
} attempt;
} client;
struct Server {
struct Call {
std::unique_ptr<opentelemetry::metrics::Counter<uint64_t>> started;
std::unique_ptr<opentelemetry::metrics::Histogram<double>> duration;
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
sent_total_compressed_message_size;
std::unique_ptr<opentelemetry::metrics::Histogram<uint64_t>>
rcvd_total_compressed_message_size;
} call;
} server;
};
const struct OTelPluginState& OTelPluginState();
void RegisterOpenTelemetryPlugin();
absl::string_view OTelMethodKey();
absl::string_view OTelStatusKey();
} // namespace internal
} // namespace grpc
#endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_PLUGIN_H

@ -0,0 +1,175 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/otel/otel_server_call_tracer.h"
#include <initializer_list>
#include <memory>
#include <string>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "opentelemetry/context/context.h"
#include "opentelemetry/metrics/sync_instruments.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
#include "src/cpp/ext/otel/otel_plugin.h"
namespace grpc {
namespace internal {
namespace {
// OpenTelemetryServerCallTracer implementation
class OpenTelemetryServerCallTracer : public grpc_core::ServerCallTracer {
public:
OpenTelemetryServerCallTracer() : start_time_(absl::Now()) {}
std::string TraceId() override {
// Not implemented
return "";
}
std::string SpanId() override {
// Not implemented
return "";
}
bool IsSampled() override {
// Not implemented
return false;
}
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
void RecordSendInitialMetadata(
grpc_metadata_batch* /*send_initial_metadata*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override;
void RecordSendMessage(const grpc_core::SliceBuffer& send_message) override {
RecordAnnotation(
absl::StrFormat("Send message: %ld bytes", send_message.Length()));
}
void RecordSendCompressedMessage(
const grpc_core::SliceBuffer& send_compressed_message) override {
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes",
send_compressed_message.Length()));
}
void RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) override;
void RecordReceivedMessage(
const grpc_core::SliceBuffer& recv_message) override {
RecordAnnotation(
absl::StrFormat("Received message: %ld bytes", recv_message.Length()));
}
void RecordReceivedDecompressedMessage(
const grpc_core::SliceBuffer& recv_decompressed_message) override {
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes",
recv_decompressed_message.Length()));
}
void RecordReceivedTrailingMetadata(
grpc_metadata_batch* /*recv_trailing_metadata*/) override {}
void RecordCancel(grpc_error_handle /*cancel_error*/) override {
elapsed_time_ = absl::Now() - start_time_;
}
void RecordEnd(const grpc_call_final_info* final_info) override;
void RecordAnnotation(absl::string_view /*annotation*/) override {
// Not implemented
}
private:
grpc_core::Slice path_;
absl::string_view method_;
absl::Time start_time_;
absl::Duration elapsed_time_;
};
void OpenTelemetryServerCallTracer::RecordReceivedInitialMetadata(
grpc_metadata_batch* recv_initial_metadata) {
const auto* path =
recv_initial_metadata->get_pointer(grpc_core::HttpPathMetadata());
if (path != nullptr) {
path_ = path->Ref();
}
method_ = absl::StripPrefix(path_.as_string_view(), "/");
// TODO(yashykt): Figure out how to get this to work with absl::string_view
OTelPluginState().server.call.started->Add(
1, {{std::string(OTelMethodKey()), std::string(method_)}});
}
void OpenTelemetryServerCallTracer::RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) {
// We need to record the time when the trailing metadata was sent to
// mark the completeness of the request.
elapsed_time_ = absl::Now() - start_time_;
}
void OpenTelemetryServerCallTracer::RecordEnd(
const grpc_call_final_info* final_info) {
absl::InlinedVector<std::pair<std::string, std::string>, 2> attributes = {
{std::string(OTelMethodKey()), std::string(method_)},
{std::string(OTelStatusKey()),
absl::StatusCodeToString(
static_cast<absl::StatusCode>(final_info->final_status))}};
OTelPluginState().server.call.duration->Record(
absl::ToDoubleSeconds(elapsed_time_), attributes,
opentelemetry::context::Context{});
OTelPluginState().server.call.sent_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.outgoing.data_bytes, attributes,
opentelemetry::context::Context{});
OTelPluginState().server.call.rcvd_total_compressed_message_size->Record(
final_info->stats.transport_stream_stats.incoming.data_bytes, attributes,
opentelemetry::context::Context{});
}
} // namespace
//
// OpenTelemetryServerCallTracerFactory
//
grpc_core::ServerCallTracer*
OpenTelemetryServerCallTracerFactory::CreateNewServerCallTracer(
grpc_core::Arena* arena) {
return arena->ManagedNew<OpenTelemetryServerCallTracer>();
}
} // namespace internal
} // namespace grpc

@ -16,31 +16,25 @@
//
//
#include "api/include/opentelemetry/metrics/provider.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "sdk/include/opentelemetry/sdk/metrics/meter_provider.h"
#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H
#include "test/core/util/test_config.h"
#include <grpc/support/port_platform.h>
namespace grpc {
namespace testing {
namespace {
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/resource_quota/arena.h"
TEST(OTelPluginTest, ApiDependency) {
opentelemetry::metrics::Provider::GetMeterProvider();
}
namespace grpc {
namespace internal {
TEST(OTelPluginTest, SdkDependency) {
opentelemetry::sdk::metrics::MeterProvider();
}
class OpenTelemetryServerCallTracerFactory
: public grpc_core::ServerCallTracerFactory {
public:
grpc_core::ServerCallTracer* CreateNewServerCallTracer(
grpc_core::Arena* arena) override;
};
} // namespace
} // namespace testing
} // namespace internal
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_SERVER_CALL_TRACER_H

@ -17,7 +17,7 @@ load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
licenses(["notice"])
grpc_package(
name = "test/cpp/ext/filters/otel",
name = "test/cpp/ext/otel",
visibility = "tests",
)
@ -34,5 +34,10 @@ grpc_cc_test(
language = "C++",
tags = [
],
deps = ["//test/core/util:grpc_test_util"],
deps = [
"//:grpc++",
"//src/cpp/ext/otel:otel_plugin",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:test_service_impl",
],
)

@ -0,0 +1,283 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
#include "src/cpp/ext/otel/otel_plugin.h"
#include "absl/functional/any_invocable.h"
#include "api/include/opentelemetry/metrics/provider.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"
#include "opentelemetry/sdk/metrics/metric_reader.h"
#include <grpcpp/grpcpp.h>
#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/config/core_configuration.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace grpc {
namespace testing {
namespace {
TEST(OTelPluginBuildTest, ApiDependency) {
opentelemetry::metrics::Provider::GetMeterProvider();
}
TEST(OTelPluginBuildTest, SdkDependency) {
opentelemetry::sdk::metrics::MeterProvider();
}
class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader {
public:
opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
opentelemetry::sdk::metrics::InstrumentType) const noexcept override {
return opentelemetry::sdk::metrics::AggregationTemporality::kDelta;
}
bool OnForceFlush(std::chrono::microseconds) noexcept override {
return true;
}
bool OnShutDown(std::chrono::microseconds) noexcept override { return true; }
void OnInitialized() noexcept override {}
};
class OTelPluginEnd2EndTest : public ::testing::Test {
protected:
void SetUp() override {
// We are resetting the MeterProvider and OpenTelemetry plugin at the start
// of each test to avoid test results from one test carrying over to another
// test. (Some measurements can get arbitrarily delayed.)
auto meter_provider = new opentelemetry::sdk::metrics::MeterProvider;
opentelemetry::metrics::Provider::SetMeterProvider(
opentelemetry::nostd::shared_ptr<opentelemetry::metrics::MeterProvider>(
meter_provider));
reader_.reset(new grpc::testing::MockMetricReader);
meter_provider->AddMetricReader(reader_);
grpc_core::CoreConfiguration::Reset();
grpc::internal::RegisterOpenTelemetryPlugin();
grpc_init();
grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("localhost:", port);
stub_ = EchoTestService::NewStub(grpc::CreateChannel(
server_address_, grpc::InsecureChannelCredentials()));
}
void TearDown() override {
server_->Shutdown();
grpc_shutdown_blocking();
delete grpc_core::ServerCallTracerFactory::Get(grpc_core::ChannelArgs());
grpc_core::ServerCallTracerFactory::RegisterGlobal(nullptr);
}
void ResetStub(std::shared_ptr<Channel> channel) {
stub_ = EchoTestService::NewStub(channel);
}
void SendRPC() {
EchoRequest request;
request.set_message("foo");
EchoResponse response;
grpc::ClientContext context;
grpc::Status status = stub_->Echo(&context, request, &response);
}
absl::flat_hash_map<std::string,
std::vector<opentelemetry::sdk::metrics::PointType>>
ReadCurrentMetricsData(
absl::AnyInvocable<
bool(const absl::flat_hash_map<
std::string,
std::vector<opentelemetry::sdk::metrics::PointType>>&)>
continue_predicate) {
absl::flat_hash_map<std::string,
std::vector<opentelemetry::sdk::metrics::PointType>>
data;
auto deadline = absl::Now() + absl::Seconds(5);
do {
reader_->Collect([&](opentelemetry::sdk::metrics::ResourceMetrics& rm) {
for (const opentelemetry::sdk::metrics::ScopeMetrics& smd :
rm.scope_metric_data_) {
for (const opentelemetry::sdk::metrics::MetricData& md :
smd.metric_data_) {
for (const opentelemetry::sdk::metrics::PointDataAttributes& dp :
md.point_data_attr_) {
data[md.instrument_descriptor.name_].push_back(dp.point_data);
}
}
}
return true;
});
} while (continue_predicate(data) && deadline > absl::Now());
return data;
}
std::shared_ptr<opentelemetry::sdk::metrics::MetricReader> reader_;
std::string server_address_;
CallbackTestServiceImpl service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
TEST_F(OTelPluginEnd2EndTest, ClientAttemptStarted) {
SendRPC();
const char* kMetricName = "grpc.client.attempt.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
auto client_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(client_started_value, nullptr);
ASSERT_EQ(*client_started_value, 1);
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptDuration) {
SendRPC();
const char* kMetricName = "grpc.client.attempt.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptSentTotalCompressedMessageSize) {
SendRPC();
const char* kMetricName =
"grpc.client.attempt.sent_total_compressed_message_size";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
}
TEST_F(OTelPluginEnd2EndTest, ClientAttemptRcvdTotalCompressedMessageSize) {
SendRPC();
const char* kMetricName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
}
TEST_F(OTelPluginEnd2EndTest, ServerCallStarted) {
SendRPC();
const char* kMetricName = "grpc.server.call.started";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data = absl::get_if<opentelemetry::sdk::metrics::SumPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
auto server_started_value = absl::get_if<int64_t>(&point_data->value_);
ASSERT_NE(server_started_value, nullptr);
ASSERT_EQ(*server_started_value, 1);
}
TEST_F(OTelPluginEnd2EndTest, ServerCallDuration) {
SendRPC();
const char* kMetricName = "grpc.server.call.duration";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
}
TEST_F(OTelPluginEnd2EndTest, ServerCallSentTotalCompressedMessageSize) {
SendRPC();
const char* kMetricName =
"grpc.server.call.sent_total_compressed_message_size";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
EXPECT_EQ(point_data->count_, 1);
}
TEST_F(OTelPluginEnd2EndTest, ServerCallRcvdTotalCompressedMessageSize) {
SendRPC();
const char* kMetricName =
"grpc.server.call.rcvd_total_compressed_message_size";
auto data = ReadCurrentMetricsData(
[&](const absl::flat_hash_map<
std::string, std::vector<opentelemetry::sdk::metrics::PointType>>&
data) { return data.size() != 8; });
ASSERT_EQ(data[kMetricName].size(), 1);
auto point_data =
absl::get_if<opentelemetry::sdk::metrics::HistogramPointData>(
&data[kMetricName][0]);
ASSERT_NE(point_data, nullptr);
ASSERT_EQ(point_data->count_, 1);
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(&argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -157,3 +157,6 @@ try-import %workspace%/tools/fuzztest.bazelrc
build:fuzztest --cxxopt=-std=c++17
build:fuzztest_test --cxxopt=-std=c++17
# OpenTelemetry C++ needs this option to build with absl
build --@io_opentelemetry_cpp//api:with_abseil

@ -740,7 +740,7 @@ def _exclude_unwanted_cc_tests(tests: List[str]) -> List[str]:
tests = [
test
for test in tests
if not test.startswith("test/cpp/ext/filters/otel:")
if not test.startswith("test/cpp/ext/otel:")
]
# missing opencensus/stats/stats.h

@ -93,7 +93,13 @@ EXTERNAL_DEPS = {
"absl/types/variant.h": "absl/types:variant",
"absl/utility/utility.h": "absl/utility",
"address_sorting/address_sorting.h": "address_sorting",
"api/include/opentelemetry/metrics/provider.h": "otel/api",
"opentelemetry/context/context.h": "otel/api",
"opentelemetry/metrics/meter.h": "otel/api",
"opentelemetry/metrics/meter_provider.h": "otel/api",
"opentelemetry/metrics/provider.h": "otel/api",
"opentelemetry/metrics/sync_instruments.h": "otel/api",
"opentelemetry/nostd/shared_ptr.h": "otel/api",
"opentelemetry/nostd/unique_ptr.h": "otel/api",
"sdk/include/opentelemetry/sdk/metrics/meter_provider.h": "otel/sdk/src/metrics",
"ares.h": "cares",
"fuzztest/fuzztest.h": ["fuzztest", "fuzztest_main"],
@ -373,6 +379,7 @@ for dirname in [
"",
"src/core",
"src/cpp/ext/gcp",
"src/cpp/ext/otel",
"test/core/backoff",
"test/core/uri",
"test/core/util",
@ -382,7 +389,6 @@ for dirname in [
"test/core/promise",
"test/core/resource_quota",
"test/core/transport/chaotic_good",
"test/cpp/ext/filters/otel",
"fuzztest",
"fuzztest/core/channel",
]:

@ -32,6 +32,7 @@ tools/distrib/gen_compilation_database.py \
--dedup_targets \
"//:*" \
"//src/core/..." \
"//src/cpp/ext/otel/..." \
"//src/compiler/..." \
"//test/core/..." \
"//test/cpp/..." \

Loading…
Cancel
Save