// Copyright 2017 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include #include #include #include #include #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include #include #include #include #include #include "src/core/lib/gpr/env.h" #include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/security/security_connector/ssl_utils.h" #include "src/cpp/server/secure_server_credentials.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/http_connection_manager.grpc.pb.h" #include "src/proto/grpc/testing/xds/v3/http_filter_rbac.grpc.pb.h" #include "test/core/util/port.h" #include "test/cpp/end2end/counted_service.h" #include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/end2end/xds/xds_server.h" namespace grpc { namespace testing { // The parameter type for INSTANTIATE_TEST_SUITE_P(). class XdsTestType { public: enum HttpFilterConfigLocation { // Set the HTTP filter config directly in LDS. kHttpFilterConfigInListener, // Enable the HTTP filter in LDS, but override the filter config in route. kHttpFilterConfigInRoute, }; enum BootstrapSource { kBootstrapFromChannelArg, kBootstrapFromFile, kBootstrapFromEnvVar, }; XdsTestType& set_enable_load_reporting() { enable_load_reporting_ = true; return *this; } XdsTestType& set_enable_rds_testing() { enable_rds_testing_ = true; return *this; } XdsTestType& set_use_v2() { use_v2_ = true; return *this; } XdsTestType& set_use_xds_credentials() { use_xds_credentials_ = true; return *this; } XdsTestType& set_use_csds_streaming() { use_csds_streaming_ = true; return *this; } XdsTestType& set_filter_config_setup(HttpFilterConfigLocation setup) { filter_config_setup_ = setup; return *this; } XdsTestType& set_bootstrap_source(BootstrapSource bootstrap_source) { bootstrap_source_ = bootstrap_source; return *this; } XdsTestType& set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action) { rbac_action_ = action; return *this; } bool enable_load_reporting() const { return enable_load_reporting_; } bool enable_rds_testing() const { return enable_rds_testing_; } bool use_v2() const { return use_v2_; } bool use_xds_credentials() const { return use_xds_credentials_; } bool use_csds_streaming() const { return use_csds_streaming_; } HttpFilterConfigLocation filter_config_setup() const { return filter_config_setup_; } BootstrapSource bootstrap_source() const { return bootstrap_source_; } ::envoy::config::rbac::v3::RBAC_Action rbac_action() const { return rbac_action_; } std::string AsString() const { std::string retval = use_v2_ ? "V2" : "V3"; if (enable_load_reporting_) retval += "WithLoadReporting"; if (enable_rds_testing_) retval += "Rds"; if (use_xds_credentials_) retval += "XdsCreds"; if (use_csds_streaming_) retval += "CsdsStreaming"; if (filter_config_setup_ == kHttpFilterConfigInRoute) { retval += "FilterPerRouteOverride"; } if (bootstrap_source_ == kBootstrapFromFile) { retval += "BootstrapFromFile"; } else if (bootstrap_source_ == kBootstrapFromEnvVar) { retval += "BootstrapFromEnvVar"; } if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_ALLOW) { retval += "RbacAllow"; } else if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_DENY) { retval += "RbacDeny"; } return retval; } // For use as the final parameter in INSTANTIATE_TEST_SUITE_P(). static std::string Name(const ::testing::TestParamInfo& info) { return info.param.AsString(); } private: bool enable_load_reporting_ = false; bool enable_rds_testing_ = false; bool use_v2_ = false; bool use_xds_credentials_ = false; bool use_csds_streaming_ = false; HttpFilterConfigLocation filter_config_setup_ = kHttpFilterConfigInListener; BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg; ::envoy::config::rbac::v3::RBAC_Action rbac_action_ = ::envoy::config::rbac::v3::RBAC_Action_LOG; }; // A base class for xDS end-to-end tests. // // An xDS server is provided in balancer_. It is automatically started // for every test. Additional xDS servers can be started if needed by // calling CreateAndStartBalancer(). // // A default set of LDS, RDS, and CDS resources are created for gRPC // clients, available in default_listener_, default_route_config_, and // default_cluster_. These resources are automatically loaded into // balancer_ but can be modified by individual tests. No EDS resource // is provided by default. There are also default LDS and RDS resources // for the gRPC server side in default_server_listener_ and // default_server_route_config_. Methods are provided for constructing new // resources that can be added to the xDS server as needed. // // This class provides a mechanism for running backend servers, which will // be stored in backends_. No servers are created or started by default, // but tests can call CreateAndStartBackends() to start however many // backends they want. There are also a number of methods for accessing // backends by index, which is the index into the backends_ vector. // For methods that take a start_index and stop_index, this refers to // the indexes in the range [start_index, stop_index). If stop_index // is 0, backends_.size() is used. Backends may or may not be // xDS-enabled, at the discretion of the test. class XdsEnd2endTest : public ::testing::TestWithParam { protected: using Cluster = ::envoy::config::cluster::v3::Cluster; using ClusterLoadAssignment = ::envoy::config::endpoint::v3::ClusterLoadAssignment; using Listener = ::envoy::config::listener::v3::Listener; using RouteConfiguration = ::envoy::config::route::v3::RouteConfiguration; using HttpConnectionManager = ::envoy::extensions::filters::network:: http_connection_manager::v3::HttpConnectionManager; // Default values for locality fields. static const char kDefaultLocalityRegion[]; static const char kDefaultLocalityZone[]; static const int kDefaultLocalityWeight = 3; static const int kDefaultLocalityPriority = 0; // Default resource names. static const char kServerName[]; static const char kDefaultRouteConfigurationName[]; static const char kDefaultClusterName[]; static const char kDefaultEdsServiceName[]; static const char kDefaultServerRouteConfigurationName[]; // TLS certificate paths. static const char kCaCertPath[]; static const char kServerCertPath[]; static const char kServerKeyPath[]; // Message used in EchoRequest to the backend. static const char kRequestMessage[]; // A base class for server threads. class ServerThread { public: // A status notifier for xDS-enabled servers. class XdsServingStatusNotifier : public grpc::experimental::XdsServerServingStatusNotifierInterface { public: void OnServingStatusUpdate(std::string uri, ServingStatusUpdate update) override; void WaitOnServingStatusChange(std::string uri, grpc::StatusCode expected_status); private: grpc_core::Mutex mu_; grpc_core::CondVar cond_; std::map status_map ABSL_GUARDED_BY(mu_); }; // If use_xds_enabled_server is true, the server will use xDS. explicit ServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server = false) : test_obj_(test_obj), port_(grpc_pick_unused_port_or_die()), use_xds_enabled_server_(use_xds_enabled_server) {} virtual ~ServerThread() { Shutdown(); } void Start(); void Shutdown(); virtual std::shared_ptr Credentials() { return std::make_shared( grpc_fake_transport_security_server_credentials_create()); } int port() const { return port_; } bool use_xds_enabled_server() const { return use_xds_enabled_server_; } XdsServingStatusNotifier* notifier() { return ¬ifier_; } private: class XdsChannelArgsServerBuilderOption; virtual const char* Type() = 0; virtual void RegisterAllServices(ServerBuilder* builder) = 0; virtual void StartAllServices() = 0; virtual void ShutdownAllServices() = 0; void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond); XdsEnd2endTest* test_obj_; const int port_; std::unique_ptr server_; XdsServingStatusNotifier notifier_; std::unique_ptr thread_; bool running_ = false; const bool use_xds_enabled_server_; }; // A server thread for a backend server. class BackendServerThread : public ServerThread { public: // A wrapper around the backend echo test service impl that counts // requests and responses. template class BackendServiceImpl : public CountedService> { public: BackendServiceImpl() {} Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { auto peer_identity = context->auth_context()->GetPeerIdentity(); CountedService< TestMultipleServiceImpl>::IncreaseRequestCount(); const auto status = TestMultipleServiceImpl::Echo( context, request, response); CountedService< TestMultipleServiceImpl>::IncreaseResponseCount(); { grpc_core::MutexLock lock(&mu_); clients_.insert(context->peer()); last_peer_identity_.clear(); for (const auto& entry : peer_identity) { last_peer_identity_.emplace_back(entry.data(), entry.size()); } } return status; } Status Echo1(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { return Echo(context, request, response); } Status Echo2(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { return Echo(context, request, response); } void Start() {} void Shutdown() {} std::set clients() { grpc_core::MutexLock lock(&mu_); return clients_; } const std::vector& last_peer_identity() { grpc_core::MutexLock lock(&mu_); return last_peer_identity_; } private: grpc_core::Mutex mu_; std::set clients_ ABSL_GUARDED_BY(mu_); std::vector last_peer_identity_ ABSL_GUARDED_BY(mu_); }; // If use_xds_enabled_server is true, the server will use xDS. BackendServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server); BackendServiceImpl* backend_service() { return &backend_service_; } BackendServiceImpl* backend_service1() { return &backend_service1_; } BackendServiceImpl* backend_service2() { return &backend_service2_; } // If XdsTestType::use_xds_credentials() and use_xds_enabled_server() // are both true, returns XdsServerCredentials. // Otherwise, if XdsTestType::use_xds_credentials() is true and // use_xds_enabled_server() is false, returns TlsServerCredentials. // Otherwise, returns fake credentials. std::shared_ptr Credentials() override; private: const char* Type() override { return "Backend"; } void RegisterAllServices(ServerBuilder* builder) override; void StartAllServices() override; void ShutdownAllServices() override; BackendServiceImpl backend_service_; BackendServiceImpl backend_service1_; BackendServiceImpl backend_service2_; }; // A server thread for the xDS server. class BalancerServerThread : public ServerThread { public: explicit BalancerServerThread(XdsEnd2endTest* test_obj); AdsServiceImpl* ads_service() { return ads_service_.get(); } LrsServiceImpl* lrs_service() { return lrs_service_.get(); } private: const char* Type() override { return "Balancer"; } void RegisterAllServices(ServerBuilder* builder) override; void StartAllServices() override; void ShutdownAllServices() override; std::shared_ptr ads_service_; std::shared_ptr lrs_service_; }; // A builder for the xDS bootstrap config. class BootstrapBuilder { public: BootstrapBuilder() {} BootstrapBuilder& SetV2() { v2_ = true; return *this; } BootstrapBuilder& SetDefaultServer(const std::string& server) { top_server_ = server; return *this; } BootstrapBuilder& SetClientDefaultListenerResourceNameTemplate( const std::string& client_default_listener_resource_name_template) { client_default_listener_resource_name_template_ = client_default_listener_resource_name_template; return *this; } BootstrapBuilder& AddCertificateProviderPlugin( const std::string& key, const std::string& name, const std::string& plugin_config = "") { plugins_[key] = {name, plugin_config}; return *this; } BootstrapBuilder& AddAuthority( const std::string& authority, const std::string& servers = "", const std::string& client_listener_resource_name_template = "") { authorities_[authority] = {servers, client_listener_resource_name_template}; return *this; } BootstrapBuilder& SetServerListenerResourceNameTemplate( const std::string& server_listener_resource_name_template = "") { server_listener_resource_name_template_ = server_listener_resource_name_template; return *this; } std::string Build(); private: struct PluginInfo { std::string name; std::string plugin_config; }; struct AuthorityInfo { std::string server; std::string client_listener_resource_name_template; }; std::string MakeXdsServersText(absl::string_view server_uri); std::string MakeNodeText(); std::string MakeCertificateProviderText(); std::string MakeAuthorityText(); bool v2_ = false; std::string top_server_; std::string client_default_listener_resource_name_template_; std::map plugins_; std::map authorities_; std::string server_listener_resource_name_template_ = "grpc/server?xds.resource.listening_address=%s"; }; class ScopedExperimentalEnvVar { public: explicit ScopedExperimentalEnvVar(const char* env_var) : env_var_(env_var) { gpr_setenv(env_var_, "true"); } ~ScopedExperimentalEnvVar() { gpr_unsetenv(env_var_); } private: const char* env_var_; }; // RPC services used to talk to the backends. enum RpcService { SERVICE_ECHO, SERVICE_ECHO1, SERVICE_ECHO2, }; // RPC methods used to talk to the backends. enum RpcMethod { METHOD_ECHO, METHOD_ECHO1, METHOD_ECHO2, }; XdsEnd2endTest(); void SetUp() override { InitClient(); } void TearDown() override; // // xDS server management // // Creates and starts a new balancer, running in its own thread. // Most tests will not need to call this; instead, they can use // balancer_, which is already populated with default resources. std::unique_ptr CreateAndStartBalancer(); // Returns the name of the server-side xDS Listener resource for a // backend on the specified port. std::string GetServerListenerName(int port); // Returns a copy of listener_template with the server-side resource // name and the port in the socket address populated. Listener PopulateServerListenerNameAndPort(const Listener& listener_template, int port); // Interface for accessing HttpConnectionManager config in Listener. class HcmAccessor { public: virtual ~HcmAccessor() = default; virtual HttpConnectionManager Unpack(const Listener& listener) const = 0; virtual void Pack(const HttpConnectionManager& hcm, Listener* listener) const = 0; }; // Client-side impl. class ClientHcmAccessor : public HcmAccessor { public: HttpConnectionManager Unpack(const Listener& listener) const override; void Pack(const HttpConnectionManager& hcm, Listener* listener) const override; }; // Server-side impl. class ServerHcmAccessor : public HcmAccessor { public: HttpConnectionManager Unpack(const Listener& listener) const override; void Pack(const HttpConnectionManager& hcm, Listener* listener) const override; }; // Sets the Listener and RouteConfiguration resource on the specified // balancer. If RDS is in use, they will be set as separate resources; // otherwise, the RouteConfig will be inlined into the Listener. void SetListenerAndRouteConfiguration( BalancerServerThread* balancer, Listener listener, const RouteConfiguration& route_config, const HcmAccessor& hcm_accessor = ClientHcmAccessor()); // A convenient wrapper for setting the Listener and // RouteConfiguration resources on the server side. void SetServerListenerNameAndRouteConfiguration( BalancerServerThread* balancer, Listener listener, int port, const RouteConfiguration& route_config) { SetListenerAndRouteConfiguration( balancer, PopulateServerListenerNameAndPort(listener, port), route_config, ServerHcmAccessor()); } // Sets the RouteConfiguration resource on the specified balancer. // If RDS is in use, it will be set directly as an independent // resource; otherwise, it will be inlined into a Listener resource // (either listener_to_copy, or if that is null, default_listener_). void SetRouteConfiguration(BalancerServerThread* balancer, const RouteConfiguration& route_config, const Listener* listener_to_copy = nullptr); // Arguments for constructing an EDS resource. struct EdsResourceArgs { // An individual endpoint for a backend running on a specified port. struct Endpoint { explicit Endpoint( int port, ::envoy::config::endpoint::v3::HealthStatus health_status = ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, int lb_weight = 1) : port(port), health_status(health_status), lb_weight(lb_weight) {} int port; ::envoy::config::endpoint::v3::HealthStatus health_status; int lb_weight; }; // A locality. struct Locality { Locality(std::string sub_zone, std::vector endpoints, int lb_weight = kDefaultLocalityWeight, int priority = kDefaultLocalityPriority) : sub_zone(std::move(sub_zone)), endpoints(std::move(endpoints)), lb_weight(lb_weight), priority(priority) {} const std::string sub_zone; std::vector endpoints; int lb_weight; int priority; }; EdsResourceArgs() = default; explicit EdsResourceArgs(std::vector locality_list) : locality_list(std::move(locality_list)) {} std::vector locality_list; std::map drop_categories; ::envoy::type::v3::FractionalPercent::DenominatorType drop_denominator = ::envoy::type::v3::FractionalPercent::MILLION; }; // Helper method for generating an endpoint for a backend, for use in // constructing an EDS resource. EdsResourceArgs::Endpoint CreateEndpoint( size_t backend_idx, ::envoy::config::endpoint::v3::HealthStatus health_status = ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, int lb_weight = 1) { return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(), health_status, lb_weight); } // Creates a vector of endpoints for a specified range of backends, // for use in constructing an EDS resource. std::vector CreateEndpointsForBackends( size_t start_index = 0, size_t stop_index = 0, ::envoy::config::endpoint::v3::HealthStatus health_status = ::envoy::config::endpoint::v3::HealthStatus::UNKNOWN, int lb_weight = 1); // Returns an endpoint for an unused port, for use in constructing an // EDS resource. EdsResourceArgs::Endpoint MakeNonExistantEndpoint() { return EdsResourceArgs::Endpoint(grpc_pick_unused_port_or_die()); } // Constructs an EDS resource. ClusterLoadAssignment BuildEdsResource( const EdsResourceArgs& args, const char* eds_service_name = kDefaultEdsServiceName); // // Backend management // // Creates num_backends backends and stores them in backends_, but // does not actually start them. If xds_enabled is true, the backends // are xDS-enabled. void CreateBackends(size_t num_backends, bool xds_enabled = false) { for (size_t i = 0; i < num_backends; ++i) { backends_.emplace_back(new BackendServerThread(this, xds_enabled)); } } // Starts all backends in backends_. void StartAllBackends() { for (auto& backend : backends_) backend->Start(); } // Same as CreateBackends(), but also starts the backends. void CreateAndStartBackends(size_t num_backends, bool xds_enabled = false) { CreateBackends(num_backends, xds_enabled); StartAllBackends(); } // Starts the backend at backends_[index]. void StartBackend(size_t index) { backends_[index]->Start(); } // Shuts down all backends in backends_. void ShutdownAllBackends() { for (auto& backend : backends_) backend->Shutdown(); } // Shuts down the backend at backends_[index]. void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } // Resets the request counters for backends in the specified range. void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0); // Returns true if the specified backend has received requests for the // specified service. bool SeenBackend(size_t backend_idx, const RpcService rpc_service = SERVICE_ECHO); // Returns true if all backends in the specified range have received // requests for the specified service. bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0, const RpcService rpc_service = SERVICE_ECHO); // Returns a vector containing the port for every backend in the // specified range. std::vector GetBackendPorts(size_t start_index = 0, size_t stop_index = 0) const; // // Client management // // Initializes global state for the client, such as the bootstrap file // and channel args for the XdsClient. Then calls ResetStub(). // All tests must call this exactly once at the start of the test. void InitClient(BootstrapBuilder builder = BootstrapBuilder(), std::string lb_expected_authority = "", int xds_resource_does_not_exist_timeout_ms = 0); // Sets channel_, stub_, stub1_, and stub2_. void ResetStub(int failover_timeout_ms = 0, ChannelArguments* args = nullptr); // Creates a new client channel. Requires that InitClient() has // already been called. std::shared_ptr CreateChannel(int failover_timeout_ms = 0, const char* server_name = kServerName, const char* xds_authority = "", ChannelArguments* args = nullptr); // // Sending RPCs // // Options used for sending an RPC. struct RpcOptions { RpcService service = SERVICE_ECHO; RpcMethod method = METHOD_ECHO; int timeout_ms = 1000; bool wait_for_ready = false; std::vector> metadata; // These options are used by the backend service impl. bool server_fail = false; int server_sleep_us = 0; int client_cancel_after_us = 0; bool skip_cancelled_check = false; StatusCode server_expected_error = StatusCode::OK; RpcOptions() {} RpcOptions& set_rpc_service(RpcService rpc_service) { service = rpc_service; return *this; } RpcOptions& set_rpc_method(RpcMethod rpc_method) { method = rpc_method; return *this; } RpcOptions& set_timeout_ms(int rpc_timeout_ms) { timeout_ms = rpc_timeout_ms; return *this; } RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) { wait_for_ready = rpc_wait_for_ready; return *this; } RpcOptions& set_metadata( std::vector> rpc_metadata) { metadata = std::move(rpc_metadata); return *this; } RpcOptions& set_server_fail(bool rpc_server_fail) { server_fail = rpc_server_fail; return *this; } RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) { server_sleep_us = rpc_server_sleep_us; return *this; } RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) { client_cancel_after_us = rpc_client_cancel_after_us; return *this; } RpcOptions& set_skip_cancelled_check(bool rpc_skip_cancelled_check) { skip_cancelled_check = rpc_skip_cancelled_check; return *this; } RpcOptions& set_server_expected_error(StatusCode code) { server_expected_error = code; return *this; } // Populates context and request. void SetupRpc(ClientContext* context, EchoRequest* request) const; }; // Sends an RPC with the specified options. // If response is non-null, it will be populated with the response. // Returns the status of the RPC. Status SendRpc(const RpcOptions& rpc_options = RpcOptions(), EchoResponse* response = nullptr); // Internal helper function for SendRpc(). template static Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options, ClientContext* context, EchoRequest& request, EchoResponse* response) { switch (rpc_options.method) { case METHOD_ECHO: return stub->Echo(context, request, response); case METHOD_ECHO1: return stub->Echo1(context, request, response); case METHOD_ECHO2: return stub->Echo2(context, request, response); } GPR_UNREACHABLE_CODE(); } // Sends the specified number of RPCs and fails if the RPC fails. void CheckRpcSendOk(const size_t times = 1, const RpcOptions& rpc_options = RpcOptions()); // Options to use with CheckRpcSendFailure(). struct CheckRpcSendFailureOptions { std::function continue_predicate = [](size_t i) { return i < 1; }; RpcOptions rpc_options; StatusCode expected_error_code = StatusCode::OK; CheckRpcSendFailureOptions() {} CheckRpcSendFailureOptions& set_times(size_t times) { continue_predicate = [times](size_t i) { return i < times; }; return *this; } CheckRpcSendFailureOptions& set_continue_predicate( std::function pred) { continue_predicate = std::move(pred); return *this; } CheckRpcSendFailureOptions& set_rpc_options(const RpcOptions& options) { rpc_options = options; return *this; } CheckRpcSendFailureOptions& set_expected_error_code(StatusCode code) { expected_error_code = code; return *this; } }; // Sends RPCs and expects them to fail. void CheckRpcSendFailure( const CheckRpcSendFailureOptions& options = CheckRpcSendFailureOptions()); // Sends num_rpcs RPCs, counting how many of them fail with a message // matching the specfied drop_error_message_prefix. // Any failure with a non-matching message is a test failure. size_t SendRpcsAndCountFailuresWithMessage( size_t num_rpcs, const char* drop_error_message_prefix, const RpcOptions& rpc_options = RpcOptions()); // A class for running a long-running RPC in its own thread. // TODO(roth): Maybe consolidate this and SendConcurrentRpcs() // somehow? LongRunningRpc has a cleaner API, but SendConcurrentRpcs() // uses the callback API, which is probably better. class LongRunningRpc { public: // Starts the RPC. void StartRpc(grpc::testing::EchoTestService::Stub* stub, const RpcOptions& rpc_options = RpcOptions().set_timeout_ms(0).set_client_cancel_after_us( 1 * 1000 * 1000)); // Cancels the RPC. void CancelRpc(); // Gets the RPC's status. Blocks if the RPC is not yet complete. Status GetStatus(); private: std::thread sender_thread_; ClientContext context_; Status status_; }; // Starts a set of concurrent RPCs. struct ConcurrentRpc { ClientContext context; Status status; grpc_core::Duration elapsed_time; EchoResponse response; }; std::vector SendConcurrentRpcs( grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, const RpcOptions& rpc_options); // // Waiting for individual backends to be seen by the client // struct WaitForBackendOptions { // If true, resets the backend counters before returning. bool reset_counters = true; // If true, RPC failures will not cause the test to fail. bool allow_failures = false; // How long to wait for the backend(s) to see requests. int timeout_ms = 5000; WaitForBackendOptions() {} WaitForBackendOptions& set_reset_counters(bool enable) { reset_counters = enable; return *this; } WaitForBackendOptions& set_allow_failures(bool enable) { allow_failures = enable; return *this; } WaitForBackendOptions& set_timeout_ms(int ms) { timeout_ms = ms; return *this; } }; // Sends RPCs until all of the backends in the specified range see requests. // Returns the total number of RPCs sent. size_t WaitForAllBackends( size_t start_index = 0, size_t stop_index = 0, const WaitForBackendOptions& wait_options = WaitForBackendOptions(), const RpcOptions& rpc_options = RpcOptions()); // Sends RPCs until the backend at index backend_idx sees requests. void WaitForBackend( size_t backend_idx, const WaitForBackendOptions& wait_options = WaitForBackendOptions(), const RpcOptions& rpc_options = RpcOptions()) { WaitForAllBackends(backend_idx, backend_idx + 1, wait_options, rpc_options); } // // Waiting for xDS NACKs // // These methods send RPCs in a loop until they see a NACK from the // xDS server, or until a timeout expires. // Sends RPCs until get_state() returns a response. absl::optional WaitForNack( std::function()> get_state, StatusCode expected_status = StatusCode::UNAVAILABLE); // Sends RPCs until an LDS NACK is seen. absl::optional WaitForLdsNack( StatusCode expected_status = StatusCode::UNAVAILABLE) { return WaitForNack( [&]() { return balancer_->ads_service()->lds_response_state(); }, expected_status); } // Sends RPCs until an RDS NACK is seen. absl::optional WaitForRdsNack( StatusCode expected_status = StatusCode::UNAVAILABLE) { return WaitForNack( [&]() { return RouteConfigurationResponseState(balancer_.get()); }, expected_status); } // Sends RPCs until a CDS NACK is seen. absl::optional WaitForCdsNack( StatusCode expected_status = StatusCode::UNAVAILABLE) { return WaitForNack( [&]() { return balancer_->ads_service()->cds_response_state(); }, expected_status); } // Sends RPCs until an EDS NACK is seen. absl::optional WaitForEdsNack() { return WaitForNack( [&]() { return balancer_->ads_service()->eds_response_state(); }); } // Convenient front-end to wait for RouteConfiguration to be NACKed, // regardless of whether it's sent in LDS or RDS. absl::optional WaitForRouteConfigNack( StatusCode expected_status = StatusCode::UNAVAILABLE) { if (GetParam().enable_rds_testing()) { return WaitForRdsNack(expected_status); } return WaitForLdsNack(expected_status); } // Convenient front-end for accessing xDS response state for a // RouteConfiguration, regardless of whether it's sent in LDS or RDS. absl::optional RouteConfigurationResponseState( BalancerServerThread* balancer) const { AdsServiceImpl* ads_service = balancer->ads_service(); if (GetParam().enable_rds_testing()) { return ads_service->rds_response_state(); } return ads_service->lds_response_state(); } // // Miscellaneous helper methods // // There is slight difference between time fetched by GPR and by C++ system // clock API. It's unclear if they are using the same syscall, but we do know // GPR round the number at millisecond-level. This creates a 1ms difference, // which could cause flake. static grpc_core::Timestamp NowFromCycleCounter() { return grpc_core::Timestamp::FromTimespecRoundDown( gpr_now(GPR_CLOCK_MONOTONIC)); } // Returns the number of RPCs needed to pass error_tolerance at 99.99994% // chance. Rolling dices in drop/fault-injection generates a binomial // distribution (if our code is not horribly wrong). Let's make "n" the number // of samples, "p" the probability. If we have np>5 & n(1-p)>5, we can // approximately treat the binomial distribution as a normal distribution. // // For normal distribution, we can easily look up how many standard deviation // we need to reach 99.995%. Based on Wiki's table // https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule, we need 5.00 // sigma (standard deviation) to cover the probability area of 99.99994%. In // another word, for a sample with size "n" probability "p" error-tolerance // "k", we want the error always land within 5.00 sigma. The sigma of // binominal distribution and be computed as sqrt(np(1-p)). Hence, we have // the equation: // // kn <= 5.00 * sqrt(np(1-p)) static size_t ComputeIdealNumRpcs(double p, double error_tolerance) { GPR_ASSERT(p >= 0 && p <= 1); size_t num_rpcs = ceil(p * (1 - p) * 5.00 * 5.00 / error_tolerance / error_tolerance); gpr_log(GPR_INFO, "Sending %" PRIuPTR " RPCs for percentage=%.3f error_tolerance=%.3f", num_rpcs, p, error_tolerance); return num_rpcs; } // Returns the contents of the specified file. static std::string ReadFile(const char* file_path); // Returns a private key pair, read from local files. static grpc_core::PemKeyCertPairList ReadTlsIdentityPair( const char* key_path, const char* cert_path); // Returns client credentials suitable for using as fallback // credentials for XdsCredentials. static std::shared_ptr CreateTlsFallbackCredentials(); bool ipv6_only_ = false; std::unique_ptr balancer_; std::shared_ptr channel_; std::unique_ptr stub_; std::unique_ptr stub1_; std::unique_ptr stub2_; std::vector> backends_; // Default xDS resources. Listener default_listener_; RouteConfiguration default_route_config_; Listener default_server_listener_; RouteConfiguration default_server_route_config_; Cluster default_cluster_; int xds_drain_grace_time_ms_ = 10 * 60 * 1000; // 10 mins bool bootstrap_contents_from_env_var_; std::string bootstrap_; char* bootstrap_file_ = nullptr; absl::InlinedVector xds_channel_args_to_add_; grpc_channel_args xds_channel_args_; }; } // namespace testing } // namespace grpc