mirror of https://github.com/grpc/grpc.git
Merge https://github.com/grpc/grpc into YUPYUPYUP
commit
68e0acd0a2
116 changed files with 2720 additions and 3034 deletions
File diff suppressed because it is too large
Load Diff
@ -1,328 +0,0 @@ |
|||||||
// Copyright 2022 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/http/message_compress/legacy_compression_filter.h" |
|
||||||
|
|
||||||
#include <inttypes.h> |
|
||||||
|
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/strings/str_cat.h" |
|
||||||
#include "absl/strings/str_format.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/compression.h> |
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/impl/channel_arg_names.h> |
|
||||||
#include <grpc/impl/compression_types.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
|
|
||||||
#include "src/core/ext/filters/message_size/message_size_filter.h" |
|
||||||
#include "src/core/lib/channel/call_tracer.h" |
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_stack.h" |
|
||||||
#include "src/core/lib/channel/context.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/compression/compression_internal.h" |
|
||||||
#include "src/core/lib/compression/message_compress.h" |
|
||||||
#include "src/core/lib/debug/trace.h" |
|
||||||
#include "src/core/lib/promise/activity.h" |
|
||||||
#include "src/core/lib/promise/context.h" |
|
||||||
#include "src/core/lib/promise/latch.h" |
|
||||||
#include "src/core/lib/promise/pipe.h" |
|
||||||
#include "src/core/lib/promise/prioritized_race.h" |
|
||||||
#include "src/core/lib/resource_quota/arena.h" |
|
||||||
#include "src/core/lib/slice/slice_buffer.h" |
|
||||||
#include "src/core/lib/surface/call.h" |
|
||||||
#include "src/core/lib/surface/call_trace.h" |
|
||||||
#include "src/core/lib/transport/metadata_batch.h" |
|
||||||
#include "src/core/lib/transport/transport.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
const grpc_channel_filter LegacyClientCompressionFilter::kFilter = |
|
||||||
MakePromiseBasedFilter< |
|
||||||
LegacyClientCompressionFilter, FilterEndpoint::kClient, |
|
||||||
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages | |
|
||||||
kFilterExaminesOutboundMessages>("compression"); |
|
||||||
const grpc_channel_filter LegacyServerCompressionFilter::kFilter = |
|
||||||
MakePromiseBasedFilter< |
|
||||||
LegacyServerCompressionFilter, FilterEndpoint::kServer, |
|
||||||
kFilterExaminesServerInitialMetadata | kFilterExaminesInboundMessages | |
|
||||||
kFilterExaminesOutboundMessages>("compression"); |
|
||||||
|
|
||||||
absl::StatusOr<LegacyClientCompressionFilter> |
|
||||||
LegacyClientCompressionFilter::Create(const ChannelArgs& args, |
|
||||||
ChannelFilter::Args) { |
|
||||||
return LegacyClientCompressionFilter(args); |
|
||||||
} |
|
||||||
|
|
||||||
absl::StatusOr<LegacyServerCompressionFilter> |
|
||||||
LegacyServerCompressionFilter::Create(const ChannelArgs& args, |
|
||||||
ChannelFilter::Args) { |
|
||||||
return LegacyServerCompressionFilter(args); |
|
||||||
} |
|
||||||
|
|
||||||
LegacyCompressionFilter::LegacyCompressionFilter(const ChannelArgs& args) |
|
||||||
: max_recv_size_(GetMaxRecvSizeFromChannelArgs(args)), |
|
||||||
message_size_service_config_parser_index_( |
|
||||||
MessageSizeParser::ParserIndex()), |
|
||||||
default_compression_algorithm_( |
|
||||||
DefaultCompressionAlgorithmFromChannelArgs(args).value_or( |
|
||||||
GRPC_COMPRESS_NONE)), |
|
||||||
enabled_compression_algorithms_( |
|
||||||
CompressionAlgorithmSet::FromChannelArgs(args)), |
|
||||||
enable_compression_( |
|
||||||
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION).value_or(true)), |
|
||||||
enable_decompression_( |
|
||||||
args.GetBool(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION) |
|
||||||
.value_or(true)) { |
|
||||||
// Make sure the default is enabled.
|
|
||||||
if (!enabled_compression_algorithms_.IsSet(default_compression_algorithm_)) { |
|
||||||
const char* name; |
|
||||||
if (!grpc_compression_algorithm_name(default_compression_algorithm_, |
|
||||||
&name)) { |
|
||||||
name = "<unknown>"; |
|
||||||
} |
|
||||||
gpr_log(GPR_ERROR, |
|
||||||
"default compression algorithm %s not enabled: switching to none", |
|
||||||
name); |
|
||||||
default_compression_algorithm_ = GRPC_COMPRESS_NONE; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
MessageHandle LegacyCompressionFilter::CompressMessage( |
|
||||||
MessageHandle message, grpc_compression_algorithm algorithm) const { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { |
|
||||||
gpr_log(GPR_INFO, "CompressMessage: len=%" PRIdPTR " alg=%d flags=%d", |
|
||||||
message->payload()->Length(), algorithm, message->flags()); |
|
||||||
} |
|
||||||
auto* call_context = GetContext<grpc_call_context_element>(); |
|
||||||
auto* call_tracer = static_cast<CallTracerInterface*>( |
|
||||||
call_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
||||||
if (call_tracer != nullptr) { |
|
||||||
call_tracer->RecordSendMessage(*message->payload()); |
|
||||||
} |
|
||||||
// Check if we're allowed to compress this message
|
|
||||||
// (apps might want to disable compression for certain messages to avoid
|
|
||||||
// crime/beast like vulns).
|
|
||||||
uint32_t& flags = message->mutable_flags(); |
|
||||||
if (algorithm == GRPC_COMPRESS_NONE || !enable_compression_ || |
|
||||||
(flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS))) { |
|
||||||
return message; |
|
||||||
} |
|
||||||
// Try to compress the payload.
|
|
||||||
SliceBuffer tmp; |
|
||||||
SliceBuffer* payload = message->payload(); |
|
||||||
bool did_compress = grpc_msg_compress(algorithm, payload->c_slice_buffer(), |
|
||||||
tmp.c_slice_buffer()); |
|
||||||
// If we achieved compression send it as compressed, otherwise send it as (to
|
|
||||||
// avoid spending cycles on the receiver decompressing).
|
|
||||||
if (did_compress) { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { |
|
||||||
const char* algo_name; |
|
||||||
const size_t before_size = payload->Length(); |
|
||||||
const size_t after_size = tmp.Length(); |
|
||||||
const float savings_ratio = 1.0f - static_cast<float>(after_size) / |
|
||||||
static_cast<float>(before_size); |
|
||||||
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); |
|
||||||
gpr_log(GPR_INFO, |
|
||||||
"Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR |
|
||||||
" bytes (%.2f%% savings)", |
|
||||||
algo_name, before_size, after_size, 100 * savings_ratio); |
|
||||||
} |
|
||||||
tmp.Swap(payload); |
|
||||||
flags |= GRPC_WRITE_INTERNAL_COMPRESS; |
|
||||||
if (call_tracer != nullptr) { |
|
||||||
call_tracer->RecordSendCompressedMessage(*message->payload()); |
|
||||||
} |
|
||||||
} else { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { |
|
||||||
const char* algo_name; |
|
||||||
GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algo_name)); |
|
||||||
gpr_log(GPR_INFO, |
|
||||||
"Algorithm '%s' enabled but decided not to compress. Input size: " |
|
||||||
"%" PRIuPTR, |
|
||||||
algo_name, payload->Length()); |
|
||||||
} |
|
||||||
} |
|
||||||
return message; |
|
||||||
} |
|
||||||
|
|
||||||
absl::StatusOr<MessageHandle> LegacyCompressionFilter::DecompressMessage( |
|
||||||
bool is_client, MessageHandle message, DecompressArgs args) const { |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { |
|
||||||
gpr_log(GPR_INFO, "DecompressMessage: len=%" PRIdPTR " max=%d alg=%d", |
|
||||||
message->payload()->Length(), |
|
||||||
args.max_recv_message_length.value_or(-1), args.algorithm); |
|
||||||
} |
|
||||||
auto* call_context = GetContext<grpc_call_context_element>(); |
|
||||||
auto* call_tracer = static_cast<CallTracerInterface*>( |
|
||||||
call_context[GRPC_CONTEXT_CALL_TRACER].value); |
|
||||||
if (call_tracer != nullptr) { |
|
||||||
call_tracer->RecordReceivedMessage(*message->payload()); |
|
||||||
} |
|
||||||
// Check max message length.
|
|
||||||
if (args.max_recv_message_length.has_value() && |
|
||||||
message->payload()->Length() > |
|
||||||
static_cast<size_t>(*args.max_recv_message_length)) { |
|
||||||
return absl::ResourceExhaustedError(absl::StrFormat( |
|
||||||
"%s: Received message larger than max (%u vs. %d)", |
|
||||||
is_client ? "CLIENT" : "SERVER", message->payload()->Length(), |
|
||||||
*args.max_recv_message_length)); |
|
||||||
} |
|
||||||
// Check if decompression is enabled (if not, we can just pass the message
|
|
||||||
// up).
|
|
||||||
if (!enable_decompression_ || |
|
||||||
(message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == 0) { |
|
||||||
return std::move(message); |
|
||||||
} |
|
||||||
// Try to decompress the payload.
|
|
||||||
SliceBuffer decompressed_slices; |
|
||||||
if (grpc_msg_decompress(args.algorithm, message->payload()->c_slice_buffer(), |
|
||||||
decompressed_slices.c_slice_buffer()) == 0) { |
|
||||||
return absl::InternalError( |
|
||||||
absl::StrCat("Unexpected error decompressing data for algorithm ", |
|
||||||
CompressionAlgorithmAsString(args.algorithm))); |
|
||||||
} |
|
||||||
// Swap the decompressed slices into the message.
|
|
||||||
message->payload()->Swap(&decompressed_slices); |
|
||||||
message->mutable_flags() &= ~GRPC_WRITE_INTERNAL_COMPRESS; |
|
||||||
message->mutable_flags() |= GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; |
|
||||||
if (call_tracer != nullptr) { |
|
||||||
call_tracer->RecordReceivedDecompressedMessage(*message->payload()); |
|
||||||
} |
|
||||||
return std::move(message); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_compression_algorithm LegacyCompressionFilter::HandleOutgoingMetadata( |
|
||||||
grpc_metadata_batch& outgoing_metadata) { |
|
||||||
const auto algorithm = outgoing_metadata.Take(GrpcInternalEncodingRequest()) |
|
||||||
.value_or(default_compression_algorithm()); |
|
||||||
// Convey supported compression algorithms.
|
|
||||||
outgoing_metadata.Set(GrpcAcceptEncodingMetadata(), |
|
||||||
enabled_compression_algorithms()); |
|
||||||
if (algorithm != GRPC_COMPRESS_NONE) { |
|
||||||
outgoing_metadata.Set(GrpcEncodingMetadata(), algorithm); |
|
||||||
} |
|
||||||
return algorithm; |
|
||||||
} |
|
||||||
|
|
||||||
LegacyCompressionFilter::DecompressArgs |
|
||||||
LegacyCompressionFilter::HandleIncomingMetadata( |
|
||||||
const grpc_metadata_batch& incoming_metadata) { |
|
||||||
// Configure max receive size.
|
|
||||||
auto max_recv_message_length = max_recv_size_; |
|
||||||
const MessageSizeParsedConfig* limits = |
|
||||||
MessageSizeParsedConfig::GetFromCallContext( |
|
||||||
GetContext<grpc_call_context_element>(), |
|
||||||
message_size_service_config_parser_index_); |
|
||||||
if (limits != nullptr && limits->max_recv_size().has_value() && |
|
||||||
(!max_recv_message_length.has_value() || |
|
||||||
*limits->max_recv_size() < *max_recv_message_length)) { |
|
||||||
max_recv_message_length = *limits->max_recv_size(); |
|
||||||
} |
|
||||||
return DecompressArgs{incoming_metadata.get(GrpcEncodingMetadata()) |
|
||||||
.value_or(GRPC_COMPRESS_NONE), |
|
||||||
max_recv_message_length}; |
|
||||||
} |
|
||||||
|
|
||||||
ArenaPromise<ServerMetadataHandle> |
|
||||||
LegacyClientCompressionFilter::MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
|
||||||
auto compression_algorithm = |
|
||||||
HandleOutgoingMetadata(*call_args.client_initial_metadata); |
|
||||||
call_args.client_to_server_messages->InterceptAndMap( |
|
||||||
[compression_algorithm, |
|
||||||
this](MessageHandle message) -> absl::optional<MessageHandle> { |
|
||||||
return CompressMessage(std::move(message), compression_algorithm); |
|
||||||
}); |
|
||||||
auto* decompress_args = GetContext<Arena>()->New<DecompressArgs>( |
|
||||||
DecompressArgs{GRPC_COMPRESS_ALGORITHMS_COUNT, absl::nullopt}); |
|
||||||
auto* decompress_err = |
|
||||||
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>(); |
|
||||||
call_args.server_initial_metadata->InterceptAndMap( |
|
||||||
[decompress_args, this](ServerMetadataHandle server_initial_metadata) |
|
||||||
-> absl::optional<ServerMetadataHandle> { |
|
||||||
if (server_initial_metadata == nullptr) return absl::nullopt; |
|
||||||
*decompress_args = HandleIncomingMetadata(*server_initial_metadata); |
|
||||||
return std::move(server_initial_metadata); |
|
||||||
}); |
|
||||||
call_args.server_to_client_messages->InterceptAndMap( |
|
||||||
[decompress_err, decompress_args, |
|
||||||
this](MessageHandle message) -> absl::optional<MessageHandle> { |
|
||||||
auto r = DecompressMessage(/*is_client=*/true, std::move(message), |
|
||||||
*decompress_args); |
|
||||||
if (!r.ok()) { |
|
||||||
decompress_err->Set(ServerMetadataFromStatus(r.status())); |
|
||||||
return absl::nullopt; |
|
||||||
} |
|
||||||
return std::move(*r); |
|
||||||
}); |
|
||||||
// Run the next filter, and race it with getting an error from decompression.
|
|
||||||
return PrioritizedRace(decompress_err->Wait(), |
|
||||||
next_promise_factory(std::move(call_args))); |
|
||||||
} |
|
||||||
|
|
||||||
ArenaPromise<ServerMetadataHandle> |
|
||||||
LegacyServerCompressionFilter::MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
|
||||||
auto decompress_args = |
|
||||||
HandleIncomingMetadata(*call_args.client_initial_metadata); |
|
||||||
auto* decompress_err = |
|
||||||
GetContext<Arena>()->New<Latch<ServerMetadataHandle>>(); |
|
||||||
call_args.client_to_server_messages->InterceptAndMap( |
|
||||||
[decompress_err, decompress_args, |
|
||||||
this](MessageHandle message) -> absl::optional<MessageHandle> { |
|
||||||
auto r = DecompressMessage(/*is_client=*/false, std::move(message), |
|
||||||
decompress_args); |
|
||||||
if (grpc_call_trace.enabled()) { |
|
||||||
gpr_log(GPR_DEBUG, "%s[compression] DecompressMessage returned %s", |
|
||||||
GetContext<Activity>()->DebugTag().c_str(), |
|
||||||
r.status().ToString().c_str()); |
|
||||||
} |
|
||||||
if (!r.ok()) { |
|
||||||
decompress_err->Set(ServerMetadataFromStatus(r.status())); |
|
||||||
return absl::nullopt; |
|
||||||
} |
|
||||||
return std::move(*r); |
|
||||||
}); |
|
||||||
auto* compression_algorithm = |
|
||||||
GetContext<Arena>()->New<grpc_compression_algorithm>(); |
|
||||||
call_args.server_initial_metadata->InterceptAndMap( |
|
||||||
[this, compression_algorithm](ServerMetadataHandle md) { |
|
||||||
if (grpc_call_trace.enabled()) { |
|
||||||
gpr_log(GPR_INFO, "%s[compression] Write metadata", |
|
||||||
GetContext<Activity>()->DebugTag().c_str()); |
|
||||||
} |
|
||||||
// Find the compression algorithm.
|
|
||||||
*compression_algorithm = HandleOutgoingMetadata(*md); |
|
||||||
return md; |
|
||||||
}); |
|
||||||
call_args.server_to_client_messages->InterceptAndMap( |
|
||||||
[compression_algorithm, |
|
||||||
this](MessageHandle message) -> absl::optional<MessageHandle> { |
|
||||||
return CompressMessage(std::move(message), *compression_algorithm); |
|
||||||
}); |
|
||||||
// Run the next filter, and race it with getting an error from decompression.
|
|
||||||
return PrioritizedRace(decompress_err->Wait(), |
|
||||||
next_promise_factory(std::move(call_args))); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
@ -1,140 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2020 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H |
|
||||||
#define GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include "absl/status/statusor.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
|
|
||||||
#include <grpc/impl/compression_types.h> |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_fwd.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/compression/compression_internal.h" |
|
||||||
#include "src/core/lib/promise/arena_promise.h" |
|
||||||
#include "src/core/lib/transport/metadata_batch.h" |
|
||||||
#include "src/core/lib/transport/transport.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
/// Compression filter for messages.
|
|
||||||
///
|
|
||||||
/// See <grpc/compression.h> for the available compression settings.
|
|
||||||
///
|
|
||||||
/// Compression settings may come from:
|
|
||||||
/// - Channel configuration, as established at channel creation time.
|
|
||||||
/// - The metadata accompanying the outgoing data to be compressed. This is
|
|
||||||
/// taken as a request only. We may choose not to honor it. The metadata key
|
|
||||||
/// is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
|
|
||||||
///
|
|
||||||
/// Compression can be disabled for concrete messages (for instance in order to
|
|
||||||
/// prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set
|
|
||||||
/// in the MessageHandle flags.
|
|
||||||
///
|
|
||||||
/// The attempted compression mechanism is added to the resulting initial
|
|
||||||
/// metadata under the 'grpc-encoding' key.
|
|
||||||
///
|
|
||||||
/// If compression is actually performed, the MessageHandle's flag is modified
|
|
||||||
/// to incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of
|
|
||||||
/// the aforementioned 'grpc-encoding' metadata value, data will pass through
|
|
||||||
/// uncompressed.
|
|
||||||
|
|
||||||
class LegacyCompressionFilter : public ChannelFilter { |
|
||||||
protected: |
|
||||||
struct DecompressArgs { |
|
||||||
grpc_compression_algorithm algorithm; |
|
||||||
absl::optional<uint32_t> max_recv_message_length; |
|
||||||
}; |
|
||||||
|
|
||||||
explicit LegacyCompressionFilter(const ChannelArgs& args); |
|
||||||
|
|
||||||
grpc_compression_algorithm default_compression_algorithm() const { |
|
||||||
return default_compression_algorithm_; |
|
||||||
} |
|
||||||
|
|
||||||
CompressionAlgorithmSet enabled_compression_algorithms() const { |
|
||||||
return enabled_compression_algorithms_; |
|
||||||
} |
|
||||||
|
|
||||||
grpc_compression_algorithm HandleOutgoingMetadata( |
|
||||||
grpc_metadata_batch& outgoing_metadata); |
|
||||||
DecompressArgs HandleIncomingMetadata( |
|
||||||
const grpc_metadata_batch& incoming_metadata); |
|
||||||
|
|
||||||
// Compress one message synchronously.
|
|
||||||
MessageHandle CompressMessage(MessageHandle message, |
|
||||||
grpc_compression_algorithm algorithm) const; |
|
||||||
// Decompress one message synchronously.
|
|
||||||
absl::StatusOr<MessageHandle> DecompressMessage(bool is_client, |
|
||||||
MessageHandle message, |
|
||||||
DecompressArgs args) const; |
|
||||||
|
|
||||||
private: |
|
||||||
// Max receive message length, if set.
|
|
||||||
absl::optional<uint32_t> max_recv_size_; |
|
||||||
size_t message_size_service_config_parser_index_; |
|
||||||
// The default, channel-level, compression algorithm.
|
|
||||||
grpc_compression_algorithm default_compression_algorithm_; |
|
||||||
// Enabled compression algorithms.
|
|
||||||
CompressionAlgorithmSet enabled_compression_algorithms_; |
|
||||||
// Is compression enabled?
|
|
||||||
bool enable_compression_; |
|
||||||
// Is decompression enabled?
|
|
||||||
bool enable_decompression_; |
|
||||||
}; |
|
||||||
|
|
||||||
class LegacyClientCompressionFilter final : public LegacyCompressionFilter { |
|
||||||
public: |
|
||||||
static const grpc_channel_filter kFilter; |
|
||||||
|
|
||||||
static absl::StatusOr<LegacyClientCompressionFilter> Create( |
|
||||||
const ChannelArgs& args, ChannelFilter::Args filter_args); |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
|
||||||
|
|
||||||
private: |
|
||||||
using LegacyCompressionFilter::LegacyCompressionFilter; |
|
||||||
}; |
|
||||||
|
|
||||||
class LegacyServerCompressionFilter final : public LegacyCompressionFilter { |
|
||||||
public: |
|
||||||
static const grpc_channel_filter kFilter; |
|
||||||
|
|
||||||
static absl::StatusOr<LegacyServerCompressionFilter> Create( |
|
||||||
const ChannelArgs& args, ChannelFilter::Args filter_args); |
|
||||||
|
|
||||||
// Construct a promise for one call.
|
|
||||||
ArenaPromise<ServerMetadataHandle> MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) override; |
|
||||||
|
|
||||||
private: |
|
||||||
using LegacyCompressionFilter::LegacyCompressionFilter; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
||||||
|
|
||||||
#endif // GRPC_SRC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_LEGACY_COMPRESSION_FILTER_H
|
|
@ -0,0 +1,28 @@ |
|||||||
|
// Copyright 2024 The gRPC Authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_FILTER_H |
||||||
|
#define GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_FILTER_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/config/core_configuration.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder); |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_LIB_CHANNEL_SERVER_CALL_TRACER_FILTER_H
|
@ -0,0 +1,31 @@ |
|||||||
|
// Copyright 2024 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_EVENT_ENGINE_CONTEXT_H |
||||||
|
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_EVENT_ENGINE_CONTEXT_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <grpc/event_engine/event_engine.h> |
||||||
|
|
||||||
|
#include "src/core/lib/promise/context.h" |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
template <> |
||||||
|
struct ContextType<grpc_event_engine::experimental::EventEngine> {}; |
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_EVENT_ENGINE_CONTEXT_H
|
@ -1,244 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <algorithm> |
|
||||||
#include <atomic> |
|
||||||
#include <cstddef> |
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/status/statusor.h" |
|
||||||
|
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/grpc_security.h> |
|
||||||
#include <grpc/status.h> |
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_fwd.h" |
|
||||||
#include "src/core/lib/channel/channel_stack.h" |
|
||||||
#include "src/core/lib/channel/context.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/debug/trace.h" |
|
||||||
#include "src/core/lib/gprpp/debug_location.h" |
|
||||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
||||||
#include "src/core/lib/gprpp/status_helper.h" |
|
||||||
#include "src/core/lib/iomgr/error.h" |
|
||||||
#include "src/core/lib/iomgr/exec_ctx.h" |
|
||||||
#include "src/core/lib/promise/activity.h" |
|
||||||
#include "src/core/lib/promise/arena_promise.h" |
|
||||||
#include "src/core/lib/promise/context.h" |
|
||||||
#include "src/core/lib/promise/poll.h" |
|
||||||
#include "src/core/lib/promise/try_seq.h" |
|
||||||
#include "src/core/lib/resource_quota/arena.h" |
|
||||||
#include "src/core/lib/security/context/security_context.h" |
|
||||||
#include "src/core/lib/security/credentials/credentials.h" |
|
||||||
#include "src/core/lib/security/transport/auth_filters.h" // IWYU pragma: keep |
|
||||||
#include "src/core/lib/slice/slice.h" |
|
||||||
#include "src/core/lib/slice/slice_internal.h" |
|
||||||
#include "src/core/lib/surface/call_trace.h" |
|
||||||
#include "src/core/lib/transport/metadata_batch.h" |
|
||||||
#include "src/core/lib/transport/transport.h" |
|
||||||
|
|
||||||
namespace grpc_core { |
|
||||||
|
|
||||||
const grpc_channel_filter LegacyServerAuthFilter::kFilter = |
|
||||||
MakePromiseBasedFilter<LegacyServerAuthFilter, FilterEndpoint::kServer>( |
|
||||||
"server-auth"); |
|
||||||
|
|
||||||
namespace { |
|
||||||
|
|
||||||
class ArrayEncoder { |
|
||||||
public: |
|
||||||
explicit ArrayEncoder(grpc_metadata_array* result) : result_(result) {} |
|
||||||
|
|
||||||
void Encode(const Slice& key, const Slice& value) { |
|
||||||
Append(key.Ref(), value.Ref()); |
|
||||||
} |
|
||||||
|
|
||||||
template <typename Which> |
|
||||||
void Encode(Which, const typename Which::ValueType& value) { |
|
||||||
Append(Slice(StaticSlice::FromStaticString(Which::key())), |
|
||||||
Slice(Which::Encode(value))); |
|
||||||
} |
|
||||||
|
|
||||||
void Encode(HttpMethodMetadata, |
|
||||||
const typename HttpMethodMetadata::ValueType&) {} |
|
||||||
|
|
||||||
private: |
|
||||||
void Append(Slice key, Slice value) { |
|
||||||
if (result_->count == result_->capacity) { |
|
||||||
result_->capacity = |
|
||||||
std::max(result_->capacity + 8, result_->capacity * 2); |
|
||||||
result_->metadata = static_cast<grpc_metadata*>(gpr_realloc( |
|
||||||
result_->metadata, result_->capacity * sizeof(grpc_metadata))); |
|
||||||
} |
|
||||||
auto* usr_md = &result_->metadata[result_->count++]; |
|
||||||
usr_md->key = key.TakeCSlice(); |
|
||||||
usr_md->value = value.TakeCSlice(); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_metadata_array* result_; |
|
||||||
}; |
|
||||||
|
|
||||||
// TODO(ctiller): seek out all users of this functionality and change API so
|
|
||||||
// that this unilateral format conversion IS NOT REQUIRED.
|
|
||||||
grpc_metadata_array MetadataBatchToMetadataArray( |
|
||||||
const grpc_metadata_batch* batch) { |
|
||||||
grpc_metadata_array result; |
|
||||||
grpc_metadata_array_init(&result); |
|
||||||
ArrayEncoder encoder(&result); |
|
||||||
batch->Encode(&encoder); |
|
||||||
return result; |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
class LegacyServerAuthFilter::RunApplicationCode { |
|
||||||
public: |
|
||||||
// TODO(ctiller): Allocate state_ into a pool on the arena to reuse this
|
|
||||||
// memory later
|
|
||||||
RunApplicationCode(LegacyServerAuthFilter* filter, CallArgs call_args) |
|
||||||
: state_(GetContext<Arena>()->ManagedNew<State>(std::move(call_args))) { |
|
||||||
if (grpc_call_trace.enabled()) { |
|
||||||
gpr_log(GPR_ERROR, |
|
||||||
"%s[server-auth]: Delegate to application: filter=%p this=%p " |
|
||||||
"auth_ctx=%p", |
|
||||||
GetContext<Activity>()->DebugTag().c_str(), filter, this, |
|
||||||
filter->auth_context_.get()); |
|
||||||
} |
|
||||||
filter->server_credentials_->auth_metadata_processor().process( |
|
||||||
filter->server_credentials_->auth_metadata_processor().state, |
|
||||||
filter->auth_context_.get(), state_->md.metadata, state_->md.count, |
|
||||||
OnMdProcessingDone, state_); |
|
||||||
} |
|
||||||
|
|
||||||
RunApplicationCode(const RunApplicationCode&) = delete; |
|
||||||
RunApplicationCode& operator=(const RunApplicationCode&) = delete; |
|
||||||
RunApplicationCode(RunApplicationCode&& other) noexcept |
|
||||||
: state_(std::exchange(other.state_, nullptr)) {} |
|
||||||
RunApplicationCode& operator=(RunApplicationCode&& other) noexcept { |
|
||||||
state_ = std::exchange(other.state_, nullptr); |
|
||||||
return *this; |
|
||||||
} |
|
||||||
|
|
||||||
Poll<absl::StatusOr<CallArgs>> operator()() { |
|
||||||
if (state_->done.load(std::memory_order_acquire)) { |
|
||||||
return Poll<absl::StatusOr<CallArgs>>(std::move(state_->call_args)); |
|
||||||
} |
|
||||||
return Pending{}; |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
struct State { |
|
||||||
explicit State(CallArgs call_args) : call_args(std::move(call_args)) {} |
|
||||||
Waker waker{GetContext<Activity>()->MakeOwningWaker()}; |
|
||||||
absl::StatusOr<CallArgs> call_args; |
|
||||||
grpc_metadata_array md = |
|
||||||
MetadataBatchToMetadataArray(call_args->client_initial_metadata.get()); |
|
||||||
std::atomic<bool> done{false}; |
|
||||||
}; |
|
||||||
|
|
||||||
// Called from application code.
|
|
||||||
static void OnMdProcessingDone( |
|
||||||
void* user_data, const grpc_metadata* consumed_md, size_t num_consumed_md, |
|
||||||
const grpc_metadata* response_md, size_t num_response_md, |
|
||||||
grpc_status_code status, const char* error_details) { |
|
||||||
ApplicationCallbackExecCtx callback_exec_ctx; |
|
||||||
ExecCtx exec_ctx; |
|
||||||
|
|
||||||
auto* state = static_cast<State*>(user_data); |
|
||||||
|
|
||||||
// TODO(ZhenLian): Implement support for response_md.
|
|
||||||
if (response_md != nullptr && num_response_md > 0) { |
|
||||||
gpr_log(GPR_ERROR, |
|
||||||
"response_md in auth metadata processing not supported for now. " |
|
||||||
"Ignoring..."); |
|
||||||
} |
|
||||||
|
|
||||||
if (status == GRPC_STATUS_OK) { |
|
||||||
ClientMetadataHandle& md = state->call_args->client_initial_metadata; |
|
||||||
for (size_t i = 0; i < num_consumed_md; i++) { |
|
||||||
md->Remove(StringViewFromSlice(consumed_md[i].key)); |
|
||||||
} |
|
||||||
} else { |
|
||||||
if (error_details == nullptr) { |
|
||||||
error_details = "Authentication metadata processing failed."; |
|
||||||
} |
|
||||||
state->call_args = grpc_error_set_int( |
|
||||||
absl::Status(static_cast<absl::StatusCode>(status), error_details), |
|
||||||
StatusIntProperty::kRpcStatus, status); |
|
||||||
} |
|
||||||
|
|
||||||
// Clean up.
|
|
||||||
for (size_t i = 0; i < state->md.count; i++) { |
|
||||||
CSliceUnref(state->md.metadata[i].key); |
|
||||||
CSliceUnref(state->md.metadata[i].value); |
|
||||||
} |
|
||||||
grpc_metadata_array_destroy(&state->md); |
|
||||||
|
|
||||||
auto waker = std::move(state->waker); |
|
||||||
state->done.store(true, std::memory_order_release); |
|
||||||
waker.Wakeup(); |
|
||||||
} |
|
||||||
|
|
||||||
State* state_; |
|
||||||
}; |
|
||||||
|
|
||||||
ArenaPromise<ServerMetadataHandle> LegacyServerAuthFilter::MakeCallPromise( |
|
||||||
CallArgs call_args, NextPromiseFactory next_promise_factory) { |
|
||||||
// Create server security context. Set its auth context from channel
|
|
||||||
// data and save it in the call context.
|
|
||||||
grpc_server_security_context* server_ctx = |
|
||||||
grpc_server_security_context_create(GetContext<Arena>()); |
|
||||||
server_ctx->auth_context = |
|
||||||
auth_context_->Ref(DEBUG_LOCATION, "server_auth_filter"); |
|
||||||
grpc_call_context_element& context = |
|
||||||
GetContext<grpc_call_context_element>()[GRPC_CONTEXT_SECURITY]; |
|
||||||
if (context.value != nullptr) context.destroy(context.value); |
|
||||||
context.value = server_ctx; |
|
||||||
context.destroy = grpc_server_security_context_destroy; |
|
||||||
|
|
||||||
if (server_credentials_ == nullptr || |
|
||||||
server_credentials_->auth_metadata_processor().process == nullptr) { |
|
||||||
return next_promise_factory(std::move(call_args)); |
|
||||||
} |
|
||||||
|
|
||||||
return TrySeq(RunApplicationCode(this, std::move(call_args)), |
|
||||||
std::move(next_promise_factory)); |
|
||||||
} |
|
||||||
|
|
||||||
LegacyServerAuthFilter::LegacyServerAuthFilter( |
|
||||||
RefCountedPtr<grpc_server_credentials> server_credentials, |
|
||||||
RefCountedPtr<grpc_auth_context> auth_context) |
|
||||||
: server_credentials_(server_credentials), auth_context_(auth_context) {} |
|
||||||
|
|
||||||
absl::StatusOr<LegacyServerAuthFilter> LegacyServerAuthFilter::Create( |
|
||||||
const ChannelArgs& args, ChannelFilter::Args) { |
|
||||||
auto auth_context = args.GetObjectRef<grpc_auth_context>(); |
|
||||||
GPR_ASSERT(auth_context != nullptr); |
|
||||||
auto creds = args.GetObjectRef<grpc_server_credentials>(); |
|
||||||
return LegacyServerAuthFilter(std::move(creds), std::move(auth_context)); |
|
||||||
} |
|
||||||
|
|
||||||
} // namespace grpc_core
|
|
@ -1,239 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/slice/b64.h" |
|
||||||
|
|
||||||
#include <stdint.h> |
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
|
|
||||||
#include "src/core/lib/gpr/useful.h" |
|
||||||
#include "src/core/lib/slice/slice.h" |
|
||||||
|
|
||||||
// --- Constants. ---
|
|
||||||
|
|
||||||
static const int8_t base64_bytes[] = { |
|
||||||
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, |
|
||||||
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, |
|
||||||
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, |
|
||||||
-1, -1, -1, -1, -1, -1, -1, 0x3E, -1, -1, -1, 0x3F, |
|
||||||
0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, -1, -1, |
|
||||||
-1, 0x7F, -1, -1, -1, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, |
|
||||||
0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, |
|
||||||
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, -1, -1, -1, -1, -1, |
|
||||||
-1, 0x1A, 0x1B, 0x1C, 0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x24, |
|
||||||
0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E, 0x2F, 0x30, |
|
||||||
0x31, 0x32, 0x33, -1, -1, -1, -1, -1}; |
|
||||||
|
|
||||||
static const char base64_url_unsafe_chars[] = |
|
||||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; |
|
||||||
static const char base64_url_safe_chars[] = |
|
||||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; |
|
||||||
|
|
||||||
#define GRPC_BASE64_PAD_CHAR '=' |
|
||||||
#define GRPC_BASE64_PAD_BYTE 0x7F |
|
||||||
#define GRPC_BASE64_MULTILINE_LINE_LEN 76 |
|
||||||
#define GRPC_BASE64_MULTILINE_NUM_BLOCKS (GRPC_BASE64_MULTILINE_LINE_LEN / 4) |
|
||||||
|
|
||||||
// --- base64 functions. ---
|
|
||||||
|
|
||||||
char* grpc_base64_encode(const void* vdata, size_t data_size, int url_safe, |
|
||||||
int multiline) { |
|
||||||
size_t result_projected_size = |
|
||||||
grpc_base64_estimate_encoded_size(data_size, multiline); |
|
||||||
char* result = static_cast<char*>(gpr_malloc(result_projected_size)); |
|
||||||
grpc_base64_encode_core(result, vdata, data_size, url_safe, multiline); |
|
||||||
return result; |
|
||||||
} |
|
||||||
|
|
||||||
size_t grpc_base64_estimate_encoded_size(size_t data_size, int multiline) { |
|
||||||
size_t result_projected_size = |
|
||||||
4 * ((data_size + 3) / 3) + |
|
||||||
2 * (multiline ? (data_size / (3 * GRPC_BASE64_MULTILINE_NUM_BLOCKS)) |
|
||||||
: 0) + |
|
||||||
1; |
|
||||||
return result_projected_size; |
|
||||||
} |
|
||||||
|
|
||||||
void grpc_base64_encode_core(char* result, const void* vdata, size_t data_size, |
|
||||||
int url_safe, int multiline) { |
|
||||||
const unsigned char* data = static_cast<const unsigned char*>(vdata); |
|
||||||
const char* base64_chars = |
|
||||||
url_safe ? base64_url_safe_chars : base64_url_unsafe_chars; |
|
||||||
const size_t result_projected_size = |
|
||||||
grpc_base64_estimate_encoded_size(data_size, multiline); |
|
||||||
|
|
||||||
char* current = result; |
|
||||||
size_t num_blocks = 0; |
|
||||||
size_t i = 0; |
|
||||||
|
|
||||||
// Encode each block.
|
|
||||||
while (data_size >= 3) { |
|
||||||
*current++ = base64_chars[(data[i] >> 2) & 0x3F]; |
|
||||||
*current++ = |
|
||||||
base64_chars[((data[i] & 0x03) << 4) | ((data[i + 1] >> 4) & 0x0F)]; |
|
||||||
*current++ = |
|
||||||
base64_chars[((data[i + 1] & 0x0F) << 2) | ((data[i + 2] >> 6) & 0x03)]; |
|
||||||
*current++ = base64_chars[data[i + 2] & 0x3F]; |
|
||||||
|
|
||||||
data_size -= 3; |
|
||||||
i += 3; |
|
||||||
if (multiline && (++num_blocks == GRPC_BASE64_MULTILINE_NUM_BLOCKS)) { |
|
||||||
*current++ = '\r'; |
|
||||||
*current++ = '\n'; |
|
||||||
num_blocks = 0; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Take care of the tail.
|
|
||||||
if (data_size == 2) { |
|
||||||
*current++ = base64_chars[(data[i] >> 2) & 0x3F]; |
|
||||||
*current++ = |
|
||||||
base64_chars[((data[i] & 0x03) << 4) | ((data[i + 1] >> 4) & 0x0F)]; |
|
||||||
*current++ = base64_chars[(data[i + 1] & 0x0F) << 2]; |
|
||||||
*current++ = GRPC_BASE64_PAD_CHAR; |
|
||||||
} else if (data_size == 1) { |
|
||||||
*current++ = base64_chars[(data[i] >> 2) & 0x3F]; |
|
||||||
*current++ = base64_chars[(data[i] & 0x03) << 4]; |
|
||||||
*current++ = GRPC_BASE64_PAD_CHAR; |
|
||||||
*current++ = GRPC_BASE64_PAD_CHAR; |
|
||||||
} |
|
||||||
|
|
||||||
GPR_ASSERT(current >= result); |
|
||||||
GPR_ASSERT((uintptr_t)(current - result) < result_projected_size); |
|
||||||
result[current - result] = '\0'; |
|
||||||
} |
|
||||||
|
|
||||||
grpc_slice grpc_base64_decode(const char* b64, int url_safe) { |
|
||||||
return grpc_base64_decode_with_len(b64, strlen(b64), url_safe); |
|
||||||
} |
|
||||||
|
|
||||||
static void decode_one_char(const unsigned char* codes, unsigned char* result, |
|
||||||
size_t* result_offset) { |
|
||||||
uint32_t packed = (static_cast<uint32_t>(codes[0]) << 2) | |
|
||||||
(static_cast<uint32_t>(codes[1]) >> 4); |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed); |
|
||||||
} |
|
||||||
|
|
||||||
static void decode_two_chars(const unsigned char* codes, unsigned char* result, |
|
||||||
size_t* result_offset) { |
|
||||||
uint32_t packed = (static_cast<uint32_t>(codes[0]) << 10) | |
|
||||||
(static_cast<uint32_t>(codes[1]) << 4) | |
|
||||||
(static_cast<uint32_t>(codes[2]) >> 2); |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed >> 8); |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed); |
|
||||||
} |
|
||||||
|
|
||||||
static int decode_group(const unsigned char* codes, size_t num_codes, |
|
||||||
unsigned char* result, size_t* result_offset) { |
|
||||||
GPR_ASSERT(num_codes <= 4); |
|
||||||
|
|
||||||
// Short end groups that may not have padding.
|
|
||||||
if (num_codes == 1) { |
|
||||||
gpr_log(GPR_ERROR, "Invalid group. Must be at least 2 bytes."); |
|
||||||
return 0; |
|
||||||
} |
|
||||||
if (num_codes == 2) { |
|
||||||
decode_one_char(codes, result, result_offset); |
|
||||||
return 1; |
|
||||||
} |
|
||||||
if (num_codes == 3) { |
|
||||||
decode_two_chars(codes, result, result_offset); |
|
||||||
return 1; |
|
||||||
} |
|
||||||
|
|
||||||
// Regular 4 byte groups with padding or not.
|
|
||||||
GPR_ASSERT(num_codes == 4); |
|
||||||
if (codes[0] == GRPC_BASE64_PAD_BYTE || codes[1] == GRPC_BASE64_PAD_BYTE) { |
|
||||||
gpr_log(GPR_ERROR, "Invalid padding detected."); |
|
||||||
return 0; |
|
||||||
} |
|
||||||
if (codes[2] == GRPC_BASE64_PAD_BYTE) { |
|
||||||
if (codes[3] == GRPC_BASE64_PAD_BYTE) { |
|
||||||
decode_one_char(codes, result, result_offset); |
|
||||||
} else { |
|
||||||
gpr_log(GPR_ERROR, "Invalid padding detected."); |
|
||||||
return 0; |
|
||||||
} |
|
||||||
} else if (codes[3] == GRPC_BASE64_PAD_BYTE) { |
|
||||||
decode_two_chars(codes, result, result_offset); |
|
||||||
} else { |
|
||||||
// No padding.
|
|
||||||
uint32_t packed = (static_cast<uint32_t>(codes[0]) << 18) | |
|
||||||
(static_cast<uint32_t>(codes[1]) << 12) | |
|
||||||
(static_cast<uint32_t>(codes[2]) << 6) | codes[3]; |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed >> 16); |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed >> 8); |
|
||||||
result[(*result_offset)++] = static_cast<unsigned char>(packed); |
|
||||||
} |
|
||||||
return 1; |
|
||||||
} |
|
||||||
|
|
||||||
grpc_slice grpc_base64_decode_with_len(const char* b64, size_t b64_len, |
|
||||||
int url_safe) { |
|
||||||
grpc_slice result = GRPC_SLICE_MALLOC(b64_len); |
|
||||||
unsigned char* current = GRPC_SLICE_START_PTR(result); |
|
||||||
size_t result_size = 0; |
|
||||||
unsigned char codes[4]; |
|
||||||
size_t num_codes = 0; |
|
||||||
|
|
||||||
while (b64_len--) { |
|
||||||
unsigned char c = static_cast<unsigned char>(*b64++); |
|
||||||
signed char code; |
|
||||||
if (c >= GPR_ARRAY_SIZE(base64_bytes)) continue; |
|
||||||
if (url_safe) { |
|
||||||
if (c == '+' || c == '/') { |
|
||||||
gpr_log(GPR_ERROR, "Invalid character for url safe base64 %c", c); |
|
||||||
goto fail; |
|
||||||
} |
|
||||||
if (c == '-') { |
|
||||||
c = '+'; |
|
||||||
} else if (c == '_') { |
|
||||||
c = '/'; |
|
||||||
} |
|
||||||
} |
|
||||||
code = base64_bytes[c]; |
|
||||||
if (code == -1) { |
|
||||||
if (c != '\r' && c != '\n') { |
|
||||||
gpr_log(GPR_ERROR, "Invalid character %c", c); |
|
||||||
goto fail; |
|
||||||
} |
|
||||||
} else { |
|
||||||
codes[num_codes++] = static_cast<unsigned char>(code); |
|
||||||
if (num_codes == 4) { |
|
||||||
if (!decode_group(codes, num_codes, current, &result_size)) goto fail; |
|
||||||
num_codes = 0; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
if (num_codes != 0 && |
|
||||||
!decode_group(codes, num_codes, current, &result_size)) { |
|
||||||
goto fail; |
|
||||||
} |
|
||||||
GRPC_SLICE_SET_LENGTH(result, result_size); |
|
||||||
return result; |
|
||||||
|
|
||||||
fail: |
|
||||||
grpc_core::CSliceUnref(result); |
|
||||||
return grpc_empty_slice(); |
|
||||||
} |
|
@ -1,52 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_SRC_CORE_LIB_SLICE_B64_H |
|
||||||
#define GRPC_SRC_CORE_LIB_SLICE_B64_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
|
|
||||||
#include <grpc/slice.h> |
|
||||||
|
|
||||||
// Encodes data using base64. It is the caller's responsibility to free
|
|
||||||
// the returned char * using gpr_free. Returns NULL on NULL input.
|
|
||||||
// TODO(makdharma) : change the flags to bool from int
|
|
||||||
char* grpc_base64_encode(const void* data, size_t data_size, int url_safe, |
|
||||||
int multiline); |
|
||||||
|
|
||||||
// estimate the upper bound on size of base64 encoded data. The actual size
|
|
||||||
// is guaranteed to be less than or equal to the size returned here.
|
|
||||||
size_t grpc_base64_estimate_encoded_size(size_t data_size, int multiline); |
|
||||||
|
|
||||||
// Encodes data using base64 and write it to memory pointed to by result. It is
|
|
||||||
// the caller's responsibility to allocate enough memory in |result| to fit the
|
|
||||||
// encoded data.
|
|
||||||
void grpc_base64_encode_core(char* result, const void* vdata, size_t data_size, |
|
||||||
int url_safe, int multiline); |
|
||||||
|
|
||||||
// Decodes data according to the base64 specification. Returns an empty
|
|
||||||
// slice in case of failure.
|
|
||||||
grpc_slice grpc_base64_decode(const char* b64, int url_safe); |
|
||||||
|
|
||||||
// Same as above except that the length is provided by the caller.
|
|
||||||
grpc_slice grpc_base64_decode_with_len(const char* b64, size_t b64_len, |
|
||||||
int url_safe); |
|
||||||
|
|
||||||
#endif // GRPC_SRC_CORE_LIB_SLICE_B64_H
|
|
@ -0,0 +1,282 @@ |
|||||||
|
//
|
||||||
|
//
|
||||||
|
// Copyright 2023 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/cpp/ext/otel/otel_client_call_tracer.h" |
||||||
|
|
||||||
|
#include <stdint.h> |
||||||
|
|
||||||
|
#include <array> |
||||||
|
#include <functional> |
||||||
|
#include <memory> |
||||||
|
#include <string> |
||||||
|
#include <utility> |
||||||
|
|
||||||
|
#include "absl/functional/any_invocable.h" |
||||||
|
#include "absl/status/status.h" |
||||||
|
#include "absl/strings/str_format.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
#include "absl/strings/strip.h" |
||||||
|
#include "absl/time/clock.h" |
||||||
|
#include "absl/time/time.h" |
||||||
|
#include "absl/types/optional.h" |
||||||
|
#include "absl/types/span.h" |
||||||
|
#include "opentelemetry/context/context.h" |
||||||
|
#include "opentelemetry/metrics/sync_instruments.h" |
||||||
|
|
||||||
|
#include <grpc/status.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/time.h> |
||||||
|
|
||||||
|
#include "src/core/client_channel/client_channel_filter.h" |
||||||
|
#include "src/core/lib/channel/channel_stack.h" |
||||||
|
#include "src/core/lib/channel/context.h" |
||||||
|
#include "src/core/lib/channel/status_util.h" |
||||||
|
#include "src/core/lib/channel/tcp_tracer.h" |
||||||
|
#include "src/core/lib/gprpp/sync.h" |
||||||
|
#include "src/core/lib/promise/context.h" |
||||||
|
#include "src/core/lib/resource_quota/arena.h" |
||||||
|
#include "src/core/lib/slice/slice.h" |
||||||
|
#include "src/core/lib/slice/slice_buffer.h" |
||||||
|
#include "src/core/lib/transport/metadata_batch.h" |
||||||
|
#include "src/cpp/ext/otel/key_value_iterable.h" |
||||||
|
#include "src/cpp/ext/otel/otel_plugin.h" |
||||||
|
|
||||||
|
namespace grpc { |
||||||
|
namespace internal { |
||||||
|
|
||||||
|
//
|
||||||
|
// OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer
|
||||||
|
//
|
||||||
|
|
||||||
|
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer( |
||||||
|
const OpenTelemetryPlugin::ClientCallTracer* parent, bool arena_allocated) |
||||||
|
: parent_(parent), |
||||||
|
arena_allocated_(arena_allocated), |
||||||
|
start_time_(absl::Now()) { |
||||||
|
if (parent_->otel_plugin_->client_.attempt.started != nullptr) { |
||||||
|
std::array<std::pair<absl::string_view, absl::string_view>, 2> |
||||||
|
additional_labels = { |
||||||
|
{{OpenTelemetryMethodKey(), parent_->MethodForStats()}, |
||||||
|
{OpenTelemetryTargetKey(), |
||||||
|
parent_->scope_config_->filtered_target()}}}; |
||||||
|
// We might not have all the injected labels that we want at this point, so
|
||||||
|
// avoid recording a subset of injected labels here.
|
||||||
|
parent_->otel_plugin_->client_.attempt.started->Add( |
||||||
|
1, KeyValueIterable( |
||||||
|
/*injected_labels_from_plugin_options=*/{}, additional_labels, |
||||||
|
/*active_plugin_options_view=*/nullptr, |
||||||
|
/*optional_labels_span=*/{}, /*is_client=*/true, |
||||||
|
parent_->otel_plugin_)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) { |
||||||
|
parent_->scope_config_->active_plugin_options_view().ForEach( |
||||||
|
[&](const InternalOpenTelemetryPluginOption& plugin_option, |
||||||
|
size_t /*index*/) { |
||||||
|
auto* labels_injector = plugin_option.labels_injector(); |
||||||
|
if (labels_injector != nullptr) { |
||||||
|
injected_labels_from_plugin_options_.push_back( |
||||||
|
labels_injector->GetLabels(recv_initial_metadata)); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}, |
||||||
|
parent_->otel_plugin_); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) { |
||||||
|
parent_->scope_config_->active_plugin_options_view().ForEach( |
||||||
|
[&](const InternalOpenTelemetryPluginOption& plugin_option, |
||||||
|
size_t /*index*/) { |
||||||
|
auto* labels_injector = plugin_option.labels_injector(); |
||||||
|
if (labels_injector != nullptr) { |
||||||
|
labels_injector->AddLabels(send_initial_metadata, nullptr); |
||||||
|
} |
||||||
|
return true; |
||||||
|
}, |
||||||
|
parent_->otel_plugin_); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordSendMessage(const grpc_core::SliceBuffer& send_message) { |
||||||
|
RecordAnnotation( |
||||||
|
absl::StrFormat("Send message: %ld bytes", send_message.Length())); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordSendCompressedMessage( |
||||||
|
const grpc_core::SliceBuffer& send_compressed_message) { |
||||||
|
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", |
||||||
|
send_compressed_message.Length())); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) { |
||||||
|
RecordAnnotation( |
||||||
|
absl::StrFormat("Received message: %ld bytes", recv_message.Length())); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordReceivedDecompressedMessage( |
||||||
|
const grpc_core::SliceBuffer& recv_decompressed_message) { |
||||||
|
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", |
||||||
|
recv_decompressed_message.Length())); |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
RecordReceivedTrailingMetadata( |
||||||
|
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/, |
||||||
|
const grpc_transport_stream_stats* transport_stream_stats) { |
||||||
|
std::array<std::pair<absl::string_view, absl::string_view>, 3> |
||||||
|
additional_labels = { |
||||||
|
{{OpenTelemetryMethodKey(), parent_->MethodForStats()}, |
||||||
|
{OpenTelemetryTargetKey(), |
||||||
|
parent_->scope_config_->filtered_target()}, |
||||||
|
{OpenTelemetryStatusKey(), |
||||||
|
grpc_status_code_to_string( |
||||||
|
static_cast<grpc_status_code>(status.code()))}}}; |
||||||
|
KeyValueIterable labels( |
||||||
|
injected_labels_from_plugin_options_, additional_labels, |
||||||
|
&parent_->scope_config_->active_plugin_options_view(), |
||||||
|
optional_labels_array_, /*is_client=*/true, parent_->otel_plugin_); |
||||||
|
if (parent_->otel_plugin_->client_.attempt.duration != nullptr) { |
||||||
|
parent_->otel_plugin_->client_.attempt.duration->Record( |
||||||
|
absl::ToDoubleSeconds(absl::Now() - start_time_), labels, |
||||||
|
opentelemetry::context::Context{}); |
||||||
|
} |
||||||
|
if (parent_->otel_plugin_->client_.attempt |
||||||
|
.sent_total_compressed_message_size != nullptr) { |
||||||
|
parent_->otel_plugin_->client_.attempt.sent_total_compressed_message_size |
||||||
|
->Record(transport_stream_stats != nullptr |
||||||
|
? transport_stream_stats->outgoing.data_bytes |
||||||
|
: 0, |
||||||
|
labels, opentelemetry::context::Context{}); |
||||||
|
} |
||||||
|
if (parent_->otel_plugin_->client_.attempt |
||||||
|
.rcvd_total_compressed_message_size != nullptr) { |
||||||
|
parent_->otel_plugin_->client_.attempt.rcvd_total_compressed_message_size |
||||||
|
->Record(transport_stream_stats != nullptr |
||||||
|
? transport_stream_stats->incoming.data_bytes |
||||||
|
: 0, |
||||||
|
labels, opentelemetry::context::Context{}); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordCancel( |
||||||
|
absl::Status /*cancel_error*/) {} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordEnd( |
||||||
|
const gpr_timespec& /*latency*/) { |
||||||
|
if (arena_allocated_) { |
||||||
|
this->~CallAttemptTracer(); |
||||||
|
} else { |
||||||
|
delete this; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation( |
||||||
|
absl::string_view /*annotation*/) { |
||||||
|
// Not implemented
|
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::RecordAnnotation( |
||||||
|
const Annotation& /*annotation*/) { |
||||||
|
// Not implemented
|
||||||
|
} |
||||||
|
|
||||||
|
std::shared_ptr<grpc_core::TcpTracerInterface> |
||||||
|
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::StartNewTcpTrace() { |
||||||
|
// No TCP trace.
|
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer:: |
||||||
|
AddOptionalLabels( |
||||||
|
OptionalLabelComponent component, |
||||||
|
std::shared_ptr<std::map<std::string, std::string>> optional_labels) { |
||||||
|
optional_labels_array_[static_cast<std::size_t>(component)] = |
||||||
|
std::move(optional_labels); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// OpenTelemetryPlugin::ClientCallTracer
|
||||||
|
//
|
||||||
|
|
||||||
|
OpenTelemetryPlugin::ClientCallTracer::ClientCallTracer( |
||||||
|
const grpc_core::Slice& path, grpc_core::Arena* arena, |
||||||
|
bool registered_method, OpenTelemetryPlugin* otel_plugin, |
||||||
|
std::shared_ptr<OpenTelemetryPlugin::ClientScopeConfig> scope_config) |
||||||
|
: path_(path.Ref()), |
||||||
|
arena_(arena), |
||||||
|
registered_method_(registered_method), |
||||||
|
otel_plugin_(otel_plugin), |
||||||
|
scope_config_(std::move(scope_config)) {} |
||||||
|
|
||||||
|
OpenTelemetryPlugin::ClientCallTracer::~ClientCallTracer() {} |
||||||
|
|
||||||
|
OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer* |
||||||
|
OpenTelemetryPlugin::ClientCallTracer::StartNewAttempt( |
||||||
|
bool is_transparent_retry) { |
||||||
|
// We allocate the first attempt on the arena and all subsequent attempts
|
||||||
|
// on the heap, so that in the common case we don't require a heap
|
||||||
|
// allocation, nor do we unnecessarily grow the arena.
|
||||||
|
bool is_first_attempt = true; |
||||||
|
{ |
||||||
|
grpc_core::MutexLock lock(&mu_); |
||||||
|
if (transparent_retries_ != 0 || retries_ != 0) { |
||||||
|
is_first_attempt = false; |
||||||
|
} |
||||||
|
if (is_transparent_retry) { |
||||||
|
++transparent_retries_; |
||||||
|
} else { |
||||||
|
++retries_; |
||||||
|
} |
||||||
|
} |
||||||
|
if (is_first_attempt) { |
||||||
|
return arena_->New<CallAttemptTracer>(this, /*arena_allocated=*/true); |
||||||
|
} |
||||||
|
return new CallAttemptTracer(this, /*arena_allocated=*/false); |
||||||
|
} |
||||||
|
|
||||||
|
absl::string_view OpenTelemetryPlugin::ClientCallTracer::MethodForStats() |
||||||
|
const { |
||||||
|
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/"); |
||||||
|
if (registered_method_ || |
||||||
|
(otel_plugin_->generic_method_attribute_filter() != nullptr && |
||||||
|
otel_plugin_->generic_method_attribute_filter()(method))) { |
||||||
|
return method; |
||||||
|
} |
||||||
|
return "other"; |
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation( |
||||||
|
absl::string_view /*annotation*/) { |
||||||
|
// Not implemented
|
||||||
|
} |
||||||
|
|
||||||
|
void OpenTelemetryPlugin::ClientCallTracer::RecordAnnotation( |
||||||
|
const Annotation& /*annotation*/) { |
||||||
|
// Not implemented
|
||||||
|
} |
||||||
|
|
||||||
|
} // namespace internal
|
||||||
|
} // namespace grpc
|
@ -1,328 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2023 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/cpp/ext/otel/otel_client_filter.h" |
|
||||||
|
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include <array> |
|
||||||
#include <functional> |
|
||||||
#include <memory> |
|
||||||
#include <string> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/functional/any_invocable.h" |
|
||||||
#include "absl/status/status.h" |
|
||||||
#include "absl/strings/str_format.h" |
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
#include "absl/strings/strip.h" |
|
||||||
#include "absl/time/clock.h" |
|
||||||
#include "absl/time/time.h" |
|
||||||
#include "absl/types/optional.h" |
|
||||||
#include "absl/types/span.h" |
|
||||||
#include "opentelemetry/context/context.h" |
|
||||||
#include "opentelemetry/metrics/sync_instruments.h" |
|
||||||
|
|
||||||
#include <grpc/status.h> |
|
||||||
#include <grpc/support/log.h> |
|
||||||
#include <grpc/support/time.h> |
|
||||||
|
|
||||||
#include "src/core/client_channel/client_channel_filter.h" |
|
||||||
#include "src/core/lib/channel/channel_stack.h" |
|
||||||
#include "src/core/lib/channel/context.h" |
|
||||||
#include "src/core/lib/channel/status_util.h" |
|
||||||
#include "src/core/lib/channel/tcp_tracer.h" |
|
||||||
#include "src/core/lib/gprpp/sync.h" |
|
||||||
#include "src/core/lib/promise/context.h" |
|
||||||
#include "src/core/lib/resource_quota/arena.h" |
|
||||||
#include "src/core/lib/slice/slice.h" |
|
||||||
#include "src/core/lib/slice/slice_buffer.h" |
|
||||||
#include "src/core/lib/transport/metadata_batch.h" |
|
||||||
#include "src/cpp/ext/otel/key_value_iterable.h" |
|
||||||
#include "src/cpp/ext/otel/otel_call_tracer.h" |
|
||||||
#include "src/cpp/ext/otel/otel_plugin.h" |
|
||||||
|
|
||||||
namespace grpc { |
|
||||||
namespace internal { |
|
||||||
|
|
||||||
//
|
|
||||||
// OpenTelemetryClientFilter
|
|
||||||
//
|
|
||||||
|
|
||||||
const grpc_channel_filter OpenTelemetryClientFilter::kFilter = |
|
||||||
grpc_core::MakePromiseBasedFilter<OpenTelemetryClientFilter, |
|
||||||
grpc_core::FilterEndpoint::kClient>( |
|
||||||
"otel_client"); |
|
||||||
|
|
||||||
absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create( |
|
||||||
const grpc_core::ChannelArgs& args, ChannelFilter::Args /*filter_args*/) { |
|
||||||
return OpenTelemetryClientFilter( |
|
||||||
args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("")); |
|
||||||
} |
|
||||||
|
|
||||||
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> |
|
||||||
OpenTelemetryClientFilter::MakeCallPromise( |
|
||||||
grpc_core::CallArgs call_args, |
|
||||||
grpc_core::NextPromiseFactory next_promise_factory) { |
|
||||||
auto* path = call_args.client_initial_metadata->get_pointer( |
|
||||||
grpc_core::HttpPathMetadata()); |
|
||||||
bool registered_method = reinterpret_cast<uintptr_t>( |
|
||||||
call_args.client_initial_metadata->get(grpc_core::GrpcRegisteredMethod()) |
|
||||||
.value_or(nullptr)); |
|
||||||
auto* call_context = grpc_core::GetContext<grpc_call_context_element>(); |
|
||||||
auto* tracer = |
|
||||||
grpc_core::GetContext<grpc_core::Arena>() |
|
||||||
->ManagedNew<OpenTelemetryCallTracer>( |
|
||||||
this, path != nullptr ? path->Ref() : grpc_core::Slice(), |
|
||||||
grpc_core::GetContext<grpc_core::Arena>(), registered_method); |
|
||||||
GPR_DEBUG_ASSERT( |
|
||||||
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value == |
|
||||||
nullptr); |
|
||||||
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value = tracer; |
|
||||||
call_context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].destroy = nullptr; |
|
||||||
return next_promise_factory(std::move(call_args)); |
|
||||||
} |
|
||||||
|
|
||||||
OpenTelemetryClientFilter::OpenTelemetryClientFilter(std::string target) |
|
||||||
: active_plugin_options_view_( |
|
||||||
ActivePluginOptionsView::MakeForClient(target)) { |
|
||||||
// Use the original target string only if a filter on the attribute is not
|
|
||||||
// registered or if the filter returns true, otherwise use "other".
|
|
||||||
if (OpenTelemetryPluginState().target_attribute_filter == nullptr || |
|
||||||
OpenTelemetryPluginState().target_attribute_filter(target)) { |
|
||||||
filtered_target_ = std::move(target); |
|
||||||
} else { |
|
||||||
filtered_target_ = "other"; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer
|
|
||||||
//
|
|
||||||
|
|
||||||
OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
OpenTelemetryCallAttemptTracer(const OpenTelemetryCallTracer* parent, |
|
||||||
bool arena_allocated) |
|
||||||
: parent_(parent), |
|
||||||
arena_allocated_(arena_allocated), |
|
||||||
start_time_(absl::Now()) { |
|
||||||
if (OpenTelemetryPluginState().client.attempt.started != nullptr) { |
|
||||||
std::array<std::pair<absl::string_view, absl::string_view>, 2> |
|
||||||
additional_labels = { |
|
||||||
{{OpenTelemetryMethodKey(), parent_->MethodForStats()}, |
|
||||||
{OpenTelemetryTargetKey(), parent_->parent_->filtered_target()}}}; |
|
||||||
// We might not have all the injected labels that we want at this point, so
|
|
||||||
// avoid recording a subset of injected labels here.
|
|
||||||
OpenTelemetryPluginState().client.attempt.started->Add( |
|
||||||
1, KeyValueIterable(/*injected_labels_from_plugin_options=*/{}, |
|
||||||
additional_labels, |
|
||||||
/*active_plugin_options_view=*/nullptr, |
|
||||||
/*optional_labels_span=*/{}, /*is_client=*/true)); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) { |
|
||||||
parent_->parent_->active_plugin_options_view().ForEach( |
|
||||||
[&](const InternalOpenTelemetryPluginOption& plugin_option, |
|
||||||
size_t /*index*/) { |
|
||||||
auto* labels_injector = plugin_option.labels_injector(); |
|
||||||
if (labels_injector != nullptr) { |
|
||||||
injected_labels_from_plugin_options_.push_back( |
|
||||||
labels_injector->GetLabels(recv_initial_metadata)); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) { |
|
||||||
parent_->parent_->active_plugin_options_view().ForEach( |
|
||||||
[&](const InternalOpenTelemetryPluginOption& plugin_option, |
|
||||||
size_t /*index*/) { |
|
||||||
auto* labels_injector = plugin_option.labels_injector(); |
|
||||||
if (labels_injector != nullptr) { |
|
||||||
labels_injector->AddLabels(send_initial_metadata, nullptr); |
|
||||||
} |
|
||||||
return true; |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordSendMessage( |
|
||||||
const grpc_core::SliceBuffer& send_message) { |
|
||||||
RecordAnnotation( |
|
||||||
absl::StrFormat("Send message: %ld bytes", send_message.Length())); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordSendCompressedMessage( |
|
||||||
const grpc_core::SliceBuffer& send_compressed_message) { |
|
||||||
RecordAnnotation(absl::StrFormat("Send compressed message: %ld bytes", |
|
||||||
send_compressed_message.Length())); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordReceivedMessage(const grpc_core::SliceBuffer& recv_message) { |
|
||||||
RecordAnnotation( |
|
||||||
absl::StrFormat("Received message: %ld bytes", recv_message.Length())); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordReceivedDecompressedMessage( |
|
||||||
const grpc_core::SliceBuffer& recv_decompressed_message) { |
|
||||||
RecordAnnotation(absl::StrFormat("Received decompressed message: %ld bytes", |
|
||||||
recv_decompressed_message.Length())); |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer:: |
|
||||||
RecordReceivedTrailingMetadata( |
|
||||||
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/, |
|
||||||
const grpc_transport_stream_stats* transport_stream_stats) { |
|
||||||
std::array<std::pair<absl::string_view, absl::string_view>, 3> |
|
||||||
additional_labels = { |
|
||||||
{{OpenTelemetryMethodKey(), parent_->MethodForStats()}, |
|
||||||
{OpenTelemetryTargetKey(), parent_->parent_->filtered_target()}, |
|
||||||
{OpenTelemetryStatusKey(), |
|
||||||
grpc_status_code_to_string( |
|
||||||
static_cast<grpc_status_code>(status.code()))}}}; |
|
||||||
KeyValueIterable labels(injected_labels_from_plugin_options_, |
|
||||||
additional_labels, |
|
||||||
&parent_->parent_->active_plugin_options_view(), |
|
||||||
optional_labels_array_, /*is_client=*/true); |
|
||||||
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) { |
|
||||||
OpenTelemetryPluginState().client.attempt.duration->Record( |
|
||||||
absl::ToDoubleSeconds(absl::Now() - start_time_), labels, |
|
||||||
opentelemetry::context::Context{}); |
|
||||||
} |
|
||||||
if (OpenTelemetryPluginState() |
|
||||||
.client.attempt.sent_total_compressed_message_size != nullptr) { |
|
||||||
OpenTelemetryPluginState() |
|
||||||
.client.attempt.sent_total_compressed_message_size->Record( |
|
||||||
transport_stream_stats != nullptr |
|
||||||
? transport_stream_stats->outgoing.data_bytes |
|
||||||
: 0, |
|
||||||
labels, opentelemetry::context::Context{}); |
|
||||||
} |
|
||||||
if (OpenTelemetryPluginState() |
|
||||||
.client.attempt.rcvd_total_compressed_message_size != nullptr) { |
|
||||||
OpenTelemetryPluginState() |
|
||||||
.client.attempt.rcvd_total_compressed_message_size->Record( |
|
||||||
transport_stream_stats != nullptr |
|
||||||
? transport_stream_stats->incoming.data_bytes |
|
||||||
: 0, |
|
||||||
labels, opentelemetry::context::Context{}); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordCancel( |
|
||||||
absl::Status /*cancel_error*/) {} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordEnd( |
|
||||||
const gpr_timespec& /*latency*/) { |
|
||||||
if (arena_allocated_) { |
|
||||||
this->~OpenTelemetryCallAttemptTracer(); |
|
||||||
} else { |
|
||||||
delete this; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordAnnotation( |
|
||||||
absl::string_view /*annotation*/) { |
|
||||||
// Not implemented
|
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::RecordAnnotation( |
|
||||||
const Annotation& /*annotation*/) { |
|
||||||
// Not implemented
|
|
||||||
} |
|
||||||
|
|
||||||
std::shared_ptr<grpc_core::TcpTracerInterface> |
|
||||||
OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::StartNewTcpTrace() { |
|
||||||
// No TCP trace.
|
|
||||||
return nullptr; |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::AddOptionalLabels( |
|
||||||
OptionalLabelComponent component, |
|
||||||
std::shared_ptr<std::map<std::string, std::string>> optional_labels) { |
|
||||||
optional_labels_array_[static_cast<std::size_t>(component)] = |
|
||||||
std::move(optional_labels); |
|
||||||
} |
|
||||||
|
|
||||||
//
|
|
||||||
// OpenTelemetryCallTracer
|
|
||||||
//
|
|
||||||
|
|
||||||
OpenTelemetryCallTracer::OpenTelemetryCallTracer( |
|
||||||
OpenTelemetryClientFilter* parent, grpc_core::Slice path, |
|
||||||
grpc_core::Arena* arena, bool registered_method) |
|
||||||
: parent_(parent), |
|
||||||
path_(std::move(path)), |
|
||||||
arena_(arena), |
|
||||||
registered_method_(registered_method) {} |
|
||||||
|
|
||||||
OpenTelemetryCallTracer::~OpenTelemetryCallTracer() {} |
|
||||||
|
|
||||||
OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer* |
|
||||||
OpenTelemetryCallTracer::StartNewAttempt(bool is_transparent_retry) { |
|
||||||
// We allocate the first attempt on the arena and all subsequent attempts
|
|
||||||
// on the heap, so that in the common case we don't require a heap
|
|
||||||
// allocation, nor do we unnecessarily grow the arena.
|
|
||||||
bool is_first_attempt = true; |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&mu_); |
|
||||||
if (transparent_retries_ != 0 || retries_ != 0) { |
|
||||||
is_first_attempt = false; |
|
||||||
} |
|
||||||
if (is_transparent_retry) { |
|
||||||
++transparent_retries_; |
|
||||||
} else { |
|
||||||
++retries_; |
|
||||||
} |
|
||||||
} |
|
||||||
if (is_first_attempt) { |
|
||||||
return arena_->New<OpenTelemetryCallAttemptTracer>( |
|
||||||
this, /*arena_allocated=*/true); |
|
||||||
} |
|
||||||
return new OpenTelemetryCallAttemptTracer(this, /*arena_allocated=*/false); |
|
||||||
} |
|
||||||
|
|
||||||
absl::string_view OpenTelemetryCallTracer::MethodForStats() const { |
|
||||||
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/"); |
|
||||||
if (registered_method_ || |
|
||||||
(OpenTelemetryPluginState().generic_method_attribute_filter != nullptr && |
|
||||||
OpenTelemetryPluginState().generic_method_attribute_filter(method))) { |
|
||||||
return method; |
|
||||||
} |
|
||||||
return "other"; |
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::RecordAnnotation( |
|
||||||
absl::string_view /*annotation*/) { |
|
||||||
// Not implemented
|
|
||||||
} |
|
||||||
|
|
||||||
void OpenTelemetryCallTracer::RecordAnnotation( |
|
||||||
const Annotation& /*annotation*/) { |
|
||||||
// Not implemented
|
|
||||||
} |
|
||||||
|
|
||||||
} // namespace internal
|
|
||||||
} // namespace grpc
|
|
@ -1,68 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2023 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H |
|
||||||
#define GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include <string> |
|
||||||
#include <utility> |
|
||||||
|
|
||||||
#include "absl/status/statusor.h" |
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
|
|
||||||
#include "src/core/lib/channel/channel_args.h" |
|
||||||
#include "src/core/lib/channel/channel_fwd.h" |
|
||||||
#include "src/core/lib/channel/promise_based_filter.h" |
|
||||||
#include "src/core/lib/promise/arena_promise.h" |
|
||||||
#include "src/core/lib/transport/transport.h" |
|
||||||
#include "src/cpp/ext/otel/otel_plugin.h" |
|
||||||
|
|
||||||
namespace grpc { |
|
||||||
namespace internal { |
|
||||||
|
|
||||||
class OpenTelemetryClientFilter : public grpc_core::ChannelFilter { |
|
||||||
public: |
|
||||||
static const grpc_channel_filter kFilter; |
|
||||||
|
|
||||||
static absl::StatusOr<OpenTelemetryClientFilter> Create( |
|
||||||
const grpc_core::ChannelArgs& /*args*/, |
|
||||||
ChannelFilter::Args /*filter_args*/); |
|
||||||
|
|
||||||
grpc_core::ArenaPromise<grpc_core::ServerMetadataHandle> MakeCallPromise( |
|
||||||
grpc_core::CallArgs call_args, |
|
||||||
grpc_core::NextPromiseFactory next_promise_factory) override; |
|
||||||
|
|
||||||
absl::string_view filtered_target() const { return filtered_target_; } |
|
||||||
|
|
||||||
const ActivePluginOptionsView& active_plugin_options_view() const { |
|
||||||
return active_plugin_options_view_; |
|
||||||
} |
|
||||||
|
|
||||||
private: |
|
||||||
explicit OpenTelemetryClientFilter(std::string target); |
|
||||||
|
|
||||||
std::string filtered_target_; |
|
||||||
ActivePluginOptionsView active_plugin_options_view_; |
|
||||||
}; |
|
||||||
|
|
||||||
} // namespace internal
|
|
||||||
} // namespace grpc
|
|
||||||
|
|
||||||
#endif // GRPC_SRC_CPP_EXT_OTEL_OTEL_CLIENT_FILTER_H
|
|
@ -1,39 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2019 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include <grpc/grpc.h> |
|
||||||
#include <grpc/slice.h> |
|
||||||
|
|
||||||
#include "src/core/lib/slice/b64.h" |
|
||||||
|
|
||||||
bool squelch = true; |
|
||||||
bool leak_check = true; |
|
||||||
|
|
||||||
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { |
|
||||||
if (size < 1) return 0; |
|
||||||
grpc_init(); |
|
||||||
const bool url_safe = static_cast<uint8_t>(0x100) < data[0]; |
|
||||||
grpc_slice res = grpc_base64_decode_with_len( |
|
||||||
reinterpret_cast<const char*>(data + 1), size - 1, url_safe); |
|
||||||
grpc_slice_unref(res); |
|
||||||
grpc_shutdown(); |
|
||||||
return 0; |
|
||||||
} |
|
@ -1 +0,0 @@ |
|||||||
~ |
|
@ -1,37 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2019 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <stddef.h> |
|
||||||
#include <stdint.h> |
|
||||||
|
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
|
|
||||||
#include "src/core/lib/slice/b64.h" |
|
||||||
|
|
||||||
bool squelch = true; |
|
||||||
bool leak_check = true; |
|
||||||
|
|
||||||
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { |
|
||||||
if (size < 2) return 0; |
|
||||||
const bool url_safe = static_cast<uint8_t>(0x100) < data[0]; |
|
||||||
const bool multiline = static_cast<uint8_t>(0x100) < data[1]; |
|
||||||
char* res = grpc_base64_encode(reinterpret_cast<const char*>(data + 2), |
|
||||||
size - 2, url_safe, multiline); |
|
||||||
gpr_free(res); |
|
||||||
return 0; |
|
||||||
} |
|
@ -1,193 +0,0 @@ |
|||||||
//
|
|
||||||
//
|
|
||||||
// Copyright 2015 gRPC authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
//
|
|
||||||
//
|
|
||||||
|
|
||||||
#include "src/core/lib/slice/b64.h" |
|
||||||
|
|
||||||
#include <stdint.h> |
|
||||||
#include <string.h> |
|
||||||
|
|
||||||
#include <memory> |
|
||||||
|
|
||||||
#include "absl/strings/string_view.h" |
|
||||||
#include "gtest/gtest.h" |
|
||||||
|
|
||||||
#include <grpc/slice.h> |
|
||||||
#include <grpc/support/alloc.h> |
|
||||||
|
|
||||||
#include "src/core/lib/iomgr/exec_ctx.h" |
|
||||||
#include "src/core/lib/slice/slice_internal.h" |
|
||||||
#include "test/core/util/test_config.h" |
|
||||||
|
|
||||||
static void test_simple_encode_decode_b64(int url_safe, int multiline) { |
|
||||||
const char* hello = "hello"; |
|
||||||
char* hello_b64 = |
|
||||||
grpc_base64_encode(hello, strlen(hello), url_safe, multiline); |
|
||||||
grpc_core::ExecCtx exec_ctx; |
|
||||||
grpc_slice hello_slice = grpc_base64_decode(hello_b64, url_safe); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(hello_slice), |
|
||||||
absl::string_view(hello)); |
|
||||||
grpc_slice_unref(hello_slice); |
|
||||||
|
|
||||||
gpr_free(hello_b64); |
|
||||||
} |
|
||||||
|
|
||||||
static void test_full_range_encode_decode_b64(int url_safe, int multiline) { |
|
||||||
unsigned char orig[256]; |
|
||||||
size_t i; |
|
||||||
char* b64; |
|
||||||
grpc_slice orig_decoded; |
|
||||||
for (i = 0; i < sizeof(orig); i++) orig[i] = static_cast<uint8_t>(i); |
|
||||||
|
|
||||||
// Try all the different paddings.
|
|
||||||
for (i = 0; i < 3; i++) { |
|
||||||
grpc_core::ExecCtx exec_ctx; |
|
||||||
b64 = grpc_base64_encode(orig, sizeof(orig) - i, url_safe, multiline); |
|
||||||
orig_decoded = grpc_base64_decode(b64, url_safe); |
|
||||||
ASSERT_EQ( |
|
||||||
grpc_core::StringViewFromSlice(orig_decoded), |
|
||||||
absl::string_view(reinterpret_cast<char*>(orig), sizeof(orig) - i)); |
|
||||||
grpc_slice_unref(orig_decoded); |
|
||||||
gpr_free(b64); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, SimpleEncodeDecodeB64NoMultiline) { |
|
||||||
test_simple_encode_decode_b64(0, 0); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, SimpleEncodeDecodeB64Multiline) { |
|
||||||
test_simple_encode_decode_b64(0, 1); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, SimpleEncodeDecodeB64UrlsafeNoMultiline) { |
|
||||||
test_simple_encode_decode_b64(1, 0); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, SimpleEncodeDecodeB64UrlsafeMultiline) { |
|
||||||
test_simple_encode_decode_b64(1, 1); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, FullRangeEncodeDecodeB64NoMultiline) { |
|
||||||
test_full_range_encode_decode_b64(0, 0); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, FullRangeEncodeDecodeB64Multiline) { |
|
||||||
test_full_range_encode_decode_b64(0, 1); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, FullRangeEncodeDecodeB64UrlsafeNoMultiline) { |
|
||||||
test_full_range_encode_decode_b64(1, 0); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, FullRangeEncodeDecodeB64UrlsafeMultiline) { |
|
||||||
test_full_range_encode_decode_b64(1, 1); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, UrlSafeUnsafeMismatchFailure) { |
|
||||||
unsigned char orig[256]; |
|
||||||
size_t i; |
|
||||||
char* b64; |
|
||||||
grpc_slice orig_decoded; |
|
||||||
int url_safe = 1; |
|
||||||
for (i = 0; i < sizeof(orig); i++) orig[i] = static_cast<uint8_t>(i); |
|
||||||
|
|
||||||
grpc_core::ExecCtx exec_ctx; |
|
||||||
b64 = grpc_base64_encode(orig, sizeof(orig), url_safe, 0); |
|
||||||
orig_decoded = grpc_base64_decode(b64, !url_safe); |
|
||||||
ASSERT_TRUE(GRPC_SLICE_IS_EMPTY(orig_decoded)); |
|
||||||
gpr_free(b64); |
|
||||||
grpc_slice_unref(orig_decoded); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode(orig, sizeof(orig), !url_safe, 0); |
|
||||||
orig_decoded = grpc_base64_decode(b64, url_safe); |
|
||||||
ASSERT_TRUE(GRPC_SLICE_IS_EMPTY(orig_decoded)); |
|
||||||
gpr_free(b64); |
|
||||||
grpc_slice_unref(orig_decoded); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, Rfc4648TestVectors) { |
|
||||||
char* b64; |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("", 0, 0, 0); |
|
||||||
ASSERT_STREQ("", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("f", 1, 0, 0); |
|
||||||
ASSERT_STREQ("Zg==", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("fo", 2, 0, 0); |
|
||||||
ASSERT_STREQ("Zm8=", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("foo", 3, 0, 0); |
|
||||||
ASSERT_STREQ("Zm9v", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("foob", 4, 0, 0); |
|
||||||
ASSERT_STREQ("Zm9vYg==", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("fooba", 5, 0, 0); |
|
||||||
ASSERT_STREQ("Zm9vYmE=", b64); |
|
||||||
gpr_free(b64); |
|
||||||
|
|
||||||
b64 = grpc_base64_encode("foobar", 6, 0, 0); |
|
||||||
ASSERT_STREQ("Zm9vYmFy", b64); |
|
||||||
gpr_free(b64); |
|
||||||
} |
|
||||||
|
|
||||||
TEST(B64Test, UnpaddedDecode) { |
|
||||||
grpc_slice decoded; |
|
||||||
|
|
||||||
grpc_core::ExecCtx exec_ctx; |
|
||||||
decoded = grpc_base64_decode("Zm9vYmFy", 0); |
|
||||||
|
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "foobar"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("Zm9vYmE", 0); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "fooba"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("Zm9vYg", 0); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "foob"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("Zm9v", 0); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "foo"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("Zm8", 0); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "fo"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("Zg", 0); |
|
||||||
ASSERT_EQ(grpc_core::StringViewFromSlice(decoded), "f"); |
|
||||||
grpc_slice_unref(decoded); |
|
||||||
|
|
||||||
decoded = grpc_base64_decode("", 0); |
|
||||||
ASSERT_TRUE(GRPC_SLICE_IS_EMPTY(decoded)); |
|
||||||
} |
|
||||||
|
|
||||||
int main(int argc, char** argv) { |
|
||||||
grpc::testing::TestEnvironment env(&argc, argv); |
|
||||||
::testing::InitGoogleTest(&argc, argv); |
|
||||||
grpc::testing::TestGrpcScope grpc_scope; |
|
||||||
return RUN_ALL_TESTS(); |
|
||||||
} |
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue