Service Config Changes to set channel in transient failure on invalid service config

reviewable/pr18946/r1
Yash Tibrewal 6 years ago
parent f1dfe791ab
commit db1ccad039
  1. 48
      CMakeLists.txt
  2. 52
      Makefile
  3. 13
      build.yaml
  4. 12
      src/core/ext/filters/client_channel/client_channel.cc
  5. 24
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  6. 17
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  7. 9
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  8. 10
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  9. 3
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  10. 21
      test/cpp/end2end/BUILD
  11. 442
      test/cpp/end2end/service_config_end2end_test.cc
  12. 22
      tools/run_tests/generated/sources_and_headers.json
  13. 24
      tools/run_tests/generated/tests.json

@ -703,6 +703,7 @@ add_dependencies(buildtests_cxx server_crash_test_client)
add_dependencies(buildtests_cxx server_early_return_test) add_dependencies(buildtests_cxx server_early_return_test)
add_dependencies(buildtests_cxx server_interceptors_end2end_test) add_dependencies(buildtests_cxx server_interceptors_end2end_test)
add_dependencies(buildtests_cxx server_request_call_test) add_dependencies(buildtests_cxx server_request_call_test)
add_dependencies(buildtests_cxx service_config_end2end_test)
add_dependencies(buildtests_cxx service_config_test) add_dependencies(buildtests_cxx service_config_test)
add_dependencies(buildtests_cxx shutdown_test) add_dependencies(buildtests_cxx shutdown_test)
add_dependencies(buildtests_cxx slice_hash_table_test) add_dependencies(buildtests_cxx slice_hash_table_test)
@ -15936,6 +15937,53 @@ target_link_libraries(server_request_call_test
) )
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(service_config_end2end_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/lb/v1/load_balancer.grpc.pb.h
test/cpp/end2end/service_config_end2end_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
protobuf_generate_grpc_cpp(
src/proto/grpc/lb/v1/load_balancer.proto
)
target_include_directories(service_config_end2end_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(service_config_end2end_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS)

@ -1265,6 +1265,7 @@ server_crash_test_client: $(BINDIR)/$(CONFIG)/server_crash_test_client
server_early_return_test: $(BINDIR)/$(CONFIG)/server_early_return_test server_early_return_test: $(BINDIR)/$(CONFIG)/server_early_return_test
server_interceptors_end2end_test: $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test server_interceptors_end2end_test: $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test
server_request_call_test: $(BINDIR)/$(CONFIG)/server_request_call_test server_request_call_test: $(BINDIR)/$(CONFIG)/server_request_call_test
service_config_end2end_test: $(BINDIR)/$(CONFIG)/service_config_end2end_test
service_config_test: $(BINDIR)/$(CONFIG)/service_config_test service_config_test: $(BINDIR)/$(CONFIG)/service_config_test
shutdown_test: $(BINDIR)/$(CONFIG)/shutdown_test shutdown_test: $(BINDIR)/$(CONFIG)/shutdown_test
slice_hash_table_test: $(BINDIR)/$(CONFIG)/slice_hash_table_test slice_hash_table_test: $(BINDIR)/$(CONFIG)/slice_hash_table_test
@ -1736,6 +1737,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_early_return_test \ $(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \ $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \
$(BINDIR)/$(CONFIG)/server_request_call_test \ $(BINDIR)/$(CONFIG)/server_request_call_test \
$(BINDIR)/$(CONFIG)/service_config_end2end_test \
$(BINDIR)/$(CONFIG)/service_config_test \ $(BINDIR)/$(CONFIG)/service_config_test \
$(BINDIR)/$(CONFIG)/shutdown_test \ $(BINDIR)/$(CONFIG)/shutdown_test \
$(BINDIR)/$(CONFIG)/slice_hash_table_test \ $(BINDIR)/$(CONFIG)/slice_hash_table_test \
@ -1882,6 +1884,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_early_return_test \ $(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \ $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test \
$(BINDIR)/$(CONFIG)/server_request_call_test \ $(BINDIR)/$(CONFIG)/server_request_call_test \
$(BINDIR)/$(CONFIG)/service_config_end2end_test \
$(BINDIR)/$(CONFIG)/service_config_test \ $(BINDIR)/$(CONFIG)/service_config_test \
$(BINDIR)/$(CONFIG)/shutdown_test \ $(BINDIR)/$(CONFIG)/shutdown_test \
$(BINDIR)/$(CONFIG)/slice_hash_table_test \ $(BINDIR)/$(CONFIG)/slice_hash_table_test \
@ -2402,6 +2405,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test || ( echo test server_interceptors_end2end_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/server_interceptors_end2end_test || ( echo test server_interceptors_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing server_request_call_test" $(E) "[RUN] Testing server_request_call_test"
$(Q) $(BINDIR)/$(CONFIG)/server_request_call_test || ( echo test server_request_call_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/server_request_call_test || ( echo test server_request_call_test failed ; exit 1 )
$(E) "[RUN] Testing service_config_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/service_config_end2end_test || ( echo test service_config_end2end_test failed ; exit 1 )
$(E) "[RUN] Testing service_config_test" $(E) "[RUN] Testing service_config_test"
$(Q) $(BINDIR)/$(CONFIG)/service_config_test || ( echo test service_config_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/service_config_test || ( echo test service_config_test failed ; exit 1 )
$(E) "[RUN] Testing shutdown_test" $(E) "[RUN] Testing shutdown_test"
@ -18895,6 +18900,53 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/server/server_request_call_test.o: $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(OBJDIR)/$(CONFIG)/test/cpp/server/server_request_call_test.o: $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc
SERVICE_CONFIG_END2END_TEST_SRC = \
$(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc \
test/cpp/end2end/service_config_end2end_test.cc \
SERVICE_CONFIG_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVICE_CONFIG_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/service_config_end2end_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+.
$(BINDIR)/$(CONFIG)/service_config_end2end_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/service_config_end2end_test: $(PROTOBUF_DEP) $(SERVICE_CONFIG_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(SERVICE_CONFIG_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/service_config_end2end_test
endif
endif
$(OBJDIR)/$(CONFIG)/src/proto/grpc/lb/v1/load_balancer.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/service_config_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_service_config_end2end_test: $(SERVICE_CONFIG_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(SERVICE_CONFIG_END2END_TEST_OBJS:.o=.dep)
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/service_config_end2end_test.o: $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.pb.cc $(GENDIR)/src/proto/grpc/lb/v1/load_balancer.grpc.pb.cc
SERVICE_CONFIG_TEST_SRC = \ SERVICE_CONFIG_TEST_SRC = \
test/core/client_channel/service_config_test.cc \ test/core/client_channel/service_config_test.cc \

@ -5520,6 +5520,19 @@ targets:
- grpc++_unsecure - grpc++_unsecure
- grpc_unsecure - grpc_unsecure
- gpr - gpr
- name: service_config_end2end_test
gtest: true
build: test
language: c++
src:
- src/proto/grpc/lb/v1/load_balancer.proto
- test/cpp/end2end/service_config_end2end_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr
- name: service_config_test - name: service_config_test
gtest: true gtest: true
build: test build: test

@ -224,7 +224,8 @@ class ChannelData {
static bool ProcessResolverResultLocked( static bool ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name, void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config); RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);
grpc_error* DoPingLocked(grpc_transport_op* op); grpc_error* DoPingLocked(grpc_transport_op* op);
@ -1132,9 +1133,16 @@ ChannelData::~ChannelData() {
// resolver result update. // resolver result update.
bool ChannelData::ProcessResolverResultLocked( bool ChannelData::ProcessResolverResultLocked(
void* arg, const Resolver::Result& result, const char** lb_policy_name, void* arg, const Resolver::Result& result, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config) { RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error) {
ChannelData* chand = static_cast<ChannelData*>(arg); ChannelData* chand = static_cast<ChannelData*>(arg);
ProcessedResolverResult resolver_result(result); ProcessedResolverResult resolver_result(result);
*service_config_error = resolver_result.service_config_error();
if (*service_config_error != GRPC_ERROR_NONE) {
// We got an invalid service config.
return false;
}
char* service_config_json = gpr_strdup(resolver_result.service_config_json()); char* service_config_json = gpr_strdup(resolver_result.service_config_json());
if (grpc_client_channel_routing_trace.enabled()) { if (grpc_client_channel_routing_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"", gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",

@ -110,6 +110,8 @@ class AresDnsResolver : public Resolver {
UniquePtr<ServerAddressList> addresses_; UniquePtr<ServerAddressList> addresses_;
/// currently resolving service config /// currently resolving service config
char* service_config_json_ = nullptr; char* service_config_json_ = nullptr;
/// last valid service config
RefCountedPtr<ServiceConfig> saved_service_config_;
// has shutdown been initiated // has shutdown been initiated
bool shutdown_initiated_ = false; bool shutdown_initiated_ = false;
// timeout in milliseconds for active DNS queries // timeout in milliseconds for active DNS queries
@ -310,13 +312,29 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s",
r, service_config_string); r, service_config_string);
grpc_error* service_config_error = GRPC_ERROR_NONE; grpc_error* service_config_error = GRPC_ERROR_NONE;
result.service_config = auto new_service_config =
ServiceConfig::Create(service_config_string, &service_config_error); ServiceConfig::Create(service_config_string, &service_config_error);
// Error is currently unused. if (service_config_error == GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(service_config_error); // Valid service config receivd
r->saved_service_config_ = std::move(new_service_config);
} else {
if (r->saved_service_config_ != nullptr) {
// Ignore the new service config error, since we have a previously
// saved service config
GRPC_ERROR_UNREF(service_config_error);
} else {
// No previously valid service config found.
// service_config_error is passed to the channel.
result.service_config_error = service_config_error;
}
}
} }
gpr_free(service_config_string); gpr_free(service_config_string);
} else {
// No service config received
r->saved_service_config_.reset();
} }
result.service_config = r->saved_service_config_;
result.args = grpc_channel_args_copy(r->channel_args_); result.args = grpc_channel_args_copy(r->channel_args_);
r->result_handler()->ReturnResult(std::move(result)); r->result_handler()->ReturnResult(std::move(result));
r->addresses_.reset(); r->addresses_.reset();

@ -61,17 +61,24 @@ void ClientChannelServiceConfigParser::Register() {
ProcessedResolverResult::ProcessedResolverResult( ProcessedResolverResult::ProcessedResolverResult(
const Resolver::Result& resolver_result) const Resolver::Result& resolver_result)
: service_config_(resolver_result.service_config) { : service_config_(resolver_result.service_config) {
// If resolver did not return a service config, use the default // If resolver did not return a service config or returned an invalid service config, use the default
// specified via the client API. // specified via the client API.
if (service_config_ == nullptr) { if (service_config_ == nullptr) {
const char* service_config_json = grpc_channel_arg_get_string( const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(resolver_result.args, GRPC_ARG_SERVICE_CONFIG)); grpc_channel_args_find(resolver_result.args, GRPC_ARG_SERVICE_CONFIG));
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
grpc_error* error = GRPC_ERROR_NONE; service_config_ =
service_config_ = ServiceConfig::Create(service_config_json, &error); ServiceConfig::Create(service_config_json, &service_config_error_);
// Error is currently unused. } else {
GRPC_ERROR_UNREF(error); service_config_error_ = GRPC_ERROR_REF(resolver_result.service_config_error);
} }
} else {
service_config_error_ =
GRPC_ERROR_REF(resolver_result.service_config_error);
}
if (service_config_error_ != GRPC_ERROR_NONE) {
// We got an invalid service config. Don't process any further.
return;
} }
// Process service config. // Process service config.
const ClientChannelGlobalParsedObject* parsed_object = nullptr; const ClientChannelGlobalParsedObject* parsed_object = nullptr;

@ -127,6 +127,8 @@ class ProcessedResolverResult {
// for later consumption. // for later consumption.
ProcessedResolverResult(const Resolver::Result& resolver_result); ProcessedResolverResult(const Resolver::Result& resolver_result);
~ProcessedResolverResult() { GRPC_ERROR_UNREF(service_config_error_); }
// Getters. Any managed object's ownership is transferred. // Getters. Any managed object's ownership is transferred.
const char* service_config_json() { return service_config_json_; } const char* service_config_json() { return service_config_json_; }
@ -144,6 +146,12 @@ class ProcessedResolverResult {
const char* health_check_service_name() { return health_check_service_name_; } const char* health_check_service_name() { return health_check_service_name_; }
grpc_error* service_config_error() {
grpc_error* return_error = service_config_error_;
service_config_error_ = GRPC_ERROR_NONE;
return return_error;
}
private: private:
// Finds the service config; extracts LB config and (maybe) retry throttle // Finds the service config; extracts LB config and (maybe) retry throttle
// params from it. // params from it.
@ -167,6 +175,7 @@ class ProcessedResolverResult {
// Service config. // Service config.
const char* service_config_json_ = nullptr; const char* service_config_json_ = nullptr;
RefCountedPtr<ServiceConfig> service_config_; RefCountedPtr<ServiceConfig> service_config_;
grpc_error* service_config_error_ = GRPC_ERROR_NONE;
// LB policy. // LB policy.
UniquePtr<char> lb_policy_name_; UniquePtr<char> lb_policy_name_;
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config_; RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config_;

@ -533,9 +533,13 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config; RefCountedPtr<ParsedLoadBalancingConfig> lb_policy_config;
bool service_config_changed = false; bool service_config_changed = false;
if (process_resolver_result_ != nullptr) { if (process_resolver_result_ != nullptr) {
service_config_changed = grpc_error* service_config_error = GRPC_ERROR_NONE;
process_resolver_result_(process_resolver_result_user_data_, result, service_config_changed = process_resolver_result_(
&lb_policy_name, &lb_policy_config); process_resolver_result_user_data_, result, &lb_policy_name,
&lb_policy_config, &service_config_error);
if (service_config_error != GRPC_ERROR_NONE) {
return OnResolverError(service_config_error);
}
} else { } else {
lb_policy_name = child_policy_name_.get(); lb_policy_name = child_policy_name_.get();
lb_policy_config = child_lb_config_; lb_policy_config = child_lb_config_;

@ -68,7 +68,8 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
typedef bool (*ProcessResolverResultCallback)( typedef bool (*ProcessResolverResultCallback)(
void* user_data, const Resolver::Result& result, void* user_data, const Resolver::Result& result,
const char** lb_policy_name, const char** lb_policy_name,
RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config); RefCountedPtr<ParsedLoadBalancingConfig>* lb_policy_config,
grpc_error** service_config_error);
// If error is set when this returns, then construction failed, and // If error is set when this returns, then construction failed, and
// the caller may not use the new object. // the caller may not use the new object.
ResolvingLoadBalancingPolicy( ResolvingLoadBalancingPolicy(

@ -423,6 +423,27 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "service_config_end2end_test",
srcs = ["service_config_end2end_test.cc"],
external_deps = [
"gmock",
"gtest",
],
deps = [
":test_service_impl",
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:test_lb_policies",
"//test/cpp/util:test_util",
],
)
grpc_cc_test( grpc_cc_test(
name = "grpclb_end2end_test", name = "grpclb_end2end_test",
srcs = ["grpclb_end2end_test.cc"], srcs = ["grpclb_end2end_test.cc"],

@ -0,0 +1,442 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <algorithm>
#include <memory>
#include <mutex>
#include <random>
#include <set>
#include <thread>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/impl/codegen/sync.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/cpp/client/secure_credentials.h"
#include "src/cpp/server/secure_server_credentials.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/core/util/test_lb_policies.h"
#include "test/cpp/end2end/test_service_impl.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
// Subclass of TestServiceImpl that increments a request counter for
// every call to the Echo RPC.
class MyTestServiceImpl : public TestServiceImpl {
public:
MyTestServiceImpl() : request_count_(0) {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
{
grpc::internal::MutexLock lock(&mu_);
++request_count_;
}
AddClient(context->peer());
return TestServiceImpl::Echo(context, request, response);
}
int request_count() {
grpc::internal::MutexLock lock(&mu_);
return request_count_;
}
void ResetCounters() {
grpc::internal::MutexLock lock(&mu_);
request_count_ = 0;
}
std::set<grpc::string> clients() {
grpc::internal::MutexLock lock(&clients_mu_);
return clients_;
}
private:
void AddClient(const grpc::string& client) {
grpc::internal::MutexLock lock(&clients_mu_);
clients_.insert(client);
}
grpc::internal::Mutex mu_;
int request_count_;
grpc::internal::Mutex clients_mu_;
std::set<grpc::string> clients_;
};
class ServiceConfigEnd2endTest : public ::testing::Test {
protected:
ServiceConfigEnd2endTest()
: server_host_("localhost"),
kRequestMessage_("Live long and prosper."),
creds_(new SecureChannelCredentials(
grpc_fake_transport_security_credentials_create())) {
// Make the backup poller poll very frequently in order to pick up
// updates from all the subchannels's FDs.
GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 1);
}
void SetUp() override {
grpc_init();
response_generator_ =
grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
}
void TearDown() override {
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown();
}
// Explicitly destroy all the members so that we can make sure grpc_shutdown
// has finished by the end of this function, and thus all the registered
// LB policy factories are removed.
stub_.reset();
servers_.clear();
creds_.reset();
grpc_shutdown_blocking();
}
void CreateServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
servers_.clear();
for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
servers_.emplace_back(new ServerData(port));
}
}
void StartServer(size_t index) { servers_[index]->Start(server_host_); }
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
CreateServers(num_servers, std::move(ports));
for (size_t i = 0; i < num_servers; ++i) {
StartServer(i);
}
}
grpc_core::Resolver::Result BuildFakeResults(const std::vector<int>& ports) {
grpc_core::Resolver::Result result;
for (const int& port : ports) {
char* lb_uri_str;
gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", port);
grpc_uri* lb_uri = grpc_uri_parse(lb_uri_str, true);
GPR_ASSERT(lb_uri != nullptr);
grpc_resolved_address address;
GPR_ASSERT(grpc_parse_uri(lb_uri, &address));
result.addresses.emplace_back(address.addr, address.len,
nullptr /* args */);
grpc_uri_destroy(lb_uri);
gpr_free(lb_uri_str);
}
return result;
}
void SetNextResolutionNoServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
response_generator_->SetResponse(result);
}
void SetNextResolutionValidServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config = grpc_core::ServiceConfig::Create("{}", &result.service_config_error);
response_generator_->SetResponse(result);
}
void SetNextResolutionInvalidServiceConfig(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
grpc_core::Resolver::Result result = BuildFakeResults(ports);
result.service_config = grpc_core::ServiceConfig::Create("{", &result.service_config_error);
response_generator_->SetResponse(result);
}
void SetNextResolutionUponError(const std::vector<int>& ports) {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetReresolutionResponse(BuildFakeResults(ports));
}
void SetFailureOnReresolution() {
grpc_core::ExecCtx exec_ctx;
response_generator_->SetFailureOnReresolution();
}
std::vector<int> GetServersPorts(size_t start_index = 0) {
std::vector<int> ports;
for (size_t i = start_index; i < servers_.size(); ++i) {
ports.push_back(servers_[i]->port_);
}
return ports;
}
std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub(
const std::shared_ptr<Channel>& channel) {
return grpc::testing::EchoTestService::NewStub(channel);
}
std::shared_ptr<Channel> BuildChannel(
ChannelArguments args = ChannelArguments()) {
args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
response_generator_.get());
return ::grpc::CreateCustomChannel("fake:///", creds_, args);
}
bool SendRpc(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
EchoResponse* response = nullptr, int timeout_ms = 1000,
Status* result = nullptr, bool wait_for_ready = false) {
const bool local_response = (response == nullptr);
if (local_response) response = new EchoResponse;
EchoRequest request;
request.set_message(kRequestMessage_);
ClientContext context;
context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
if (wait_for_ready) context.set_wait_for_ready(true);
Status status = stub->Echo(&context, request, response);
if (result != nullptr) *result = status;
if (local_response) delete response;
return status.ok();
}
void CheckRpcSendOk(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
const grpc_core::DebugLocation& location, bool wait_for_ready = false) {
EchoResponse response;
Status status;
const bool success =
SendRpc(stub, &response, 2000, &status, wait_for_ready);
ASSERT_TRUE(success) << "From " << location.file() << ":" << location.line()
<< "\n"
<< "Error: " << status.error_message() << " "
<< status.error_details();
ASSERT_EQ(response.message(), kRequestMessage_)
<< "From " << location.file() << ":" << location.line();
if (!success) abort();
}
void CheckRpcSendFailure(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub) {
const bool success = SendRpc(stub);
EXPECT_FALSE(success);
}
struct ServerData {
int port_;
std::unique_ptr<Server> server_;
MyTestServiceImpl service_;
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
bool started_ = false;
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
}
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
started_ = true;
grpc::internal::Mutex mu;
grpc::internal::MutexLock lock(&mu);
grpc::internal::CondVar cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.WaitUntil(&mu, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Serve(const grpc::string& server_host, grpc::internal::Mutex* mu,
grpc::internal::CondVar* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
ServerBuilder builder;
std::shared_ptr<ServerCredentials> creds(new SecureServerCredentials(
grpc_fake_transport_security_server_credentials_create()));
builder.AddListeningPort(server_address.str(), std::move(creds));
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
grpc::internal::MutexLock lock(mu);
server_ready_ = true;
cond->Signal();
}
void Shutdown() {
if (!started_) return;
server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
thread_->join();
started_ = false;
}
void SetServingStatus(const grpc::string& service, bool serving) {
server_->GetHealthCheckService()->SetServingStatus(service, serving);
}
};
void ResetCounters() {
for (const auto& server : servers_) server->service_.ResetCounters();
}
void WaitForServer(
const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub,
size_t server_idx, const grpc_core::DebugLocation& location,
bool ignore_failure = false) {
do {
if (ignore_failure) {
SendRpc(stub);
} else {
CheckRpcSendOk(stub, location, true);
}
} while (servers_[server_idx]->service_.request_count() == 0);
ResetCounters();
}
bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(false /* try_to_connect */)) ==
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
bool WaitForChannelReady(Channel* channel, int timeout_seconds = 5) {
const gpr_timespec deadline =
grpc_timeout_seconds_to_deadline(timeout_seconds);
grpc_connectivity_state state;
while ((state = channel->GetState(true /* try_to_connect */)) !=
GRPC_CHANNEL_READY) {
if (!channel->WaitForStateChange(state, deadline)) return false;
}
return true;
}
bool SeenAllServers() {
for (const auto& server : servers_) {
if (server->service_.request_count() == 0) return false;
}
return true;
}
// Updates \a connection_order by appending to it the index of the newly
// connected server. Must be called after every single RPC.
void UpdateConnectionOrder(
const std::vector<std::unique_ptr<ServerData>>& servers,
std::vector<int>* connection_order) {
for (size_t i = 0; i < servers.size(); ++i) {
if (servers[i]->service_.request_count() == 1) {
// Was the server index known? If not, update connection_order.
const auto it =
std::find(connection_order->begin(), connection_order->end(), i);
if (it == connection_order->end()) {
connection_order->push_back(i);
return;
}
}
}
}
const grpc::string server_host_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::vector<std::unique_ptr<ServerData>> servers_;
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
response_generator_;
const grpc::string kRequestMessage_;
std::shared_ptr<ChannelCredentials> creds_;
};
TEST_F(ServiceConfigEnd2endTest, BasicTest) {
StartServers(1);
auto channel = BuildChannel();
auto stub = BuildStub(channel);
SetNextResolutionNoServiceConfig(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigTest) {
StartServers(1);
auto channel = BuildChannel();
auto stub = BuildStub(channel);
SetNextResolutionInvalidServiceConfig(GetServersPorts());
CheckRpcSendFailure(stub);
}
TEST_F(ServiceConfigEnd2endTest, ValidServiceConfigAfterInvalidTest) {
StartServers(1);
auto channel = BuildChannel();
auto stub = BuildStub(channel);
SetNextResolutionInvalidServiceConfig(GetServersPorts());
CheckRpcSendFailure(stub);
SetNextResolutionValidServiceConfig(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
TEST_F(ServiceConfigEnd2endTest, InvalidServiceConfigWithDefaultConfigTest) {
StartServers(1);
ChannelArguments args;
args.SetServiceConfigJSON("{}");
auto channel = BuildChannel(args);
auto stub = BuildStub(channel);
SetNextResolutionInvalidServiceConfig(GetServersPorts());
CheckRpcSendOk(stub, DEBUG_LOCATION);
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
const auto result = RUN_ALL_TESTS();
return result;
}

@ -4795,6 +4795,28 @@
"third_party": false, "third_party": false,
"type": "target" "type": "target"
}, },
{
"deps": [
"gpr",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
],
"headers": [
"src/proto/grpc/lb/v1/load_balancer.grpc.pb.h",
"src/proto/grpc/lb/v1/load_balancer.pb.h",
"src/proto/grpc/lb/v1/load_balancer_mock.grpc.pb.h"
],
"is_filegroup": false,
"language": "c++",
"name": "service_config_end2end_test",
"src": [
"test/cpp/end2end/service_config_end2end_test.cc"
],
"third_party": false,
"type": "target"
},
{ {
"deps": [ "deps": [
"gpr", "gpr",

@ -5445,6 +5445,30 @@
], ],
"uses_polling": true "uses_polling": true
}, },
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "service_config_end2end_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{ {
"args": [], "args": [],
"benchmark": false, "benchmark": false,

Loading…
Cancel
Save