Merge pull request #15070 from Vizerai/filter_port

Adding opencensus grpc plugin with bazel support.
reviewable/pr15196/r14^2
Jim King 7 years ago committed by GitHub
commit 01cbab60f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      BUILD
  2. 9
      CMakeLists.txt
  3. 9
      Makefile
  4. 41
      bazel/grpc_deps.bzl
  5. 3
      build.yaml
  6. 4
      config.m4
  7. 7
      config.w32
  8. 2
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 4
      grpc.gyp
  11. 41
      include/grpcpp/opencensus.h
  12. 2
      package.xml
  13. 19
      src/cpp/ext/filters/census/channel_filter.cc
  14. 27
      src/cpp/ext/filters/census/channel_filter.h
  15. 163
      src/cpp/ext/filters/census/client_filter.cc
  16. 104
      src/cpp/ext/filters/census/client_filter.h
  17. 132
      src/cpp/ext/filters/census/context.cc
  18. 126
      src/cpp/ext/filters/census/context.h
  19. 0
      src/cpp/ext/filters/census/grpc_context.cc
  20. 130
      src/cpp/ext/filters/census/grpc_plugin.cc
  21. 111
      src/cpp/ext/filters/census/grpc_plugin.h
  22. 129
      src/cpp/ext/filters/census/measures.cc
  23. 46
      src/cpp/ext/filters/census/measures.h
  24. 39
      src/cpp/ext/filters/census/rpc_encoding.cc
  25. 284
      src/cpp/ext/filters/census/rpc_encoding.h
  26. 198
      src/cpp/ext/filters/census/server_filter.cc
  27. 101
      src/cpp/ext/filters/census/server_filter.h
  28. 491
      src/cpp/ext/filters/census/views.cc
  29. 3
      src/proto/census/census.options
  30. 307
      src/proto/census/census.proto
  31. 0
      src/proto/census/trace_context.options
  32. 29
      src/proto/census/trace_context.proto
  33. 2
      src/python/grpcio/grpc_core_dependencies.py
  34. 36
      test/core/statistics/census_log_tests.h
  35. 62
      test/core/statistics/census_stub_test.cc
  36. 31
      test/core/statistics/multiple_writers_circular_buffer_test.cc
  37. 31
      test/core/statistics/multiple_writers_test.cc
  38. 39
      test/core/statistics/quick_test.cc
  39. 183
      test/core/statistics/rpc_stats_test.cc
  40. 42
      test/cpp/ext/filters/census/BUILD
  41. 376
      test/cpp/ext/filters/census/stats_plugin_end2end_test.cc
  42. 12
      test/cpp/microbenchmarks/BUILD
  43. 118
      test/cpp/microbenchmarks/bm_opencensus_plugin.cc
  44. 2
      tools/doxygen/Doxyfile.core.internal
  45. 5
      tools/run_tests/generated/sources_and_headers.json
  46. 2
      tools/run_tests/sanity/check_bazel_workspace.py

41
BUILD

