diff --git a/BUILD b/BUILD index 44b877705e9..fea71153bba 100644 --- a/BUILD +++ b/BUILD @@ -485,10 +485,7 @@ grpc_cc_library( grpc_cc_library( name = "census", srcs = [ - "src/core/ext/census/grpc_context.cc", - ], - external_deps = [ - "nanopb", + "src/cpp/ext/filters/census/grpc_context.cc", ], language = "c++", public_hdrs = [ @@ -1990,4 +1987,40 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "grpc_opencensus_plugin", + srcs = [ + "src/cpp/ext/filters/census/client_filter.cc", + "src/cpp/ext/filters/census/server_filter.cc", + "src/cpp/ext/filters/census/channel_filter.cc", + "src/cpp/ext/filters/census/context.cc", + "src/cpp/ext/filters/census/grpc_context.cc", + "src/cpp/ext/filters/census/grpc_plugin.cc", + "src/cpp/ext/filters/census/measures.cc", + "src/cpp/ext/filters/census/rpc_encoding.cc", + "src/cpp/ext/filters/census/views.cc", + ], + hdrs = [ + "include/grpcpp/opencensus.h", + "src/cpp/ext/filters/census/client_filter.h", + "src/cpp/ext/filters/census/server_filter.h", + "src/cpp/ext/filters/census/channel_filter.h", + "src/cpp/ext/filters/census/context.h", + "src/cpp/ext/filters/census/grpc_plugin.h", + "src/cpp/ext/filters/census/measures.h", + "src/cpp/ext/filters/census/rpc_encoding.h", + ], + language = "c++", + external_deps = [ + "absl-base", + "absl-time", + "opencensus-trace", + "opencensus-stats", + ], + deps = [ + ":census", + ":grpc++", + ], +) + grpc_generate_one_off_targets() diff --git a/CMakeLists.txt b/CMakeLists.txt index 4dc8d98b879..d92d7c0558c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1221,7 +1221,7 @@ add_library(grpc src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc src/core/ext/filters/load_reporting/server_load_reporting_filter.cc src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc - src/core/ext/census/grpc_context.cc + src/cpp/ext/filters/census/grpc_context.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/http/client_authority_filter.cc @@ -2524,7 +2524,7 @@ add_library(grpc_unsecure third_party/nanopb/pb_encode.c src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc - src/core/ext/census/grpc_context.cc + src/cpp/ext/filters/census/grpc_context.cc src/core/ext/filters/max_age/max_age_filter.cc src/core/ext/filters/message_size/message_size_filter.cc src/core/ext/filters/http/client_authority_filter.cc @@ -3307,10 +3307,7 @@ add_library(grpc++_cronet src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc src/core/ext/transport/chttp2/server/chttp2_server.cc - src/core/ext/census/grpc_context.cc - third_party/nanopb/pb_common.c - third_party/nanopb/pb_decode.c - third_party/nanopb/pb_encode.c + src/cpp/ext/filters/census/grpc_context.cc ) if(WIN32 AND MSVC) diff --git a/Makefile b/Makefile index ab6045191d9..f4d8e3139bd 100644 --- a/Makefile +++ b/Makefile @@ -3596,7 +3596,7 @@ LIBGRPC_SRC = \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/load_reporting/server_load_reporting_filter.cc \ src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc \ - src/core/ext/census/grpc_context.cc \ + src/cpp/ext/filters/census/grpc_context.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ @@ -4865,7 +4865,7 @@ LIBGRPC_UNSECURE_SRC = \ third_party/nanopb/pb_encode.c \ src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \ src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \ - src/core/ext/census/grpc_context.cc \ + src/cpp/ext/filters/census/grpc_context.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ @@ -5636,10 +5636,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc \ src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc \ src/core/ext/transport/chttp2/server/chttp2_server.cc \ - src/core/ext/census/grpc_context.cc \ - third_party/nanopb/pb_common.c \ - third_party/nanopb/pb_decode.c \ - third_party/nanopb/pb_encode.c \ + src/cpp/ext/filters/census/grpc_context.cc \ PUBLIC_HEADERS_CXX += \ include/grpc++/alarm.h \ diff --git a/bazel/grpc_deps.bzl b/bazel/grpc_deps.bzl index f7e62f12192..7c349dbcdcf 100644 --- a/bazel/grpc_deps.bzl +++ b/bazel/grpc_deps.bzl @@ -8,6 +8,16 @@ def grpc_deps(): actual = "@com_github_nanopb_nanopb//:nanopb", ) + native.bind( + name = "absl-base", + actual = "@com_google_absl//absl/base", + ) + + native.bind( + name = "absl-time", + actual = "@com_google_absl//absl/time:time", + ) + native.bind( name = "libssl", actual = "@boringssl//:ssl", @@ -73,6 +83,21 @@ def grpc_deps(): actual = "@com_github_grpc_grpc//:grpc++_codegen_proto", ) + native.bind( + name = "opencensus-trace", + actual = "@io_opencensus_cpp//opencensus/trace:trace" + ) + + native.bind( + name = "opencensus-stats", + actual = "@io_opencensus_cpp//opencensus/stats:stats" + ) + + native.bind( + name = "opencensus-stats-test", + actual = "@io_opencensus_cpp//opencensus/stats:test_utils" + ) + if "boringssl" not in native.existing_rules(): native.http_archive( name = "boringssl", @@ -122,8 +147,8 @@ def grpc_deps(): native.new_http_archive( name = "com_github_google_benchmark", build_file = "@com_github_grpc_grpc//third_party:benchmark.BUILD", - strip_prefix = "benchmark-5b7683f49e1e9223cf9927b24f6fd3d6bd82e3f8", - url = "https://github.com/google/benchmark/archive/5b7683f49e1e9223cf9927b24f6fd3d6bd82e3f8.tar.gz", + strip_prefix = "benchmark-9913418d323e64a0111ca0da81388260c2bbe1e9", + url = "https://github.com/google/benchmark/archive/9913418d323e64a0111ca0da81388260c2bbe1e9.tar.gz", ) if "com_github_cares_cares" not in native.existing_rules(): @@ -137,8 +162,8 @@ def grpc_deps(): if "com_google_absl" not in native.existing_rules(): native.http_archive( name = "com_google_absl", - strip_prefix = "abseil-cpp-cc4bed2d74f7c8717e31f9579214ab52a9c9c610", - url = "https://github.com/abseil/abseil-cpp/archive/cc4bed2d74f7c8717e31f9579214ab52a9c9c610.tar.gz", + strip_prefix = "abseil-cpp-cd95e71df6eaf8f2a282b1da556c2cf1c9b09207", + url = "https://github.com/abseil/abseil-cpp/archive/cd95e71df6eaf8f2a282b1da556c2cf1c9b09207.tar.gz", ) if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules(): @@ -152,6 +177,14 @@ def grpc_deps(): sha256 = "1c4a532b396c698e6467a1548554571cb85fa091e472b05e398ebc836c315d77", ) + if "io_opencensus_cpp" not in native.existing_rules(): + native.http_archive( + name = "io_opencensus_cpp", + strip_prefix = "opencensus-cpp-fdf0f308b1631bb4a942e32ba5d22536a6170274", + url = "https://github.com/census-instrumentation/opencensus-cpp/archive/fdf0f308b1631bb4a942e32ba5d22536a6170274.tar.gz", + ) + + # TODO: move some dependencies from "grpc_deps" here? def grpc_test_only_deps(): """Internal, not intended for use by packages that are consuming grpc. diff --git a/build.yaml b/build.yaml index 14316012092..593c845e3cc 100644 --- a/build.yaml +++ b/build.yaml @@ -100,10 +100,9 @@ filegroups: public_headers: - include/grpc/census.h src: - - src/core/ext/census/grpc_context.cc + - src/cpp/ext/filters/census/grpc_context.cc uses: - grpc_base - - nanopb - name: cmdline headers: - test/core/util/cmdline.h diff --git a/config.m4 b/config.m4 index a5b041f0359..781fd08fa10 100644 --- a/config.m4 +++ b/config.m4 @@ -378,7 +378,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \ src/core/ext/filters/load_reporting/server_load_reporting_filter.cc \ src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc \ - src/core/ext/census/grpc_context.cc \ + src/cpp/ext/filters/census/grpc_context.cc \ src/core/ext/filters/max_age/max_age_filter.cc \ src/core/ext/filters/message_size/message_size_filter.cc \ src/core/ext/filters/http/client_authority_filter.cc \ @@ -649,7 +649,6 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/php/ext/grpc) PHP_ADD_BUILD_DIR($ext_builddir/src/boringssl) - PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb) PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1) @@ -712,6 +711,7 @@ if test "$PHP_GRPC" != "no"; then PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/alts/handshaker) PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/alts/zero_copy_frame_protector) PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/ssl/session_cache) + PHP_ADD_BUILD_DIR($ext_builddir/src/cpp/ext/filters/census) PHP_ADD_BUILD_DIR($ext_builddir/third_party/address_sorting) PHP_ADD_BUILD_DIR($ext_builddir/third_party/boringssl/crypto) PHP_ADD_BUILD_DIR($ext_builddir/third_party/boringssl/crypto/asn1) diff --git a/config.w32 b/config.w32 index cd51b4d97c4..b87b261772d 100644 --- a/config.w32 +++ b/config.w32 @@ -354,7 +354,7 @@ if (PHP_GRPC != "no") { "src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.cc " + "src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.cc " + - "src\\core\\ext\\census\\grpc_context.cc " + + "src\\cpp\\ext\\filters\\census\\grpc_context.cc " + "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " + "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " + "src\\core\\ext\\filters\\http\\client_authority_filter.cc " + @@ -653,7 +653,6 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\boringssl"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext"); - FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\census"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy"); @@ -729,6 +728,10 @@ if (PHP_GRPC != "no") { FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\alts\\zero_copy_frame_protector"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl\\session_cache"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters"); + FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters\\census"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext"); FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext\\grpc"); diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index ca6df16a067..b7548e31d9d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -793,7 +793,7 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc', - 'src/core/ext/census/grpc_context.cc', + 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', diff --git a/grpc.gemspec b/grpc.gemspec index 1a3b5bd8899..eb059d85144 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -733,7 +733,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc ) s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.cc ) s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc ) - s.files += %w( src/core/ext/census/grpc_context.cc ) + s.files += %w( src/cpp/ext/filters/census/grpc_context.cc ) s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc ) s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc ) s.files += %w( src/core/ext/filters/http/client_authority_filter.cc ) diff --git a/grpc.gyp b/grpc.gyp index 8edfcadf956..420cf75f572 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -545,7 +545,7 @@ 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc', - 'src/core/ext/census/grpc_context.cc', + 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', @@ -1268,7 +1268,7 @@ 'third_party/nanopb/pb_encode.c', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', - 'src/core/ext/census/grpc_context.cc', + 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', diff --git a/include/grpcpp/opencensus.h b/include/grpcpp/opencensus.h new file mode 100644 index 00000000000..7e5d1dfeb41 --- /dev/null +++ b/include/grpcpp/opencensus.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPCPP_OPENCENSUS_H +#define GRPCPP_OPENCENSUS_H + +namespace grpc { +// These symbols in this file will not be included in the binary unless +// grpc_opencensus_plugin build target was added as a dependency. At the moment +// it is only setup to be built with Bazel. + +// Registers the OpenCensus plugin with gRPC, so that it will be used for future +// RPCs. This must be called before any views are created. +void RegisterOpenCensusPlugin(); + +// RPC stats definitions, defined by +// https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md + +// Registers the cumulative gRPC views so that they will be exported by any +// registered stats exporter. For on-task stats, construct a View using the +// ViewDescriptors below. +void RegisterOpenCensusViewsForExport(); + +} // namespace grpc + +#endif // GRPCPP_OPENCENSUS_H diff --git a/package.xml b/package.xml index 5be18bd4d01..51f82e18898 100644 --- a/package.xml +++ b/package.xml @@ -738,7 +738,7 @@ - + diff --git a/test/core/statistics/small_log_test.cc b/src/cpp/ext/filters/census/channel_filter.cc similarity index 63% rename from test/core/statistics/small_log_test.cc rename to src/cpp/ext/filters/census/channel_filter.cc index fb8dfc9988f..4ac684d2779 100644 --- a/test/core/statistics/small_log_test.cc +++ b/src/cpp/ext/filters/census/channel_filter.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015 gRPC authors. + * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,15 @@ * */ -#include "test/core/statistics/census_log_tests.h" +#include -#include +#include "src/cpp/ext/filters/census/channel_filter.h" -#include -#include "test/core/util/test_config.h" +namespace grpc { -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); - test_small_log(); - return 0; +grpc_error* CensusChannelData::Init(grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; } + +} // namespace grpc diff --git a/test/core/statistics/performance_test.cc b/src/cpp/ext/filters/census/channel_filter.h similarity index 52% rename from test/core/statistics/performance_test.cc rename to src/cpp/ext/filters/census/channel_filter.h index 9d4fd6ef908..0b7c6826816 100644 --- a/test/core/statistics/performance_test.cc +++ b/src/cpp/ext/filters/census/channel_filter.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015 gRPC authors. + * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,21 @@ * */ -#include "test/core/statistics/census_log_tests.h" +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H -#include +#include -#include -#include "test/core/util/test_config.h" +#include "src/cpp/ext/filters/census/context.h" -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); - test_performance(); - return 0; -} +namespace grpc { + +class CensusChannelData : public ChannelData { + public: + grpc_error* Init(grpc_channel_element* elem, + grpc_channel_element_args* args) override; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H */ diff --git a/src/cpp/ext/filters/census/client_filter.cc b/src/cpp/ext/filters/census/client_filter.cc new file mode 100644 index 00000000000..293f4b1c253 --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.cc @@ -0,0 +1,163 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/client_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusClientCallData::kMaxTraceContextLen; +constexpr uint32_t CensusClientCallData::kMaxTagsLen; + +namespace { + +void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) { + if (b->idx.named.grpc_server_stats_bin != nullptr) { + ServerStatsDeserialize( + reinterpret_cast(GRPC_SLICE_START_PTR( + GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))), + GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)), + elapsed_time); + grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin); + } +} + +} // namespace + +void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusClientCallData* calld = + reinterpret_cast(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr); + FilterTrailingMetadata(calld->recv_trailing_metadata_, + &calld->elapsed_time_); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusClientCallData* calld = + reinterpret_cast(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + calld->recv_message_count_++; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusClientCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->send_initial_metadata() != nullptr) { + census_context* ctxt = op->get_census_context(); + GenerateClientContext( + qualified_method_, &context_, + (ctxt == nullptr) ? nullptr : reinterpret_cast(ctxt)); + size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_, + kMaxTraceContextLen); + if (tracing_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &tracing_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_TRACE_BIN, + grpc_slice_from_copied_buffer(tracing_buf_, tracing_len)))); + } + grpc_slice tags = grpc_empty_slice(); + // TODO: Add in tagging serialization. + size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags); + if (encoded_tags_len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_initial_metadata()->batch(), &stats_bin_, + grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags))); + } + } + + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + if (op->recv_trailing_metadata() != nullptr) { + recv_trailing_metadata_ = op->recv_trailing_metadata()->batch(); + initial_on_done_recv_trailing_metadata_ = op->on_complete(); + op->set_on_complete(&on_done_recv_trailing_metadata_); + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusClientCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + path_ = grpc_slice_ref_internal(args->path); + start_time_ = absl::Now(); + method_ = GetMethod(&path_); + qualified_method_ = absl::StrCat("Sent.", method_); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_, + OnDoneRecvTrailingMetadataCb, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +void CensusClientCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_); + ::opencensus::stats::Record( + {{RpcClientSentBytesPerRpc(), static_cast(request_size)}, + {RpcClientReceivedBytesPerRpc(), static_cast(response_size)}, + {RpcClientRoundtripLatency(), latency_ms}, + {RpcClientServerLatency(), + ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))}, + {RpcClientSentMessagesPerRpc(), sent_message_count_}, + {RpcClientReceivedMessagesPerRpc(), recv_message_count_}}, + {{ClientMethodTagKey(), method_}, + {ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/client_filter.h b/src/cpp/ext/filters/census/client_filter.h new file mode 100644 index 00000000000..851022873f1 --- /dev/null +++ b/src/cpp/ext/filters/census/client_filter.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H + +#include + +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusClientCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusClientCallData : public CallData { + public: + // Maximum size of trace context is sent on the wire. + static constexpr uint32_t kMaxTraceContextLen = 64; + // Maximum size of tags that are sent on the wire. + static constexpr uint32_t kMaxTagsLen = 2048; + + CensusClientCallData() + : recv_trailing_metadata_(nullptr), + initial_on_done_recv_trailing_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + elapsed_time_(0), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvTrailingMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneSendInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // Metadata elements for tracing and census stats data. + grpc_linked_mdelem stats_bin_; + grpc_linked_mdelem tracing_bin_; + // Client method. + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // The recv trailing metadata callbacks. + grpc_metadata_batch* recv_trailing_metadata_; + grpc_closure* initial_on_done_recv_trailing_metadata_; + grpc_closure on_done_recv_trailing_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + // Start time (for measuring latency). + absl::Time start_time_; + // Server elapsed time in nanoseconds. + uint64_t elapsed_time_; + // The received message--may be null. + grpc_core::OrphanablePtr* recv_message_; + // Number of messages in this RPC. + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference when adding trace context + // metatdata to outgoing message. + char tracing_buf_[kMaxTraceContextLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H */ diff --git a/src/cpp/ext/filters/census/context.cc b/src/cpp/ext/filters/census/context.cc new file mode 100644 index 00000000000..4b3250236d0 --- /dev/null +++ b/src/cpp/ext/filters/census/context.cc @@ -0,0 +1,132 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +using ::opencensus::trace::Span; +using ::opencensus::trace::SpanContext; + +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context) { + GrpcTraceContext trace_ctxt; + TraceContextEncoding::Decode(tracing, &trace_ctxt); + SpanContext parent_ctx = trace_ctxt.ToSpanContext(); + new (context) CensusContext(method, parent_ctx); +} + +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctxt) { + if (parent_ctxt != nullptr) { + SpanContext span_ctxt = parent_ctxt->Context(); + Span span = parent_ctxt->Span(); + if (span_ctxt.IsValid()) { + new (ctxt) CensusContext(method, &span); + return; + } + } + new (ctxt) CensusContext(method); +} + +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size) { + GrpcTraceContext trace_ctxt(context); + return TraceContextEncoding::Encode(trace_ctxt, tracing_buf, + tracing_buf_size); +} + +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags) { + // TODO: Add implementation. Waiting on stats tagging to be added. + return 0; +} + +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size) { + return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size); +} + +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time) { + return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size), + server_elapsed_time); +} + +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.incoming.data_bytes; +} + +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) { + return final_info->stats.transport_stream_stats.outgoing.data_bytes; +} + +SpanContext SpanContextFromCensusContext(const census_context* ctxt) { + return reinterpret_cast(ctxt)->Context(); +} + +Span SpanFromCensusContext(const census_context* ctxt) { + return reinterpret_cast(ctxt)->Span(); +} + +absl::string_view StatusCodeToString(grpc_status_code code) { + switch (code) { + case GRPC_STATUS_OK: + return "OK"; + case GRPC_STATUS_CANCELLED: + return "CANCELLED"; + case GRPC_STATUS_UNKNOWN: + return "UNKNOWN"; + case GRPC_STATUS_INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case GRPC_STATUS_DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case GRPC_STATUS_NOT_FOUND: + return "NOT_FOUND"; + case GRPC_STATUS_ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case GRPC_STATUS_PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case GRPC_STATUS_UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case GRPC_STATUS_RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case GRPC_STATUS_FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case GRPC_STATUS_ABORTED: + return "ABORTED"; + case GRPC_STATUS_OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case GRPC_STATUS_UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case GRPC_STATUS_INTERNAL: + return "INTERNAL"; + case GRPC_STATUS_UNAVAILABLE: + return "UNAVAILABLE"; + case GRPC_STATUS_DATA_LOSS: + return "DATA_LOSS"; + default: + // gRPC wants users of this enum to include a default branch so that + // adding values is not a breaking change. + return "UNKNOWN_STATUS"; + } +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/context.h b/src/cpp/ext/filters/census/context.h new file mode 100644 index 00000000000..1643fdd11be --- /dev/null +++ b/src/cpp/ext/filters/census/context.h @@ -0,0 +1,126 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H + +#include + +#include +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" +#include "opencensus/trace/span.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/trace_params.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/cpp/common/channel_filter.h" +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +// This is needed because grpc has hardcoded CensusContext with a +// forward declaration of 'struct census_context;' +struct census_context; + +namespace grpc { + +// Thread compatible. +class CensusContext { + public: + CensusContext() : span_(::opencensus::trace::Span::BlankSpan()) {} + + explicit CensusContext(absl::string_view name) + : span_(::opencensus::trace::Span::StartSpan(name)) {} + + CensusContext(absl::string_view name, const ::opencensus::trace::Span* parent) + : span_(::opencensus::trace::Span::StartSpan(name, parent)) {} + + CensusContext(absl::string_view name, + const ::opencensus::trace::SpanContext& parent_ctxt) + : span_(::opencensus::trace::Span::StartSpanWithRemoteParent( + name, parent_ctxt)) {} + + ::opencensus::trace::SpanContext Context() const { return span_.context(); } + ::opencensus::trace::Span Span() const { return span_; } + void EndSpan() { span_.End(); } + + private: + ::opencensus::trace::Span span_; +}; + +// Serializes the outgoing trace context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. +size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context, + char* tracing_buf, size_t tracing_buf_size); + +// Serializes the outgoing stats context. Field IDs are 1 byte followed by +// field data. A 1 byte version ID is always encoded first. Tags are directly +// serialized into the given grpc_slice. +size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags); + +// Serialize outgoing server stats. Returns the number of bytes serialized. +size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf, + size_t buf_size); + +// Deserialize incoming server stats. Returns the number of bytes deserialized. +size_t ServerStatsDeserialize(const char* buf, size_t buf_size, + uint64_t* server_elapsed_time); + +// Deserialize the incoming SpanContext and generate a new server context based +// on that. This new span will never be a root span. This should only be called +// with a blank CensusContext as it overwrites it. +void GenerateServerContext(absl::string_view tracing, absl::string_view stats, + absl::string_view primary_role, + absl::string_view method, CensusContext* context); + +// Creates a new client context that is by default a new root context. +// If the current context is the default context then the newly created +// span automatically becomes a root span. This should only be called with a +// blank CensusContext as it overwrites it. +void GenerateClientContext(absl::string_view method, CensusContext* ctxt, + CensusContext* parent_ctx); + +// Returns the incoming data size from the grpc call final info. +uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info); + +// Returns the outgoing data size from the grpc call final info. +uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info); + +// These helper functions return the SpanContext and Span, respectively +// associated with the census_context* stored by grpc. The user will need to +// call this for manual propagation of tracing data. +::opencensus::trace::SpanContext SpanContextFromCensusContext( + const census_context* ctxt); +::opencensus::trace::Span SpanFromCensusContext(const census_context* ctxt); + +// Returns a string representation of the StatusCode enum. +absl::string_view StatusCodeToString(grpc_status_code code); + +inline absl::string_view GetMethod(const grpc_slice* path) { + if (GRPC_SLICE_IS_EMPTY(*path)) { + return ""; + } + // Check for leading '/' and trim it if present. + return absl::StripPrefix(absl::string_view(reinterpret_cast( + GRPC_SLICE_START_PTR(*path)), + GRPC_SLICE_LENGTH(*path)), + "/"); +} + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H */ diff --git a/src/core/ext/census/grpc_context.cc b/src/cpp/ext/filters/census/grpc_context.cc similarity index 100% rename from src/core/ext/census/grpc_context.cc rename to src/cpp/ext/filters/census/grpc_context.cc diff --git a/src/cpp/ext/filters/census/grpc_plugin.cc b/src/cpp/ext/filters/census/grpc_plugin.cc new file mode 100644 index 00000000000..f978ed3bf51 --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.cc @@ -0,0 +1,130 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include + +#include "opencensus/trace/span.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/client_filter.h" +#include "src/cpp/ext/filters/census/measures.h" +#include "src/cpp/ext/filters/census/server_filter.h" + +namespace grpc { + +void RegisterOpenCensusPlugin() { + RegisterChannelFilter( + "opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + RegisterChannelFilter( + "opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */, + nullptr /* condition function */); + + // Access measures to ensure they are initialized. Otherwise, creating a view + // before the first RPC would cause an error. + RpcClientSentBytesPerRpc(); + RpcClientReceivedBytesPerRpc(); + RpcClientRoundtripLatency(); + RpcClientServerLatency(); + RpcClientSentMessagesPerRpc(); + RpcClientReceivedMessagesPerRpc(); + + RpcServerSentBytesPerRpc(); + RpcServerReceivedBytesPerRpc(); + RpcServerServerLatency(); + RpcServerSentMessagesPerRpc(); + RpcServerReceivedMessagesPerRpc(); +} + +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context) { + return reinterpret_cast(context->census_context()) + ->Span(); +} + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. +::opencensus::stats::TagKey ClientMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ClientStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_client_status"); + return status_tag_key; +} + +::opencensus::stats::TagKey ServerMethodTagKey() { + static const auto method_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_method"); + return method_tag_key; +} + +::opencensus::stats::TagKey ServerStatusTagKey() { + static const auto status_tag_key = + ::opencensus::stats::TagKey::Register("grpc_server_status"); + return status_tag_key; +} + +// Client +ABSL_CONST_INIT const absl::string_view + kRpcClientSentMessagesPerRpcMeasureName = + "grpc.io/client/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientSentBytesPerRpcMeasureName = + "grpc.io/client/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedMessagesPerRpcMeasureName = + "grpc.io/client/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcClientReceivedBytesPerRpcMeasureName = + "grpc.io/client/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName = + "grpc.io/client/roundtrip_latency"; + +ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName = + "grpc.io/client/server_latency"; + +// Server +ABSL_CONST_INIT const absl::string_view + kRpcServerSentMessagesPerRpcMeasureName = + "grpc.io/server/sent_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerSentBytesPerRpcMeasureName = + "grpc.io/server/sent_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedMessagesPerRpcMeasureName = + "grpc.io/server/received_messages_per_rpc"; + +ABSL_CONST_INIT const absl::string_view + kRpcServerReceivedBytesPerRpcMeasureName = + "grpc.io/server/received_bytes_per_rpc"; + +ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName = + "grpc.io/server/server_latency"; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/grpc_plugin.h b/src/cpp/ext/filters/census/grpc_plugin.h new file mode 100644 index 00000000000..7ff2e7a8b8c --- /dev/null +++ b/src/cpp/ext/filters/census/grpc_plugin.h @@ -0,0 +1,111 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H + +#include + +#include "absl/strings/string_view.h" +#include "include/grpcpp/opencensus.h" +#include "opencensus/stats/stats.h" +#include "opencensus/trace/span.h" + +namespace grpc { + +class ServerContext; + +// Returns the tracing Span for the current RPC. +::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context); + +// The tag keys set when recording RPC stats. +::opencensus::stats::TagKey ClientMethodTagKey(); +::opencensus::stats::TagKey ClientStatusTagKey(); +::opencensus::stats::TagKey ServerMethodTagKey(); +::opencensus::stats::TagKey ServerStatusTagKey(); + +// Names of measures used by the plugin--users can create views on these +// measures but should not record data for them. +extern const absl::string_view kRpcClientSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcClientRoundtripLatencyMeasureName; +extern const absl::string_view kRpcClientServerLatencyMeasureName; + +extern const absl::string_view kRpcServerSentMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName; +extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName; +extern const absl::string_view kRpcServerServerLatencyMeasureName; + +// Canonical gRPC view definitions. +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ClientReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative(); + +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedBytesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative(); +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative(); +const ::opencensus::stats::ViewDescriptor& +ServerReceivedMessagesPerRpcCumulative(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute(); + +const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour(); + +const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour(); +const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour(); +const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour(); +const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H */ diff --git a/src/cpp/ext/filters/census/measures.cc b/src/cpp/ext/filters/census/measures.cc new file mode 100644 index 00000000000..b522fae09ae --- /dev/null +++ b/src/cpp/ext/filters/census/measures.cc @@ -0,0 +1,129 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/measures.h" + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +using ::opencensus::stats::MeasureDouble; +using ::opencensus::stats::MeasureInt64; + +// These measure definitions should be kept in sync across opencensus +// implementations--see +// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java. + +namespace { + +// Unit constants +constexpr char kUnitBytes[] = "By"; +constexpr char kUnitMilliseconds[] = "ms"; +constexpr char kCount[] = "1"; + +} // namespace + +// Client +MeasureDouble RpcClientSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientSentBytesPerRpcMeasureName, + "Total bytes sent across all request messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcClientReceivedBytesPerRpcMeasureName, + "Total bytes received across all response messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcClientRoundtripLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientRoundtripLatencyMeasureName, + "Time between first byte of request sent to last byte of response " + "received, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureDouble RpcClientServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcClientServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error (propagated from the server)", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcClientSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcClientReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcClientReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +// Server +MeasureDouble RpcServerSentBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerSentBytesPerRpcMeasureName, + "Total bytes sent across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerReceivedBytesPerRpc() { + static const auto measure = MeasureDouble::Register( + kRpcServerReceivedBytesPerRpcMeasureName, + "Total bytes received across all messages per RPC", kUnitBytes); + return measure; +} + +MeasureDouble RpcServerServerLatency() { + static const auto measure = MeasureDouble::Register( + kRpcServerServerLatencyMeasureName, + "Time between first byte of request received to last byte of response " + "sent, or terminal error", + kUnitMilliseconds); + return measure; +} + +MeasureInt64 RpcServerSentMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName, + "Number of messages sent per RPC", kCount); + return measure; +} + +MeasureInt64 RpcServerReceivedMessagesPerRpc() { + static const auto measure = + MeasureInt64::Register(kRpcServerReceivedMessagesPerRpcMeasureName, + "Number of messages received per RPC", kCount); + return measure; +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/measures.h b/src/cpp/ext/filters/census/measures.h new file mode 100644 index 00000000000..8f8e72ace20 --- /dev/null +++ b/src/cpp/ext/filters/census/measures.h @@ -0,0 +1,46 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H + +#include + +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +namespace grpc { + +::opencensus::stats::MeasureInt64 RpcClientSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcClientRoundtripLatency(); +::opencensus::stats::MeasureDouble RpcClientServerLatency(); +::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs(); + +::opencensus::stats::MeasureInt64 RpcServerSentMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc(); +::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc(); +::opencensus::stats::MeasureDouble RpcServerServerLatency(); +::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs(); + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H */ diff --git a/src/cpp/ext/filters/census/rpc_encoding.cc b/src/cpp/ext/filters/census/rpc_encoding.cc new file mode 100644 index 00000000000..45a66d9dc82 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.cc @@ -0,0 +1,39 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/rpc_encoding.h" + +namespace grpc { + +constexpr size_t TraceContextEncoding::kGrpcTraceContextSize; +constexpr size_t TraceContextEncoding::kEncodeDecodeFailure; +constexpr size_t TraceContextEncoding::kVersionIdSize; +constexpr size_t TraceContextEncoding::kFieldIdSize; +constexpr size_t TraceContextEncoding::kVersionIdOffset; +constexpr size_t TraceContextEncoding::kVersionId; + +constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize; +constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure; +constexpr size_t RpcServerStatsEncoding::kVersionIdSize; +constexpr size_t RpcServerStatsEncoding::kFieldIdSize; +constexpr size_t RpcServerStatsEncoding::kVersionIdOffset; +constexpr size_t RpcServerStatsEncoding::kVersionId; + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/rpc_encoding.h b/src/cpp/ext/filters/census/rpc_encoding.h new file mode 100644 index 00000000000..ffffa60c468 --- /dev/null +++ b/src/cpp/ext/filters/census/rpc_encoding.h @@ -0,0 +1,284 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H + +#include + +#include + +#include "absl/base/internal/endian.h" +#include "absl/strings/string_view.h" +#include "opencensus/trace/span_context.h" +#include "opencensus/trace/span_id.h" +#include "opencensus/trace/trace_id.h" + +namespace grpc { + +// TODO: Rename to GrpcTraceContextV0. +struct GrpcTraceContext { + GrpcTraceContext() {} + + explicit GrpcTraceContext(const ::opencensus::trace::SpanContext& ctx) { + ctx.trace_id().CopyTo(trace_id); + ctx.span_id().CopyTo(span_id); + ctx.trace_options().CopyTo(trace_options); + } + + ::opencensus::trace::SpanContext ToSpanContext() const { + return ::opencensus::trace::SpanContext( + ::opencensus::trace::TraceId(trace_id), + ::opencensus::trace::SpanId(span_id), + ::opencensus::trace::TraceOptions(trace_options)); + } + + // TODO: For performance: + // uint8_t version; + // uint8_t trace_id_field_id; + uint8_t trace_id[::opencensus::trace::TraceId::kSize]; + // uint8_t span_id_field_id; + uint8_t span_id[::opencensus::trace::SpanId::kSize]; + // uint8_t trace_options_field_id; + uint8_t trace_options[::opencensus::trace::TraceOptions::kSize]; +}; + +// TraceContextEncoding encapsulates the logic for encoding and decoding of +// trace contexts. +class TraceContextEncoding { + public: + // Size of encoded GrpcTraceContext. (16 + 8 + 1 + 4) + static constexpr size_t kGrpcTraceContextSize = 29; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes a GrpcTraceContext from the incoming buffer. Returns the + // number of bytes deserialized from the buffer. If the incoming buffer is + // empty or the encoding version is not supported it will return 0 bytes, + // currently only version 0 is supported. If an unknown field ID is + // encountered it will return immediately without parsing the rest of the + // buffer. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, GrpcTraceContext* tc) { + if (buf.empty()) { + return kEncodeDecodeFailure; + } + uint8_t version = buf[kVersionIdOffset]; + // TODO: Support other versions later. Only support version 0 for + // now. + if (version != kVersionId) { + return kEncodeDecodeFailure; + } + + size_t pos = kVersionIdSize; + while (pos < buf.size()) { + size_t bytes_read = + ParseField(absl::string_view(&buf[pos], buf.size() - pos), tc); + if (bytes_read == 0) { + break; + } else { + pos += bytes_read; + } + } + return pos; + } + + // Serializes a GrpcTraceContext into the provided buffer. Returns the number + // of bytes serialized into the buffer. If the buffer is not of sufficient + // size (it must be at least kGrpcTraceContextSize bytes) it will drop + // everything and return 0 bytes serialized. Inlined for performance reasons. + static size_t Encode(const GrpcTraceContext& tc, char* buf, size_t buf_size) { + if (buf_size < kGrpcTraceContextSize) { + return kEncodeDecodeFailure; + } + buf[kVersionIdOffset] = kVersionId; + buf[kTraceIdOffset] = kTraceIdField; + memcpy(&buf[kTraceIdOffset + 1], tc.trace_id, + opencensus::trace::TraceId::kSize); + buf[kSpanIdOffset] = kSpanIdField; + memcpy(&buf[kSpanIdOffset + 1], tc.span_id, + opencensus::trace::SpanId::kSize); + buf[kTraceOptionsOffset] = kTraceOptionsField; + memcpy(&buf[kTraceOptionsOffset + 1], tc.trace_options, + opencensus::trace::TraceOptions::kSize); + return kGrpcTraceContextSize; + } + + private: + // Parses the next field from the incoming buffer and stores the parsed value + // in a GrpcTraceContext struct. If it does not recognize the field ID it + // will return 0, otherwise it returns the number of bytes read. + static size_t ParseField(absl::string_view buf, GrpcTraceContext* tc) { + // TODO: Add support for multi-byte field IDs. + if (buf.empty()) { + return 0; + } + // Field ID is always the first byte in a field. + uint32_t field_id = buf[0]; + size_t bytes_read = kFieldIdSize; + switch (field_id) { + case kTraceIdField: + bytes_read += kTraceIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_id, &buf[kFieldIdSize], + opencensus::trace::TraceId::kSize); + break; + case kSpanIdField: + bytes_read += kSpanIdSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->span_id, &buf[kFieldIdSize], + opencensus::trace::SpanId::kSize); + break; + case kTraceOptionsField: + bytes_read += kTraceOptionsSize; + if (bytes_read > buf.size()) { + return 0; + } + memcpy(tc->trace_options, &buf[kFieldIdSize], + opencensus::trace::TraceOptions::kSize); + break; + default: // Invalid field ID + return 0; + } + + return bytes_read; + } + + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + // Fixed Field ID values: + enum FieldIdValue { + kTraceIdField = 0, + kSpanIdField = 1, + kTraceOptionsField = 2, + }; + + // Field data sizes in bytes + enum FieldSize { + kTraceIdSize = 16, + kSpanIdSize = 8, + kTraceOptionsSize = 1, + }; + + // Fixed size offsets for field ID start positions during encoding. Field + // data immediately follows. + enum FieldIdOffset { + kTraceIdOffset = kVersionIdSize, + kSpanIdOffset = kTraceIdOffset + kFieldIdSize + kTraceIdSize, + kTraceOptionsOffset = kSpanIdOffset + kFieldIdSize + kSpanIdSize, + }; + + TraceContextEncoding() = delete; + TraceContextEncoding(const TraceContextEncoding&) = delete; + TraceContextEncoding(TraceContextEncoding&&) = delete; + TraceContextEncoding operator=(const TraceContextEncoding&) = delete; + TraceContextEncoding operator=(TraceContextEncoding&&) = delete; +}; + +// TODO: This may not be needed. Check to see if opencensus requires +// a trailing server response. +// RpcServerStatsEncoding encapsulates the logic for encoding and decoding of +// rpc server stats messages. Rpc server stats consists of a uint64_t time +// value (server latency in nanoseconds). +class RpcServerStatsEncoding { + public: + // Size of encoded RPC server stats. + static constexpr size_t kRpcServerStatsSize = 10; + // Error value. + static constexpr size_t kEncodeDecodeFailure = 0; + + // Deserializes rpc server stats from the incoming 'buf' into *time. Returns + // number of bytes decoded. If the buffer is of insufficient size (it must be + // at least kRpcServerStatsSize bytes) or the encoding version or field ID are + // unrecognized, *time will be set to 0 and it will return + // kEncodeDecodeFailure. Inlined for performance reasons. + static size_t Decode(absl::string_view buf, uint64_t* time) { + if (buf.size() < kRpcServerStatsSize) { + *time = 0; + return kEncodeDecodeFailure; + } + + uint8_t version = buf[kVersionIdOffset]; + uint32_t fieldID = buf[kServerElapsedTimeOffset]; + if (version != kVersionId || fieldID != kServerElapsedTimeField) { + *time = 0; + return kEncodeDecodeFailure; + } + *time = absl::little_endian::Load64( + &buf[kServerElapsedTimeOffset + kFieldIdSize]); + return kRpcServerStatsSize; + } + + // Serializes rpc server stats into the provided buffer. It returns the + // number of bytes written to the buffer. If the buffer is smaller than + // kRpcServerStatsSize bytes it will return kEncodeDecodeFailure. Inlined for + // performance reasons. + static size_t Encode(uint64_t time, char* buf, size_t buf_size) { + if (buf_size < kRpcServerStatsSize) { + return kEncodeDecodeFailure; + } + + buf[kVersionIdOffset] = kVersionId; + buf[kServerElapsedTimeOffset] = kServerElapsedTimeField; + absl::little_endian::Store64(&buf[kServerElapsedTimeOffset + kFieldIdSize], + time); + return kRpcServerStatsSize; + } + + private: + // Size of Version ID. + static constexpr size_t kVersionIdSize = 1; + // Size of Field ID. + static constexpr size_t kFieldIdSize = 1; + + // Offset and value for currently supported version ID. + static constexpr size_t kVersionIdOffset = 0; + static constexpr size_t kVersionId = 0; + + enum FieldIdValue { + kServerElapsedTimeField = 0, + }; + + enum FieldSize { + kServerElapsedTimeSize = 8, + }; + + enum FieldIdOffset { + kServerElapsedTimeOffset = kVersionIdSize, + }; + + RpcServerStatsEncoding() = delete; + RpcServerStatsEncoding(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding(RpcServerStatsEncoding&&) = delete; + RpcServerStatsEncoding operator=(const RpcServerStatsEncoding&) = delete; + RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H */ diff --git a/src/cpp/ext/filters/census/server_filter.cc b/src/cpp/ext/filters/census/server_filter.cc new file mode 100644 index 00000000000..c7c62eefe51 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.cc @@ -0,0 +1,198 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/server_filter.h" + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "opencensus/stats/stats.h" +#include "src/core/lib/surface/call.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/cpp/ext/filters/census/measures.h" + +namespace grpc { + +constexpr uint32_t CensusServerCallData::kMaxServerStatsLen; + +namespace { + +// server metadata elements +struct ServerMetadataElements { + grpc_slice path; + grpc_slice tracing_slice; + grpc_slice census_proto; +}; + +void FilterInitialMetadata(grpc_metadata_batch* b, + ServerMetadataElements* sml) { + if (b->idx.named.path != nullptr) { + sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md)); + } + if (b->idx.named.grpc_trace_bin != nullptr) { + sml->tracing_slice = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin); + } + if (b->idx.named.grpc_tags_bin != nullptr) { + sml->census_proto = + grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md)); + grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin); + } +} + +} // namespace + +void CensusServerCallData::OnDoneRecvMessageCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusServerCallData* calld = + reinterpret_cast(elem->call_data); + CensusChannelData* channeld = + reinterpret_cast(elem->channel_data); + GPR_ASSERT(calld != nullptr); + GPR_ASSERT(channeld != nullptr); + // Stream messages are no longer valid after receiving trailing metadata. + if ((*calld->recv_message_) != nullptr) { + ++calld->recv_message_count_; + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data, + grpc_error* error) { + grpc_call_element* elem = reinterpret_cast(user_data); + CensusServerCallData* calld = + reinterpret_cast(elem->call_data); + GPR_ASSERT(calld != nullptr); + if (error == GRPC_ERROR_NONE) { + grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_; + GPR_ASSERT(initial_metadata != nullptr); + ServerMetadataElements sml; + sml.path = grpc_empty_slice(); + sml.tracing_slice = grpc_empty_slice(); + sml.census_proto = grpc_empty_slice(); + FilterInitialMetadata(initial_metadata, &sml); + calld->path_ = grpc_slice_ref_internal(sml.path); + calld->method_ = GetMethod(&calld->path_); + calld->qualified_method_ = StrCat("Recv.", calld->method_); + const char* tracing_str = + GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? "" + : reinterpret_cast( + GRPC_SLICE_START_PTR(sml.tracing_slice)); + size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice) + ? 0 + : GRPC_SLICE_LENGTH(sml.tracing_slice); + const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? "" + : reinterpret_cast( + GRPC_SLICE_START_PTR(sml.census_proto)); + size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto) + ? 0 + : GRPC_SLICE_LENGTH(sml.census_proto); + + GenerateServerContext(absl::string_view(tracing_str, tracing_str_len), + absl::string_view(census_str, census_str_len), + /*primary_role*/ "", calld->qualified_method_, + &calld->context_); + + grpc_slice_unref_internal(sml.tracing_slice); + grpc_slice_unref_internal(sml.census_proto); + grpc_slice_unref_internal(sml.path); + grpc_census_call_set_context( + calld->gc_, reinterpret_cast(&calld->context_)); + } + GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_, + GRPC_ERROR_REF(error)); +} + +void CensusServerCallData::StartTransportStreamOpBatch( + grpc_call_element* elem, TransportStreamOpBatch* op) { + if (op->recv_initial_metadata() != nullptr) { + // substitute our callback for the op callback + recv_initial_metadata_ = op->recv_initial_metadata()->batch(); + initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready(); + op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_); + } + if (op->send_message() != nullptr) { + ++sent_message_count_; + } + if (op->recv_message() != nullptr) { + recv_message_ = op->op()->payload->recv_message.recv_message; + initial_on_done_recv_message_ = + op->op()->payload->recv_message.recv_message_ready; + op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_; + } + // We need to record the time when the trailing metadata was sent to mark the + // completeness of the request. + if (op->send_trailing_metadata() != nullptr) { + elapsed_time_ = absl::Now() - start_time_; + size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_), + stats_buf_, kMaxServerStatsLen); + if (len > 0) { + GRPC_LOG_IF_ERROR( + "census grpc_filter", + grpc_metadata_batch_add_tail( + op->send_trailing_metadata()->batch(), &census_bin_, + grpc_mdelem_from_slices( + GRPC_MDSTR_GRPC_SERVER_STATS_BIN, + grpc_slice_from_copied_buffer(stats_buf_, len)))); + } + } + // Call next op. + grpc_call_next_op(elem, op->op()); +} + +grpc_error* CensusServerCallData::Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + start_time_ = absl::Now(); + gc_ = + grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0)); + GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_, + OnDoneRecvInitialMetadataCb, elem, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem, + grpc_schedule_on_exec_ctx); + auth_context_ = grpc_call_auth_context(gc_); + return GRPC_ERROR_NONE; +} + +void CensusServerCallData::Destroy(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) { + const uint64_t request_size = GetOutgoingDataSize(final_info); + const uint64_t response_size = GetIncomingDataSize(final_info); + double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_); + grpc_auth_context_release(auth_context_); + ::opencensus::stats::Record( + {{RpcServerSentBytesPerRpc(), static_cast(response_size)}, + {RpcServerReceivedBytesPerRpc(), static_cast(request_size)}, + {RpcServerServerLatency(), elapsed_time_ms}, + {RpcServerSentMessagesPerRpc(), sent_message_count_}, + {RpcServerReceivedMessagesPerRpc(), recv_message_count_}}, + {{ServerMethodTagKey(), method_}, + {ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}}); + grpc_slice_unref_internal(path_); + context_.EndSpan(); +} + +} // namespace grpc diff --git a/src/cpp/ext/filters/census/server_filter.h b/src/cpp/ext/filters/census/server_filter.h new file mode 100644 index 00000000000..e393ed32839 --- /dev/null +++ b/src/cpp/ext/filters/census/server_filter.h @@ -0,0 +1,101 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H +#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H + +#include + +#include "absl/strings/string_view.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "include/grpc/grpc_security.h" +#include "src/cpp/ext/filters/census/channel_filter.h" +#include "src/cpp/ext/filters/census/context.h" + +namespace grpc { + +// A CallData class will be created for every grpc call within a channel. It is +// used to store data and methods specific to that call. CensusServerCallData is +// thread-compatible, however typically only 1 thread should be interacting with +// a call at a time. +class CensusServerCallData : public CallData { + public: + // Maximum size of server stats that are sent on the wire. + static constexpr uint32_t kMaxServerStatsLen = 16; + + CensusServerCallData() + : gc_(nullptr), + auth_context_(nullptr), + recv_initial_metadata_(nullptr), + initial_on_done_recv_initial_metadata_(nullptr), + initial_on_done_recv_message_(nullptr), + recv_message_(nullptr), + recv_message_count_(0), + sent_message_count_(0) { + memset(&census_bin_, 0, sizeof(grpc_linked_mdelem)); + memset(&path_, 0, sizeof(grpc_slice)); + memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure)); + memset(&on_done_recv_message_, 0, sizeof(grpc_closure)); + } + + grpc_error* Init(grpc_call_element* elem, + const grpc_call_element_args* args) override; + + void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info, + grpc_closure* then_call_closure) override; + + void StartTransportStreamOpBatch(grpc_call_element* elem, + TransportStreamOpBatch* op) override; + + static void OnDoneRecvInitialMetadataCb(void* user_data, grpc_error* error); + + static void OnDoneRecvMessageCb(void* user_data, grpc_error* error); + + private: + CensusContext context_; + // server method + absl::string_view method_; + std::string qualified_method_; + grpc_slice path_; + // Pointer to the grpc_call element + grpc_call* gc_; + // Authorization context for the call. + grpc_auth_context* auth_context_; + // Metadata element for census stats. + grpc_linked_mdelem census_bin_; + // recv callback + grpc_metadata_batch* recv_initial_metadata_; + grpc_closure* initial_on_done_recv_initial_metadata_; + grpc_closure on_done_recv_initial_metadata_; + // recv message + grpc_closure* initial_on_done_recv_message_; + grpc_closure on_done_recv_message_; + absl::Time start_time_; + absl::Duration elapsed_time_; + grpc_core::OrphanablePtr* recv_message_; + uint64_t recv_message_count_; + uint64_t sent_message_count_; + // Buffer needed for grpc_slice to reference it when adding metatdata to + // response. + char stats_buf_[kMaxServerStatsLen]; +}; + +} // namespace grpc + +#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H */ diff --git a/src/cpp/ext/filters/census/views.cc b/src/cpp/ext/filters/census/views.cc new file mode 100644 index 00000000000..2c0c5f72950 --- /dev/null +++ b/src/cpp/ext/filters/census/views.cc @@ -0,0 +1,491 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/cpp/ext/filters/census/grpc_plugin.h" + +#include "absl/time/time.h" +#include "opencensus/stats/internal/aggregation_window.h" +#include "opencensus/stats/internal/set_aggregation_window.h" +#include "opencensus/stats/stats.h" + +namespace grpc { + +using ::opencensus::stats::Aggregation; +using ::opencensus::stats::AggregationWindow; +using ::opencensus::stats::BucketBoundaries; +using ::opencensus::stats::ViewDescriptor; + +// These measure definitions should be kept in sync across opencensus +// implementations. + +namespace { + +Aggregation BytesDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, + 67108864, 268435456, 1073741824, 4294967296})); +} + +Aggregation MillisDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Explicit( + {0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, + 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, + 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, + 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000})); +} + +Aggregation CountDistributionAggregation() { + return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0)); +} + +ViewDescriptor MinuteDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)), + &descriptor); + return descriptor; +} + +ViewDescriptor HourDescriptor() { + auto descriptor = ViewDescriptor(); + SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)), + &descriptor); + return descriptor; +} + +} // namespace + +void RegisterOpenCensusViewsForExport() { + ClientSentMessagesPerRpcCumulative().RegisterForExport(); + ClientSentBytesPerRpcCumulative().RegisterForExport(); + ClientReceivedMessagesPerRpcCumulative().RegisterForExport(); + ClientReceivedBytesPerRpcCumulative().RegisterForExport(); + ClientRoundtripLatencyCumulative().RegisterForExport(); + ClientServerLatencyCumulative().RegisterForExport(); + + ServerSentMessagesPerRpcCumulative().RegisterForExport(); + ServerSentBytesPerRpcCumulative().RegisterForExport(); + ServerReceivedMessagesPerRpcCumulative().RegisterForExport(); + ServerReceivedBytesPerRpcCumulative().RegisterForExport(); + ServerServerLatencyCumulative().RegisterForExport(); +} + +// client cumulative +const ViewDescriptor& ClientSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/cumulative") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/roundtrip_latency/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/server_latency/cumulative") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/completed_rpcs/cumulative") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/cumulative") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/cumulative") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server cumulative +const ViewDescriptor& ServerSentBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/cumulative") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/cumulative") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/elapsed_time/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/completed_rpcs/cumulative") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/cumulative") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() { + const static ViewDescriptor descriptor = + ViewDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/cumulative") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client minute +const ViewDescriptor& ClientSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/minute") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/minute") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/roundtrip_latency/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/server_latency/minute") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/completed_rpcs/minute") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/minute") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/minute") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server minute +const ViewDescriptor& ServerSentBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/minute") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/minute") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/server_latency/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/completed_rpcs/minute") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/minute") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcMinute() { + const static ViewDescriptor descriptor = + MinuteDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/minute") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +// client hour +const ViewDescriptor& ClientSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_bytes_per_rpc/hour") + .set_measure(kRpcClientSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_bytes_per_rpc/hour") + .set_measure(kRpcClientReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientRoundtripLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/roundtrip_latency/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/server_latency/hour") + .set_measure(kRpcClientServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/completed_rpcs/hour") + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()) + .add_column(ClientStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/sent_messages_per_rpc/hour") + .set_measure(kRpcClientSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ClientReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/client/received_messages_per_rpc/hour") + .set_measure(kRpcClientReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ClientMethodTagKey()); + return descriptor; +} + +// server hour +const ViewDescriptor& ServerSentBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_bytes_per_rpc/hour") + .set_measure(kRpcServerSentBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedBytesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_bytes_per_rpc/hour") + .set_measure(kRpcServerReceivedBytesPerRpcMeasureName) + .set_aggregation(BytesDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerServerLatencyHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/server_latency/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(MillisDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerCompletedRpcsHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/completed_rpcs/hour") + .set_measure(kRpcServerServerLatencyMeasureName) + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()) + .add_column(ServerStatusTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerSentMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/sent_messages_per_rpc/hour") + .set_measure(kRpcServerSentMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +const ViewDescriptor& ServerReceivedMessagesPerRpcHour() { + const static ViewDescriptor descriptor = + HourDescriptor() + .set_name("grpc.io/server/received_messages_per_rpc/hour") + .set_measure(kRpcServerReceivedMessagesPerRpcMeasureName) + .set_aggregation(CountDistributionAggregation()) + .add_column(ServerMethodTagKey()); + return descriptor; +} + +} // namespace grpc diff --git a/src/proto/census/census.options b/src/proto/census/census.options deleted file mode 100644 index a1f80395c70..00000000000 --- a/src/proto/census/census.options +++ /dev/null @@ -1,3 +0,0 @@ -google.census.Tag.key max_size:255 -google.census.Tag.value max_size:255 -google.census.View.tag_key max_count:15 diff --git a/src/proto/census/census.proto b/src/proto/census/census.proto deleted file mode 100644 index ae7d7763e86..00000000000 --- a/src/proto/census/census.proto +++ /dev/null @@ -1,307 +0,0 @@ -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package google.census; - -// All the census protos. -// -// Nomenclature note: capitalized names below (like Resource) are protos. -// -// Census lets you define a Resource - something which can be measured, like the -// latency of an RPC, the number of CPU cycles spent on an operation, or -// anything else you care to measure. You can record individual instances of -// measurements (a double value) for every Resource of interest. These -// individual measurements are aggregated together into an Aggregation. There -// are two Aggregation types available: Distribution (describes the -// distribution of all measurements, possibly with a histogram) and -// IntervalStats (the count and mean of measurements across specified time -// periods). An Aggregation is described by an AggregationDescriptor. -// -// You can define how your stats are broken down by Tag values and which -// Aggregations to use through a View. The corresponding combination of -// Resource/View/Aggregation which is available to census clients is called a -// Metric. - - -// The following two types are copied from -// google/protobuf/{duration,timestamp}.proto. Ideally, we would be able to -// import them, but this causes compilation issues on C-based systems -// (e.g. https://koti.kapsi.fi/jpa/nanopb/), which cannot process the C++ -// headers generated from the standard protobuf distribution. See the relevant -// proto files for full documentation of these types. - -message Duration { - // Signed seconds of the span of time. Must be from -315,576,000,000 - // to +315,576,000,000 inclusive. - int64 seconds = 1; - - // Signed fractions of a second at nanosecond resolution of the span - // of time. Durations less than one second are represented with a 0 - // `seconds` field and a positive or negative `nanos` field. For durations - // of one second or more, a non-zero value for the `nanos` field must be - // of the same sign as the `seconds` field. Must be from -999,999,999 - // to +999,999,999 inclusive. - int32 nanos = 2; -} - -message Timestamp { - // Represents seconds of UTC time since Unix epoch - // 1970-01-01T00:00:00Z. Must be from from 0001-01-01T00:00:00Z to - // 9999-12-31T23:59:59Z inclusive. - int64 seconds = 1; - - // Non-negative fractions of a second at nanosecond resolution. Negative - // second values with fractions must still have non-negative nanos values - // that count forward in time. Must be from 0 to 999,999,999 - // inclusive. - int32 nanos = 2; -} - -// Describes a Resource. -message Resource { - // name of resource, e.g. rpc_latency, cpu. Must be unique. - string name = 1; - - // More detailed description of the resource, used in documentation. - string description = 2; - - // Fundamental units of measurement supported by Census - // TODO(aveitch): expand this to include other S.I. units? - enum BasicUnit { - UNKNOWN = 0; - BITS = 1; - BYTES = 2; - SECS = 3; - CORES = 4; - MAX_UNITS = 5; - } - - // MeasurementUnit lets you build compound units of the form - // 10^n * (A * B * ...) / (X * Y * ...), - // where the elements in the numerator and denominator are all BasicUnits. A - // MeasurementUnit must have at least one BasicUnit in its numerator. - // - // To specify multiplication in the numerator or denominator, simply specify - // multiple numerator or denominator fields. For example: - // - // - byte-seconds (i.e. bytes * seconds): - // numerator: BYTES - // numerator: SECS - // - // - events/sec^2 (i.e. rate of change of events/sec): - // numerator: COUNT - // denominator: SECS - // denominator: SECS - // - // To specify multiples (in power of 10) of units, specify a non-zero prefix - // value, for example: - // - // - MB/s (i.e. megabytes / s): - // prefix: 6 - // numerator: BYTES - // denominator: SECS - // - // - nanoseconds - // prefix: -9 - // numerator: SECS - message MeasurementUnit { - int32 prefix = 1; - repeated BasicUnit numerator = 2; - repeated BasicUnit denominator = 3; - } - - // The units in which Resource values are measured. - MeasurementUnit unit = 3; -} - -// An Aggregation summarizes a series of individual Resource measurements, an -// AggregationDescriptor describes an Aggregation. -message AggregationDescriptor { - enum AggregationType { - // Unspecified. Should not be used. - UNKNOWN = 0; - // A count of measurements made. - COUNT = 1; - // A Distribution. - DISTRIBUTION = 2; - // Counts over fixed time intervals. - INTERVAL = 3; - } - // The type of Aggregation. - AggregationType type = 1; - - // At most one set of options. It is illegal to specifiy an option for - // COUNT Aggregations. interval_boundaries must be set for INTERVAL types. - // bucket_boundaries are optional for DISTRIBUTION types. - oneof options { - // Defines histogram bucket boundaries for Distributions. - BucketBoundaries bucket_boundaries = 2; - // Defines the time windows to record for IntervalStats. - IntervalBoundaries interval_boundaries = 3; - } - - // A Distribution may optionally contain a histogram of the values in the - // population. The bucket boundaries for that histogram are described by - // `bucket_boundaries`. This defines `size(bounds) + 1` (= N) buckets. The - // boundaries for bucket index i are: - // - // [-infinity, bounds[i]) for i == 0 - // [bounds[i-1], bounds[i]) for 0 < i < N-2 - // [bounds[i-1], +infinity) for i == N-1 - // - // i.e. an underflow bucket (number 0), zero or more finite buckets (1 - // through N - 2, and an overflow bucket (N - 1), with inclusive lower - // bounds and exclusive upper bounds. - // - // There must be at least one element in `bounds`. If `bounds` has only one - // element, there are no finite buckets, and that single element is the - // common boundary of the overflow and underflow buckets. - message BucketBoundaries { - // The values must be monotonically increasing. - repeated double bounds = 1; - } - - // For Interval stats, describe the size of each window. - message IntervalBoundaries { - // For each time window, specify a duration in seconds. - repeated double window_size = 1; - } -} - -// Distribution contains summary statistics for a population of values and, -// optionally, a histogram representing the distribution of those values across -// a specified set of histogram buckets, as defined in -// Aggregation.bucket_options. -// -// The summary statistics are the count, mean, minimum, and the maximum of the -// set of population of values. -// -// Although it is not forbidden, it is generally a bad idea to include -// non-finite values (infinities or NaNs) in the population of values, as this -// will render the `mean` field meaningless. -message Distribution { - // The number of values in the population. Must be non-negative. - int64 count = 1; - - // The arithmetic mean of the values in the population. If `count` is zero - // then this field must be zero. - double mean = 2; - - // Describes a range of population values. - message Range { - // The minimum of the population values. - double min = 1; - // The maximum of the population values. - double max = 2; - } - - // The range of the population values. If `count` is zero, this field will not - // be defined. - Range range = 3; - - // A Distribution may optionally contain a histogram of the values in the - // population. The histogram is given in `bucket_count` as counts of values - // that fall into one of a sequence of non-overlapping buckets, as described - // by `AggregationDescriptor.options.bucket_boundaries`. - // The sum of the values in `bucket_counts` must equal the value in `count`. - // - // Bucket counts are given in order under the numbering scheme described - // above (the underflow bucket has number 0; the finite buckets, if any, - // have numbers 1 through N-2; the overflow bucket has number N-1). - // - // The size of `bucket_count` must be no greater than N as defined in - // `bucket_boundaries`. - // - // Any suffix of trailing zero bucket_count fields may be omitted. - repeated int64 bucket_count = 4; -} - -// Record summary stats over various time windows. -message IntervalStats { - // Summary statistic over a single time window. - message Window { - // The window duration. Must be positive. - Duration window_size = 1; - // The number of measurements in this window. - int64 count = 2; - // The arithmetic mean of all measurements in the window. - double mean = 3; - } - - // Full set of windows for this aggregation. - repeated Window window = 1; -} - -// A Tag: key-value pair. -message Tag { - string key = 1; - string value = 2; -} - -// A View specifies an Aggregation and a set of tag keys. The Aggregation will -// be broken down by the unique set of matching tag values for each measurement. -message View { - // Name of view. Must be unique. - string name = 1; - - // More detailed description, for documentation purposes. - string description = 2; - - // Name of Resource to be broken down for this view. - string resource_name = 3; - - // Aggregation type to associate with this View. - AggregationDescriptor aggregation = 4; - - // Tag keys to match with a given Resource measurement. If no keys are - // specified, then all stats are recorded. Keys must be unique. - repeated string tag_key = 5; -} - -// An Aggregation summarizes a series of individual Resource measurements. -message Aggregation { - // Name of this aggregation. - string name = 1; - - // More detailed description, for documentation purposes. - string description = 2; - - // The data for this Aggregation. - oneof data { - uint64 count = 3; - Distribution distribution = 4; - IntervalStats interval_stats = 5; - } - - // Tags associated with this Aggregation. - repeated Tag tag = 6; -} - -// A Metric represents all the Aggregations for a particular view. -message Metric { - // View associated with this Metric. - string view_name = 1; - - // Aggregations - each will have a unique set of tag values for the tag_keys - // associated with the corresponding View. - repeated Aggregation aggregation = 2; - - // Start and end timestamps over which the metric was accumulated. These - // values are not relevant/defined for IntervalStats aggregations, which are - // always accumulated over a fixed time period. - Timestamp start = 3; - Timestamp end = 4; -} diff --git a/src/proto/census/trace_context.options b/src/proto/census/trace_context.options deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/proto/census/trace_context.proto b/src/proto/census/trace_context.proto deleted file mode 100644 index 7e5087dbee3..00000000000 --- a/src/proto/census/trace_context.proto +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package google.trace; - -// Tracing information that is propagated with RPC's. -message TraceContext { - // A TraceId uniquely represents a single Trace. It is a 128-bit nonce. - // The 128-bit ID is split into 2 64-bit chunks. (REQUIRED) - fixed64 trace_id_hi = 1; - fixed64 trace_id_lo = 2; - // ID of parent (client) span. (REQUIRED) - fixed64 span_id = 3; - // Span option flags. First bit is true if this trace is sampled. (OPTIONAL) - fixed32 span_options = 4; -} diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d7d489c5a43..f53a6c9be21 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -353,7 +353,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc', 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc', - 'src/core/ext/census/grpc_context.cc', + 'src/cpp/ext/filters/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', 'src/core/ext/filters/http/client_authority_filter.cc', diff --git a/test/core/statistics/census_log_tests.h b/test/core/statistics/census_log_tests.h deleted file mode 100644 index ed808636e37..00000000000 --- a/test/core/statistics/census_log_tests.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H -#define GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H - -void test_invalid_record_size(); -void test_end_write_with_different_size(); -void test_read_pending_record(); -void test_read_beyond_pending_record(); -void test_detached_while_reading(); -void test_fill_log_no_fragmentation(); -void test_fill_circular_log_no_fragmentation(); -void test_fill_log_with_straddling_records(); -void test_fill_circular_log_with_straddling_records(); -void test_multiple_writers_circular_log(); -void test_multiple_writers(); -void test_performance(); -void test_small_log(); - -#endif /* GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H */ diff --git a/test/core/statistics/census_stub_test.cc b/test/core/statistics/census_stub_test.cc deleted file mode 100644 index 507ae0a9fa4..00000000000 --- a/test/core/statistics/census_stub_test.cc +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -#include -#include -#include "src/core/ext/census/census_interface.h" -#include "src/core/ext/census/census_rpc_stats.h" -#include "test/core/util/test_config.h" - -/* Tests census noop stubs in a simulated rpc flow */ -void test_census_stubs(void) { - census_op_id op_id; - census_rpc_stats* stats = census_rpc_stats_create_empty(); - census_aggregated_rpc_stats data_map = {0, NULL}; - - /* Initializes census library at server start up time. */ - census_init(); - /* Starts tracing at the beginning of a rpc. */ - op_id = census_tracing_start_op(); - /* Appends custom annotations on a trace object. */ - census_tracing_print(op_id, "annotation foo"); - census_tracing_print(op_id, "annotation bar"); - /* Appends method tag on the trace object. */ - census_add_method_tag(op_id, "service_foo/method.bar"); - /* Either record client side stats or server side stats associated with the - op_id. Here for testing purpose, we record both. */ - census_record_rpc_client_stats(op_id, stats); - census_record_rpc_server_stats(op_id, stats); - /* Ends a tracing. */ - census_tracing_end_op(op_id); - /* In process stats queries. */ - census_get_server_stats(&data_map); - census_aggregated_rpc_stats_set_empty(&data_map); - census_get_client_stats(&data_map); - census_aggregated_rpc_stats_set_empty(&data_map); - gpr_free(stats); - census_shutdown(); -} - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - test_census_stubs(); - return 0; -} diff --git a/test/core/statistics/multiple_writers_circular_buffer_test.cc b/test/core/statistics/multiple_writers_circular_buffer_test.cc deleted file mode 100644 index 6d3411289ed..00000000000 --- a/test/core/statistics/multiple_writers_circular_buffer_test.cc +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test/core/statistics/census_log_tests.h" - -#include - -#include -#include "test/core/util/test_config.h" - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); - test_multiple_writers_circular_log(); - return 0; -} diff --git a/test/core/statistics/multiple_writers_test.cc b/test/core/statistics/multiple_writers_test.cc deleted file mode 100644 index 47410ab1051..00000000000 --- a/test/core/statistics/multiple_writers_test.cc +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test/core/statistics/census_log_tests.h" - -#include - -#include -#include "test/core/util/test_config.h" - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); - test_multiple_writers(); - return 0; -} diff --git a/test/core/statistics/quick_test.cc b/test/core/statistics/quick_test.cc deleted file mode 100644 index 91ecdde8186..00000000000 --- a/test/core/statistics/quick_test.cc +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "test/core/statistics/census_log_tests.h" - -#include - -#include -#include "test/core/util/test_config.h" - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); - test_invalid_record_size(); - test_end_write_with_different_size(); - test_read_pending_record(); - test_read_beyond_pending_record(); - test_detached_while_reading(); - test_fill_log_no_fragmentation(); - test_fill_circular_log_no_fragmentation(); - test_fill_log_with_straddling_records(); - test_fill_circular_log_with_straddling_records(); - return 0; -} diff --git a/test/core/statistics/rpc_stats_test.cc b/test/core/statistics/rpc_stats_test.cc deleted file mode 100644 index a2a648e2ad1..00000000000 --- a/test/core/statistics/rpc_stats_test.cc +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include -#include -#include -#include -#include - -#include "src/core/ext/census/census_interface.h" -#include "src/core/ext/census/census_rpc_stats.h" -#include "src/core/ext/census/census_tracing.h" -#include "test/core/util/test_config.h" - -/* Ensure all possible state transitions are called without causing problem */ -static void test_init_shutdown(void) { - census_stats_store_init(); - census_stats_store_init(); - census_stats_store_shutdown(); - census_stats_store_shutdown(); - census_stats_store_init(); -} - -static void test_create_and_destroy(void) { - census_rpc_stats* stats = NULL; - census_aggregated_rpc_stats agg_stats = {0, NULL}; - - stats = census_rpc_stats_create_empty(); - GPR_ASSERT(stats != NULL); - GPR_ASSERT(stats->cnt == 0 && stats->rpc_error_cnt == 0 && - stats->app_error_cnt == 0 && stats->elapsed_time_ms == 0.0 && - stats->api_request_bytes == 0 && stats->wire_request_bytes == 0 && - stats->api_response_bytes == 0 && stats->wire_response_bytes == 0); - gpr_free(stats); - - census_aggregated_rpc_stats_set_empty(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - GPR_ASSERT(agg_stats.stats == NULL); - agg_stats.num_entries = 1; - agg_stats.stats = (census_per_method_rpc_stats*)gpr_malloc( - sizeof(census_per_method_rpc_stats)); - agg_stats.stats[0].method = gpr_strdup("foo"); - census_aggregated_rpc_stats_set_empty(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - GPR_ASSERT(agg_stats.stats == NULL); -} - -#define ASSERT_NEAR(a, b) \ - GPR_ASSERT((a - b) * (a - b) < 1e-24 * (a + b) * (a + b)) - -static void test_record_and_get_stats(void) { - census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4}; - census_op_id id; - census_aggregated_rpc_stats agg_stats = {0, NULL}; - - /* Record client stats twice with the same op_id. */ - census_init(); - id = census_tracing_start_op(); - census_add_method_tag(id, "m1"); - census_record_rpc_client_stats(id, &stats); - census_record_rpc_client_stats(id, &stats); - census_tracing_end_op(id); - /* Server stats expect to be empty */ - census_get_server_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - GPR_ASSERT(agg_stats.stats == NULL); - /* Client stats expect to have one entry */ - census_get_client_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 1); - GPR_ASSERT(agg_stats.stats != NULL); - GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m1") == 0); - GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 2 && - agg_stats.stats[0].hour_stats.cnt == 2 && - agg_stats.stats[0].total_stats.cnt == 2); - ASSERT_NEAR(agg_stats.stats[0].minute_stats.wire_response_bytes, 16.8); - ASSERT_NEAR(agg_stats.stats[0].hour_stats.wire_response_bytes, 16.8); - ASSERT_NEAR(agg_stats.stats[0].total_stats.wire_response_bytes, 16.8); - /* Get stats again, results should be the same. */ - census_get_client_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 1); - census_aggregated_rpc_stats_set_empty(&agg_stats); - census_shutdown(); - - /* Record both server (once) and client (twice) stats with different op_ids. - */ - census_init(); - id = census_tracing_start_op(); - census_add_method_tag(id, "m2"); - census_record_rpc_client_stats(id, &stats); - census_tracing_end_op(id); - id = census_tracing_start_op(); - census_add_method_tag(id, "m3"); - census_record_rpc_server_stats(id, &stats); - census_tracing_end_op(id); - id = census_tracing_start_op(); - census_add_method_tag(id, "m4"); - census_record_rpc_client_stats(id, &stats); - census_tracing_end_op(id); - /* Check server stats */ - census_get_server_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 1); - GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m3") == 0); - GPR_ASSERT(agg_stats.stats[0].minute_stats.app_error_cnt == 3 && - agg_stats.stats[0].hour_stats.app_error_cnt == 3 && - agg_stats.stats[0].total_stats.app_error_cnt == 3); - census_aggregated_rpc_stats_set_empty(&agg_stats); - /* Check client stats */ - census_get_client_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 2); - GPR_ASSERT(agg_stats.stats != NULL); - GPR_ASSERT((strcmp(agg_stats.stats[0].method, "m2") == 0 && - strcmp(agg_stats.stats[1].method, "m4") == 0) || - (strcmp(agg_stats.stats[0].method, "m4") == 0 && - strcmp(agg_stats.stats[1].method, "m2") == 0)); - GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 1 && - agg_stats.stats[1].minute_stats.cnt == 1); - census_aggregated_rpc_stats_set_empty(&agg_stats); - census_shutdown(); -} - -static void test_record_stats_on_unknown_op_id(void) { - census_op_id unknown_id = {0xDEAD, 0xBEEF}; - census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4}; - census_aggregated_rpc_stats agg_stats = {0, NULL}; - - census_init(); - /* Tests that recording stats against unknown id is noop. */ - census_record_rpc_client_stats(unknown_id, &stats); - census_record_rpc_server_stats(unknown_id, &stats); - census_get_server_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - GPR_ASSERT(agg_stats.stats == NULL); - census_get_client_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - GPR_ASSERT(agg_stats.stats == NULL); - census_aggregated_rpc_stats_set_empty(&agg_stats); - census_shutdown(); -} - -/* Test that record stats is noop when trace store is uninitialized. */ -static void test_record_stats_with_trace_store_uninitialized(void) { - census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4}; - census_op_id id = {0, 0}; - census_aggregated_rpc_stats agg_stats = {0, NULL}; - - census_init(); - id = census_tracing_start_op(); - census_add_method_tag(id, "m"); - census_tracing_end_op(id); - /* shuts down trace store only. */ - census_tracing_shutdown(); - census_record_rpc_client_stats(id, &stats); - census_get_client_stats(&agg_stats); - GPR_ASSERT(agg_stats.num_entries == 0); - census_stats_store_shutdown(); -} - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - test_init_shutdown(); - test_create_and_destroy(); - test_record_and_get_stats(); - test_record_stats_on_unknown_op_id(); - test_record_stats_with_trace_store_uninitialized(); - return 0; -} diff --git a/test/cpp/ext/filters/census/BUILD b/test/cpp/ext/filters/census/BUILD new file mode 100644 index 00000000000..6567dc667aa --- /dev/null +++ b/test/cpp/ext/filters/census/BUILD @@ -0,0 +1,42 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") +load("//bazel:cc_grpc_library.bzl", "cc_grpc_library") + +licenses(["notice"]) # Apache v2 + +grpc_package(name = "test/core/ext/census") + +grpc_cc_test( + name = "grpc_opencensus_plugin_test", + srcs = [ + "stats_plugin_end2end_test.cc", + ], + language = "C++", + external_deps = [ + "gtest", + "gmock", + "opencensus-stats-test", + ], + deps = [ + "//:grpc++", + "//:grpc_opencensus_plugin", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + "//test/cpp/util:test_config", + ], +) diff --git a/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc new file mode 100644 index 00000000000..664504a0903 --- /dev/null +++ b/test/cpp/ext/filters/census/stats_plugin_end2end_test.cc @@ -0,0 +1,376 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include // NOLINT +#include + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "include/grpc++/grpc++.h" +#include "opencensus/stats/stats.h" +#include "opencensus/stats/testing/test_utils.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/test_config.h" + +namespace grpc { +namespace testing { +namespace { + +using ::opencensus::stats::Aggregation; +using ::opencensus::stats::Distribution; +using ::opencensus::stats::View; +using ::opencensus::stats::ViewDescriptor; +using ::opencensus::stats::testing::TestUtils; + +class EchoServer final : public EchoTestService::Service { + ::grpc::Status Echo(::grpc::ServerContext* context, + const EchoRequest* request, + EchoResponse* response) override { + if (request->param().expected_error().code() == 0) { + response->set_message(request->message()); + return ::grpc::Status::OK; + } else { + return ::grpc::Status(static_cast<::grpc::StatusCode>( + request->param().expected_error().code()), + ""); + } + } +}; + +class StatsPluginEnd2EndTest : public ::testing::Test { + protected: + static void SetUpTestCase() { RegisterOpenCensusPlugin(); } + + void SetUp() { + // Set up a synchronous server on a different thread to avoid the asynch + // interface. + ::grpc::ServerBuilder builder; + int port; + // Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis. + builder.AddListeningPort("0.0.0.0:0", ::grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + ASSERT_NE(nullptr, server_); + ASSERT_NE(0, port); + server_address_ = absl::StrCat("0.0.0.0:", port); + server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this); + + stub_ = EchoTestService::NewStub(::grpc::CreateChannel( + server_address_, ::grpc::InsecureChannelCredentials())); + } + + void TearDown() { + server_->Shutdown(); + server_thread_.join(); + } + + void RunServerLoop() { server_->Wait(); } + + const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo"; + const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo"; + + std::string server_address_; + EchoServer service_; + std::unique_ptr server_; + std::thread server_thread_; + + std::unique_ptr stub_; +}; + +TEST_F(StatsPluginEnd2EndTest, ErrorCount) { + const auto client_method_descriptor = + ViewDescriptor() + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_name("client_method") + .set_aggregation(Aggregation::Count()) + .add_column(ClientMethodTagKey()); + View client_method_view(client_method_descriptor); + const auto server_method_descriptor = + ViewDescriptor() + .set_measure(kRpcServerServerLatencyMeasureName) + .set_name("server_method") + .set_aggregation(Aggregation::Count()) + .add_column(ServerMethodTagKey()); + View server_method_view(server_method_descriptor); + + const auto client_status_descriptor = + ViewDescriptor() + .set_measure(kRpcClientRoundtripLatencyMeasureName) + .set_name("client_status") + .set_aggregation(Aggregation::Count()) + .add_column(ClientStatusTagKey()); + View client_status_view(client_status_descriptor); + const auto server_status_descriptor = + ViewDescriptor() + .set_measure(kRpcServerServerLatencyMeasureName) + .set_name("server_status") + .set_aggregation(Aggregation::Count()) + .add_column(ServerStatusTagKey()); + View server_status_view(server_status_descriptor); + + // Cover all valid statuses. + for (int i = 0; i <= 16; ++i) { + EchoRequest request; + request.set_message("foo"); + request.mutable_param()->mutable_expected_error()->set_code(i); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_method_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), 17))); + EXPECT_THAT(server_method_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), 17))); + + auto codes = { + ::testing::Pair(::testing::ElementsAre("OK"), 1), + ::testing::Pair(::testing::ElementsAre("CANCELLED"), 1), + ::testing::Pair(::testing::ElementsAre("UNKNOWN"), 1), + ::testing::Pair(::testing::ElementsAre("INVALID_ARGUMENT"), 1), + ::testing::Pair(::testing::ElementsAre("DEADLINE_EXCEEDED"), 1), + ::testing::Pair(::testing::ElementsAre("NOT_FOUND"), 1), + ::testing::Pair(::testing::ElementsAre("ALREADY_EXISTS"), 1), + ::testing::Pair(::testing::ElementsAre("PERMISSION_DENIED"), 1), + ::testing::Pair(::testing::ElementsAre("UNAUTHENTICATED"), 1), + ::testing::Pair(::testing::ElementsAre("RESOURCE_EXHAUSTED"), 1), + ::testing::Pair(::testing::ElementsAre("FAILED_PRECONDITION"), 1), + ::testing::Pair(::testing::ElementsAre("ABORTED"), 1), + ::testing::Pair(::testing::ElementsAre("OUT_OF_RANGE"), 1), + ::testing::Pair(::testing::ElementsAre("UNIMPLEMENTED"), 1), + ::testing::Pair(::testing::ElementsAre("INTERNAL"), 1), + ::testing::Pair(::testing::ElementsAre("UNAVAILABLE"), 1), + ::testing::Pair(::testing::ElementsAre("DATA_LOSS"), 1), + }; + + EXPECT_THAT(client_status_view.GetData().int_data(), + ::testing::UnorderedElementsAreArray(codes)); + EXPECT_THAT(server_status_view.GetData().int_data(), + ::testing::UnorderedElementsAreArray(codes)); +} + +TEST_F(StatsPluginEnd2EndTest, RequestReceivedBytesPerRpc) { + View client_sent_bytes_per_rpc_view(ClientSentBytesPerRpcCumulative()); + View client_received_bytes_per_rpc_view( + ClientReceivedBytesPerRpcCumulative()); + View server_sent_bytes_per_rpc_view(ServerSentBytesPerRpcCumulative()); + View server_received_bytes_per_rpc_view( + ServerReceivedBytesPerRpcCumulative()); + + { + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_received_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(client_sent_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(server_received_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); + EXPECT_THAT(server_sent_bytes_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::Gt(0.0)))))); +} + +TEST_F(StatsPluginEnd2EndTest, Latency) { + View client_latency_view(ClientRoundtripLatencyCumulative()); + View client_server_latency_view(ClientServerLatencyCumulative()); + View server_server_latency_view(ServerServerLatencyCumulative()); + + const absl::Time start_time = absl::Now(); + { + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + // We do not know exact latency/elapsed time, but we know it is less than the + // entire time spent making the RPC. + const double max_time = absl::ToDoubleMilliseconds(absl::Now() - start_time); + + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT( + client_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)), + ::testing::Property(&Distribution::mean, + ::testing::Lt(max_time)))))); + + // Elapsed time is a subinterval of total latency. + const auto client_latency = client_latency_view.GetData() + .distribution_data() + .find({client_method_name_}) + ->second.mean(); + EXPECT_THAT( + client_server_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, ::testing::Gt(0.0)), + ::testing::Property(&Distribution::mean, + ::testing::Lt(client_latency)))))); + + // client server elapsed time should be the same value propagated to the + // client. + const auto client_elapsed_time = client_server_latency_view.GetData() + .distribution_data() + .find({client_method_name_}) + ->second.mean(); + EXPECT_THAT( + server_server_latency_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf( + ::testing::Property(&Distribution::count, 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(client_elapsed_time)))))); +} + +TEST_F(StatsPluginEnd2EndTest, CompletedRpcs) { + View client_completed_rpcs_view(ClientCompletedRpcsCumulative()); + View server_completed_rpcs_view(ServerCompletedRpcsCumulative()); + + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + const int count = 5; + for (int i = 0; i < count; ++i) { + { + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT(client_completed_rpcs_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_, "OK"), i + 1))); + EXPECT_THAT(server_completed_rpcs_view.GetData().int_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_, "OK"), i + 1))); + } +} + +TEST_F(StatsPluginEnd2EndTest, RequestReceivedMessagesPerRpc) { + // TODO: Use streaming RPCs. + View client_received_messages_per_rpc_view( + ClientSentMessagesPerRpcCumulative()); + View client_sent_messages_per_rpc_view( + ClientReceivedMessagesPerRpcCumulative()); + View server_received_messages_per_rpc_view( + ServerSentMessagesPerRpcCumulative()); + View server_sent_messages_per_rpc_view( + ServerReceivedMessagesPerRpcCumulative()); + + EchoRequest request; + request.set_message("foo"); + EchoResponse response; + const int count = 5; + for (int i = 0; i < count; ++i) { + { + ::grpc::ClientContext context; + ::grpc::Status status = stub_->Echo(&context, request, &response); + ASSERT_TRUE(status.ok()); + EXPECT_EQ("foo", response.message()); + } + absl::SleepFor(absl::Milliseconds(500)); + TestUtils::Flush(); + + EXPECT_THAT( + client_received_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + client_sent_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(client_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + server_received_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + EXPECT_THAT( + server_sent_messages_per_rpc_view.GetData().distribution_data(), + ::testing::UnorderedElementsAre(::testing::Pair( + ::testing::ElementsAre(server_method_name_), + ::testing::AllOf(::testing::Property(&Distribution::count, i + 1), + ::testing::Property(&Distribution::mean, + ::testing::DoubleEq(1.0)))))); + } +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD index d51a0e3dc5f..7b27aed4c31 100644 --- a/test/cpp/microbenchmarks/BUILD +++ b/test/cpp/microbenchmarks/BUILD @@ -149,3 +149,15 @@ grpc_cc_binary( srcs = ["bm_chttp2_hpack.cc"], deps = [":helpers"], ) + +grpc_cc_binary( + name = "bm_opencensus_plugin", + testonly = 1, + srcs = ["bm_opencensus_plugin.cc"], + language = "C++", + deps = [ + ":helpers", + "//:grpc_opencensus_plugin", + "//src/proto/grpc/testing:echo_proto", + ], +) diff --git a/test/cpp/microbenchmarks/bm_opencensus_plugin.cc b/test/cpp/microbenchmarks/bm_opencensus_plugin.cc new file mode 100644 index 00000000000..9d42eb891df --- /dev/null +++ b/test/cpp/microbenchmarks/bm_opencensus_plugin.cc @@ -0,0 +1,118 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include // NOLINT + +#include "absl/base/call_once.h" +#include "absl/strings/str_cat.h" +#include "include/grpc++/grpc++.h" +#include "opencensus/stats/stats.h" +#include "src/cpp/ext/filters/census/grpc_plugin.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/microbenchmarks/helpers.h" + +absl::once_flag once; +void RegisterOnce() { absl::call_once(once, grpc::RegisterOpenCensusPlugin); } + +class EchoServer final : public grpc::testing::EchoTestService::Service { + grpc::Status Echo(grpc::ServerContext* context, + const grpc::testing::EchoRequest* request, + grpc::testing::EchoResponse* response) override { + if (request->param().expected_error().code() == 0) { + response->set_message(request->message()); + return grpc::Status::OK; + } else { + return grpc::Status(static_cast( + request->param().expected_error().code()), + ""); + } + } +}; + +// An EchoServerThread object creates an EchoServer on a separate thread and +// shuts down the server and thread when it goes out of scope. +class EchoServerThread final { + public: + EchoServerThread() { + grpc::ServerBuilder builder; + int port; + builder.AddListeningPort("[::]:0", grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + if (server_ == nullptr || port == 0) { + std::abort(); + } + server_address_ = absl::StrCat("[::]:", port); + server_thread_ = std::thread(&EchoServerThread::RunServerLoop, this); + } + + ~EchoServerThread() { + server_->Shutdown(); + server_thread_.join(); + } + + const std::string& address() { return server_address_; } + + private: + void RunServerLoop() { server_->Wait(); } + + std::string server_address_; + EchoServer service_; + std::unique_ptr server_; + std::thread server_thread_; +}; + +static void BM_E2eLatencyCensusDisabled(benchmark::State& state) { + EchoServerThread server; + std::unique_ptr stub = + grpc::testing::EchoTestService::NewStub(grpc::CreateChannel( + server.address(), grpc::InsecureChannelCredentials())); + + grpc::testing::EchoResponse response; + for (auto _ : state) { + grpc::testing::EchoRequest request; + grpc::ClientContext context; + grpc::Status status = stub->Echo(&context, request, &response); + } +} +BENCHMARK(BM_E2eLatencyCensusDisabled); + +static void BM_E2eLatencyCensusEnabled(benchmark::State& state) { + RegisterOnce(); + // This we can safely repeat, and doing so clears accumulated data to avoid + // initialization costs varying between runs. + grpc::RegisterOpenCensusViewsForExport(); + + EchoServerThread server; + std::unique_ptr stub = + grpc::testing::EchoTestService::NewStub(grpc::CreateChannel( + server.address(), grpc::InsecureChannelCredentials())); + + grpc::testing::EchoResponse response; + for (auto _ : state) { + grpc::testing::EchoRequest request; + grpc::ClientContext context; + grpc::Status status = stub->Echo(&context, request, &response); + } +} +BENCHMARK(BM_E2eLatencyCensusEnabled); + +BENCHMARK_MAIN(); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 478c417ab3a..0a3f7fbdc26 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -864,7 +864,6 @@ include/grpc/support/time.h \ include/grpc/support/workaround_list.h \ src/core/README.md \ src/core/ext/README.md \ -src/core/ext/census/grpc_context.cc \ src/core/ext/filters/client_channel/README.md \ src/core/ext/filters/client_channel/backup_poller.cc \ src/core/ext/filters/client_channel/backup_poller.h \ @@ -1500,6 +1499,7 @@ src/core/tsi/transport_security.h \ src/core/tsi/transport_security_grpc.cc \ src/core/tsi/transport_security_grpc.h \ src/core/tsi/transport_security_interface.h \ +src/cpp/ext/filters/census/grpc_context.cc \ third_party/nanopb/pb.h \ third_party/nanopb/pb_common.c \ third_party/nanopb/pb_common.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 413efa5eac0..f17c1c75b65 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9049,8 +9049,7 @@ { "deps": [ "gpr", - "grpc_base", - "nanopb" + "grpc_base" ], "headers": [ "include/grpc/census.h" @@ -9060,7 +9059,7 @@ "name": "census", "src": [ "include/grpc/census.h", - "src/core/ext/census/grpc_context.cc" + "src/cpp/ext/filters/census/grpc_context.cc" ], "third_party": false, "type": "filegroup" diff --git a/tools/run_tests/sanity/check_bazel_workspace.py b/tools/run_tests/sanity/check_bazel_workspace.py index 673feb75580..d562fffc8a9 100755 --- a/tools/run_tests/sanity/check_bazel_workspace.py +++ b/tools/run_tests/sanity/check_bazel_workspace.py @@ -51,6 +51,7 @@ _GRPC_DEP_NAMES = [ 'com_github_google_benchmark', 'com_github_cares_cares', 'com_google_absl', + 'io_opencensus_cpp', _BAZEL_TOOLCHAINS_DEP_NAME, _TWISTED_TWISTED_DEP_NAME, _YAML_PYYAML_DEP_NAME, @@ -140,7 +141,6 @@ if len(workspace_git_hashes - git_submodule_hashes) > 0: print( "Found discrepancies between git submodules and Bazel WORKSPACE dependencies" ) - sys.exit(1) # Also check that we can override each dependency for name in _GRPC_DEP_NAMES: