mirror of https://github.com/grpc/grpc.git
commit
a1bb3605d4
287 changed files with 13443 additions and 3667 deletions
@ -0,0 +1,124 @@ |
||||
#
|
||||
# Copyright 2015 gRPC authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# TODO(jtattermusch): Remove the hack to workaround protobuf bug. See https://github.com/protocolbuffers/protobuf/issues/12439
|
||||
# Hack: protobuf currently doesn't declare it's absl dependencies when protobuf.pc pkgconfig file is used.
|
||||
PROTOBUF_ABSL_DEPS = absl_absl_check absl_absl_log absl_algorithm absl_base absl_bind_front absl_bits absl_btree absl_cleanup absl_cord absl_core_headers absl_debugging absl_die_if_null absl_dynamic_annotations absl_flags absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_layout absl_log_initialize absl_log_severity absl_memory absl_node_hash_map absl_node_hash_set absl_optional absl_span absl_status absl_statusor absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant
|
||||
# TODO(jtattermusch): Remove the hack to workaround protobuf/utf8_range bug. See https://github.com/protocolbuffers/utf8_range/issues/20
|
||||
# Hack: utf8_range (which is protobuf's dependency) currently doesn't have a pkgconfig file, so we need to explicitly
|
||||
# tweak the list of libraries to link against to fix the build.
|
||||
PROTOBUF_UTF8_RANGE_LINK_LIBS = -lutf8_validity
|
||||
OPENTELEMETRY_LINK_LIBS = -lopentelemetry_metrics -lopentelemetry_exporter_prometheus -lopentelemetry_common -lopentelemetry_resources -lprometheus-cpp-pull -lprometheus-cpp-core
|
||||
|
||||
HOST_SYSTEM = $(shell uname | cut -f 1 -d_)
|
||||
SYSTEM ?= $(HOST_SYSTEM)
|
||||
CXX = g++
|
||||
CPPFLAGS += `pkg-config --cflags protobuf grpc absl_flags absl_flags_parse`
|
||||
ifeq ($(SYSTEM),Darwin) |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs --static protobuf grpc++ grpcpp_otel_plugin absl_flags absl_flags_parse $(PROTOBUF_ABSL_DEPS)` \
|
||||
$(PROTOBUF_UTF8_RANGE_LINK_LIBS) \
|
||||
$(OPENTELEMETRY_LINK_LIBS) \
|
||||
-pthread \
|
||||
-lgrpc++_reflection \
|
||||
-ldl
|
||||
else |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs --static protobuf grpc++ grpcpp_otel_plugin absl_flags absl_flags_parse $(PROTOBUF_ABSL_DEPS)` \
|
||||
$(PROTOBUF_UTF8_RANGE_LINK_LIBS) \
|
||||
$(OPENTELEMETRY_LINK_LIBS) \
|
||||
-pthread \
|
||||
-Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed \
|
||||
-ldl
|
||||
endif |
||||
PROTOC = protoc
|
||||
GRPC_CPP_PLUGIN = grpc_cpp_plugin
|
||||
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
|
||||
|
||||
PROTOS_PATH = ../../protos
|
||||
|
||||
vpath %.proto $(PROTOS_PATH) |
||||
|
||||
all: system-check greeter_callback_client greeter_callback_server |
||||
|
||||
greeter_callback_client: util.o helloworld.pb.o helloworld.grpc.pb.o greeter_callback_client.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
greeter_callback_server: util.o helloworld.pb.o helloworld.grpc.pb.o greeter_callback_server.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
.PRECIOUS: %.grpc.pb.cc |
||||
%.grpc.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $<
|
||||
|
||||
.PRECIOUS: %.pb.cc |
||||
%.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $<
|
||||
|
||||
clean: |
||||
rm -f *.o *.pb.cc *.pb.h greeter_callback_client greeter_callback_server
|
||||
|
||||
|
||||
# The following is to test your system and ensure a smoother experience.
|
||||
# They are by no means necessary to actually compile a grpc-enabled software.
|
||||
|
||||
PROTOC_CMD = which $(PROTOC)
|
||||
PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q 'libprotoc.3\|libprotoc [0-9][0-9]\.'
|
||||
PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN)
|
||||
HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false)
|
||||
ifeq ($(HAS_PROTOC),true) |
||||
HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false)
|
||||
endif |
||||
HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false)
|
||||
|
||||
SYSTEM_OK = false
|
||||
ifeq ($(HAS_VALID_PROTOC),true) |
||||
ifeq ($(HAS_PLUGIN),true) |
||||
SYSTEM_OK = true
|
||||
endif |
||||
endif |
||||
|
||||
system-check: |
||||
ifneq ($(HAS_VALID_PROTOC),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have protoc 3.0.0 or newer installed in your path."
|
||||
@echo "Please install an up-to-date version of Google protocol buffers."
|
||||
@echo "You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/protocolbuffers/protobuf/releases"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to evaluate your version of protoc:"
|
||||
@echo
|
||||
-$(PROTOC) --version
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(HAS_PLUGIN),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have the grpc c++ protobuf plugin installed in your path."
|
||||
@echo "Please install grpc. You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/grpc/grpc"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to detect if you have the plugin:"
|
||||
@echo
|
||||
-which $(GRPC_CPP_PLUGIN)
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(SYSTEM_OK),true) |
||||
@false
|
||||
endif |
@ -0,0 +1,44 @@ |
||||
# Copyright 2023 the gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
licenses(["notice"]) |
||||
|
||||
cc_binary( |
||||
name = "greeter_callback_client", |
||||
srcs = ["greeter_callback_client.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpcpp_otel_plugin", |
||||
"//examples/cpp/otel:util", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
"@io_opentelemetry_cpp//exporters/ostream:ostream_metric_exporter", |
||||
"@io_opentelemetry_cpp//sdk/src/metrics", |
||||
], |
||||
) |
||||
|
||||
cc_binary( |
||||
name = "greeter_callback_server", |
||||
srcs = ["greeter_callback_server.cc"], |
||||
defines = ["BAZEL_BUILD"], |
||||
deps = [ |
||||
"//:grpcpp_otel_plugin", |
||||
"//examples/cpp/otel:util", |
||||
"@com_google_absl//absl/flags:flag", |
||||
"@com_google_absl//absl/flags:parse", |
||||
"@com_google_absl//absl/strings:str_format", |
||||
"@io_opentelemetry_cpp//exporters/ostream:ostream_metric_exporter", |
||||
"@io_opentelemetry_cpp//sdk/src/metrics", |
||||
], |
||||
) |
@ -0,0 +1,85 @@ |
||||
# Copyright 2018 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
# |
||||
# cmake build file for C++ gRPC OpenTelemetry example. |
||||
# Assumes absl, protobuf, prometheus-cpp, opentelemetry-cpp and gRPC (with -DgRPC_BUILD_OPENTELEMETRY_PLUGIN=ON) have been installed using cmake. |
||||
# See cmake_externalproject/CMakeLists.txt for all-in-one cmake build |
||||
# that automatically builds all the dependencies before building helloworld. |
||||
|
||||
cmake_minimum_required(VERSION 3.13) |
||||
|
||||
project(grpc_opentelemetry_example C CXX) |
||||
|
||||
include(../../cmake/common.cmake) |
||||
|
||||
# Find opentelemetry-cpp package |
||||
find_package(opentelemetry-cpp CONFIG REQUIRED) |
||||
|
||||
# Proto file |
||||
get_filename_component(hw_proto "../../../protos/helloworld.proto" ABSOLUTE) |
||||
get_filename_component(hw_proto_path "${hw_proto}" PATH) |
||||
|
||||
# Generated sources |
||||
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc") |
||||
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h") |
||||
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc") |
||||
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h") |
||||
add_custom_command( |
||||
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}" |
||||
COMMAND ${_PROTOBUF_PROTOC} |
||||
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}" |
||||
-I "${hw_proto_path}" |
||||
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}" |
||||
"${hw_proto}" |
||||
DEPENDS "${hw_proto}") |
||||
|
||||
# Include generated *.pb.h files |
||||
include_directories("${CMAKE_CURRENT_BINARY_DIR}") |
||||
include_directories("${CMAKE_SOURCE_DIR}") |
||||
|
||||
# hw_grpc_proto |
||||
add_library(hw_grpc_proto |
||||
${hw_grpc_srcs} |
||||
${hw_grpc_hdrs} |
||||
${hw_proto_srcs} |
||||
${hw_proto_hdrs}) |
||||
target_link_libraries(hw_grpc_proto |
||||
${_REFLECTION} |
||||
${_GRPC_GRPCPP} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
|
||||
# util |
||||
add_library(util |
||||
"../util.cc") |
||||
target_link_libraries(util |
||||
hw_grpc_proto |
||||
opentelemetry-cpp::metrics |
||||
${_GRPC_GRPCPP} |
||||
${_REFLECTION} |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
|
||||
# Targets greeter_callback_(client|server) |
||||
foreach(_target |
||||
greeter_callback_client greeter_callback_server) |
||||
add_executable(${_target} "${_target}.cc") |
||||
target_link_libraries(${_target} |
||||
absl::flags |
||||
absl::flags_parse |
||||
opentelemetry-cpp::metrics |
||||
opentelemetry-cpp::ostream_metrics_exporter |
||||
gRPC::grpcpp_otel_plugin |
||||
util |
||||
${_PROTOBUF_LIBPROTOBUF}) |
||||
endforeach() |
@ -0,0 +1,127 @@ |
||||
#
|
||||
# Copyright 2015 gRPC authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
# TODO(jtattermusch): Remove the hack to workaround protobuf bug. See https://github.com/protocolbuffers/protobuf/issues/12439
|
||||
# Hack: protobuf currently doesn't declare it's absl dependencies when protobuf.pc pkgconfig file is used.
|
||||
PROTOBUF_ABSL_DEPS = absl_absl_check absl_absl_log absl_algorithm absl_base absl_bind_front absl_bits absl_btree absl_cleanup absl_cord absl_core_headers absl_debugging absl_die_if_null absl_dynamic_annotations absl_flags absl_flat_hash_map absl_flat_hash_set absl_function_ref absl_hash absl_layout absl_log_initialize absl_log_severity absl_memory absl_node_hash_map absl_node_hash_set absl_optional absl_span absl_status absl_statusor absl_strings absl_synchronization absl_time absl_type_traits absl_utility absl_variant
|
||||
# TODO(jtattermusch): Remove the hack to workaround protobuf/utf8_range bug. See https://github.com/protocolbuffers/utf8_range/issues/20
|
||||
# Hack: utf8_range (which is protobuf's dependency) currently doesn't have a pkgconfig file, so we need to explicitly
|
||||
# tweak the list of libraries to link against to fix the build.
|
||||
PROTOBUF_UTF8_RANGE_LINK_LIBS = -lutf8_validity
|
||||
OPENTELEMETRY_LINK_LIBS = -lopentelemetry_metrics -lopentelemetry_exporter_ostream_metrics -lopentelemetry_resources -lopentelemetry_common
|
||||
|
||||
HOST_SYSTEM = $(shell uname | cut -f 1 -d_)
|
||||
SYSTEM ?= $(HOST_SYSTEM)
|
||||
CXX = g++
|
||||
CPPFLAGS += `pkg-config --cflags protobuf grpc absl_flags absl_flags_parse`
|
||||
ifeq ($(SYSTEM),Darwin) |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs --static protobuf grpc++ grpcpp_otel_plugin absl_flags absl_flags_parse $(PROTOBUF_ABSL_DEPS)` \
|
||||
$(PROTOBUF_UTF8_RANGE_LINK_LIBS) \
|
||||
$(OPENTELEMETRY_LINK_LIBS) \
|
||||
-pthread \
|
||||
-lgrpc++_reflection \
|
||||
-ldl
|
||||
else |
||||
LDFLAGS += -L/usr/local/lib `pkg-config --libs --static protobuf grpc++ grpcpp_otel_plugin absl_flags absl_flags_parse $(PROTOBUF_ABSL_DEPS)` \
|
||||
$(PROTOBUF_UTF8_RANGE_LINK_LIBS) \
|
||||
$(OPENTELEMETRY_LINK_LIBS) \
|
||||
-pthread \
|
||||
-Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed \
|
||||
-ldl
|
||||
endif |
||||
PROTOC = protoc
|
||||
GRPC_CPP_PLUGIN = grpc_cpp_plugin
|
||||
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
|
||||
|
||||
PROTOS_PATH = ../../../protos
|
||||
|
||||
vpath %.proto $(PROTOS_PATH) |
||||
|
||||
all: system-check greeter_callback_client greeter_callback_server |
||||
|
||||
greeter_callback_client: util.o helloworld.pb.o helloworld.grpc.pb.o greeter_callback_client.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
greeter_callback_server: util.o helloworld.pb.o helloworld.grpc.pb.o greeter_callback_server.o |
||||
$(CXX) $^ $(LDFLAGS) -o $@
|
||||
|
||||
.PRECIOUS: %.grpc.pb.cc |
||||
%.grpc.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $<
|
||||
|
||||
.PRECIOUS: %.pb.cc |
||||
%.pb.cc: %.proto |
||||
$(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $<
|
||||
|
||||
util.o: ../util.cc |
||||
$(CXX) $^ -I . -I $(CPPFLAGS) -c -o $@
|
||||
|
||||
clean: |
||||
rm -f *.o *.pb.cc *.pb.h greeter_callback_client greeter_callback_server
|
||||
|
||||
|
||||
# The following is to test your system and ensure a smoother experience.
|
||||
# They are by no means necessary to actually compile a grpc-enabled software.
|
||||
|
||||
PROTOC_CMD = which $(PROTOC)
|
||||
PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q 'libprotoc.3\|libprotoc [0-9][0-9]\.'
|
||||
PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN)
|
||||
HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false)
|
||||
ifeq ($(HAS_PROTOC),true) |
||||
HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false)
|
||||
endif |
||||
HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false)
|
||||
|
||||
SYSTEM_OK = false
|
||||
ifeq ($(HAS_VALID_PROTOC),true) |
||||
ifeq ($(HAS_PLUGIN),true) |
||||
SYSTEM_OK = true
|
||||
endif |
||||
endif |
||||
|
||||
system-check: |
||||
ifneq ($(HAS_VALID_PROTOC),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have protoc 3.0.0 or newer installed in your path."
|
||||
@echo "Please install an up-to-date version of Google protocol buffers."
|
||||
@echo "You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/protocolbuffers/protobuf/releases"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to evaluate your version of protoc:"
|
||||
@echo
|
||||
-$(PROTOC) --version
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(HAS_PLUGIN),true) |
||||
@echo " DEPENDENCY ERROR"
|
||||
@echo
|
||||
@echo "You don't have the grpc c++ protobuf plugin installed in your path."
|
||||
@echo "Please install grpc. You can find it here:"
|
||||
@echo
|
||||
@echo " https://github.com/grpc/grpc"
|
||||
@echo
|
||||
@echo "Here is what I get when trying to detect if you have the plugin:"
|
||||
@echo
|
||||
-which $(GRPC_CPP_PLUGIN)
|
||||
@echo
|
||||
@echo
|
||||
endif |
||||
ifneq ($(SYSTEM_OK),true) |
||||
@false
|
||||
endif |
@ -0,0 +1,79 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil
|
||||
// version. Refer
|
||||
// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042.
|
||||
#ifndef HAVE_ABSEIL |
||||
#define HAVE_ABSEIL |
||||
#endif |
||||
|
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "opentelemetry/exporters/ostream/metric_exporter.h" |
||||
#include "opentelemetry/exporters/ostream/metric_exporter_factory.h" |
||||
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" |
||||
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_factory.h" |
||||
#include "opentelemetry/sdk/metrics/meter_provider.h" |
||||
|
||||
#include <grpcpp/ext/otel_plugin.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/cpp/otel/util.h" |
||||
#else |
||||
#include "../util.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(std::string, target, "localhost:50051", "Server address"); |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
// Register a global gRPC OpenTelemetry plugin configured with an ostream
|
||||
// exporter.
|
||||
auto ostream_exporter = |
||||
opentelemetry::exporter::metrics::OStreamMetricExporterFactory::Create(); |
||||
opentelemetry::sdk::metrics::PeriodicExportingMetricReaderOptions |
||||
reader_options; |
||||
reader_options.export_interval_millis = std::chrono::milliseconds(1000); |
||||
reader_options.export_timeout_millis = std::chrono::milliseconds(500); |
||||
auto reader = |
||||
opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create( |
||||
std::move(ostream_exporter), reader_options); |
||||
auto meter_provider = |
||||
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(); |
||||
// The default histogram boundaries are not granular enough for RPCs. Override
|
||||
// the "grpc.client.attempt.duration" view as recommended by
|
||||
// https://github.com/grpc/proposal/blob/master/A66-otel-stats.md.
|
||||
AddLatencyView(meter_provider.get(), "grpc.client.attempt.duration", "s"); |
||||
meter_provider->AddMetricReader(std::move(reader)); |
||||
auto status = grpc::OpenTelemetryPluginBuilder() |
||||
.SetMeterProvider(std::move(meter_provider)) |
||||
.BuildAndRegisterGlobal(); |
||||
if (!status.ok()) { |
||||
std::cerr << "Failed to register gRPC OpenTelemetry Plugin: " |
||||
<< status.ToString() << std::endl; |
||||
return static_cast<int>(status.code()); |
||||
} |
||||
|
||||
// Continuously send RPCs every second.
|
||||
RunClient(absl::GetFlag(FLAGS_target)); |
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,78 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2021 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
// Explicitly define HAVE_ABSEIL to avoid conflict with OTel's Abseil
|
||||
// version. Refer
|
||||
// https://github.com/open-telemetry/opentelemetry-cpp/issues/1042.
|
||||
#ifndef HAVE_ABSEIL |
||||
#define HAVE_ABSEIL |
||||
#endif |
||||
|
||||
#include <iostream> |
||||
#include <memory> |
||||
#include <string> |
||||
|
||||
#include "absl/flags/flag.h" |
||||
#include "absl/flags/parse.h" |
||||
#include "opentelemetry/exporters/ostream/metric_exporter.h" |
||||
#include "opentelemetry/exporters/ostream/metric_exporter_factory.h" |
||||
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" |
||||
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader_factory.h" |
||||
#include "opentelemetry/sdk/metrics/meter_provider.h" |
||||
|
||||
#include <grpcpp/ext/otel_plugin.h> |
||||
|
||||
#ifdef BAZEL_BUILD |
||||
#include "examples/cpp/otel/util.h" |
||||
#else |
||||
#include "../util.h" |
||||
#endif |
||||
|
||||
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); |
||||
|
||||
int main(int argc, char** argv) { |
||||
absl::ParseCommandLine(argc, argv); |
||||
// Register a global gRPC OpenTelemetry plugin configured with an ostream
|
||||
// exporter.
|
||||
auto ostream_exporter = |
||||
opentelemetry::exporter::metrics::OStreamMetricExporterFactory::Create(); |
||||
opentelemetry::sdk::metrics::PeriodicExportingMetricReaderOptions |
||||
reader_options; |
||||
reader_options.export_interval_millis = std::chrono::milliseconds(1000); |
||||
reader_options.export_timeout_millis = std::chrono::milliseconds(500); |
||||
auto reader = |
||||
opentelemetry::sdk::metrics::PeriodicExportingMetricReaderFactory::Create( |
||||
std::move(ostream_exporter), reader_options); |
||||
auto meter_provider = |
||||
std::make_shared<opentelemetry::sdk::metrics::MeterProvider>(); |
||||
// The default histogram boundaries are not granular enough for RPCs. Override
|
||||
// the "grpc.server.call.duration" view as recommended by
|
||||
// https://github.com/grpc/proposal/blob/master/A66-otel-stats.md.
|
||||
AddLatencyView(meter_provider.get(), "grpc.server.call.duration", "s"); |
||||
meter_provider->AddMetricReader(std::move(reader)); |
||||
auto status = grpc::OpenTelemetryPluginBuilder() |
||||
.SetMeterProvider(std::move(meter_provider)) |
||||
.BuildAndRegisterGlobal(); |
||||
if (!status.ok()) { |
||||
std::cerr << "Failed to register gRPC OpenTelemetry Plugin: " |
||||
<< status.ToString() << std::endl; |
||||
return static_cast<int>(status.code()); |
||||
} |
||||
RunServer(absl::GetFlag(FLAGS_port)); |
||||
return 0; |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,243 @@ |
||||
//
|
||||
// Copyright 2015 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H |
||||
#define GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include "src/core/client_channel/client_channel_factory.h" |
||||
#include "src/core/client_channel/config_selector.h" |
||||
#include "src/core/client_channel/subchannel.h" |
||||
#include "src/core/ext/filters/channel_idle/idle_filter_state.h" |
||||
#include "src/core/lib/gprpp/single_set_ptr.h" |
||||
#include "src/core/lib/promise/observable.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/load_balancing/lb_policy.h" |
||||
#include "src/core/resolver/resolver.h" |
||||
#include "src/core/service_config/service_config.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class ClientChannel : public Channel { |
||||
public: |
||||
using PickerObservable = |
||||
Observable<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>>; |
||||
|
||||
class CallDestinationFactory { |
||||
public: |
||||
struct RawPointerChannelArgTag {}; |
||||
|
||||
static absl::string_view ChannelArgName() { |
||||
return "grpc.internal.client_channel_call_destination"; |
||||
} |
||||
|
||||
virtual RefCountedPtr<UnstartedCallDestination> CreateCallDestination( |
||||
PickerObservable) = 0; |
||||
|
||||
protected: |
||||
~CallDestinationFactory() = default; |
||||
}; |
||||
|
||||
static absl::StatusOr<OrphanablePtr<Channel>> Create( |
||||
std::string target, ChannelArgs channel_args); |
||||
|
||||
// Do not instantiate directly -- use Create() instead.
|
||||
ClientChannel(std::string target_uri, ChannelArgs args, |
||||
std::string uri_to_resolve, |
||||
RefCountedPtr<ServiceConfig> default_service_config, |
||||
ClientChannelFactory* client_channel_factory, |
||||
CallDestinationFactory* call_destination_factory); |
||||
|
||||
~ClientChannel() override; |
||||
|
||||
void Orphan() override; |
||||
|
||||
grpc_call* CreateCall(grpc_call* parent_call, uint32_t propagation_mask, |
||||
grpc_completion_queue* cq, |
||||
grpc_pollset_set* /*pollset_set_alternative*/, |
||||
Slice path, absl::optional<Slice> authority, |
||||
Timestamp deadline, bool registered_method) override; |
||||
|
||||
CallInitiator CreateCall(ClientMetadataHandle client_initial_metadata); |
||||
|
||||
grpc_event_engine::experimental::EventEngine* event_engine() const override { |
||||
return event_engine_.get(); |
||||
} |
||||
|
||||
// TODO(ctiller): lame channels
|
||||
bool IsLame() const override { return false; } |
||||
|
||||
bool SupportsConnectivityWatcher() const override { return true; } |
||||
|
||||
// Returns the current connectivity state. If try_to_connect is true,
|
||||
// triggers a connection attempt if not already connected.
|
||||
grpc_connectivity_state CheckConnectivityState(bool try_to_connect) override; |
||||
|
||||
void WatchConnectivityState(grpc_connectivity_state last_observed_state, |
||||
Timestamp deadline, grpc_completion_queue* cq, |
||||
void* tag) override; |
||||
|
||||
// Starts and stops a connectivity watch. The watcher will be initially
|
||||
// notified as soon as the state changes from initial_state and then on
|
||||
// every subsequent state change until either the watch is stopped or
|
||||
// it is notified that the state has changed to SHUTDOWN.
|
||||
//
|
||||
// This is intended to be used when starting watches from code inside of
|
||||
// C-core (e.g., for a nested control plane channel for things like xds).
|
||||
void AddConnectivityWatcher( |
||||
grpc_connectivity_state initial_state, |
||||
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) override; |
||||
void RemoveConnectivityWatcher( |
||||
AsyncConnectivityStateWatcherInterface* watcher) override; |
||||
|
||||
void GetInfo(const grpc_channel_info* channel_info) override; |
||||
|
||||
void ResetConnectionBackoff() override; |
||||
|
||||
void Ping(grpc_completion_queue* cq, void* tag) override; |
||||
|
||||
// Flag that this object gets stored in channel args as a raw pointer.
|
||||
struct RawPointerChannelArgTag {}; |
||||
static absl::string_view ChannelArgName() { |
||||
return "grpc.internal.client_channel"; |
||||
} |
||||
|
||||
private: |
||||
class ClientChannelControlHelper; |
||||
class ResolverResultHandler; |
||||
class SubchannelWrapper; |
||||
|
||||
void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
void DestroyResolverAndLbPolicyLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void OnResolverResultChangedLocked(Resolver::Result result) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
void OnResolverErrorLocked(absl::Status status) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
absl::Status CreateOrUpdateLbPolicyLocked( |
||||
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, |
||||
const absl::optional<std::string>& health_check_service_name, |
||||
Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( |
||||
const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void UpdateServiceConfigInControlPlaneLocked( |
||||
RefCountedPtr<ServiceConfig> service_config, |
||||
RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void UpdateServiceConfigInDataPlaneLocked() |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void UpdateStateLocked(grpc_connectivity_state state, |
||||
const absl::Status& status, const char* reason) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void UpdateStateAndPickerLocked( |
||||
grpc_connectivity_state state, const absl::Status& status, |
||||
const char* reason, |
||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) |
||||
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); |
||||
|
||||
void StartIdleTimer(); |
||||
|
||||
// Applies service config settings from config_selector to the call.
|
||||
// May modify call context and client_initial_metadata.
|
||||
absl::Status ApplyServiceConfigToCall( |
||||
ConfigSelector& config_selector, |
||||
ClientMetadata& client_initial_metadata) const; |
||||
|
||||
const ChannelArgs channel_args_; |
||||
const std::shared_ptr<grpc_event_engine::experimental::EventEngine> |
||||
event_engine_; |
||||
const std::string uri_to_resolve_; |
||||
const size_t service_config_parser_index_; |
||||
const RefCountedPtr<ServiceConfig> default_service_config_; |
||||
ClientChannelFactory* const client_channel_factory_; |
||||
const std::string default_authority_; |
||||
channelz::ChannelNode* const channelz_node_; |
||||
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_; |
||||
|
||||
//
|
||||
// Idleness state.
|
||||
//
|
||||
const Duration idle_timeout_; |
||||
IdleFilterState idle_state_{false}; |
||||
SingleSetPtr<Activity, typename ActivityPtr::deleter_type> idle_activity_; |
||||
|
||||
//
|
||||
// Fields related to name resolution.
|
||||
//
|
||||
struct ResolverDataForCalls { |
||||
RefCountedPtr<ConfigSelector> config_selector; |
||||
RefCountedPtr<UnstartedCallDestination> call_destination; |
||||
}; |
||||
Observable<absl::StatusOr<ResolverDataForCalls>> resolver_data_for_calls_; |
||||
|
||||
//
|
||||
// Fields related to LB picks.
|
||||
//
|
||||
PickerObservable picker_; |
||||
const RefCountedPtr<UnstartedCallDestination> call_destination_; |
||||
|
||||
//
|
||||
// Fields used in the control plane. Guarded by work_serializer.
|
||||
//
|
||||
std::shared_ptr<WorkSerializer> work_serializer_; |
||||
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_); |
||||
OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(*work_serializer_); |
||||
bool previous_resolution_contained_addresses_ |
||||
ABSL_GUARDED_BY(*work_serializer_) = false; |
||||
RefCountedPtr<ServiceConfig> saved_service_config_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
RefCountedPtr<ConfigSelector> saved_config_selector_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
OrphanablePtr<LoadBalancingPolicy> lb_policy_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
// The number of SubchannelWrapper instances referencing a given Subchannel.
|
||||
std::map<Subchannel*, int> subchannel_refcount_map_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
// The set of SubchannelWrappers that currently exist.
|
||||
// No need to hold a ref, since the set is updated in the control-plane
|
||||
// work_serializer when the SubchannelWrappers are created and destroyed.
|
||||
absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_ |
||||
ABSL_GUARDED_BY(*work_serializer_); |
||||
int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1; |
||||
absl::Status disconnect_error_ ABSL_GUARDED_BY(*work_serializer_); |
||||
|
||||
//
|
||||
// Fields accessed via GetChannelInfo().
|
||||
//
|
||||
Mutex info_mu_; |
||||
std::string info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); |
||||
std::string info_service_config_json_ ABSL_GUARDED_BY(info_mu_); |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_H
|
@ -0,0 +1,335 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/client_channel/load_balanced_call_destination.h" |
||||
|
||||
#include "src/core/client_channel/client_channel.h" |
||||
#include "src/core/client_channel/client_channel_internal.h" |
||||
#include "src/core/client_channel/subchannel.h" |
||||
#include "src/core/lib/channel/status_util.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/telemetry/call_tracer.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Defined in legacy client channel filter.
|
||||
// TODO(roth): Move these here when we remove the legacy filter.
|
||||
extern TraceFlag grpc_client_channel_trace; |
||||
extern TraceFlag grpc_client_channel_call_trace; |
||||
extern TraceFlag grpc_client_channel_lb_call_trace; |
||||
|
||||
namespace { |
||||
|
||||
class LbMetadata : public LoadBalancingPolicy::MetadataInterface { |
||||
public: |
||||
explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} |
||||
|
||||
void Add(absl::string_view key, absl::string_view value) override { |
||||
if (batch_ == nullptr) return; |
||||
// Gross, egregious hack to support legacy grpclb behavior.
|
||||
// TODO(ctiller): Use a promise context for this once that plumbing is done.
|
||||
if (key == GrpcLbClientStatsMetadata::key()) { |
||||
batch_->Set( |
||||
GrpcLbClientStatsMetadata(), |
||||
const_cast<GrpcLbClientStats*>( |
||||
reinterpret_cast<const GrpcLbClientStats*>(value.data()))); |
||||
return; |
||||
} |
||||
batch_->Append(key, Slice::FromStaticString(value), |
||||
[key](absl::string_view error, const Slice& value) { |
||||
gpr_log(GPR_ERROR, "%s", |
||||
absl::StrCat(error, " key:", key, |
||||
" value:", value.as_string_view()) |
||||
.c_str()); |
||||
}); |
||||
} |
||||
|
||||
std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector() |
||||
override { |
||||
if (batch_ == nullptr) return {}; |
||||
Encoder encoder; |
||||
batch_->Encode(&encoder); |
||||
return encoder.Take(); |
||||
} |
||||
|
||||
absl::optional<absl::string_view> Lookup(absl::string_view key, |
||||
std::string* buffer) const override { |
||||
if (batch_ == nullptr) return absl::nullopt; |
||||
return batch_->GetStringValue(key, buffer); |
||||
} |
||||
|
||||
private: |
||||
class Encoder { |
||||
public: |
||||
void Encode(const Slice& key, const Slice& value) { |
||||
out_.emplace_back(std::string(key.as_string_view()), |
||||
std::string(value.as_string_view())); |
||||
} |
||||
|
||||
template <class Which> |
||||
void Encode(Which, const typename Which::ValueType& value) { |
||||
auto value_slice = Which::Encode(value); |
||||
out_.emplace_back(std::string(Which::key()), |
||||
std::string(value_slice.as_string_view())); |
||||
} |
||||
|
||||
void Encode(GrpcTimeoutMetadata, |
||||
const typename GrpcTimeoutMetadata::ValueType&) {} |
||||
void Encode(HttpPathMetadata, const Slice&) {} |
||||
void Encode(HttpMethodMetadata, |
||||
const typename HttpMethodMetadata::ValueType&) {} |
||||
|
||||
std::vector<std::pair<std::string, std::string>> Take() { |
||||
return std::move(out_); |
||||
} |
||||
|
||||
private: |
||||
std::vector<std::pair<std::string, std::string>> out_; |
||||
}; |
||||
|
||||
grpc_metadata_batch* batch_; |
||||
}; |
||||
|
||||
void MaybeCreateCallAttemptTracer(bool is_transparent_retry) { |
||||
auto* call_tracer = MaybeGetContext<ClientCallTracer>(); |
||||
if (call_tracer == nullptr) return; |
||||
auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry); |
||||
SetContext<CallTracerInterface>(tracer); |
||||
} |
||||
|
||||
class LbCallState : public ClientChannelLbCallState { |
||||
public: |
||||
void* Alloc(size_t size) override { return GetContext<Arena>()->Alloc(size); } |
||||
|
||||
// Internal API to allow first-party LB policies to access per-call
|
||||
// attributes set by the ConfigSelector.
|
||||
ServiceConfigCallData::CallAttributeInterface* GetCallAttribute( |
||||
UniqueTypeName type) const override { |
||||
auto* service_config_call_data = GetContext<ServiceConfigCallData>(); |
||||
return service_config_call_data->GetCallAttribute(type); |
||||
} |
||||
|
||||
ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override { |
||||
auto* legacy_context = GetContext<grpc_call_context_element>(); |
||||
return static_cast<ClientCallTracer::CallAttemptTracer*>( |
||||
legacy_context[GRPC_CONTEXT_CALL_TRACER].value); |
||||
} |
||||
}; |
||||
|
||||
// TODO(roth): Remove this in favor of the gprpp Match() function once
|
||||
// we can do that without breaking lock annotations.
|
||||
template <typename T> |
||||
T HandlePickResult( |
||||
LoadBalancingPolicy::PickResult* result, |
||||
std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func, |
||||
std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func, |
||||
std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func, |
||||
std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) { |
||||
auto* complete_pick = |
||||
absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result); |
||||
if (complete_pick != nullptr) { |
||||
return complete_func(complete_pick); |
||||
} |
||||
auto* queue_pick = |
||||
absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result); |
||||
if (queue_pick != nullptr) { |
||||
return queue_func(queue_pick); |
||||
} |
||||
auto* fail_pick = |
||||
absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result); |
||||
if (fail_pick != nullptr) { |
||||
return fail_func(fail_pick); |
||||
} |
||||
auto* drop_pick = |
||||
absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result); |
||||
CHECK(drop_pick != nullptr); |
||||
return drop_func(drop_pick); |
||||
} |
||||
|
||||
// Does an LB pick for a call. Returns one of the following things:
|
||||
// - Continue{}, meaning to queue the pick
|
||||
// - a non-OK status, meaning to fail the call
|
||||
// - a call destination, meaning that the pick is complete
|
||||
// When the pick is complete, pushes client_initial_metadata onto
|
||||
// call_initiator. Also adds the subchannel call tracker (if any) to
|
||||
// context.
|
||||
LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> PickSubchannel( |
||||
LoadBalancingPolicy::SubchannelPicker& picker, |
||||
UnstartedCallHandler& unstarted_handler) { |
||||
// Perform LB pick.
|
||||
auto& client_initial_metadata = |
||||
unstarted_handler.UnprocessedClientInitialMetadata(); |
||||
LoadBalancingPolicy::PickArgs pick_args; |
||||
Slice* path = client_initial_metadata.get_pointer(HttpPathMetadata()); |
||||
CHECK(path != nullptr); |
||||
pick_args.path = path->as_string_view(); |
||||
LbCallState lb_call_state; |
||||
pick_args.call_state = &lb_call_state; |
||||
LbMetadata initial_metadata(&client_initial_metadata); |
||||
pick_args.initial_metadata = &initial_metadata; |
||||
auto result = picker.Pick(pick_args); |
||||
// Handle result.
|
||||
return HandlePickResult< |
||||
LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>>>( |
||||
&result, |
||||
// CompletePick
|
||||
[&](LoadBalancingPolicy::PickResult::Complete* complete_pick) |
||||
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"client_channel: %sLB pick succeeded: subchannel=%p", |
||||
GetContext<Activity>()->DebugTag().c_str(), |
||||
complete_pick->subchannel.get()); |
||||
} |
||||
CHECK(complete_pick->subchannel != nullptr); |
||||
// Grab a ref to the call destination while we're still
|
||||
// holding the data plane mutex.
|
||||
auto call_destination = |
||||
DownCast<SubchannelInterfaceWithCallDestination*>( |
||||
complete_pick->subchannel.get()) |
||||
->call_destination(); |
||||
// If the subchannel has no call destination (e.g., if the
|
||||
// subchannel has moved out of state READY but the LB policy hasn't
|
||||
// yet seen that change and given us a new picker), then just
|
||||
// queue the pick. We'll try again as soon as we get a new picker.
|
||||
if (call_destination == nullptr) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||
gpr_log(GPR_INFO, |
||||
"client_channel: %ssubchannel returned by LB picker " |
||||
"has no connected subchannel; queueing pick", |
||||
GetContext<Activity>()->DebugTag().c_str()); |
||||
} |
||||
return Continue{}; |
||||
} |
||||
// If the LB policy returned a call tracker, inform it that the
|
||||
// call is starting and add it to context, so that we can notify
|
||||
// it when the call finishes.
|
||||
if (complete_pick->subchannel_call_tracker != nullptr) { |
||||
complete_pick->subchannel_call_tracker->Start(); |
||||
SetContext(complete_pick->subchannel_call_tracker.release()); |
||||
} |
||||
// Return the connected subchannel.
|
||||
return call_destination; |
||||
}, |
||||
// QueuePick
|
||||
[&](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||
gpr_log(GPR_INFO, "client_channel: %sLB pick queued", |
||||
GetContext<Activity>()->DebugTag().c_str()); |
||||
} |
||||
return Continue{}; |
||||
}, |
||||
// FailPick
|
||||
[&](LoadBalancingPolicy::PickResult::Fail* fail_pick) |
||||
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||
gpr_log(GPR_INFO, "client_channel: %sLB pick failed: %s", |
||||
GetContext<Activity>()->DebugTag().c_str(), |
||||
fail_pick->status.ToString().c_str()); |
||||
} |
||||
// If wait_for_ready is false, then the error indicates the RPC
|
||||
// attempt's final status.
|
||||
if (!unstarted_handler.UnprocessedClientInitialMetadata() |
||||
.GetOrCreatePointer(WaitForReady()) |
||||
->value) { |
||||
return MaybeRewriteIllegalStatusCode(std::move(fail_pick->status), |
||||
"LB pick"); |
||||
} |
||||
// If wait_for_ready is true, then queue to retry when we get a new
|
||||
// picker.
|
||||
return Continue{}; |
||||
}, |
||||
// DropPick
|
||||
[&](LoadBalancingPolicy::PickResult::Drop* drop_pick) |
||||
-> LoopCtl<absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>> { |
||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { |
||||
gpr_log(GPR_INFO, "client_channel: %sLB pick dropped: %s", |
||||
GetContext<Activity>()->DebugTag().c_str(), |
||||
drop_pick->status.ToString().c_str()); |
||||
} |
||||
return grpc_error_set_int(MaybeRewriteIllegalStatusCode( |
||||
std::move(drop_pick->status), "LB drop"), |
||||
StatusIntProperty::kLbPolicyDrop, 1); |
||||
}); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
void LoadBalancedCallDestination::StartCall( |
||||
UnstartedCallHandler unstarted_handler) { |
||||
// If there is a call tracer, create a call attempt tracer.
|
||||
bool* is_transparent_retry_metadata = |
||||
unstarted_handler.UnprocessedClientInitialMetadata().get_pointer( |
||||
IsTransparentRetry()); |
||||
bool is_transparent_retry = is_transparent_retry_metadata != nullptr |
||||
? *is_transparent_retry_metadata |
||||
: false; |
||||
MaybeCreateCallAttemptTracer(is_transparent_retry); |
||||
// Spawn a promise to do the LB pick.
|
||||
// This will eventually start the call.
|
||||
unstarted_handler.SpawnGuardedUntilCallCompletes( |
||||
"lb_pick", [unstarted_handler, picker = picker_]() mutable { |
||||
return Map( |
||||
// Wait for the LB picker.
|
||||
CheckDelayed(Loop( |
||||
[last_picker = |
||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>(), |
||||
unstarted_handler, picker]() mutable { |
||||
return Map( |
||||
picker.Next(last_picker), |
||||
[unstarted_handler, &last_picker]( |
||||
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> |
||||
picker) mutable { |
||||
last_picker = std::move(picker); |
||||
// Returns 3 possible things:
|
||||
// - Continue to queue the pick
|
||||
// - non-OK status to fail the pick
|
||||
// - a connected subchannel to complete the pick
|
||||
return PickSubchannel(*last_picker, unstarted_handler); |
||||
}); |
||||
})), |
||||
// Create call stack on the connected subchannel.
|
||||
[unstarted_handler]( |
||||
std::tuple< |
||||
absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>, |
||||
bool> |
||||
pick_result) { |
||||
auto& call_destination = std::get<0>(pick_result); |
||||
const bool was_queued = std::get<1>(pick_result); |
||||
if (!call_destination.ok()) { |
||||
return call_destination.status(); |
||||
} |
||||
// LB pick is done, so indicate that we've committed.
|
||||
auto* on_commit = MaybeGetContext<LbOnCommit>(); |
||||
if (on_commit != nullptr && *on_commit != nullptr) { |
||||
(*on_commit)(); |
||||
} |
||||
// If it was queued, add a trace annotation.
|
||||
if (was_queued) { |
||||
auto* tracer = |
||||
MaybeGetContext<ClientCallTracer::CallAttemptTracer>(); |
||||
if (tracer != nullptr) { |
||||
tracer->RecordAnnotation("Delayed LB pick complete."); |
||||
} |
||||
} |
||||
// Delegate to connected subchannel.
|
||||
// TODO(ctiller): need to insert LbCallTracingFilter at the top of
|
||||
// the stack
|
||||
(*call_destination)->StartCall(unstarted_handler); |
||||
return absl::OkStatus(); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,49 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H |
||||
#define GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
|
||||
#include "src/core/client_channel/client_channel.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/transport/call_destination.h" |
||||
#include "src/core/load_balancing/lb_policy.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// Context type for LB on_commit callback.
|
||||
// TODO(ctiller): make this a struct, so we don't accidentally alias context
|
||||
// types
|
||||
using LbOnCommit = absl::AnyInvocable<void()>; |
||||
template <> |
||||
struct ContextType<LbOnCommit> {}; |
||||
|
||||
class LoadBalancedCallDestination final : public UnstartedCallDestination { |
||||
public: |
||||
explicit LoadBalancedCallDestination(ClientChannel::PickerObservable picker) |
||||
: picker_(std::move(picker)) {} |
||||
|
||||
void Orphaned() override {} |
||||
|
||||
void StartCall(UnstartedCallHandler unstarted_handler) override; |
||||
|
||||
private: |
||||
ClientChannel::PickerObservable picker_; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_LOAD_BALANCED_CALL_DESTINATION_H
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue