diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 1e4c9fd0a23..5a73918c0d0 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -358,7 +358,10 @@ typedef struct { * The default is 15 seconds. */ #define GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS \ "grpc.xds_resource_does_not_exist_timeout_ms" -/* if set, enable xds routing policy */ +/* If set, enable xds routing policy. This boolean argument is currently + * disabled by default; however, it will be changed to enabled by default + * once the functionality proves stable. This arg will eventually + * be removed completely. */ #define GRPC_ARG_XDS_ROUTING_ENABLED "grpc.xds_routing_enabled" /** If non-zero, grpc server's cronet compression workaround will be enabled */ #define GRPC_ARG_WORKAROUND_CRONET_COMPRESSION \ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc index 7bcc05cc018..40174672e96 100644 --- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_routing.cc @@ -610,7 +610,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { std::vector error_list; // action map. XdsRoutingLbConfig::ActionMap action_map; - std::set action_to_be_used; + std::set actions_to_be_used; auto it = json.object_value().find("actions"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -620,6 +620,11 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { "field:actions error:type should be object")); } else { for (const auto& p : it->second.object_value()) { + if (p.first.empty()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "field:actions element error: name cannot be empty")); + continue; + } RefCountedPtr child_config; std::vector child_errors = ParseChildConfig(p.second, &child_config); @@ -634,7 +639,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { error_list.push_back(error); } else { action_map[p.first] = std::move(child_config); - action_to_be_used.insert(p.first); + actions_to_be_used.insert(p.first); } } } @@ -655,7 +660,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { for (size_t i = 0; i < array.size(); ++i) { XdsRoutingLbConfig::Route route; std::vector route_errors = - ParseRoute(array[i], action_map, &route, &action_to_be_used); + ParseRoute(array[i], action_map, &route, &actions_to_be_used); if (!route_errors.empty()) { // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // string is not static in this case. @@ -680,7 +685,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { "default route must not contain service or method"); error_list.push_back(error); } - if (!action_to_be_used.empty()) { + if (!actions_to_be_used.empty()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "some actions were not referenced by any route")); } @@ -760,7 +765,7 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { static std::vector ParseRoute( const Json& json, const XdsRoutingLbConfig::ActionMap& action_map, XdsRoutingLbConfig::Route* route, - std::set* action_to_be_used) { + std::set* actions_to_be_used) { std::vector error_list; if (json.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -790,15 +795,19 @@ class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { "field:action error:should be of type string")); } else { route->action = it->second.string_value(); - // Validate action exists and mark it as used. - if (!route->action.empty() && - action_map.find(route->action) == action_map.end()) { + if (route->action.empty()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - absl::StrCat("field:action error:", route->action, - " does not exist") - .c_str())); + "field:action error:cannot be empty")); + } else { + // Validate action exists and mark it as used. + if (action_map.find(route->action) == action_map.end()) { + error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + absl::StrCat("field:action error:", route->action, + " does not exist") + .c_str())); + } + actions_to_be_used->erase(route->action); } - action_to_be_used->erase(route->action); } return error_list; } diff --git a/src/core/ext/filters/client_channel/xds/xds_api.cc b/src/core/ext/filters/client_channel/xds/xds_api.cc index d74b2ab694f..855d34a780d 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.cc +++ b/src/core/ext/filters/client_channel/xds/xds_api.cc @@ -1013,11 +1013,9 @@ grpc_error* RouteConfigParse( return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "No route found in the virtual host."); } - // If xds_routing is not configured, only look at the last one in the route // list (the default route) - size_t start_index = size - 1; - if (xds_routing_enabled) start_index = 0; + size_t start_index = xds_routing_enabled ? 0 : size - 1; for (size_t i = start_index; i < size; ++i) { const envoy_api_v2_route_Route* route = routes[i]; const envoy_api_v2_route_RouteMatch* match = @@ -1064,6 +1062,10 @@ grpc_error* RouteConfigParse( } const upb_strview action = envoy_api_v2_route_RouteAction_cluster(route_action); + if (action.size == 0) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "RouteAction has empty cluster."); + } rds_route.cluster_name = std::string(action.data, action.size); rds_update->routes.emplace_back(std::move(rds_route)); } diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index 33e9d17c519..2f0657496da 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -903,7 +903,7 @@ void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate( ? lds_update->route_config_name.c_str() : "")); if (lds_update->rds_update.has_value()) { - gpr_log(GPR_INFO, " RouteConfiguration contains %lu routes", this, + gpr_log(GPR_INFO, " RouteConfiguration contains %lu routes", lds_update->rds_update.value().routes.size()); for (const auto& route : lds_update->rds_update.value().routes) { gpr_log(GPR_INFO, @@ -1819,8 +1819,8 @@ grpc_millis GetRequestTimeout(const grpc_channel_args& args) { } bool GetXdsRoutingEnabled(const grpc_channel_args& args) { - return grpc_channel_args_find_integer(&args, GRPC_ARG_XDS_ROUTING_ENABLED, - {0, 0, 1}); + return grpc_channel_args_find_bool(&args, GRPC_ARG_XDS_ROUTING_ENABLED, + false); } } // namespace diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index b8dbf5b4db5..bb6be0c4f02 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -38,7 +38,7 @@ grpc_cc_library( grpc_cc_library( name = "test_multiple_service_impl", testonly = True, - hdrs = ["test_multiple_service_impl.h"], + hdrs = ["test_service_impl.h"], external_deps = [ "gtest", ], diff --git a/test/cpp/end2end/test_multiple_service_impl.h b/test/cpp/end2end/test_multiple_service_impl.h deleted file mode 100644 index 2b06117774c..00000000000 --- a/test/cpp/end2end/test_multiple_service_impl.h +++ /dev/null @@ -1,495 +0,0 @@ -/* - * - * Copyright 2016 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_TEST_CPP_END2END_TEST_MULTIPLE_SERVICE_IMPL_H -#define GRPC_TEST_CPP_END2END_TEST_MULTIPLE_SERVICE_IMPL_H - -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/cpp/util/string_ref_helper.h" - -using std::chrono::system_clock; - -namespace grpc { -namespace testing { - -const int kServerDefaultResponseStreamsToSend = 3; -const char* const kServerResponseStreamsToSend = "server_responses_to_send"; -const char* const kServerTryCancelRequest = "server_try_cancel"; -const char* const kDebugInfoTrailerKey = "debug-info-bin"; -const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; -const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; -const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; -const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; - -typedef enum { - DO_NOT_CANCEL = 0, - CANCEL_BEFORE_PROCESSING, - CANCEL_DURING_PROCESSING, - CANCEL_AFTER_PROCESSING -} ServerTryCancelRequestPhase; - -namespace { - -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(experimental::ServerContextBase* context, - const EchoRequest* request, EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -void CheckServerAuthContext( - const experimental::ServerContextBase* context, - const grpc::string& expected_transport_security_type, - const grpc::string& expected_client_identity) { - std::shared_ptr auth_ctx = context->auth_context(); - std::vector tst = - auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, tst.size()); - EXPECT_EQ(expected_transport_security_type, ToString(tst[0])); - if (expected_client_identity.empty()) { - EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); - EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); - EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); - } else { - auto identity = auth_ctx->GetPeerIdentity(); - EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); - EXPECT_EQ(1u, identity.size()); - EXPECT_EQ(expected_client_identity, identity[0]); - } -} - -// Returns the number of pairs in metadata that exactly match the given -// key-value pair. Returns -1 if the pair wasn't found. -int MetadataMatchCount( - const std::multimap& metadata, - const grpc::string& key, const grpc::string& value) { - int count = 0; - for (const auto& metadatum : metadata) { - if (ToString(metadatum.first) == key && - ToString(metadatum.second) == value) { - count++; - } - } - return count; -} -} // namespace - -namespace { -int GetIntValueFromMetadataHelper( - const char* key, - const std::multimap& metadata, - int default_value) { - if (metadata.find(key) != metadata.end()) { - std::istringstream iss(ToString(metadata.find(key)->second)); - iss >> default_value; - gpr_log(GPR_INFO, "%s : %d", key, default_value); - } - - return default_value; -} - -int GetIntValueFromMetadata( - const char* key, - const std::multimap& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); -} - -void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - // Now wait until it's really canceled - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN))); - } -} - -} // namespace - -class TestMultipleServiceSignaller { - public: - void ClientWaitUntilRpcStarted() { - std::unique_lock lock(mu_); - cv_rpc_started_.wait(lock, [this] { return rpc_started_; }); - } - void ServerWaitToContinue() { - std::unique_lock lock(mu_); - cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); - } - void SignalClientThatRpcStarted() { - std::unique_lock lock(mu_); - rpc_started_ = true; - cv_rpc_started_.notify_one(); - } - void SignalServerToContinue() { - std::unique_lock lock(mu_); - server_should_continue_ = true; - cv_server_continue_.notify_one(); - } - - private: - std::mutex mu_; - std::condition_variable cv_rpc_started_; - bool rpc_started_ /* GUARDED_BY(mu_) */ = false; - std::condition_variable cv_server_continue_; - bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; -}; - -template -class TestMultipleServiceImpl : public RpcService { - public: - TestMultipleServiceImpl() : signal_client_(false), host_() {} - explicit TestMultipleServiceImpl(const grpc::string& host) - : signal_client_(false), host_(new grpc::string(host)) {} - - Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && - request->param().server_notify_client_when_started()) { - signaller_.SignalClientThatRpcStarted(); - signaller_.ServerWaitToContinue(); - } - - // A bit of sleep to make sure that short deadline tests fail - if (request->has_param() && request->param().server_sleep_us() > 0) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_micros(request->param().server_sleep_us(), - GPR_TIMESPAN))); - } - - if (request->has_param() && request->param().server_die()) { - gpr_log(GPR_ERROR, "The request should not reach application handler."); - GPR_ASSERT(0); - } - if (request->has_param() && request->param().has_expected_error()) { - const auto& error = request->param().expected_error(); - return Status(static_cast(error.code()), - error.error_message(), error.binary_error_details()); - } - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - if (server_try_cancel > DO_NOT_CANCEL) { - // Since this is a unary RPC, by the time this server handler is called, - // the 'request' message is already read from the client. So the scenarios - // in server_try_cancel don't make much sense. Just cancel the RPC as long - // as server_try_cancel is not DO_NOT_CANCEL - ServerTryCancel(context); - return Status::CANCELLED; - } - - response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (host_) { - response->mutable_param()->set_host(*host_); - } - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else if (!request->has_param() || - !request->param().skip_cancelled_check()) { - EXPECT_FALSE(context->IsCancelled()); - } - - if (request->has_param() && request->param().echo_metadata_initially()) { - const std::multimap& client_metadata = - context->client_metadata(); - for (const auto& metadatum : client_metadata) { - context->AddInitialMetadata(ToString(metadatum.first), - ToString(metadatum.second)); - } - } - - if (request->has_param() && request->param().echo_metadata()) { - const std::multimap& client_metadata = - context->client_metadata(); - for (const auto& metadatum : client_metadata) { - context->AddTrailingMetadata(ToString(metadatum.first), - ToString(metadatum.second)); - } - // Terminate rpc with error and debug info in trailer. - if (request->param().debug_info().stack_entries_size() || - !request->param().debug_info().detail().empty()) { - grpc::string serialized_debug_info = - request->param().debug_info().SerializeAsString(); - context->AddTrailingMetadata(kDebugInfoTrailerKey, - serialized_debug_info); - return Status::CANCELLED; - } - } - if (request->has_param() && - (request->param().expected_client_identity().length() > 0 || - request->param().check_auth_context())) { - CheckServerAuthContext( - context, request->param().expected_transport_security_type(), - request->param().expected_client_identity()); - } - if (request->has_param() && - request->param().response_message_length() > 0) { - response->set_message( - grpc::string(request->param().response_message_length(), '\0')); - } - if (request->has_param() && request->param().echo_peer()) { - response->mutable_param()->set_peer(context->peer()); - } - return Status::OK; - } - - Status Echo1(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - return Echo(context, request, response); - } - - Status Echo2(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - return Echo(context, request, response); - } - - Status CheckClientInitialMetadata(ServerContext* context, - const SimpleRequest* /*request*/, - SimpleResponse* /*response*/) { - EXPECT_EQ(MetadataMatchCount(context->client_metadata(), - kCheckClientInitialMetadataKey, - kCheckClientInitialMetadataVal), - 1); - EXPECT_EQ(1u, - context->client_metadata().count(kCheckClientInitialMetadataKey)); - return Status::OK; - } - - // Unimplemented is left unimplemented to test the returned error. - Status RequestStream(ServerContext* context, - ServerReader* reader, - EchoResponse* response) { - // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by - // the server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads - // any message from the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading messages from the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads - // all the messages from the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - EchoRequest request; - response->set_message(""); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - int num_msgs_read = 0; - while (reader->Read(&request)) { - response->mutable_message()->append(request.message()); - } - gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - // Return 'kNumResponseStreamMsgs' messages. - // TODO(yangg) make it generic by adding a parameter into EchoRequest - Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter* writer) { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes - // any messages to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // writing messages to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes - // all the messages to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - int server_coalescing_api = GetIntValueFromMetadata( - kServerUseCoalescingApi, context->client_metadata(), 0); - - int server_responses_to_send = GetIntValueFromMetadata( - kServerResponseStreamsToSend, context->client_metadata(), - kServerDefaultResponseStreamsToSend); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - EchoResponse response; - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - for (int i = 0; i < server_responses_to_send; i++) { - response.set_message(request->message() + grpc::to_string(i)); - if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { - writer->WriteLast(response, WriteOptions()); - } else { - writer->Write(response); - } - } - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - Status BidiStream(ServerContext* context, - ServerReaderWriter* stream) { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ - // writes any messages from/to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading/writing messages from/to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server - // reads/writes all messages from/to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - EchoRequest request; - EchoResponse response; - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - // kServerFinishAfterNReads suggests after how many reads, the server should - // write the last message and send status (coalesced using WriteLast) - int server_write_last = GetIntValueFromMetadata( - kServerFinishAfterNReads, context->client_metadata(), 0); - - int read_counts = 0; - while (stream->Read(&request)) { - read_counts++; - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - if (read_counts == server_write_last) { - stream->WriteLast(response, WriteOptions()); - } else { - stream->Write(response); - } - } - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; - } - - // Unimplemented is left unimplemented to test the returned error. - bool signal_client() { - std::unique_lock lock(mu_); - return signal_client_; - } - void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } - void SignalServerToContinue() { signaller_.SignalServerToContinue(); } - - private: - bool signal_client_; - std::mutex mu_; - TestMultipleServiceSignaller signaller_; - std::unique_ptr host_; -}; - -} // namespace testing -} // namespace grpc - -#endif // GRPC_TEST_CPP_END2END_TEST_MULTIPLE_SERVICE_IMPL_H diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index ad1592bf7d8..6517c5d6cc1 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -36,88 +36,6 @@ namespace grpc { namespace testing { namespace { -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(experimental::ServerContextBase* context, - const EchoRequest* request, EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -void CheckServerAuthContext( - const experimental::ServerContextBase* context, - const grpc::string& expected_transport_security_type, - const grpc::string& expected_client_identity) { - std::shared_ptr auth_ctx = context->auth_context(); - std::vector tst = - auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, tst.size()); - EXPECT_EQ(expected_transport_security_type, ToString(tst[0])); - if (expected_client_identity.empty()) { - EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); - EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); - EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); - } else { - auto identity = auth_ctx->GetPeerIdentity(); - EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); - EXPECT_EQ(1u, identity.size()); - EXPECT_EQ(expected_client_identity, identity[0]); - } -} - -// Returns the number of pairs in metadata that exactly match the given -// key-value pair. Returns -1 if the pair wasn't found. -int MetadataMatchCount( - const std::multimap& metadata, - const grpc::string& key, const grpc::string& value) { - int count = 0; - for (const auto& metadatum : metadata) { - if (ToString(metadatum.first) == key && - ToString(metadatum.second) == value) { - count++; - } - } - return count; -} -} // namespace - -namespace { -int GetIntValueFromMetadataHelper( - const char* key, - const std::multimap& metadata, - int default_value) { - if (metadata.find(key) != metadata.end()) { - std::istringstream iss(ToString(metadata.find(key)->second)); - iss >> default_value; - gpr_log(GPR_INFO, "%s : %d", key, default_value); - } - - return default_value; -} - -int GetIntValueFromMetadata( - const char* key, - const std::multimap& metadata, - int default_value) { - return GetIntValueFromMetadataHelper(key, metadata, default_value); -} - -void ServerTryCancel(ServerContext* context) { - EXPECT_FALSE(context->IsCancelled()); - context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - // Now wait until it's really canceled - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000, GPR_TIMESPAN))); - } -} - void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { EXPECT_FALSE(context->IsCancelled()); context->TryCancel(); @@ -127,304 +45,6 @@ void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { } // namespace -Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && - request->param().server_notify_client_when_started()) { - signaller_.SignalClientThatRpcStarted(); - signaller_.ServerWaitToContinue(); - } - - // A bit of sleep to make sure that short deadline tests fail - if (request->has_param() && request->param().server_sleep_us() > 0) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_micros(request->param().server_sleep_us(), - GPR_TIMESPAN))); - } - - if (request->has_param() && request->param().server_die()) { - gpr_log(GPR_ERROR, "The request should not reach application handler."); - GPR_ASSERT(0); - } - if (request->has_param() && request->param().has_expected_error()) { - const auto& error = request->param().expected_error(); - return Status(static_cast(error.code()), error.error_message(), - error.binary_error_details()); - } - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - if (server_try_cancel > DO_NOT_CANCEL) { - // Since this is a unary RPC, by the time this server handler is called, - // the 'request' message is already read from the client. So the scenarios - // in server_try_cancel don't make much sense. Just cancel the RPC as long - // as server_try_cancel is not DO_NOT_CANCEL - ServerTryCancel(context); - return Status::CANCELLED; - } - - response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (host_) { - response->mutable_param()->set_host(*host_); - } - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else if (!request->has_param() || - !request->param().skip_cancelled_check()) { - EXPECT_FALSE(context->IsCancelled()); - } - - if (request->has_param() && request->param().echo_metadata_initially()) { - const std::multimap& client_metadata = - context->client_metadata(); - for (const auto& metadatum : client_metadata) { - context->AddInitialMetadata(ToString(metadatum.first), - ToString(metadatum.second)); - } - } - - if (request->has_param() && request->param().echo_metadata()) { - const std::multimap& client_metadata = - context->client_metadata(); - for (const auto& metadatum : client_metadata) { - context->AddTrailingMetadata(ToString(metadatum.first), - ToString(metadatum.second)); - } - // Terminate rpc with error and debug info in trailer. - if (request->param().debug_info().stack_entries_size() || - !request->param().debug_info().detail().empty()) { - grpc::string serialized_debug_info = - request->param().debug_info().SerializeAsString(); - context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info); - return Status::CANCELLED; - } - } - if (request->has_param() && - (request->param().expected_client_identity().length() > 0 || - request->param().check_auth_context())) { - CheckServerAuthContext(context, - request->param().expected_transport_security_type(), - request->param().expected_client_identity()); - } - if (request->has_param() && request->param().response_message_length() > 0) { - response->set_message( - grpc::string(request->param().response_message_length(), '\0')); - } - if (request->has_param() && request->param().echo_peer()) { - response->mutable_param()->set_peer(context->peer()); - } - return Status::OK; -} - -Status TestServiceImpl::Echo1(ServerContext* context, - const EchoRequest* request, - EchoResponse* response) { - return Echo(context, request, response); -} - -Status TestServiceImpl::Echo2(ServerContext* context, - const EchoRequest* request, - EchoResponse* response) { - return Echo(context, request, response); -} - -Status TestServiceImpl::CheckClientInitialMetadata( - ServerContext* context, const SimpleRequest* /*request*/, - SimpleResponse* /*response*/) { - EXPECT_EQ(MetadataMatchCount(context->client_metadata(), - kCheckClientInitialMetadataKey, - kCheckClientInitialMetadataVal), - 1); - EXPECT_EQ(1u, - context->client_metadata().count(kCheckClientInitialMetadataKey)); - return Status::OK; -} - -// Unimplemented is left unimplemented to test the returned error. - -Status TestServiceImpl::RequestStream(ServerContext* context, - ServerReader* reader, - EchoResponse* response) { - // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by - // the server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads - // any message from the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading messages from the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads - // all the messages from the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - EchoRequest request; - response->set_message(""); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - int num_msgs_read = 0; - while (reader->Read(&request)) { - response->mutable_message()->append(request.message()); - } - gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; -} - -// Return 'kNumResponseStreamMsgs' messages. -// TODO(yangg) make it generic by adding a parameter into EchoRequest -Status TestServiceImpl::ResponseStream(ServerContext* context, - const EchoRequest* request, - ServerWriter* writer) { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes - // any messages to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // writing messages to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes - // all the messages to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - int server_coalescing_api = GetIntValueFromMetadata( - kServerUseCoalescingApi, context->client_metadata(), 0); - - int server_responses_to_send = GetIntValueFromMetadata( - kServerResponseStreamsToSend, context->client_metadata(), - kServerDefaultResponseStreamsToSend); - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - EchoResponse response; - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - for (int i = 0; i < server_responses_to_send; i++) { - response.set_message(request->message() + grpc::to_string(i)); - if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { - writer->WriteLast(response, WriteOptions()); - } else { - writer->Write(response); - } - } - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; -} - -Status TestServiceImpl::BidiStream( - ServerContext* context, - ServerReaderWriter* stream) { - // If server_try_cancel is set in the metadata, the RPC is cancelled by the - // server by calling ServerContext::TryCancel() depending on the value: - // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ - // writes any messages from/to the client - // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is - // reading/writing messages from/to the client - // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server - // reads/writes all messages from/to the client - int server_try_cancel = GetIntValueFromMetadata( - kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - - EchoRequest request; - EchoResponse response; - - if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - std::thread* server_try_cancel_thd = nullptr; - if (server_try_cancel == CANCEL_DURING_PROCESSING) { - server_try_cancel_thd = - new std::thread([context] { ServerTryCancel(context); }); - } - - // kServerFinishAfterNReads suggests after how many reads, the server should - // write the last message and send status (coalesced using WriteLast) - int server_write_last = GetIntValueFromMetadata( - kServerFinishAfterNReads, context->client_metadata(), 0); - - int read_counts = 0; - while (stream->Read(&request)) { - read_counts++; - gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); - response.set_message(request.message()); - if (read_counts == server_write_last) { - stream->WriteLast(response, WriteOptions()); - } else { - stream->Write(response); - } - } - - if (server_try_cancel_thd != nullptr) { - server_try_cancel_thd->join(); - delete server_try_cancel_thd; - return Status::CANCELLED; - } - - if (server_try_cancel == CANCEL_AFTER_PROCESSING) { - ServerTryCancel(context); - return Status::CANCELLED; - } - - return Status::OK; -} - experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( experimental::CallbackServerContext* context, const EchoRequest* request, EchoResponse* response) { diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index e41359f9655..905f2cbea89 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -15,6 +15,7 @@ * limitations under the License. * */ + #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H @@ -23,9 +24,19 @@ #include #include +#include +#include +#include #include +#include + +#include +#include #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/cpp/util/string_ref_helper.h" + +using std::chrono::system_clock; namespace grpc { namespace testing { @@ -46,6 +57,92 @@ typedef enum { CANCEL_AFTER_PROCESSING } ServerTryCancelRequestPhase; +namespace { + +// When echo_deadline is requested, deadline seen in the ServerContext is set in +// the response in seconds. +void MaybeEchoDeadline(experimental::ServerContextBase* context, + const EchoRequest* request, EchoResponse* response) { + if (request->has_param() && request->param().echo_deadline()) { + gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + if (context->deadline() != system_clock::time_point::max()) { + Timepoint2Timespec(context->deadline(), &deadline); + } + response->mutable_param()->set_request_deadline(deadline.tv_sec); + } +} + +void CheckServerAuthContext( + const experimental::ServerContextBase* context, + const grpc::string& expected_transport_security_type, + const grpc::string& expected_client_identity) { + std::shared_ptr auth_ctx = context->auth_context(); + std::vector tst = + auth_ctx->FindPropertyValues("transport_security_type"); + EXPECT_EQ(1u, tst.size()); + EXPECT_EQ(expected_transport_security_type, ToString(tst[0])); + if (expected_client_identity.empty()) { + EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty()); + EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty()); + EXPECT_FALSE(auth_ctx->IsPeerAuthenticated()); + } else { + auto identity = auth_ctx->GetPeerIdentity(); + EXPECT_TRUE(auth_ctx->IsPeerAuthenticated()); + EXPECT_EQ(1u, identity.size()); + EXPECT_EQ(expected_client_identity, identity[0]); + } +} + +// Returns the number of pairs in metadata that exactly match the given +// key-value pair. Returns -1 if the pair wasn't found. +int MetadataMatchCount( + const std::multimap& metadata, + const grpc::string& key, const grpc::string& value) { + int count = 0; + for (const auto& metadatum : metadata) { + if (ToString(metadatum.first) == key && + ToString(metadatum.second) == value) { + count++; + } + } + return count; +} +} // namespace + +namespace { +int GetIntValueFromMetadataHelper( + const char* key, + const std::multimap& metadata, + int default_value) { + if (metadata.find(key) != metadata.end()) { + std::istringstream iss(ToString(metadata.find(key)->second)); + iss >> default_value; + gpr_log(GPR_INFO, "%s : %d", key, default_value); + } + + return default_value; +} + +int GetIntValueFromMetadata( + const char* key, + const std::multimap& metadata, + int default_value) { + return GetIntValueFromMetadataHelper(key, metadata, default_value); +} + +void ServerTryCancel(ServerContext* context) { + EXPECT_FALSE(context->IsCancelled()); + context->TryCancel(); + gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + // Now wait until it's really canceled + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000, GPR_TIMESPAN))); + } +} + +} // namespace + class TestServiceSignaller { public: void ClientWaitUntilRpcStarted() { @@ -75,38 +172,309 @@ class TestServiceSignaller { bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; }; -class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { +template +class TestMultipleServiceImpl : public RpcService { public: - TestServiceImpl() : signal_client_(false), host_() {} - explicit TestServiceImpl(const grpc::string& host) + TestMultipleServiceImpl() : signal_client_(false), host_() {} + explicit TestMultipleServiceImpl(const grpc::string& host) : signal_client_(false), host_(new grpc::string(host)) {} Status Echo(ServerContext* context, const EchoRequest* request, - EchoResponse* response) override; + EchoResponse* response) { + if (request->has_param() && + request->param().server_notify_client_when_started()) { + signaller_.SignalClientThatRpcStarted(); + signaller_.ServerWaitToContinue(); + } + + // A bit of sleep to make sure that short deadline tests fail + if (request->has_param() && request->param().server_sleep_us() > 0) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(request->param().server_sleep_us(), + GPR_TIMESPAN))); + } + + if (request->has_param() && request->param().server_die()) { + gpr_log(GPR_ERROR, "The request should not reach application handler."); + GPR_ASSERT(0); + } + if (request->has_param() && request->param().has_expected_error()) { + const auto& error = request->param().expected_error(); + return Status(static_cast(error.code()), + error.error_message(), error.binary_error_details()); + } + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + if (server_try_cancel > DO_NOT_CANCEL) { + // Since this is a unary RPC, by the time this server handler is called, + // the 'request' message is already read from the client. So the scenarios + // in server_try_cancel don't make much sense. Just cancel the RPC as long + // as server_try_cancel is not DO_NOT_CANCEL + ServerTryCancel(context); + return Status::CANCELLED; + } + + response->set_message(request->message()); + MaybeEchoDeadline(context, request, response); + if (host_) { + response->mutable_param()->set_host(*host_); + } + if (request->has_param() && request->param().client_cancel_after_us()) { + { + std::unique_lock lock(mu_); + signal_client_ = true; + } + while (!context->IsCancelled()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().client_cancel_after_us(), + GPR_TIMESPAN))); + } + return Status::CANCELLED; + } else if (request->has_param() && + request->param().server_cancel_after_us()) { + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(request->param().server_cancel_after_us(), + GPR_TIMESPAN))); + return Status::CANCELLED; + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { + EXPECT_FALSE(context->IsCancelled()); + } + + if (request->has_param() && request->param().echo_metadata_initially()) { + const std::multimap& client_metadata = + context->client_metadata(); + for (const auto& metadatum : client_metadata) { + context->AddInitialMetadata(ToString(metadatum.first), + ToString(metadatum.second)); + } + } + + if (request->has_param() && request->param().echo_metadata()) { + const std::multimap& client_metadata = + context->client_metadata(); + for (const auto& metadatum : client_metadata) { + context->AddTrailingMetadata(ToString(metadatum.first), + ToString(metadatum.second)); + } + // Terminate rpc with error and debug info in trailer. + if (request->param().debug_info().stack_entries_size() || + !request->param().debug_info().detail().empty()) { + grpc::string serialized_debug_info = + request->param().debug_info().SerializeAsString(); + context->AddTrailingMetadata(kDebugInfoTrailerKey, + serialized_debug_info); + return Status::CANCELLED; + } + } + if (request->has_param() && + (request->param().expected_client_identity().length() > 0 || + request->param().check_auth_context())) { + CheckServerAuthContext( + context, request->param().expected_transport_security_type(), + request->param().expected_client_identity()); + } + if (request->has_param() && + request->param().response_message_length() > 0) { + response->set_message( + grpc::string(request->param().response_message_length(), '\0')); + } + if (request->has_param() && request->param().echo_peer()) { + response->mutable_param()->set_peer(context->peer()); + } + return Status::OK; + } Status Echo1(ServerContext* context, const EchoRequest* request, - EchoResponse* response) override; + EchoResponse* response) { + return Echo(context, request, response); + } Status Echo2(ServerContext* context, const EchoRequest* request, - EchoResponse* response) override; + EchoResponse* response) { + return Echo(context, request, response); + } Status CheckClientInitialMetadata(ServerContext* context, - const SimpleRequest* request, - SimpleResponse* response) override; + const SimpleRequest* /*request*/, + SimpleResponse* /*response*/) { + EXPECT_EQ(MetadataMatchCount(context->client_metadata(), + kCheckClientInitialMetadataKey, + kCheckClientInitialMetadataVal), + 1); + EXPECT_EQ(1u, + context->client_metadata().count(kCheckClientInitialMetadataKey)); + return Status::OK; + } // Unimplemented is left unimplemented to test the returned error. - Status RequestStream(ServerContext* context, ServerReader* reader, - EchoResponse* response) override; + EchoResponse* response) { + // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by + // the server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads + // any message from the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading messages from the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads + // all the messages from the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + EchoRequest request; + response->set_message(""); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = nullptr; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread([context] { ServerTryCancel(context); }); + } + + int num_msgs_read = 0; + while (reader->Read(&request)) { + response->mutable_message()->append(request.message()); + } + gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); + + if (server_try_cancel_thd != nullptr) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + return Status::OK; + } + + // Return 'kNumResponseStreamMsgs' messages. + // TODO(yangg) make it generic by adding a parameter into EchoRequest Status ResponseStream(ServerContext* context, const EchoRequest* request, - ServerWriter* writer) override; + ServerWriter* writer) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes + // any messages to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // writing messages to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes + // all the messages to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); - Status BidiStream( - ServerContext* context, - ServerReaderWriter* stream) override; + int server_coalescing_api = GetIntValueFromMetadata( + kServerUseCoalescingApi, context->client_metadata(), 0); + int server_responses_to_send = GetIntValueFromMetadata( + kServerResponseStreamsToSend, context->client_metadata(), + kServerDefaultResponseStreamsToSend); + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + EchoResponse response; + std::thread* server_try_cancel_thd = nullptr; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread([context] { ServerTryCancel(context); }); + } + + for (int i = 0; i < server_responses_to_send; i++) { + response.set_message(request->message() + grpc::to_string(i)); + if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { + writer->WriteLast(response, WriteOptions()); + } else { + writer->Write(response); + } + } + + if (server_try_cancel_thd != nullptr) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + return Status::OK; + } + + Status BidiStream(ServerContext* context, + ServerReaderWriter* stream) { + // If server_try_cancel is set in the metadata, the RPC is cancelled by the + // server by calling ServerContext::TryCancel() depending on the value: + // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ + // writes any messages from/to the client + // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is + // reading/writing messages from/to the client + // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server + // reads/writes all messages from/to the client + int server_try_cancel = GetIntValueFromMetadata( + kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); + + EchoRequest request; + EchoResponse response; + + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + std::thread* server_try_cancel_thd = nullptr; + if (server_try_cancel == CANCEL_DURING_PROCESSING) { + server_try_cancel_thd = + new std::thread([context] { ServerTryCancel(context); }); + } + + // kServerFinishAfterNReads suggests after how many reads, the server should + // write the last message and send status (coalesced using WriteLast) + int server_write_last = GetIntValueFromMetadata( + kServerFinishAfterNReads, context->client_metadata(), 0); + + int read_counts = 0; + while (stream->Read(&request)) { + read_counts++; + gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); + response.set_message(request.message()); + if (read_counts == server_write_last) { + stream->WriteLast(response, WriteOptions()); + } else { + stream->Write(response); + } + } + + if (server_try_cancel_thd != nullptr) { + server_try_cancel_thd->join(); + delete server_try_cancel_thd; + return Status::CANCELLED; + } + + if (server_try_cancel == CANCEL_AFTER_PROCESSING) { + ServerTryCancel(context); + return Status::CANCELLED; + } + + return Status::OK; + } + + // Unimplemented is left unimplemented to test the returned error. bool signal_client() { std::unique_lock lock(mu_); return signal_client_; @@ -162,6 +530,9 @@ class CallbackTestServiceImpl std::unique_ptr host_; }; +using TestServiceImpl = + TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>; + } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 2665f9e9082..2e9d2ae501a 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -57,7 +57,7 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -#include "test/cpp/end2end/test_multiple_service_impl.h" +#include "test/cpp/end2end/test_service_impl.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h" @@ -2316,6 +2316,7 @@ TEST_P(LdsTest, XdsRoutingPathMatching) { Listener listener = balancers_[0]->ads_service()->BuildListener(new_route_config); balancers_[0]->ads_service()->SetLdsResource(listener, kDefaultResourceName); + WaitForAllBackends(0, 2); CheckRpcSendOk(kNumRpcs, 1000, true); CheckEcho1RpcSendOk(kNumRpcs, 1000, true); CheckEcho2RpcSendOk(kNumRpcs, 1000, true);