commit
93c85a636c
61 changed files with 2353 additions and 848 deletions
@ -0,0 +1,108 @@ |
|||||||
|
//
|
||||||
|
//
|
||||||
|
// Copyright 2023 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPCPP_EXT_OTEL_PLUGIN_H |
||||||
|
#define GRPCPP_EXT_OTEL_PLUGIN_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <stddef.h> |
||||||
|
#include <stdint.h> |
||||||
|
|
||||||
|
#include <memory> |
||||||
|
|
||||||
|
#include "absl/functional/any_invocable.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
#include "opentelemetry/metrics/meter_provider.h" |
||||||
|
|
||||||
|
namespace grpc { |
||||||
|
|
||||||
|
namespace internal { |
||||||
|
class OpenTelemetryPluginBuilderImpl; |
||||||
|
} // namespace internal
|
||||||
|
|
||||||
|
namespace experimental { |
||||||
|
|
||||||
|
/// The most common way to use this API is -
|
||||||
|
///
|
||||||
|
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
|
||||||
|
///
|
||||||
|
/// The set of instruments available are -
|
||||||
|
/// grpc.client.attempt.started
|
||||||
|
/// grpc.client.attempt.duration
|
||||||
|
/// grpc.client.attempt.sent_total_compressed_message_size
|
||||||
|
/// grpc.client.attempt.rcvd_total_compressed_message_size
|
||||||
|
/// grpc.server.call.started
|
||||||
|
/// grpc.server.call.duration
|
||||||
|
/// grpc.server.call.sent_total_compressed_message_size
|
||||||
|
/// grpc.server.call.rcvd_total_compressed_message_size
|
||||||
|
class OpenTelemetryPluginBuilder { |
||||||
|
public: |
||||||
|
/// Metrics
|
||||||
|
static constexpr absl::string_view kClientAttemptStartedInstrumentName = |
||||||
|
"grpc.client.attempt.started"; |
||||||
|
static constexpr absl::string_view kClientAttemptDurationInstrumentName = |
||||||
|
"grpc.client.attempt.duration"; |
||||||
|
static constexpr absl::string_view |
||||||
|
kClientAttemptSentTotalCompressedMessageSizeInstrumentName = |
||||||
|
"grpc.client.attempt.sent_total_compressed_message_size"; |
||||||
|
static constexpr absl::string_view |
||||||
|
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName = |
||||||
|
"grpc.client.attempt.rcvd_total_compressed_message_size"; |
||||||
|
static constexpr absl::string_view kServerCallStartedInstrumentName = |
||||||
|
"grpc.server.call.started"; |
||||||
|
static constexpr absl::string_view kServerCallDurationInstrumentName = |
||||||
|
"grpc.server.call.duration"; |
||||||
|
static constexpr absl::string_view |
||||||
|
kServerCallSentTotalCompressedMessageSizeInstrumentName = |
||||||
|
"grpc.server.call.sent_total_compressed_message_size"; |
||||||
|
static constexpr absl::string_view |
||||||
|
kServerCallRcvdTotalCompressedMessageSizeInstrumentName = |
||||||
|
"grpc.server.call.rcvd_total_compressed_message_size"; |
||||||
|
|
||||||
|
OpenTelemetryPluginBuilder(); |
||||||
|
/// If `SetMeterProvider()` is not called, no metrics are collected.
|
||||||
|
OpenTelemetryPluginBuilder& SetMeterProvider( |
||||||
|
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider); |
||||||
|
/// If set, \a target_attribute_filter is called per channel to decide whether
|
||||||
|
/// to record the target attribute on client or to replace it with "other".
|
||||||
|
/// This helps reduce the cardinality on metrics in cases where many channels
|
||||||
|
/// are created with different targets in the same binary (which might happen
|
||||||
|
/// for example, if the channel target string uses IP addresses directly).
|
||||||
|
OpenTelemetryPluginBuilder& SetTargetAttributeFilter( |
||||||
|
absl::AnyInvocable<bool(absl::string_view /*target*/) const> |
||||||
|
target_attribute_filter); |
||||||
|
/// If set, \a generic_method_attribute_filter is called per call with a
|
||||||
|
/// generic method type to decide whether to record the method name or to
|
||||||
|
/// replace it with "other". Non-generic or pre-registered methods remain
|
||||||
|
/// unaffected. If not set, by default, generic method names are replaced with
|
||||||
|
/// "other" when recording metrics.
|
||||||
|
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter( |
||||||
|
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const> |
||||||
|
generic_method_attribute_filter); |
||||||
|
/// Registers a global plugin that acts on all channels and servers running on
|
||||||
|
/// the process.
|
||||||
|
void BuildAndRegisterGlobal(); |
||||||
|
|
||||||
|
private: |
||||||
|
std::unique_ptr<internal::OpenTelemetryPluginBuilderImpl> impl_; |
||||||
|
}; |
||||||
|
} // namespace experimental
|
||||||
|
} // namespace grpc
|
||||||
|
|
||||||
|
#endif // GRPCPP_EXT_OTEL_PLUGIN_H
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,94 @@ |
|||||||
|
// Copyright 2023 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H |
||||||
|
#define GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/iomgr/exec_ctx.h" |
||||||
|
#include "src/core/lib/promise/activity.h" |
||||||
|
#include "src/core/lib/surface/completion_queue.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
// Defines a promise that calls grpc_cq_end_op() (on first poll) and then waits
|
||||||
|
// for the callback supplied to grpc_cq_end_op() to be called, before resolving
|
||||||
|
// to Empty{}
|
||||||
|
class WaitForCqEndOp { |
||||||
|
public: |
||||||
|
WaitForCqEndOp(bool is_closure, void* tag, grpc_error_handle error, |
||||||
|
grpc_completion_queue* cq) |
||||||
|
: state_{NotStarted{is_closure, tag, std::move(error), cq}} {} |
||||||
|
|
||||||
|
Poll<Empty> operator()() { |
||||||
|
if (auto* n = absl::get_if<NotStarted>(&state_)) { |
||||||
|
if (n->is_closure) { |
||||||
|
ExecCtx::Run(DEBUG_LOCATION, static_cast<grpc_closure*>(n->tag), |
||||||
|
std::move(n->error)); |
||||||
|
return Empty{}; |
||||||
|
} else { |
||||||
|
auto not_started = std::move(*n); |
||||||
|
auto& started = |
||||||
|
state_.emplace<Started>(Activity::current()->MakeOwningWaker()); |
||||||
|
grpc_cq_end_op( |
||||||
|
not_started.cq, not_started.tag, std::move(not_started.error), |
||||||
|
[](void* p, grpc_cq_completion*) { |
||||||
|
auto started = static_cast<Started*>(p); |
||||||
|
started->done.store(true, std::memory_order_release); |
||||||
|
}, |
||||||
|
&started, &started.completion); |
||||||
|
} |
||||||
|
} |
||||||
|
auto& started = absl::get<Started>(state_); |
||||||
|
if (started.done.load(std::memory_order_acquire)) { |
||||||
|
return Empty{}; |
||||||
|
} else { |
||||||
|
return Pending{}; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
WaitForCqEndOp(const WaitForCqEndOp&) = delete; |
||||||
|
WaitForCqEndOp& operator=(const WaitForCqEndOp&) = delete; |
||||||
|
WaitForCqEndOp(WaitForCqEndOp&& other) noexcept |
||||||
|
: state_(std::move(absl::get<NotStarted>(other.state_))) { |
||||||
|
other.state_.emplace<Invalid>(); |
||||||
|
} |
||||||
|
WaitForCqEndOp& operator=(WaitForCqEndOp&& other) noexcept { |
||||||
|
state_ = std::move(absl::get<NotStarted>(other.state_)); |
||||||
|
other.state_.emplace<Invalid>(); |
||||||
|
return *this; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
struct NotStarted { |
||||||
|
bool is_closure; |
||||||
|
void* tag; |
||||||
|
grpc_error_handle error; |
||||||
|
grpc_completion_queue* cq; |
||||||
|
}; |
||||||
|
struct Started { |
||||||
|
explicit Started(Waker waker) : waker(std::move(waker)) {} |
||||||
|
Waker waker; |
||||||
|
grpc_cq_completion completion; |
||||||
|
std::atomic<bool> done{false}; |
||||||
|
}; |
||||||
|
struct Invalid {}; |
||||||
|
using State = absl::variant<NotStarted, Started, Invalid>; |
||||||
|
State state_{Invalid{}}; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_LIB_SURFACE_WAIT_FOR_CQ_END_OP_H
|
Loading…
Reference in new issue