@ -485,10 +485,7 @@ grpc_cc_library(
grpc_cc_library(
name = "census",
srcs = [
"src/core/ext/census/grpc_context.cc",
],
external_deps = [
"nanopb",
"src/cpp/ext/filters/census/grpc_context.cc",
],
language = "c++",
public_hdrs = [
@ -1990,4 +1987,40 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "grpc_opencensus_plugin",
srcs = [
"src/cpp/ext/filters/census/client_filter.cc",
"src/cpp/ext/filters/census/server_filter.cc",
"src/cpp/ext/filters/census/channel_filter.cc",
"src/cpp/ext/filters/census/context.cc",
"src/cpp/ext/filters/census/grpc_context.cc",
"src/cpp/ext/filters/census/grpc_plugin.cc",
"src/cpp/ext/filters/census/measures.cc",
"src/cpp/ext/filters/census/rpc_encoding.cc",
"src/cpp/ext/filters/census/views.cc",
],
hdrs = [
"include/grpcpp/opencensus.h",
"src/cpp/ext/filters/census/client_filter.h",
"src/cpp/ext/filters/census/server_filter.h",
"src/cpp/ext/filters/census/channel_filter.h",
"src/cpp/ext/filters/census/context.h",
"src/cpp/ext/filters/census/grpc_plugin.h",
"src/cpp/ext/filters/census/measures.h",
"src/cpp/ext/filters/census/rpc_encoding.h",
],
language = "c++",
external_deps = [
"absl-base",
"absl-time",
"opencensus-trace",
"opencensus-stats",
],
deps = [
":census",
":grpc++",
],
)
grpc_generate_one_off_targets()

@ -1221,7 +1221,7 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/load_reporting/server_load_reporting_filter.cc
src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc
src/core/ext/census/grpc_context.cc
src/cpp/ext/filters/census/grpc_context.cc
src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/http/client_authority_filter.cc
@ -2524,7 +2524,7 @@ add_library(grpc_unsecure
third_party/nanopb/pb_encode.c
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
src/core/ext/census/grpc_context.cc
src/cpp/ext/filters/census/grpc_context.cc
src/core/ext/filters/max_age/max_age_filter.cc
src/core/ext/filters/message_size/message_size_filter.cc
src/core/ext/filters/http/client_authority_filter.cc
@ -3307,10 +3307,7 @@ add_library(grpc++_cronet
src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
src/core/ext/transport/chttp2/server/chttp2_server.cc
src/core/ext/census/grpc_context.cc
third_party/nanopb/pb_common.c
third_party/nanopb/pb_decode.c
third_party/nanopb/pb_encode.c
src/cpp/ext/filters/census/grpc_context.cc
)
if(WIN32 AND MSVC)

@ -3596,7 +3596,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/load_reporting/server_load_reporting_filter.cc \
src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc \
src/core/ext/census/grpc_context.cc \
src/cpp/ext/filters/census/grpc_context.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
@ -4865,7 +4865,7 @@ LIBGRPC_UNSECURE_SRC = \
third_party/nanopb/pb_encode.c \
src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc \
src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc \
src/core/ext/census/grpc_context.cc \
src/cpp/ext/filters/census/grpc_context.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
@ -5636,10 +5636,7 @@ LIBGRPC++_CRONET_SRC = \
src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc \
src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc \
src/core/ext/transport/chttp2/server/chttp2_server.cc \
src/core/ext/census/grpc_context.cc \
third_party/nanopb/pb_common.c \
third_party/nanopb/pb_decode.c \
third_party/nanopb/pb_encode.c \
src/cpp/ext/filters/census/grpc_context.cc \
PUBLIC_HEADERS_CXX += \
include/grpc++/alarm.h \

@ -8,6 +8,16 @@ def grpc_deps():
actual = "@com_github_nanopb_nanopb//:nanopb",
)
native.bind(
name = "absl-base",
actual = "@com_google_absl//absl/base",
)
native.bind(
name = "absl-time",
actual = "@com_google_absl//absl/time:time",
)
native.bind(
name = "libssl",
actual = "@boringssl//:ssl",
@ -73,6 +83,21 @@ def grpc_deps():
actual = "@com_github_grpc_grpc//:grpc++_codegen_proto",
)
native.bind(
name = "opencensus-trace",
actual = "@io_opencensus_cpp//opencensus/trace:trace"
)
native.bind(
name = "opencensus-stats",
actual = "@io_opencensus_cpp//opencensus/stats:stats"
)
native.bind(
name = "opencensus-stats-test",
actual = "@io_opencensus_cpp//opencensus/stats:test_utils"
)
if "boringssl" not in native.existing_rules():
native.http_archive(
name = "boringssl",
@ -122,8 +147,8 @@ def grpc_deps():
native.new_http_archive(
name = "com_github_google_benchmark",
build_file = "@com_github_grpc_grpc//third_party:benchmark.BUILD",
strip_prefix = "benchmark-5b7683f49e1e9223cf9927b24f6fd3d6bd82e3f8",
url = "https://github.com/google/benchmark/archive/5b7683f49e1e9223cf9927b24f6fd3d6bd82e3f8.tar.gz",
strip_prefix = "benchmark-9913418d323e64a0111ca0da81388260c2bbe1e9",
url = "https://github.com/google/benchmark/archive/9913418d323e64a0111ca0da81388260c2bbe1e9.tar.gz",
)
if "com_github_cares_cares" not in native.existing_rules():
@ -137,8 +162,8 @@ def grpc_deps():
if "com_google_absl" not in native.existing_rules():
native.http_archive(
name = "com_google_absl",
strip_prefix = "abseil-cpp-cc4bed2d74f7c8717e31f9579214ab52a9c9c610",
url = "https://github.com/abseil/abseil-cpp/archive/cc4bed2d74f7c8717e31f9579214ab52a9c9c610.tar.gz",
strip_prefix = "abseil-cpp-cd95e71df6eaf8f2a282b1da556c2cf1c9b09207",
url = "https://github.com/abseil/abseil-cpp/archive/cd95e71df6eaf8f2a282b1da556c2cf1c9b09207.tar.gz",
)
if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules():
@ -152,6 +177,14 @@ def grpc_deps():
sha256 = "1c4a532b396c698e6467a1548554571cb85fa091e472b05e398ebc836c315d77",
)
if "io_opencensus_cpp" not in native.existing_rules():
native.http_archive(
name = "io_opencensus_cpp",
strip_prefix = "opencensus-cpp-fdf0f308b1631bb4a942e32ba5d22536a6170274",
url = "https://github.com/census-instrumentation/opencensus-cpp/archive/fdf0f308b1631bb4a942e32ba5d22536a6170274.tar.gz",
)
# TODO: move some dependencies from "grpc_deps" here?
def grpc_test_only_deps():
"""Internal, not intended for use by packages that are consuming grpc.

@ -100,10 +100,9 @@ filegroups:
public_headers:
- include/grpc/census.h
src:
- src/core/ext/census/grpc_context.cc
- src/cpp/ext/filters/census/grpc_context.cc
uses:
- grpc_base
- nanopb
- name: cmdline
headers:
- test/core/util/cmdline.h

@ -378,7 +378,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/load_reporting/server_load_reporting_filter.cc \
src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc \
src/core/ext/census/grpc_context.cc \
src/cpp/ext/filters/census/grpc_context.cc \
src/core/ext/filters/max_age/max_age_filter.cc \
src/core/ext/filters/message_size/message_size_filter.cc \
src/core/ext/filters/http/client_authority_filter.cc \
@ -649,7 +649,6 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/php/ext/grpc)
PHP_ADD_BUILD_DIR($ext_builddir/src/boringssl)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/census)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1)
@ -712,6 +711,7 @@ if test "$PHP_GRPC" != "no"; then
PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/alts/handshaker)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/alts/zero_copy_frame_protector)
PHP_ADD_BUILD_DIR($ext_builddir/src/core/tsi/ssl/session_cache)
PHP_ADD_BUILD_DIR($ext_builddir/src/cpp/ext/filters/census)
PHP_ADD_BUILD_DIR($ext_builddir/third_party/address_sorting)
PHP_ADD_BUILD_DIR($ext_builddir/third_party/boringssl/crypto)
PHP_ADD_BUILD_DIR($ext_builddir/third_party/boringssl/crypto/asn1)

@ -354,7 +354,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " +
"src\\core\\ext\\filters\\load_reporting\\server_load_reporting_filter.cc " +
"src\\core\\ext\\filters\\load_reporting\\server_load_reporting_plugin.cc " +
"src\\core\\ext\\census\\grpc_context.cc " +
"src\\cpp\\ext\\filters\\census\\grpc_context.cc " +
"src\\core\\ext\\filters\\max_age\\max_age_filter.cc " +
"src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +
"src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
@ -653,7 +653,6 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\boringssl");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\census");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\ext\\filters\\client_channel\\lb_policy");
@ -729,6 +728,10 @@ if (PHP_GRPC != "no") {
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\alts\\zero_copy_frame_protector");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\core\\tsi\\ssl\\session_cache");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\cpp\\ext\\filters\\census");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext");
FSO.CreateFolder(base_dir+"\\ext\\grpc\\src\\php\\ext\\grpc");

@ -793,7 +793,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc',
'src/core/ext/census/grpc_context.cc',
'src/cpp/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',

@ -733,7 +733,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc )
s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_filter.cc )
s.files += %w( src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc )
s.files += %w( src/core/ext/census/grpc_context.cc )
s.files += %w( src/cpp/ext/filters/census/grpc_context.cc )
s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc )
s.files += %w( src/core/ext/filters/message_size/message_size_filter.cc )
s.files += %w( src/core/ext/filters/http/client_authority_filter.cc )

@ -545,7 +545,7 @@
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc',
'src/core/ext/census/grpc_context.cc',
'src/cpp/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',
@ -1268,7 +1268,7 @@
'third_party/nanopb/pb_encode.c',
'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc',
'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc',
'src/core/ext/census/grpc_context.cc',
'src/cpp/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',

@ -0,0 +1,41 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPCPP_OPENCENSUS_H
#define GRPCPP_OPENCENSUS_H
namespace grpc {
// These symbols in this file will not be included in the binary unless
// grpc_opencensus_plugin build target was added as a dependency. At the moment
// it is only setup to be built with Bazel.
// Registers the OpenCensus plugin with gRPC, so that it will be used for future
// RPCs. This must be called before any views are created.
void RegisterOpenCensusPlugin();
// RPC stats definitions, defined by
// https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md
// Registers the cumulative gRPC views so that they will be exported by any
// registered stats exporter. For on-task stats, construct a View using the
// ViewDescriptors below.
void RegisterOpenCensusViewsForExport();
} // namespace grpc
#endif // GRPCPP_OPENCENSUS_H

@ -738,7 +738,7 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/census/grpc_context.cc" role="src" />
<file baseinstalldir="/" name="src/cpp/ext/filters/census/grpc_context.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/max_age/max_age_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/message_size/message_size_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/http/client_authority_filter.cc" role="src" />

@ -1,6 +1,6 @@
/*
*
* Copyright 2015 gRPC authors.
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,16 +16,15 @@
*
*/
#include "test/core/statistics/census_log_tests.h"
#include <grpc/support/port_platform.h>
#include <stdlib.h>
#include "src/cpp/ext/filters/census/channel_filter.h"
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
namespace grpc {
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
test_small_log();
return 0;
grpc_error* CensusChannelData::Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
return GRPC_ERROR_NONE;
}
} // namespace grpc

@ -1,6 +1,6 @@
/*
*
* Copyright 2015 gRPC authors.
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,16 +16,21 @@
*
*/
#include "test/core/statistics/census_log_tests.h"
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H
#include <stdlib.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
#include "src/cpp/ext/filters/census/context.h"
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
test_performance();
return 0;
}
namespace grpc {
class CensusChannelData : public ChannelData {
public:
grpc_error* Init(grpc_channel_element* elem,
grpc_channel_element_args* args) override;
};
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CHANNEL_FILTER_H */

@ -0,0 +1,163 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/client_filter.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "opencensus/stats/stats.h"
#include "src/core/lib/surface/call.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
namespace grpc {
constexpr uint32_t CensusClientCallData::kMaxTraceContextLen;
constexpr uint32_t CensusClientCallData::kMaxTagsLen;
namespace {
void FilterTrailingMetadata(grpc_metadata_batch* b, uint64_t* elapsed_time) {
if (b->idx.named.grpc_server_stats_bin != nullptr) {
ServerStatsDeserialize(
reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(
GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md))),
GRPC_SLICE_LENGTH(GRPC_MDVALUE(b->idx.named.grpc_server_stats_bin->md)),
elapsed_time);
grpc_metadata_batch_remove(b, b->idx.named.grpc_server_stats_bin);
}
}
} // namespace
void CensusClientCallData::OnDoneRecvTrailingMetadataCb(void* user_data,
grpc_error* error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusClientCallData* calld =
reinterpret_cast<CensusClientCallData*>(elem->call_data);
GPR_ASSERT(calld != nullptr);
if (error == GRPC_ERROR_NONE) {
GPR_ASSERT(calld->recv_trailing_metadata_ != nullptr);
FilterTrailingMetadata(calld->recv_trailing_metadata_,
&calld->elapsed_time_);
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_trailing_metadata_,
GRPC_ERROR_REF(error));
}
void CensusClientCallData::OnDoneRecvMessageCb(void* user_data,
grpc_error* error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusClientCallData* calld =
reinterpret_cast<CensusClientCallData*>(elem->call_data);
CensusChannelData* channeld =
reinterpret_cast<CensusChannelData*>(elem->channel_data);
GPR_ASSERT(calld != nullptr);
GPR_ASSERT(channeld != nullptr);
// Stream messages are no longer valid after receiving trailing metadata.
if ((*calld->recv_message_) != nullptr) {
calld->recv_message_count_++;
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
}
void CensusClientCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
if (op->send_initial_metadata() != nullptr) {
census_context* ctxt = op->get_census_context();
GenerateClientContext(
qualified_method_, &context_,
(ctxt == nullptr) ? nullptr : reinterpret_cast<CensusContext*>(ctxt));
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf_,
kMaxTraceContextLen);
if (tracing_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_initial_metadata()->batch(), &tracing_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_TRACE_BIN,
grpc_slice_from_copied_buffer(tracing_buf_, tracing_len))));
}
grpc_slice tags = grpc_empty_slice();
// TODO: Add in tagging serialization.
size_t encoded_tags_len = StatsContextSerialize(kMaxTagsLen, &tags);
if (encoded_tags_len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_initial_metadata()->batch(), &stats_bin_,
grpc_mdelem_from_slices(GRPC_MDSTR_GRPC_TAGS_BIN, tags)));
}
}
if (op->send_message() != nullptr) {
++sent_message_count_;
}
if (op->recv_message() != nullptr) {
recv_message_ = op->op()->payload->recv_message.recv_message;
initial_on_done_recv_message_ =
op->op()->payload->recv_message.recv_message_ready;
op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
}
if (op->recv_trailing_metadata() != nullptr) {
recv_trailing_metadata_ = op->recv_trailing_metadata()->batch();
initial_on_done_recv_trailing_metadata_ = op->on_complete();
op->set_on_complete(&on_done_recv_trailing_metadata_);
}
// Call next op.
grpc_call_next_op(elem, op->op());
}
grpc_error* CensusClientCallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
path_ = grpc_slice_ref_internal(args->path);
start_time_ = absl::Now();
method_ = GetMethod(&path_);
qualified_method_ = absl::StrCat("Sent.", method_);
GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_done_recv_trailing_metadata_,
OnDoneRecvTrailingMetadataCb, elem,
grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
void CensusClientCallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double latency_ms = absl::ToDoubleMilliseconds(absl::Now() - start_time_);
::opencensus::stats::Record(
{{RpcClientSentBytesPerRpc(), static_cast<double>(request_size)},
{RpcClientReceivedBytesPerRpc(), static_cast<double>(response_size)},
{RpcClientRoundtripLatency(), latency_ms},
{RpcClientServerLatency(),
ToDoubleMilliseconds(absl::Nanoseconds(elapsed_time_))},
{RpcClientSentMessagesPerRpc(), sent_message_count_},
{RpcClientReceivedMessagesPerRpc(), recv_message_count_}},
{{ClientMethodTagKey(), method_},
{ClientStatusTagKey(), StatusCodeToString(final_info->final_status)}});
grpc_slice_unref_internal(path_);
context_.EndSpan();
}
} // namespace grpc

@ -0,0 +1,104 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "src/cpp/ext/filters/census/context.h"
namespace grpc {
// A CallData class will be created for every grpc call within a channel. It is
// used to store data and methods specific to that call. CensusClientCallData is
// thread-compatible, however typically only 1 thread should be interacting with
// a call at a time.
class CensusClientCallData : public CallData {
public:
// Maximum size of trace context is sent on the wire.
static constexpr uint32_t kMaxTraceContextLen = 64;
// Maximum size of tags that are sent on the wire.
static constexpr uint32_t kMaxTagsLen = 2048;
CensusClientCallData()
: recv_trailing_metadata_(nullptr),
initial_on_done_recv_trailing_metadata_(nullptr),
initial_on_done_recv_message_(nullptr),
elapsed_time_(0),
recv_message_(nullptr),
recv_message_count_(0),
sent_message_count_(0) {
memset(&stats_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&tracing_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&path_, 0, sizeof(grpc_slice));
memset(&on_done_recv_trailing_metadata_, 0, sizeof(grpc_closure));
memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
}
grpc_error* Init(grpc_call_element* elem,
const grpc_call_element_args* args) override;
void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
static void OnDoneRecvTrailingMetadataCb(void* user_data, grpc_error* error);
static void OnDoneSendInitialMetadataCb(void* user_data, grpc_error* error);
static void OnDoneRecvMessageCb(void* user_data, grpc_error* error);
private:
CensusContext context_;
// Metadata elements for tracing and census stats data.
grpc_linked_mdelem stats_bin_;
grpc_linked_mdelem tracing_bin_;
// Client method.
absl::string_view method_;
std::string qualified_method_;
grpc_slice path_;
// The recv trailing metadata callbacks.
grpc_metadata_batch* recv_trailing_metadata_;
grpc_closure* initial_on_done_recv_trailing_metadata_;
grpc_closure on_done_recv_trailing_metadata_;
// recv message
grpc_closure* initial_on_done_recv_message_;
grpc_closure on_done_recv_message_;
// Start time (for measuring latency).
absl::Time start_time_;
// Server elapsed time in nanoseconds.
uint64_t elapsed_time_;
// The received message--may be null.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_;
// Number of messages in this RPC.
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference when adding trace context
// metatdata to outgoing message.
char tracing_buf_[kMaxTraceContextLen];
};
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CLIENT_FILTER_H */

@ -0,0 +1,132 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/context.h"
namespace grpc {
using ::opencensus::trace::Span;
using ::opencensus::trace::SpanContext;
void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
absl::string_view primary_role,
absl::string_view method, CensusContext* context) {
GrpcTraceContext trace_ctxt;
TraceContextEncoding::Decode(tracing, &trace_ctxt);
SpanContext parent_ctx = trace_ctxt.ToSpanContext();
new (context) CensusContext(method, parent_ctx);
}
void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
CensusContext* parent_ctxt) {
if (parent_ctxt != nullptr) {
SpanContext span_ctxt = parent_ctxt->Context();
Span span = parent_ctxt->Span();
if (span_ctxt.IsValid()) {
new (ctxt) CensusContext(method, &span);
return;
}
}
new (ctxt) CensusContext(method);
}
size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,
char* tracing_buf, size_t tracing_buf_size) {
GrpcTraceContext trace_ctxt(context);
return TraceContextEncoding::Encode(trace_ctxt, tracing_buf,
tracing_buf_size);
}
size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags) {
// TODO: Add implementation. Waiting on stats tagging to be added.
return 0;
}
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
size_t buf_size) {
return RpcServerStatsEncoding::Encode(server_elapsed_time, buf, buf_size);
}
size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
uint64_t* server_elapsed_time) {
return RpcServerStatsEncoding::Decode(absl::string_view(buf, buf_size),
server_elapsed_time);
}
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info) {
return final_info->stats.transport_stream_stats.incoming.data_bytes;
}
uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info) {
return final_info->stats.transport_stream_stats.outgoing.data_bytes;
}
SpanContext SpanContextFromCensusContext(const census_context* ctxt) {
return reinterpret_cast<const CensusContext*>(ctxt)->Context();
}
Span SpanFromCensusContext(const census_context* ctxt) {
return reinterpret_cast<const CensusContext*>(ctxt)->Span();
}
absl::string_view StatusCodeToString(grpc_status_code code) {
switch (code) {
case GRPC_STATUS_OK:
return "OK";
case GRPC_STATUS_CANCELLED:
return "CANCELLED";
case GRPC_STATUS_UNKNOWN:
return "UNKNOWN";
case GRPC_STATUS_INVALID_ARGUMENT:
return "INVALID_ARGUMENT";
case GRPC_STATUS_DEADLINE_EXCEEDED:
return "DEADLINE_EXCEEDED";
case GRPC_STATUS_NOT_FOUND:
return "NOT_FOUND";
case GRPC_STATUS_ALREADY_EXISTS:
return "ALREADY_EXISTS";
case GRPC_STATUS_PERMISSION_DENIED:
return "PERMISSION_DENIED";
case GRPC_STATUS_UNAUTHENTICATED:
return "UNAUTHENTICATED";
case GRPC_STATUS_RESOURCE_EXHAUSTED:
return "RESOURCE_EXHAUSTED";
case GRPC_STATUS_FAILED_PRECONDITION:
return "FAILED_PRECONDITION";
case GRPC_STATUS_ABORTED:
return "ABORTED";
case GRPC_STATUS_OUT_OF_RANGE:
return "OUT_OF_RANGE";
case GRPC_STATUS_UNIMPLEMENTED:
return "UNIMPLEMENTED";
case GRPC_STATUS_INTERNAL:
return "INTERNAL";
case GRPC_STATUS_UNAVAILABLE:
return "UNAVAILABLE";
case GRPC_STATUS_DATA_LOSS:
return "DATA_LOSS";
default:
// gRPC wants users of this enum to include a default branch so that
// adding values is not a breaking change.
return "UNKNOWN_STATUS";
}
}
} // namespace grpc

@ -0,0 +1,126 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H
#include <grpc/support/port_platform.h>
#include <grpc/status.h>
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include "opencensus/trace/span.h"
#include "opencensus/trace/span_context.h"
#include "opencensus/trace/trace_params.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/cpp/common/channel_filter.h"
#include "src/cpp/ext/filters/census/rpc_encoding.h"
// This is needed because grpc has hardcoded CensusContext with a
// forward declaration of 'struct census_context;'
struct census_context;
namespace grpc {
// Thread compatible.
class CensusContext {
public:
CensusContext() : span_(::opencensus::trace::Span::BlankSpan()) {}
explicit CensusContext(absl::string_view name)
: span_(::opencensus::trace::Span::StartSpan(name)) {}
CensusContext(absl::string_view name, const ::opencensus::trace::Span* parent)
: span_(::opencensus::trace::Span::StartSpan(name, parent)) {}
CensusContext(absl::string_view name,
const ::opencensus::trace::SpanContext& parent_ctxt)
: span_(::opencensus::trace::Span::StartSpanWithRemoteParent(
name, parent_ctxt)) {}
::opencensus::trace::SpanContext Context() const { return span_.context(); }
::opencensus::trace::Span Span() const { return span_; }
void EndSpan() { span_.End(); }
private:
::opencensus::trace::Span span_;
};
// Serializes the outgoing trace context. Field IDs are 1 byte followed by
// field data. A 1 byte version ID is always encoded first.
size_t TraceContextSerialize(const ::opencensus::trace::SpanContext& context,
char* tracing_buf, size_t tracing_buf_size);
// Serializes the outgoing stats context. Field IDs are 1 byte followed by
// field data. A 1 byte version ID is always encoded first. Tags are directly
// serialized into the given grpc_slice.
size_t StatsContextSerialize(size_t max_tags_len, grpc_slice* tags);
// Serialize outgoing server stats. Returns the number of bytes serialized.
size_t ServerStatsSerialize(uint64_t server_elapsed_time, char* buf,
size_t buf_size);
// Deserialize incoming server stats. Returns the number of bytes deserialized.
size_t ServerStatsDeserialize(const char* buf, size_t buf_size,
uint64_t* server_elapsed_time);
// Deserialize the incoming SpanContext and generate a new server context based
// on that. This new span will never be a root span. This should only be called
// with a blank CensusContext as it overwrites it.
void GenerateServerContext(absl::string_view tracing, absl::string_view stats,
absl::string_view primary_role,
absl::string_view method, CensusContext* context);
// Creates a new client context that is by default a new root context.
// If the current context is the default context then the newly created
// span automatically becomes a root span. This should only be called with a
// blank CensusContext as it overwrites it.
void GenerateClientContext(absl::string_view method, CensusContext* ctxt,
CensusContext* parent_ctx);
// Returns the incoming data size from the grpc call final info.
uint64_t GetIncomingDataSize(const grpc_call_final_info* final_info);
// Returns the outgoing data size from the grpc call final info.
uint64_t GetOutgoingDataSize(const grpc_call_final_info* final_info);
// These helper functions return the SpanContext and Span, respectively
// associated with the census_context* stored by grpc. The user will need to
// call this for manual propagation of tracing data.
::opencensus::trace::SpanContext SpanContextFromCensusContext(
const census_context* ctxt);
::opencensus::trace::Span SpanFromCensusContext(const census_context* ctxt);
// Returns a string representation of the StatusCode enum.
absl::string_view StatusCodeToString(grpc_status_code code);
inline absl::string_view GetMethod(const grpc_slice* path) {
if (GRPC_SLICE_IS_EMPTY(*path)) {
return "";
}
// Check for leading '/' and trim it if present.
return absl::StripPrefix(absl::string_view(reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(*path)),
GRPC_SLICE_LENGTH(*path)),
"/");
}
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_CONTEXT_H */

@ -0,0 +1,130 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include <grpcpp/server_context.h>
#include "opencensus/trace/span.h"
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "src/cpp/ext/filters/census/client_filter.h"
#include "src/cpp/ext/filters/census/measures.h"
#include "src/cpp/ext/filters/census/server_filter.h"
namespace grpc {
void RegisterOpenCensusPlugin() {
RegisterChannelFilter<CensusChannelData, CensusClientCallData>(
"opencensus_client", GRPC_CLIENT_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
RegisterChannelFilter<CensusChannelData, CensusServerCallData>(
"opencensus_server", GRPC_SERVER_CHANNEL, INT_MAX /* priority */,
nullptr /* condition function */);
// Access measures to ensure they are initialized. Otherwise, creating a view
// before the first RPC would cause an error.
RpcClientSentBytesPerRpc();
RpcClientReceivedBytesPerRpc();
RpcClientRoundtripLatency();
RpcClientServerLatency();
RpcClientSentMessagesPerRpc();
RpcClientReceivedMessagesPerRpc();
RpcServerSentBytesPerRpc();
RpcServerReceivedBytesPerRpc();
RpcServerServerLatency();
RpcServerSentMessagesPerRpc();
RpcServerReceivedMessagesPerRpc();
}
::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context) {
return reinterpret_cast<const CensusContext*>(context->census_context())
->Span();
}
// These measure definitions should be kept in sync across opencensus
// implementations--see
// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java.
::opencensus::stats::TagKey ClientMethodTagKey() {
static const auto method_tag_key =
::opencensus::stats::TagKey::Register("grpc_client_method");
return method_tag_key;
}
::opencensus::stats::TagKey ClientStatusTagKey() {
static const auto status_tag_key =
::opencensus::stats::TagKey::Register("grpc_client_status");
return status_tag_key;
}
::opencensus::stats::TagKey ServerMethodTagKey() {
static const auto method_tag_key =
::opencensus::stats::TagKey::Register("grpc_server_method");
return method_tag_key;
}
::opencensus::stats::TagKey ServerStatusTagKey() {
static const auto status_tag_key =
::opencensus::stats::TagKey::Register("grpc_server_status");
return status_tag_key;
}
// Client
ABSL_CONST_INIT const absl::string_view
kRpcClientSentMessagesPerRpcMeasureName =
"grpc.io/client/sent_messages_per_rpc";
ABSL_CONST_INIT const absl::string_view kRpcClientSentBytesPerRpcMeasureName =
"grpc.io/client/sent_bytes_per_rpc";
ABSL_CONST_INIT const absl::string_view
kRpcClientReceivedMessagesPerRpcMeasureName =
"grpc.io/client/received_messages_per_rpc";
ABSL_CONST_INIT const absl::string_view
kRpcClientReceivedBytesPerRpcMeasureName =
"grpc.io/client/received_bytes_per_rpc";
ABSL_CONST_INIT const absl::string_view kRpcClientRoundtripLatencyMeasureName =
"grpc.io/client/roundtrip_latency";
ABSL_CONST_INIT const absl::string_view kRpcClientServerLatencyMeasureName =
"grpc.io/client/server_latency";
// Server
ABSL_CONST_INIT const absl::string_view
kRpcServerSentMessagesPerRpcMeasureName =
"grpc.io/server/sent_messages_per_rpc";
ABSL_CONST_INIT const absl::string_view kRpcServerSentBytesPerRpcMeasureName =
"grpc.io/server/sent_bytes_per_rpc";
ABSL_CONST_INIT const absl::string_view
kRpcServerReceivedMessagesPerRpcMeasureName =
"grpc.io/server/received_messages_per_rpc";
ABSL_CONST_INIT const absl::string_view
kRpcServerReceivedBytesPerRpcMeasureName =
"grpc.io/server/received_bytes_per_rpc";
ABSL_CONST_INIT const absl::string_view kRpcServerServerLatencyMeasureName =
"grpc.io/server/server_latency";
} // namespace grpc

@ -0,0 +1,111 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "include/grpcpp/opencensus.h"
#include "opencensus/stats/stats.h"
#include "opencensus/trace/span.h"
namespace grpc {
class ServerContext;
// Returns the tracing Span for the current RPC.
::opencensus::trace::Span GetSpanFromServerContext(ServerContext* context);
// The tag keys set when recording RPC stats.
::opencensus::stats::TagKey ClientMethodTagKey();
::opencensus::stats::TagKey ClientStatusTagKey();
::opencensus::stats::TagKey ServerMethodTagKey();
::opencensus::stats::TagKey ServerStatusTagKey();
// Names of measures used by the plugin--users can create views on these
// measures but should not record data for them.
extern const absl::string_view kRpcClientSentMessagesPerRpcMeasureName;
extern const absl::string_view kRpcClientSentBytesPerRpcMeasureName;
extern const absl::string_view kRpcClientReceivedMessagesPerRpcMeasureName;
extern const absl::string_view kRpcClientReceivedBytesPerRpcMeasureName;
extern const absl::string_view kRpcClientRoundtripLatencyMeasureName;
extern const absl::string_view kRpcClientServerLatencyMeasureName;
extern const absl::string_view kRpcServerSentMessagesPerRpcMeasureName;
extern const absl::string_view kRpcServerSentBytesPerRpcMeasureName;
extern const absl::string_view kRpcServerReceivedMessagesPerRpcMeasureName;
extern const absl::string_view kRpcServerReceivedBytesPerRpcMeasureName;
extern const absl::string_view kRpcServerServerLatencyMeasureName;
// Canonical gRPC view definitions.
const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor&
ClientReceivedMessagesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor&
ClientReceivedBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor&
ServerReceivedBytesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyCumulative();
const ::opencensus::stats::ViewDescriptor& ServerStartedCountCumulative();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsCumulative();
const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor&
ServerReceivedMessagesPerRpcCumulative();
const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsMinute();
const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcMinute();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyMinute();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsMinute();
const ::opencensus::stats::ViewDescriptor& ClientSentMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientSentBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientReceivedMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientReceivedBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ClientRoundtripLatencyHour();
const ::opencensus::stats::ViewDescriptor& ClientServerLatencyHour();
const ::opencensus::stats::ViewDescriptor& ClientCompletedRpcsHour();
const ::opencensus::stats::ViewDescriptor& ServerSentMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerSentBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerReceivedMessagesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerReceivedBytesPerRpcHour();
const ::opencensus::stats::ViewDescriptor& ServerServerLatencyHour();
const ::opencensus::stats::ViewDescriptor& ServerStartedCountHour();
const ::opencensus::stats::ViewDescriptor& ServerCompletedRpcsHour();
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_GRPC_PLUGIN_H */

@ -0,0 +1,129 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/measures.h"
#include "opencensus/stats/stats.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
namespace grpc {
using ::opencensus::stats::MeasureDouble;
using ::opencensus::stats::MeasureInt64;
// These measure definitions should be kept in sync across opencensus
// implementations--see
// https://github.com/census-instrumentation/opencensus-java/blob/master/contrib/grpc_metrics/src/main/java/io/opencensus/contrib/grpc/metrics/RpcMeasureConstants.java.
namespace {
// Unit constants
constexpr char kUnitBytes[] = "By";
constexpr char kUnitMilliseconds[] = "ms";
constexpr char kCount[] = "1";
} // namespace
// Client
MeasureDouble RpcClientSentBytesPerRpc() {
static const auto measure = MeasureDouble::Register(
kRpcClientSentBytesPerRpcMeasureName,
"Total bytes sent across all request messages per RPC", kUnitBytes);
return measure;
}
MeasureDouble RpcClientReceivedBytesPerRpc() {
static const auto measure = MeasureDouble::Register(
kRpcClientReceivedBytesPerRpcMeasureName,
"Total bytes received across all response messages per RPC", kUnitBytes);
return measure;
}
MeasureDouble RpcClientRoundtripLatency() {
static const auto measure = MeasureDouble::Register(
kRpcClientRoundtripLatencyMeasureName,
"Time between first byte of request sent to last byte of response "
"received, or terminal error",
kUnitMilliseconds);
return measure;
}
MeasureDouble RpcClientServerLatency() {
static const auto measure = MeasureDouble::Register(
kRpcClientServerLatencyMeasureName,
"Time between first byte of request received to last byte of response "
"sent, or terminal error (propagated from the server)",
kUnitMilliseconds);
return measure;
}
MeasureInt64 RpcClientSentMessagesPerRpc() {
static const auto measure =
MeasureInt64::Register(kRpcClientSentMessagesPerRpcMeasureName,
"Number of messages sent per RPC", kCount);
return measure;
}
MeasureInt64 RpcClientReceivedMessagesPerRpc() {
static const auto measure =
MeasureInt64::Register(kRpcClientReceivedMessagesPerRpcMeasureName,
"Number of messages received per RPC", kCount);
return measure;
}
// Server
MeasureDouble RpcServerSentBytesPerRpc() {
static const auto measure = MeasureDouble::Register(
kRpcServerSentBytesPerRpcMeasureName,
"Total bytes sent across all messages per RPC", kUnitBytes);
return measure;
}
MeasureDouble RpcServerReceivedBytesPerRpc() {
static const auto measure = MeasureDouble::Register(
kRpcServerReceivedBytesPerRpcMeasureName,
"Total bytes received across all messages per RPC", kUnitBytes);
return measure;
}
MeasureDouble RpcServerServerLatency() {
static const auto measure = MeasureDouble::Register(
kRpcServerServerLatencyMeasureName,
"Time between first byte of request received to last byte of response "
"sent, or terminal error",
kUnitMilliseconds);
return measure;
}
MeasureInt64 RpcServerSentMessagesPerRpc() {
static const auto measure =
MeasureInt64::Register(kRpcServerSentMessagesPerRpcMeasureName,
"Number of messages sent per RPC", kCount);
return measure;
}
MeasureInt64 RpcServerReceivedMessagesPerRpc() {
static const auto measure =
MeasureInt64::Register(kRpcServerReceivedMessagesPerRpcMeasureName,
"Number of messages received per RPC", kCount);
return measure;
}
} // namespace grpc

@ -0,0 +1,46 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H
#include <grpc/support/port_platform.h>
#include "opencensus/stats/stats.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
namespace grpc {
::opencensus::stats::MeasureInt64 RpcClientSentMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcClientSentBytesPerRpc();
::opencensus::stats::MeasureInt64 RpcClientReceivedMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcClientReceivedBytesPerRpc();
::opencensus::stats::MeasureDouble RpcClientRoundtripLatency();
::opencensus::stats::MeasureDouble RpcClientServerLatency();
::opencensus::stats::MeasureInt64 RpcClientCompletedRpcs();
::opencensus::stats::MeasureInt64 RpcServerSentMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcServerSentBytesPerRpc();
::opencensus::stats::MeasureInt64 RpcServerReceivedMessagesPerRpc();
::opencensus::stats::MeasureDouble RpcServerReceivedBytesPerRpc();
::opencensus::stats::MeasureDouble RpcServerServerLatency();
::opencensus::stats::MeasureInt64 RpcServerCompletedRpcs();
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_MEASURES_H */

@ -0,0 +1,39 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/rpc_encoding.h"
namespace grpc {
constexpr size_t TraceContextEncoding::kGrpcTraceContextSize;
constexpr size_t TraceContextEncoding::kEncodeDecodeFailure;
constexpr size_t TraceContextEncoding::kVersionIdSize;
constexpr size_t TraceContextEncoding::kFieldIdSize;
constexpr size_t TraceContextEncoding::kVersionIdOffset;
constexpr size_t TraceContextEncoding::kVersionId;
constexpr size_t RpcServerStatsEncoding::kRpcServerStatsSize;
constexpr size_t RpcServerStatsEncoding::kEncodeDecodeFailure;
constexpr size_t RpcServerStatsEncoding::kVersionIdSize;
constexpr size_t RpcServerStatsEncoding::kFieldIdSize;
constexpr size_t RpcServerStatsEncoding::kVersionIdOffset;
constexpr size_t RpcServerStatsEncoding::kVersionId;
} // namespace grpc

@ -0,0 +1,284 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H
#include <grpc/support/port_platform.h>
#include <string.h>
#include "absl/base/internal/endian.h"
#include "absl/strings/string_view.h"
#include "opencensus/trace/span_context.h"
#include "opencensus/trace/span_id.h"
#include "opencensus/trace/trace_id.h"
namespace grpc {
// TODO: Rename to GrpcTraceContextV0.
struct GrpcTraceContext {
GrpcTraceContext() {}
explicit GrpcTraceContext(const ::opencensus::trace::SpanContext& ctx) {
ctx.trace_id().CopyTo(trace_id);
ctx.span_id().CopyTo(span_id);
ctx.trace_options().CopyTo(trace_options);
}
::opencensus::trace::SpanContext ToSpanContext() const {
return ::opencensus::trace::SpanContext(
::opencensus::trace::TraceId(trace_id),
::opencensus::trace::SpanId(span_id),
::opencensus::trace::TraceOptions(trace_options));
}
// TODO: For performance:
// uint8_t version;
// uint8_t trace_id_field_id;
uint8_t trace_id[::opencensus::trace::TraceId::kSize];
// uint8_t span_id_field_id;
uint8_t span_id[::opencensus::trace::SpanId::kSize];
// uint8_t trace_options_field_id;
uint8_t trace_options[::opencensus::trace::TraceOptions::kSize];
};
// TraceContextEncoding encapsulates the logic for encoding and decoding of
// trace contexts.
class TraceContextEncoding {
public:
// Size of encoded GrpcTraceContext. (16 + 8 + 1 + 4)
static constexpr size_t kGrpcTraceContextSize = 29;
// Error value.
static constexpr size_t kEncodeDecodeFailure = 0;
// Deserializes a GrpcTraceContext from the incoming buffer. Returns the
// number of bytes deserialized from the buffer. If the incoming buffer is
// empty or the encoding version is not supported it will return 0 bytes,
// currently only version 0 is supported. If an unknown field ID is
// encountered it will return immediately without parsing the rest of the
// buffer. Inlined for performance reasons.
static size_t Decode(absl::string_view buf, GrpcTraceContext* tc) {
if (buf.empty()) {
return kEncodeDecodeFailure;
}
uint8_t version = buf[kVersionIdOffset];
// TODO: Support other versions later. Only support version 0 for
// now.
if (version != kVersionId) {
return kEncodeDecodeFailure;
}
size_t pos = kVersionIdSize;
while (pos < buf.size()) {
size_t bytes_read =
ParseField(absl::string_view(&buf[pos], buf.size() - pos), tc);
if (bytes_read == 0) {
break;
} else {
pos += bytes_read;
}
}
return pos;
}
// Serializes a GrpcTraceContext into the provided buffer. Returns the number
// of bytes serialized into the buffer. If the buffer is not of sufficient
// size (it must be at least kGrpcTraceContextSize bytes) it will drop
// everything and return 0 bytes serialized. Inlined for performance reasons.
static size_t Encode(const GrpcTraceContext& tc, char* buf, size_t buf_size) {
if (buf_size < kGrpcTraceContextSize) {
return kEncodeDecodeFailure;
}
buf[kVersionIdOffset] = kVersionId;
buf[kTraceIdOffset] = kTraceIdField;
memcpy(&buf[kTraceIdOffset + 1], tc.trace_id,
opencensus::trace::TraceId::kSize);
buf[kSpanIdOffset] = kSpanIdField;
memcpy(&buf[kSpanIdOffset + 1], tc.span_id,
opencensus::trace::SpanId::kSize);
buf[kTraceOptionsOffset] = kTraceOptionsField;
memcpy(&buf[kTraceOptionsOffset + 1], tc.trace_options,
opencensus::trace::TraceOptions::kSize);
return kGrpcTraceContextSize;
}
private:
// Parses the next field from the incoming buffer and stores the parsed value
// in a GrpcTraceContext struct. If it does not recognize the field ID it
// will return 0, otherwise it returns the number of bytes read.
static size_t ParseField(absl::string_view buf, GrpcTraceContext* tc) {
// TODO: Add support for multi-byte field IDs.
if (buf.empty()) {
return 0;
}
// Field ID is always the first byte in a field.
uint32_t field_id = buf[0];
size_t bytes_read = kFieldIdSize;
switch (field_id) {
case kTraceIdField:
bytes_read += kTraceIdSize;
if (bytes_read > buf.size()) {
return 0;
}
memcpy(tc->trace_id, &buf[kFieldIdSize],
opencensus::trace::TraceId::kSize);
break;
case kSpanIdField:
bytes_read += kSpanIdSize;
if (bytes_read > buf.size()) {
return 0;
}
memcpy(tc->span_id, &buf[kFieldIdSize],
opencensus::trace::SpanId::kSize);
break;
case kTraceOptionsField:
bytes_read += kTraceOptionsSize;
if (bytes_read > buf.size()) {
return 0;
}
memcpy(tc->trace_options, &buf[kFieldIdSize],
opencensus::trace::TraceOptions::kSize);
break;
default: // Invalid field ID
return 0;
}
return bytes_read;
}
// Size of Version ID.
static constexpr size_t kVersionIdSize = 1;
// Size of Field ID.
static constexpr size_t kFieldIdSize = 1;
// Offset and value for currently supported version ID.
static constexpr size_t kVersionIdOffset = 0;
static constexpr size_t kVersionId = 0;
// Fixed Field ID values:
enum FieldIdValue {
kTraceIdField = 0,
kSpanIdField = 1,
kTraceOptionsField = 2,
};
// Field data sizes in bytes
enum FieldSize {
kTraceIdSize = 16,
kSpanIdSize = 8,
kTraceOptionsSize = 1,
};
// Fixed size offsets for field ID start positions during encoding. Field
// data immediately follows.
enum FieldIdOffset {
kTraceIdOffset = kVersionIdSize,
kSpanIdOffset = kTraceIdOffset + kFieldIdSize + kTraceIdSize,
kTraceOptionsOffset = kSpanIdOffset + kFieldIdSize + kSpanIdSize,
};
TraceContextEncoding() = delete;
TraceContextEncoding(const TraceContextEncoding&) = delete;
TraceContextEncoding(TraceContextEncoding&&) = delete;
TraceContextEncoding operator=(const TraceContextEncoding&) = delete;
TraceContextEncoding operator=(TraceContextEncoding&&) = delete;
};
// TODO: This may not be needed. Check to see if opencensus requires
// a trailing server response.
// RpcServerStatsEncoding encapsulates the logic for encoding and decoding of
// rpc server stats messages. Rpc server stats consists of a uint64_t time
// value (server latency in nanoseconds).
class RpcServerStatsEncoding {
public:
// Size of encoded RPC server stats.
static constexpr size_t kRpcServerStatsSize = 10;
// Error value.
static constexpr size_t kEncodeDecodeFailure = 0;
// Deserializes rpc server stats from the incoming 'buf' into *time. Returns
// number of bytes decoded. If the buffer is of insufficient size (it must be
// at least kRpcServerStatsSize bytes) or the encoding version or field ID are
// unrecognized, *time will be set to 0 and it will return
// kEncodeDecodeFailure. Inlined for performance reasons.
static size_t Decode(absl::string_view buf, uint64_t* time) {
if (buf.size() < kRpcServerStatsSize) {
*time = 0;
return kEncodeDecodeFailure;
}
uint8_t version = buf[kVersionIdOffset];
uint32_t fieldID = buf[kServerElapsedTimeOffset];
if (version != kVersionId || fieldID != kServerElapsedTimeField) {
*time = 0;
return kEncodeDecodeFailure;
}
*time = absl::little_endian::Load64(
&buf[kServerElapsedTimeOffset + kFieldIdSize]);
return kRpcServerStatsSize;
}
// Serializes rpc server stats into the provided buffer. It returns the
// number of bytes written to the buffer. If the buffer is smaller than
// kRpcServerStatsSize bytes it will return kEncodeDecodeFailure. Inlined for
// performance reasons.
static size_t Encode(uint64_t time, char* buf, size_t buf_size) {
if (buf_size < kRpcServerStatsSize) {
return kEncodeDecodeFailure;
}
buf[kVersionIdOffset] = kVersionId;
buf[kServerElapsedTimeOffset] = kServerElapsedTimeField;
absl::little_endian::Store64(&buf[kServerElapsedTimeOffset + kFieldIdSize],
time);
return kRpcServerStatsSize;
}
private:
// Size of Version ID.
static constexpr size_t kVersionIdSize = 1;
// Size of Field ID.
static constexpr size_t kFieldIdSize = 1;
// Offset and value for currently supported version ID.
static constexpr size_t kVersionIdOffset = 0;
static constexpr size_t kVersionId = 0;
enum FieldIdValue {
kServerElapsedTimeField = 0,
};
enum FieldSize {
kServerElapsedTimeSize = 8,
};
enum FieldIdOffset {
kServerElapsedTimeOffset = kVersionIdSize,
};
RpcServerStatsEncoding() = delete;
RpcServerStatsEncoding(const RpcServerStatsEncoding&) = delete;
RpcServerStatsEncoding(RpcServerStatsEncoding&&) = delete;
RpcServerStatsEncoding operator=(const RpcServerStatsEncoding&) = delete;
RpcServerStatsEncoding operator=(RpcServerStatsEncoding&&) = delete;
};
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_RPC_ENCODING_H */

@ -0,0 +1,198 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/server_filter.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "opencensus/stats/stats.h"
#include "src/core/lib/surface/call.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/cpp/ext/filters/census/measures.h"
namespace grpc {
constexpr uint32_t CensusServerCallData::kMaxServerStatsLen;
namespace {
// server metadata elements
struct ServerMetadataElements {
grpc_slice path;
grpc_slice tracing_slice;
grpc_slice census_proto;
};
void FilterInitialMetadata(grpc_metadata_batch* b,
ServerMetadataElements* sml) {
if (b->idx.named.path != nullptr) {
sml->path = grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.path->md));
}
if (b->idx.named.grpc_trace_bin != nullptr) {
sml->tracing_slice =
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_trace_bin->md));
grpc_metadata_batch_remove(b, b->idx.named.grpc_trace_bin);
}
if (b->idx.named.grpc_tags_bin != nullptr) {
sml->census_proto =
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_tags_bin->md));
grpc_metadata_batch_remove(b, b->idx.named.grpc_tags_bin);
}
}
} // namespace
void CensusServerCallData::OnDoneRecvMessageCb(void* user_data,
grpc_error* error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusServerCallData* calld =
reinterpret_cast<CensusServerCallData*>(elem->call_data);
CensusChannelData* channeld =
reinterpret_cast<CensusChannelData*>(elem->channel_data);
GPR_ASSERT(calld != nullptr);
GPR_ASSERT(channeld != nullptr);
// Stream messages are no longer valid after receiving trailing metadata.
if ((*calld->recv_message_) != nullptr) {
++calld->recv_message_count_;
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_message_, GRPC_ERROR_REF(error));
}
void CensusServerCallData::OnDoneRecvInitialMetadataCb(void* user_data,
grpc_error* error) {
grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
CensusServerCallData* calld =
reinterpret_cast<CensusServerCallData*>(elem->call_data);
GPR_ASSERT(calld != nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* initial_metadata = calld->recv_initial_metadata_;
GPR_ASSERT(initial_metadata != nullptr);
ServerMetadataElements sml;
sml.path = grpc_empty_slice();
sml.tracing_slice = grpc_empty_slice();
sml.census_proto = grpc_empty_slice();
FilterInitialMetadata(initial_metadata, &sml);
calld->path_ = grpc_slice_ref_internal(sml.path);
calld->method_ = GetMethod(&calld->path_);
calld->qualified_method_ = StrCat("Recv.", calld->method_);
const char* tracing_str =
GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
? ""
: reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(sml.tracing_slice));
size_t tracing_str_len = GRPC_SLICE_IS_EMPTY(sml.tracing_slice)
? 0
: GRPC_SLICE_LENGTH(sml.tracing_slice);
const char* census_str = GRPC_SLICE_IS_EMPTY(sml.census_proto)
? ""
: reinterpret_cast<const char*>(
GRPC_SLICE_START_PTR(sml.census_proto));
size_t census_str_len = GRPC_SLICE_IS_EMPTY(sml.census_proto)
? 0
: GRPC_SLICE_LENGTH(sml.census_proto);
GenerateServerContext(absl::string_view(tracing_str, tracing_str_len),
absl::string_view(census_str, census_str_len),
/*primary_role*/ "", calld->qualified_method_,
&calld->context_);
grpc_slice_unref_internal(sml.tracing_slice);
grpc_slice_unref_internal(sml.census_proto);
grpc_slice_unref_internal(sml.path);
grpc_census_call_set_context(
calld->gc_, reinterpret_cast<census_context*>(&calld->context_));
}
GRPC_CLOSURE_RUN(calld->initial_on_done_recv_initial_metadata_,
GRPC_ERROR_REF(error));
}
void CensusServerCallData::StartTransportStreamOpBatch(
grpc_call_element* elem, TransportStreamOpBatch* op) {
if (op->recv_initial_metadata() != nullptr) {
// substitute our callback for the op callback
recv_initial_metadata_ = op->recv_initial_metadata()->batch();
initial_on_done_recv_initial_metadata_ = op->recv_initial_metadata_ready();
op->set_recv_initial_metadata_ready(&on_done_recv_initial_metadata_);
}
if (op->send_message() != nullptr) {
++sent_message_count_;
}
if (op->recv_message() != nullptr) {
recv_message_ = op->op()->payload->recv_message.recv_message;
initial_on_done_recv_message_ =
op->op()->payload->recv_message.recv_message_ready;
op->op()->payload->recv_message.recv_message_ready = &on_done_recv_message_;
}
// We need to record the time when the trailing metadata was sent to mark the
// completeness of the request.
if (op->send_trailing_metadata() != nullptr) {
elapsed_time_ = absl::Now() - start_time_;
size_t len = ServerStatsSerialize(absl::ToInt64Nanoseconds(elapsed_time_),
stats_buf_, kMaxServerStatsLen);
if (len > 0) {
GRPC_LOG_IF_ERROR(
"census grpc_filter",
grpc_metadata_batch_add_tail(
op->send_trailing_metadata()->batch(), &census_bin_,
grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_SERVER_STATS_BIN,
grpc_slice_from_copied_buffer(stats_buf_, len))));
}
}
// Call next op.
grpc_call_next_op(elem, op->op());
}
grpc_error* CensusServerCallData::Init(grpc_call_element* elem,
const grpc_call_element_args* args) {
start_time_ = absl::Now();
gc_ =
grpc_call_from_top_element(grpc_call_stack_element(args->call_stack, 0));
GRPC_CLOSURE_INIT(&on_done_recv_initial_metadata_,
OnDoneRecvInitialMetadataCb, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_done_recv_message_, OnDoneRecvMessageCb, elem,
grpc_schedule_on_exec_ctx);
auth_context_ = grpc_call_auth_context(gc_);
return GRPC_ERROR_NONE;
}
void CensusServerCallData::Destroy(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) {
const uint64_t request_size = GetOutgoingDataSize(final_info);
const uint64_t response_size = GetIncomingDataSize(final_info);
double elapsed_time_ms = absl::ToDoubleMilliseconds(elapsed_time_);
grpc_auth_context_release(auth_context_);
::opencensus::stats::Record(
{{RpcServerSentBytesPerRpc(), static_cast<double>(response_size)},
{RpcServerReceivedBytesPerRpc(), static_cast<double>(request_size)},
{RpcServerServerLatency(), elapsed_time_ms},
{RpcServerSentMessagesPerRpc(), sent_message_count_},
{RpcServerReceivedMessagesPerRpc(), recv_message_count_}},
{{ServerMethodTagKey(), method_},
{ServerStatusTagKey(), StatusCodeToString(final_info->final_status)}});
grpc_slice_unref_internal(path_);
context_.EndSpan();
}
} // namespace grpc

@ -0,0 +1,101 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
#define GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H
#include <grpc/support/port_platform.h>
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "include/grpc/grpc_security.h"
#include "src/cpp/ext/filters/census/channel_filter.h"
#include "src/cpp/ext/filters/census/context.h"
namespace grpc {
// A CallData class will be created for every grpc call within a channel. It is
// used to store data and methods specific to that call. CensusServerCallData is
// thread-compatible, however typically only 1 thread should be interacting with
// a call at a time.
class CensusServerCallData : public CallData {
public:
// Maximum size of server stats that are sent on the wire.
static constexpr uint32_t kMaxServerStatsLen = 16;
CensusServerCallData()
: gc_(nullptr),
auth_context_(nullptr),
recv_initial_metadata_(nullptr),
initial_on_done_recv_initial_metadata_(nullptr),
initial_on_done_recv_message_(nullptr),
recv_message_(nullptr),
recv_message_count_(0),
sent_message_count_(0) {
memset(&census_bin_, 0, sizeof(grpc_linked_mdelem));
memset(&path_, 0, sizeof(grpc_slice));
memset(&on_done_recv_initial_metadata_, 0, sizeof(grpc_closure));
memset(&on_done_recv_message_, 0, sizeof(grpc_closure));
}
grpc_error* Init(grpc_call_element* elem,
const grpc_call_element_args* args) override;
void Destroy(grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure* then_call_closure) override;
void StartTransportStreamOpBatch(grpc_call_element* elem,
TransportStreamOpBatch* op) override;
static void OnDoneRecvInitialMetadataCb(void* user_data, grpc_error* error);
static void OnDoneRecvMessageCb(void* user_data, grpc_error* error);
private:
CensusContext context_;
// server method
absl::string_view method_;
std::string qualified_method_;
grpc_slice path_;
// Pointer to the grpc_call element
grpc_call* gc_;
// Authorization context for the call.
grpc_auth_context* auth_context_;
// Metadata element for census stats.
grpc_linked_mdelem census_bin_;
// recv callback
grpc_metadata_batch* recv_initial_metadata_;
grpc_closure* initial_on_done_recv_initial_metadata_;
grpc_closure on_done_recv_initial_metadata_;
// recv message
grpc_closure* initial_on_done_recv_message_;
grpc_closure on_done_recv_message_;
absl::Time start_time_;
absl::Duration elapsed_time_;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_;
uint64_t recv_message_count_;
uint64_t sent_message_count_;
// Buffer needed for grpc_slice to reference it when adding metatdata to
// response.
char stats_buf_[kMaxServerStatsLen];
};
} // namespace grpc
#endif /* GRPC_INTERNAL_CPP_EXT_FILTERS_CENSUS_SERVER_FILTER_H */

@ -0,0 +1,491 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "absl/time/time.h"
#include "opencensus/stats/internal/aggregation_window.h"
#include "opencensus/stats/internal/set_aggregation_window.h"
#include "opencensus/stats/stats.h"
namespace grpc {
using ::opencensus::stats::Aggregation;
using ::opencensus::stats::AggregationWindow;
using ::opencensus::stats::BucketBoundaries;
using ::opencensus::stats::ViewDescriptor;
// These measure definitions should be kept in sync across opencensus
// implementations.
namespace {
Aggregation BytesDistributionAggregation() {
return Aggregation::Distribution(BucketBoundaries::Explicit(
{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216,
67108864, 268435456, 1073741824, 4294967296}));
}
Aggregation MillisDistributionAggregation() {
return Aggregation::Distribution(BucketBoundaries::Explicit(
{0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4,
5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50,
65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650,
800, 1000, 2000, 5000, 10000, 20000, 50000, 100000}));
}
Aggregation CountDistributionAggregation() {
return Aggregation::Distribution(BucketBoundaries::Exponential(17, 1.0, 2.0));
}
ViewDescriptor MinuteDescriptor() {
auto descriptor = ViewDescriptor();
SetAggregationWindow(AggregationWindow::Interval(absl::Minutes(1)),
&descriptor);
return descriptor;
}
ViewDescriptor HourDescriptor() {
auto descriptor = ViewDescriptor();
SetAggregationWindow(AggregationWindow::Interval(absl::Hours(1)),
&descriptor);
return descriptor;
}
} // namespace
void RegisterOpenCensusViewsForExport() {
ClientSentMessagesPerRpcCumulative().RegisterForExport();
ClientSentBytesPerRpcCumulative().RegisterForExport();
ClientReceivedMessagesPerRpcCumulative().RegisterForExport();
ClientReceivedBytesPerRpcCumulative().RegisterForExport();
ClientRoundtripLatencyCumulative().RegisterForExport();
ClientServerLatencyCumulative().RegisterForExport();
ServerSentMessagesPerRpcCumulative().RegisterForExport();
ServerSentBytesPerRpcCumulative().RegisterForExport();
ServerReceivedMessagesPerRpcCumulative().RegisterForExport();
ServerReceivedBytesPerRpcCumulative().RegisterForExport();
ServerServerLatencyCumulative().RegisterForExport();
}
// client cumulative
const ViewDescriptor& ClientSentBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/sent_bytes_per_rpc/cumulative")
.set_measure(kRpcClientSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/received_bytes_per_rpc/cumulative")
.set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientRoundtripLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/roundtrip_latency/cumulative")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientServerLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/server_latency/cumulative")
.set_measure(kRpcClientServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/completed_rpcs/cumulative")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey())
.add_column(ClientStatusTagKey());
return descriptor;
}
const ViewDescriptor& ClientSentMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/received_messages_per_rpc/cumulative")
.set_measure(kRpcClientSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/client/sent_messages_per_rpc/cumulative")
.set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
// server cumulative
const ViewDescriptor& ServerSentBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/received_bytes_per_rpc/cumulative")
.set_measure(kRpcServerSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedBytesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/sent_bytes_per_rpc/cumulative")
.set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerServerLatencyCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/elapsed_time/cumulative")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerCompletedRpcsCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/completed_rpcs/cumulative")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey())
.add_column(ServerStatusTagKey());
return descriptor;
}
const ViewDescriptor& ServerSentMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/received_messages_per_rpc/cumulative")
.set_measure(kRpcServerSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedMessagesPerRpcCumulative() {
const static ViewDescriptor descriptor =
ViewDescriptor()
.set_name("grpc.io/server/sent_messages_per_rpc/cumulative")
.set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
// client minute
const ViewDescriptor& ClientSentBytesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/sent_bytes_per_rpc/minute")
.set_measure(kRpcClientSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedBytesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/received_bytes_per_rpc/minute")
.set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientRoundtripLatencyMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/roundtrip_latency/minute")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientServerLatencyMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/server_latency/minute")
.set_measure(kRpcClientServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientCompletedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/completed_rpcs/minute")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey())
.add_column(ClientStatusTagKey());
return descriptor;
}
const ViewDescriptor& ClientSentMessagesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/sent_messages_per_rpc/minute")
.set_measure(kRpcClientSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedMessagesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/client/received_messages_per_rpc/minute")
.set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
// server minute
const ViewDescriptor& ServerSentBytesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/sent_bytes_per_rpc/minute")
.set_measure(kRpcServerSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedBytesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/received_bytes_per_rpc/minute")
.set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerServerLatencyMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/server_latency/minute")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerCompletedRpcsMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/completed_rpcs/minute")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey())
.add_column(ServerStatusTagKey());
return descriptor;
}
const ViewDescriptor& ServerSentMessagesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/sent_messages_per_rpc/minute")
.set_measure(kRpcServerSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedMessagesPerRpcMinute() {
const static ViewDescriptor descriptor =
MinuteDescriptor()
.set_name("grpc.io/server/received_messages_per_rpc/minute")
.set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
// client hour
const ViewDescriptor& ClientSentBytesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/sent_bytes_per_rpc/hour")
.set_measure(kRpcClientSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedBytesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/received_bytes_per_rpc/hour")
.set_measure(kRpcClientReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientRoundtripLatencyHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/roundtrip_latency/hour")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientServerLatencyHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/server_latency/hour")
.set_measure(kRpcClientServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientCompletedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/completed_rpcs/hour")
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey())
.add_column(ClientStatusTagKey());
return descriptor;
}
const ViewDescriptor& ClientSentMessagesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/sent_messages_per_rpc/hour")
.set_measure(kRpcClientSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
const ViewDescriptor& ClientReceivedMessagesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/client/received_messages_per_rpc/hour")
.set_measure(kRpcClientReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ClientMethodTagKey());
return descriptor;
}
// server hour
const ViewDescriptor& ServerSentBytesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/sent_bytes_per_rpc/hour")
.set_measure(kRpcServerSentBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedBytesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/received_bytes_per_rpc/hour")
.set_measure(kRpcServerReceivedBytesPerRpcMeasureName)
.set_aggregation(BytesDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerServerLatencyHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/server_latency/hour")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(MillisDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerCompletedRpcsHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/completed_rpcs/hour")
.set_measure(kRpcServerServerLatencyMeasureName)
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey())
.add_column(ServerStatusTagKey());
return descriptor;
}
const ViewDescriptor& ServerSentMessagesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/sent_messages_per_rpc/hour")
.set_measure(kRpcServerSentMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
const ViewDescriptor& ServerReceivedMessagesPerRpcHour() {
const static ViewDescriptor descriptor =
HourDescriptor()
.set_name("grpc.io/server/received_messages_per_rpc/hour")
.set_measure(kRpcServerReceivedMessagesPerRpcMeasureName)
.set_aggregation(CountDistributionAggregation())
.add_column(ServerMethodTagKey());
return descriptor;
}
} // namespace grpc

@ -1,3 +0,0 @@
google.census.Tag.key max_size:255
google.census.Tag.value max_size:255
google.census.View.tag_key max_count:15

@ -1,307 +0,0 @@
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.census;
// All the census protos.
//
// Nomenclature note: capitalized names below (like Resource) are protos.
//
// Census lets you define a Resource - something which can be measured, like the
// latency of an RPC, the number of CPU cycles spent on an operation, or
// anything else you care to measure. You can record individual instances of
// measurements (a double value) for every Resource of interest. These
// individual measurements are aggregated together into an Aggregation. There
// are two Aggregation types available: Distribution (describes the
// distribution of all measurements, possibly with a histogram) and
// IntervalStats (the count and mean of measurements across specified time
// periods). An Aggregation is described by an AggregationDescriptor.
//
// You can define how your stats are broken down by Tag values and which
// Aggregations to use through a View. The corresponding combination of
// Resource/View/Aggregation which is available to census clients is called a
// Metric.
// The following two types are copied from
// google/protobuf/{duration,timestamp}.proto. Ideally, we would be able to
// import them, but this causes compilation issues on C-based systems
// (e.g. https://koti.kapsi.fi/jpa/nanopb/), which cannot process the C++
// headers generated from the standard protobuf distribution. See the relevant
// proto files for full documentation of these types.
message Duration {
// Signed seconds of the span of time. Must be from -315,576,000,000
// to +315,576,000,000 inclusive.
int64 seconds = 1;
// Signed fractions of a second at nanosecond resolution of the span
// of time. Durations less than one second are represented with a 0
// `seconds` field and a positive or negative `nanos` field. For durations
// of one second or more, a non-zero value for the `nanos` field must be
// of the same sign as the `seconds` field. Must be from -999,999,999
// to +999,999,999 inclusive.
int32 nanos = 2;
}
message Timestamp {
// Represents seconds of UTC time since Unix epoch
// 1970-01-01T00:00:00Z. Must be from from 0001-01-01T00:00:00Z to
// 9999-12-31T23:59:59Z inclusive.
int64 seconds = 1;
// Non-negative fractions of a second at nanosecond resolution. Negative
// second values with fractions must still have non-negative nanos values
// that count forward in time. Must be from 0 to 999,999,999
// inclusive.
int32 nanos = 2;
}
// Describes a Resource.
message Resource {
// name of resource, e.g. rpc_latency, cpu. Must be unique.
string name = 1;
// More detailed description of the resource, used in documentation.
string description = 2;
// Fundamental units of measurement supported by Census
// TODO(aveitch): expand this to include other S.I. units?
enum BasicUnit {
UNKNOWN = 0;
BITS = 1;
BYTES = 2;
SECS = 3;
CORES = 4;
MAX_UNITS = 5;
}
// MeasurementUnit lets you build compound units of the form
// 10^n * (A * B * ...) / (X * Y * ...),
// where the elements in the numerator and denominator are all BasicUnits. A
// MeasurementUnit must have at least one BasicUnit in its numerator.
//
// To specify multiplication in the numerator or denominator, simply specify
// multiple numerator or denominator fields. For example:
//
// - byte-seconds (i.e. bytes * seconds):
// numerator: BYTES
// numerator: SECS
//
// - events/sec^2 (i.e. rate of change of events/sec):
// numerator: COUNT
// denominator: SECS
// denominator: SECS
//
// To specify multiples (in power of 10) of units, specify a non-zero prefix
// value, for example:
//
// - MB/s (i.e. megabytes / s):
// prefix: 6
// numerator: BYTES
// denominator: SECS
//
// - nanoseconds
// prefix: -9
// numerator: SECS
message MeasurementUnit {
int32 prefix = 1;
repeated BasicUnit numerator = 2;
repeated BasicUnit denominator = 3;
}
// The units in which Resource values are measured.
MeasurementUnit unit = 3;
}
// An Aggregation summarizes a series of individual Resource measurements, an
// AggregationDescriptor describes an Aggregation.
message AggregationDescriptor {
enum AggregationType {
// Unspecified. Should not be used.
UNKNOWN = 0;
// A count of measurements made.
COUNT = 1;
// A Distribution.
DISTRIBUTION = 2;
// Counts over fixed time intervals.
INTERVAL = 3;
}
// The type of Aggregation.
AggregationType type = 1;
// At most one set of options. It is illegal to specifiy an option for
// COUNT Aggregations. interval_boundaries must be set for INTERVAL types.
// bucket_boundaries are optional for DISTRIBUTION types.
oneof options {
// Defines histogram bucket boundaries for Distributions.
BucketBoundaries bucket_boundaries = 2;
// Defines the time windows to record for IntervalStats.
IntervalBoundaries interval_boundaries = 3;
}
// A Distribution may optionally contain a histogram of the values in the
// population. The bucket boundaries for that histogram are described by
// `bucket_boundaries`. This defines `size(bounds) + 1` (= N) buckets. The
// boundaries for bucket index i are:
//
// [-infinity, bounds[i]) for i == 0
// [bounds[i-1], bounds[i]) for 0 < i < N-2
// [bounds[i-1], +infinity) for i == N-1
//
// i.e. an underflow bucket (number 0), zero or more finite buckets (1
// through N - 2, and an overflow bucket (N - 1), with inclusive lower
// bounds and exclusive upper bounds.
//
// There must be at least one element in `bounds`. If `bounds` has only one
// element, there are no finite buckets, and that single element is the
// common boundary of the overflow and underflow buckets.
message BucketBoundaries {
// The values must be monotonically increasing.
repeated double bounds = 1;
}
// For Interval stats, describe the size of each window.
message IntervalBoundaries {
// For each time window, specify a duration in seconds.
repeated double window_size = 1;
}
}
// Distribution contains summary statistics for a population of values and,
// optionally, a histogram representing the distribution of those values across
// a specified set of histogram buckets, as defined in
// Aggregation.bucket_options.
//
// The summary statistics are the count, mean, minimum, and the maximum of the
// set of population of values.
//
// Although it is not forbidden, it is generally a bad idea to include
// non-finite values (infinities or NaNs) in the population of values, as this
// will render the `mean` field meaningless.
message Distribution {
// The number of values in the population. Must be non-negative.
int64 count = 1;
// The arithmetic mean of the values in the population. If `count` is zero
// then this field must be zero.
double mean = 2;
// Describes a range of population values.
message Range {
// The minimum of the population values.
double min = 1;
// The maximum of the population values.
double max = 2;
}
// The range of the population values. If `count` is zero, this field will not
// be defined.
Range range = 3;
// A Distribution may optionally contain a histogram of the values in the
// population. The histogram is given in `bucket_count` as counts of values
// that fall into one of a sequence of non-overlapping buckets, as described
// by `AggregationDescriptor.options.bucket_boundaries`.
// The sum of the values in `bucket_counts` must equal the value in `count`.
//
// Bucket counts are given in order under the numbering scheme described
// above (the underflow bucket has number 0; the finite buckets, if any,
// have numbers 1 through N-2; the overflow bucket has number N-1).
//
// The size of `bucket_count` must be no greater than N as defined in
// `bucket_boundaries`.
//
// Any suffix of trailing zero bucket_count fields may be omitted.
repeated int64 bucket_count = 4;
}
// Record summary stats over various time windows.
message IntervalStats {
// Summary statistic over a single time window.
message Window {
// The window duration. Must be positive.
Duration window_size = 1;
// The number of measurements in this window.
int64 count = 2;
// The arithmetic mean of all measurements in the window.
double mean = 3;
}
// Full set of windows for this aggregation.
repeated Window window = 1;
}
// A Tag: key-value pair.
message Tag {
string key = 1;
string value = 2;
}
// A View specifies an Aggregation and a set of tag keys. The Aggregation will
// be broken down by the unique set of matching tag values for each measurement.
message View {
// Name of view. Must be unique.
string name = 1;
// More detailed description, for documentation purposes.
string description = 2;
// Name of Resource to be broken down for this view.
string resource_name = 3;
// Aggregation type to associate with this View.
AggregationDescriptor aggregation = 4;
// Tag keys to match with a given Resource measurement. If no keys are
// specified, then all stats are recorded. Keys must be unique.
repeated string tag_key = 5;
}
// An Aggregation summarizes a series of individual Resource measurements.
message Aggregation {
// Name of this aggregation.
string name = 1;
// More detailed description, for documentation purposes.
string description = 2;
// The data for this Aggregation.
oneof data {
uint64 count = 3;
Distribution distribution = 4;
IntervalStats interval_stats = 5;
}
// Tags associated with this Aggregation.
repeated Tag tag = 6;
}
// A Metric represents all the Aggregations for a particular view.
message Metric {
// View associated with this Metric.
string view_name = 1;
// Aggregations - each will have a unique set of tag values for the tag_keys
// associated with the corresponding View.
repeated Aggregation aggregation = 2;
// Start and end timestamps over which the metric was accumulated. These
// values are not relevant/defined for IntervalStats aggregations, which are
// always accumulated over a fixed time period.
Timestamp start = 3;
Timestamp end = 4;
}

@ -1,29 +0,0 @@
// Copyright 2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.trace;
// Tracing information that is propagated with RPC's.
message TraceContext {
// A TraceId uniquely represents a single Trace. It is a 128-bit nonce.
// The 128-bit ID is split into 2 64-bit chunks. (REQUIRED)
fixed64 trace_id_hi = 1;
fixed64 trace_id_lo = 2;
// ID of parent (client) span. (REQUIRED)
fixed64 span_id = 3;
// Span option flags. First bit is true if this trace is sampled. (OPTIONAL)
fixed32 span_options = 4;
}

@ -353,7 +353,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_filter.cc',
'src/core/ext/filters/load_reporting/server_load_reporting_plugin.cc',
'src/core/ext/census/grpc_context.cc',
'src/cpp/ext/filters/census/grpc_context.cc',
'src/core/ext/filters/max_age/max_age_filter.cc',
'src/core/ext/filters/message_size/message_size_filter.cc',
'src/core/ext/filters/http/client_authority_filter.cc',

@ -1,36 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H
#define GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H
void test_invalid_record_size();
void test_end_write_with_different_size();
void test_read_pending_record();
void test_read_beyond_pending_record();
void test_detached_while_reading();
void test_fill_log_no_fragmentation();
void test_fill_circular_log_no_fragmentation();
void test_fill_log_with_straddling_records();
void test_fill_circular_log_with_straddling_records();
void test_multiple_writers_circular_log();
void test_multiple_writers();
void test_performance();
void test_small_log();
#endif /* GRPC_TEST_CORE_STATISTICS_CENSUS_LOG_TESTS_H */

@ -1,62 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/ext/census/census_interface.h"
#include "src/core/ext/census/census_rpc_stats.h"
#include "test/core/util/test_config.h"
/* Tests census noop stubs in a simulated rpc flow */
void test_census_stubs(void) {
census_op_id op_id;
census_rpc_stats* stats = census_rpc_stats_create_empty();
census_aggregated_rpc_stats data_map = {0, NULL};
/* Initializes census library at server start up time. */
census_init();
/* Starts tracing at the beginning of a rpc. */
op_id = census_tracing_start_op();
/* Appends custom annotations on a trace object. */
census_tracing_print(op_id, "annotation foo");
census_tracing_print(op_id, "annotation bar");
/* Appends method tag on the trace object. */
census_add_method_tag(op_id, "service_foo/method.bar");
/* Either record client side stats or server side stats associated with the
op_id. Here for testing purpose, we record both. */
census_record_rpc_client_stats(op_id, stats);
census_record_rpc_server_stats(op_id, stats);
/* Ends a tracing. */
census_tracing_end_op(op_id);
/* In process stats queries. */
census_get_server_stats(&data_map);
census_aggregated_rpc_stats_set_empty(&data_map);
census_get_client_stats(&data_map);
census_aggregated_rpc_stats_set_empty(&data_map);
gpr_free(stats);
census_shutdown();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_census_stubs();
return 0;
}

@ -1,31 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/statistics/census_log_tests.h"
#include <stdlib.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
test_multiple_writers_circular_log();
return 0;
}

@ -1,31 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/statistics/census_log_tests.h"
#include <stdlib.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
test_multiple_writers();
return 0;
}

@ -1,39 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "test/core/statistics/census_log_tests.h"
#include <stdlib.h>
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
srand(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
test_invalid_record_size();
test_end_write_with_different_size();
test_read_pending_record();
test_read_beyond_pending_record();
test_detached_while_reading();
test_fill_log_no_fragmentation();
test_fill_circular_log_no_fragmentation();
test_fill_log_with_straddling_records();
test_fill_circular_log_with_straddling_records();
return 0;
}

@ -1,183 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string.h>
#include <grpc/support/time.h>
#include "src/core/ext/census/census_interface.h"
#include "src/core/ext/census/census_rpc_stats.h"
#include "src/core/ext/census/census_tracing.h"
#include "test/core/util/test_config.h"
/* Ensure all possible state transitions are called without causing problem */
static void test_init_shutdown(void) {
census_stats_store_init();
census_stats_store_init();
census_stats_store_shutdown();
census_stats_store_shutdown();
census_stats_store_init();
}
static void test_create_and_destroy(void) {
census_rpc_stats* stats = NULL;
census_aggregated_rpc_stats agg_stats = {0, NULL};
stats = census_rpc_stats_create_empty();
GPR_ASSERT(stats != NULL);
GPR_ASSERT(stats->cnt == 0 && stats->rpc_error_cnt == 0 &&
stats->app_error_cnt == 0 && stats->elapsed_time_ms == 0.0 &&
stats->api_request_bytes == 0 && stats->wire_request_bytes == 0 &&
stats->api_response_bytes == 0 && stats->wire_response_bytes == 0);
gpr_free(stats);
census_aggregated_rpc_stats_set_empty(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
agg_stats.num_entries = 1;
agg_stats.stats = (census_per_method_rpc_stats*)gpr_malloc(
sizeof(census_per_method_rpc_stats));
agg_stats.stats[0].method = gpr_strdup("foo");
census_aggregated_rpc_stats_set_empty(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
}
#define ASSERT_NEAR(a, b) \
GPR_ASSERT((a - b) * (a - b) < 1e-24 * (a + b) * (a + b))
static void test_record_and_get_stats(void) {
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_op_id id;
census_aggregated_rpc_stats agg_stats = {0, NULL};
/* Record client stats twice with the same op_id. */
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m1");
census_record_rpc_client_stats(id, &stats);
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
/* Server stats expect to be empty */
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
/* Client stats expect to have one entry */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
GPR_ASSERT(agg_stats.stats != NULL);
GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m1") == 0);
GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 2 &&
agg_stats.stats[0].hour_stats.cnt == 2 &&
agg_stats.stats[0].total_stats.cnt == 2);
ASSERT_NEAR(agg_stats.stats[0].minute_stats.wire_response_bytes, 16.8);
ASSERT_NEAR(agg_stats.stats[0].hour_stats.wire_response_bytes, 16.8);
ASSERT_NEAR(agg_stats.stats[0].total_stats.wire_response_bytes, 16.8);
/* Get stats again, results should be the same. */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
/* Record both server (once) and client (twice) stats with different op_ids.
*/
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m2");
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
id = census_tracing_start_op();
census_add_method_tag(id, "m3");
census_record_rpc_server_stats(id, &stats);
census_tracing_end_op(id);
id = census_tracing_start_op();
census_add_method_tag(id, "m4");
census_record_rpc_client_stats(id, &stats);
census_tracing_end_op(id);
/* Check server stats */
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 1);
GPR_ASSERT(strcmp(agg_stats.stats[0].method, "m3") == 0);
GPR_ASSERT(agg_stats.stats[0].minute_stats.app_error_cnt == 3 &&
agg_stats.stats[0].hour_stats.app_error_cnt == 3 &&
agg_stats.stats[0].total_stats.app_error_cnt == 3);
census_aggregated_rpc_stats_set_empty(&agg_stats);
/* Check client stats */
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 2);
GPR_ASSERT(agg_stats.stats != NULL);
GPR_ASSERT((strcmp(agg_stats.stats[0].method, "m2") == 0 &&
strcmp(agg_stats.stats[1].method, "m4") == 0) ||
(strcmp(agg_stats.stats[0].method, "m4") == 0 &&
strcmp(agg_stats.stats[1].method, "m2") == 0));
GPR_ASSERT(agg_stats.stats[0].minute_stats.cnt == 1 &&
agg_stats.stats[1].minute_stats.cnt == 1);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
}
static void test_record_stats_on_unknown_op_id(void) {
census_op_id unknown_id = {0xDEAD, 0xBEEF};
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_aggregated_rpc_stats agg_stats = {0, NULL};
census_init();
/* Tests that recording stats against unknown id is noop. */
census_record_rpc_client_stats(unknown_id, &stats);
census_record_rpc_server_stats(unknown_id, &stats);
census_get_server_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
GPR_ASSERT(agg_stats.stats == NULL);
census_aggregated_rpc_stats_set_empty(&agg_stats);
census_shutdown();
}
/* Test that record stats is noop when trace store is uninitialized. */
static void test_record_stats_with_trace_store_uninitialized(void) {
census_rpc_stats stats = {1, 2, 3, 4, 5.1, 6.2, 7.3, 8.4};
census_op_id id = {0, 0};
census_aggregated_rpc_stats agg_stats = {0, NULL};
census_init();
id = census_tracing_start_op();
census_add_method_tag(id, "m");
census_tracing_end_op(id);
/* shuts down trace store only. */
census_tracing_shutdown();
census_record_rpc_client_stats(id, &stats);
census_get_client_stats(&agg_stats);
GPR_ASSERT(agg_stats.num_entries == 0);
census_stats_store_shutdown();
}
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
test_init_shutdown();
test_create_and_destroy();
test_record_and_get_stats();
test_record_stats_on_unknown_op_id();
test_record_stats_with_trace_store_uninitialized();
return 0;
}

@ -0,0 +1,42 @@
# Copyright 2018 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//bazel:cc_grpc_library.bzl", "cc_grpc_library")
licenses(["notice"]) # Apache v2
grpc_package(name = "test/core/ext/census")
grpc_cc_test(
name = "grpc_opencensus_plugin_test",
srcs = [
"stats_plugin_end2end_test.cc",
],
language = "C++",
external_deps = [
"gtest",
"gmock",
"opencensus-stats-test",
],
deps = [
"//:grpc++",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
"//test/cpp/util:test_config",
],
)

@ -0,0 +1,376 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <string>
#include <thread> // NOLINT
#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "include/grpc++/grpc++.h"
#include "opencensus/stats/stats.h"
#include "opencensus/stats/testing/test_utils.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/test_config.h"
namespace grpc {
namespace testing {
namespace {
using ::opencensus::stats::Aggregation;
using ::opencensus::stats::Distribution;
using ::opencensus::stats::View;
using ::opencensus::stats::ViewDescriptor;
using ::opencensus::stats::testing::TestUtils;
class EchoServer final : public EchoTestService::Service {
::grpc::Status Echo(::grpc::ServerContext* context,
const EchoRequest* request,
EchoResponse* response) override {
if (request->param().expected_error().code() == 0) {
response->set_message(request->message());
return ::grpc::Status::OK;
} else {
return ::grpc::Status(static_cast<::grpc::StatusCode>(
request->param().expected_error().code()),
"");
}
}
};
class StatsPluginEnd2EndTest : public ::testing::Test {
protected:
static void SetUpTestCase() { RegisterOpenCensusPlugin(); }
void SetUp() {
// Set up a synchronous server on a different thread to avoid the asynch
// interface.
::grpc::ServerBuilder builder;
int port;
// Use IPv4 here because it's less flaky than IPv6 ("[::]:0") on Travis.
builder.AddListeningPort("0.0.0.0:0", ::grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
ASSERT_NE(nullptr, server_);
ASSERT_NE(0, port);
server_address_ = absl::StrCat("0.0.0.0:", port);
server_thread_ = std::thread(&StatsPluginEnd2EndTest::RunServerLoop, this);
stub_ = EchoTestService::NewStub(::grpc::CreateChannel(
server_address_, ::grpc::InsecureChannelCredentials()));
}
void TearDown() {
server_->Shutdown();
server_thread_.join();
}
void RunServerLoop() { server_->Wait(); }
const std::string client_method_name_ = "grpc.testing.EchoTestService/Echo";
const std::string server_method_name_ = "grpc.testing.EchoTestService/Echo";
std::string server_address_;
EchoServer service_;
std::unique_ptr<grpc::Server> server_;
std::thread server_thread_;
std::unique_ptr<EchoTestService::Stub> stub_;
};
TEST_F(StatsPluginEnd2EndTest, ErrorCount) {
const auto client_method_descriptor =
ViewDescriptor()
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_name("client_method")
.set_aggregation(Aggregation::Count())
.add_column(ClientMethodTagKey());
View client_method_view(client_method_descriptor);
const auto server_method_descriptor =
ViewDescriptor()
.set_measure(kRpcServerServerLatencyMeasureName)
.set_name("server_method")
.set_aggregation(Aggregation::Count())
.add_column(ServerMethodTagKey());
View server_method_view(server_method_descriptor);
const auto client_status_descriptor =
ViewDescriptor()
.set_measure(kRpcClientRoundtripLatencyMeasureName)
.set_name("client_status")
.set_aggregation(Aggregation::Count())
.add_column(ClientStatusTagKey());
View client_status_view(client_status_descriptor);
const auto server_status_descriptor =
ViewDescriptor()
.set_measure(kRpcServerServerLatencyMeasureName)
.set_name("server_status")
.set_aggregation(Aggregation::Count())
.add_column(ServerStatusTagKey());
View server_status_view(server_status_descriptor);
// Cover all valid statuses.
for (int i = 0; i <= 16; ++i) {
EchoRequest request;
request.set_message("foo");
request.mutable_param()->mutable_expected_error()->set_code(i);
EchoResponse response;
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(client_method_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_), 17)));
EXPECT_THAT(server_method_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_), 17)));
auto codes = {
::testing::Pair(::testing::ElementsAre("OK"), 1),
::testing::Pair(::testing::ElementsAre("CANCELLED"), 1),
::testing::Pair(::testing::ElementsAre("UNKNOWN"), 1),
::testing::Pair(::testing::ElementsAre("INVALID_ARGUMENT"), 1),
::testing::Pair(::testing::ElementsAre("DEADLINE_EXCEEDED"), 1),
::testing::Pair(::testing::ElementsAre("NOT_FOUND"), 1),
::testing::Pair(::testing::ElementsAre("ALREADY_EXISTS"), 1),
::testing::Pair(::testing::ElementsAre("PERMISSION_DENIED"), 1),
::testing::Pair(::testing::ElementsAre("UNAUTHENTICATED"), 1),
::testing::Pair(::testing::ElementsAre("RESOURCE_EXHAUSTED"), 1),
::testing::Pair(::testing::ElementsAre("FAILED_PRECONDITION"), 1),
::testing::Pair(::testing::ElementsAre("ABORTED"), 1),
::testing::Pair(::testing::ElementsAre("OUT_OF_RANGE"), 1),
::testing::Pair(::testing::ElementsAre("UNIMPLEMENTED"), 1),
::testing::Pair(::testing::ElementsAre("INTERNAL"), 1),
::testing::Pair(::testing::ElementsAre("UNAVAILABLE"), 1),
::testing::Pair(::testing::ElementsAre("DATA_LOSS"), 1),
};
EXPECT_THAT(client_status_view.GetData().int_data(),
::testing::UnorderedElementsAreArray(codes));
EXPECT_THAT(server_status_view.GetData().int_data(),
::testing::UnorderedElementsAreArray(codes));
}
TEST_F(StatsPluginEnd2EndTest, RequestReceivedBytesPerRpc) {
View client_sent_bytes_per_rpc_view(ClientSentBytesPerRpcCumulative());
View client_received_bytes_per_rpc_view(
ClientReceivedBytesPerRpcCumulative());
View server_sent_bytes_per_rpc_view(ServerSentBytesPerRpcCumulative());
View server_received_bytes_per_rpc_view(
ServerReceivedBytesPerRpcCumulative());
{
EchoRequest request;
request.set_message("foo");
EchoResponse response;
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(client_received_bytes_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean,
::testing::Gt(0.0))))));
EXPECT_THAT(client_sent_bytes_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean,
::testing::Gt(0.0))))));
EXPECT_THAT(server_received_bytes_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean,
::testing::Gt(0.0))))));
EXPECT_THAT(server_sent_bytes_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean,
::testing::Gt(0.0))))));
}
TEST_F(StatsPluginEnd2EndTest, Latency) {
View client_latency_view(ClientRoundtripLatencyCumulative());
View client_server_latency_view(ClientServerLatencyCumulative());
View server_server_latency_view(ServerServerLatencyCumulative());
const absl::Time start_time = absl::Now();
{
EchoRequest request;
request.set_message("foo");
EchoResponse response;
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
// We do not know exact latency/elapsed time, but we know it is less than the
// entire time spent making the RPC.
const double max_time = absl::ToDoubleMilliseconds(absl::Now() - start_time);
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(
client_latency_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(
::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean, ::testing::Gt(0.0)),
::testing::Property(&Distribution::mean,
::testing::Lt(max_time))))));
// Elapsed time is a subinterval of total latency.
const auto client_latency = client_latency_view.GetData()
.distribution_data()
.find({client_method_name_})
->second.mean();
EXPECT_THAT(
client_server_latency_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(
::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean, ::testing::Gt(0.0)),
::testing::Property(&Distribution::mean,
::testing::Lt(client_latency))))));
// client server elapsed time should be the same value propagated to the
// client.
const auto client_elapsed_time = client_server_latency_view.GetData()
.distribution_data()
.find({client_method_name_})
->second.mean();
EXPECT_THAT(
server_server_latency_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_),
::testing::AllOf(
::testing::Property(&Distribution::count, 1),
::testing::Property(&Distribution::mean,
::testing::DoubleEq(client_elapsed_time))))));
}
TEST_F(StatsPluginEnd2EndTest, CompletedRpcs) {
View client_completed_rpcs_view(ClientCompletedRpcsCumulative());
View server_completed_rpcs_view(ServerCompletedRpcsCumulative());
EchoRequest request;
request.set_message("foo");
EchoResponse response;
const int count = 5;
for (int i = 0; i < count; ++i) {
{
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(client_completed_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_, "OK"), i + 1)));
EXPECT_THAT(server_completed_rpcs_view.GetData().int_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_, "OK"), i + 1)));
}
}
TEST_F(StatsPluginEnd2EndTest, RequestReceivedMessagesPerRpc) {
// TODO: Use streaming RPCs.
View client_received_messages_per_rpc_view(
ClientSentMessagesPerRpcCumulative());
View client_sent_messages_per_rpc_view(
ClientReceivedMessagesPerRpcCumulative());
View server_received_messages_per_rpc_view(
ServerSentMessagesPerRpcCumulative());
View server_sent_messages_per_rpc_view(
ServerReceivedMessagesPerRpcCumulative());
EchoRequest request;
request.set_message("foo");
EchoResponse response;
const int count = 5;
for (int i = 0; i < count; ++i) {
{
::grpc::ClientContext context;
::grpc::Status status = stub_->Echo(&context, request, &response);
ASSERT_TRUE(status.ok());
EXPECT_EQ("foo", response.message());
}
absl::SleepFor(absl::Milliseconds(500));
TestUtils::Flush();
EXPECT_THAT(
client_received_messages_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
::testing::Property(&Distribution::mean,
::testing::DoubleEq(1.0))))));
EXPECT_THAT(
client_sent_messages_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(client_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
::testing::Property(&Distribution::mean,
::testing::DoubleEq(1.0))))));
EXPECT_THAT(
server_received_messages_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
::testing::Property(&Distribution::mean,
::testing::DoubleEq(1.0))))));
EXPECT_THAT(
server_sent_messages_per_rpc_view.GetData().distribution_data(),
::testing::UnorderedElementsAre(::testing::Pair(
::testing::ElementsAre(server_method_name_),
::testing::AllOf(::testing::Property(&Distribution::count, i + 1),
::testing::Property(&Distribution::mean,
::testing::DoubleEq(1.0))))));
}
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -149,3 +149,15 @@ grpc_cc_binary(
srcs = ["bm_chttp2_hpack.cc"],
deps = [":helpers"],
)
grpc_cc_binary(
name = "bm_opencensus_plugin",
testonly = 1,
srcs = ["bm_opencensus_plugin.cc"],
language = "C++",
deps = [
":helpers",
"//:grpc_opencensus_plugin",
"//src/proto/grpc/testing:echo_proto",
],
)

@ -0,0 +1,118 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <benchmark/benchmark.h>
#include <string>
#include <thread> // NOLINT
#include "absl/base/call_once.h"
#include "absl/strings/str_cat.h"
#include "include/grpc++/grpc++.h"
#include "opencensus/stats/stats.h"
#include "src/cpp/ext/filters/census/grpc_plugin.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/cpp/microbenchmarks/helpers.h"
absl::once_flag once;
void RegisterOnce() { absl::call_once(once, grpc::RegisterOpenCensusPlugin); }
class EchoServer final : public grpc::testing::EchoTestService::Service {
grpc::Status Echo(grpc::ServerContext* context,
const grpc::testing::EchoRequest* request,
grpc::testing::EchoResponse* response) override {
if (request->param().expected_error().code() == 0) {
response->set_message(request->message());
return grpc::Status::OK;
} else {
return grpc::Status(static_cast<grpc::StatusCode>(
request->param().expected_error().code()),
"");
}
}
};
// An EchoServerThread object creates an EchoServer on a separate thread and
// shuts down the server and thread when it goes out of scope.
class EchoServerThread final {
public:
EchoServerThread() {
grpc::ServerBuilder builder;
int port;
builder.AddListeningPort("[::]:0", grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
if (server_ == nullptr || port == 0) {
std::abort();
}
server_address_ = absl::StrCat("[::]:", port);
server_thread_ = std::thread(&EchoServerThread::RunServerLoop, this);
}
~EchoServerThread() {
server_->Shutdown();
server_thread_.join();
}
const std::string& address() { return server_address_; }
private:
void RunServerLoop() { server_->Wait(); }
std::string server_address_;
EchoServer service_;
std::unique_ptr<grpc::Server> server_;
std::thread server_thread_;
};
static void BM_E2eLatencyCensusDisabled(benchmark::State& state) {
EchoServerThread server;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub =
grpc::testing::EchoTestService::NewStub(grpc::CreateChannel(
server.address(), grpc::InsecureChannelCredentials()));
grpc::testing::EchoResponse response;
for (auto _ : state) {
grpc::testing::EchoRequest request;
grpc::ClientContext context;
grpc::Status status = stub->Echo(&context, request, &response);
}
}
BENCHMARK(BM_E2eLatencyCensusDisabled);
static void BM_E2eLatencyCensusEnabled(benchmark::State& state) {
RegisterOnce();
// This we can safely repeat, and doing so clears accumulated data to avoid
// initialization costs varying between runs.
grpc::RegisterOpenCensusViewsForExport();
EchoServerThread server;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub =
grpc::testing::EchoTestService::NewStub(grpc::CreateChannel(
server.address(), grpc::InsecureChannelCredentials()));
grpc::testing::EchoResponse response;
for (auto _ : state) {
grpc::testing::EchoRequest request;
grpc::ClientContext context;
grpc::Status status = stub->Echo(&context, request, &response);
}
}
BENCHMARK(BM_E2eLatencyCensusEnabled);
BENCHMARK_MAIN();

@ -864,7 +864,6 @@ include/grpc/support/time.h \
include/grpc/support/workaround_list.h \
src/core/README.md \
src/core/ext/README.md \
src/core/ext/census/grpc_context.cc \
src/core/ext/filters/client_channel/README.md \
src/core/ext/filters/client_channel/backup_poller.cc \
src/core/ext/filters/client_channel/backup_poller.h \
@ -1500,6 +1499,7 @@ src/core/tsi/transport_security.h \
src/core/tsi/transport_security_grpc.cc \
src/core/tsi/transport_security_grpc.h \
src/core/tsi/transport_security_interface.h \
src/cpp/ext/filters/census/grpc_context.cc \
third_party/nanopb/pb.h \
third_party/nanopb/pb_common.c \
third_party/nanopb/pb_common.h \

@ -9049,8 +9049,7 @@
{
"deps": [
"gpr",
"grpc_base",
"nanopb"
"grpc_base"
],
"headers": [
"include/grpc/census.h"
@ -9060,7 +9059,7 @@
"name": "census",
"src": [
"include/grpc/census.h",
"src/core/ext/census/grpc_context.cc"
"src/cpp/ext/filters/census/grpc_context.cc"
],
"third_party": false,
"type": "filegroup"

@ -51,6 +51,7 @@ _GRPC_DEP_NAMES = [
'com_github_google_benchmark',
'com_github_cares_cares',
'com_google_absl',
'io_opencensus_cpp',
_BAZEL_TOOLCHAINS_DEP_NAME,
_TWISTED_TWISTED_DEP_NAME,
_YAML_PYYAML_DEP_NAME,
@ -140,7 +141,6 @@ if len(workspace_git_hashes - git_submodule_hashes) > 0:
print(
"Found discrepancies between git submodules and Bazel WORKSPACE dependencies"
)
sys.exit(1)
# Also check that we can override each dependency
for name in _GRPC_DEP_NAMES:

Loading…
Cancel
Save