[test interop] Add HookService to the maintenence server (#34413)

This pull request adds another hook service on the maintenance server.
This will enable clients to gradually migrate from the standalone hook
server.

Changes:
1. Hook service can now be used separately.
2. Copied latest protos and updated the hook service to new API.
3. Added the hook service to the maintenance server.
pull/34418/head
Eugene Ostroukhov 1 year ago committed by GitHub
parent fc159a6901
commit 490f6a3ee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/proto/grpc/testing/messages.proto
  2. 9
      src/proto/grpc/testing/test.proto
  3. 146
      test/cpp/interop/pre_stop_hook_server.cc
  4. 37
      test/cpp/interop/pre_stop_hook_server.h
  5. 119
      test/cpp/interop/pre_stop_hook_server_test.cc
  6. 6
      test/cpp/interop/xds_interop_server.cc
  7. 72
      test/cpp/interop/xds_interop_server_lib.cc
  8. 3
      test/cpp/interop/xds_interop_server_lib.h
  9. 41
      test/cpp/interop/xds_interop_server_test.cc

@ -319,6 +319,12 @@ message TestOrcaReport {
map<string, double> utilization = 4;
}
// Status that will be return to callers of the Hook method
message SetReturnStatusRequest {
int32 grpc_code_to_return = 1;
string grpc_status_description = 2;
}
message HookRequest {
enum HookRequestCommand {
// Default value

@ -91,10 +91,15 @@ service LoadBalancerStatsService {
returns (LoadBalancerAccumulatedStatsResponse) {}
}
// Hook service that may be started on request by calling XdsUpdateHealthService
// with HookRequestCommand::START
// Hook service. Used to keep Kubernetes from shutting the pod down.
service HookService {
// Sends a request that will "hang" until the return status is set by a call
// to a SetReturnStatus
rpc Hook(grpc.testing.Empty) returns (grpc.testing.Empty);
// Sets a return status for pending and upcoming calls to Hook
rpc SetReturnStatus(SetReturnStatusRequest) returns (grpc.testing.Empty);
// Clears the return status. Incoming calls to Hook will "hang"
rpc ClearReturnStatus(grpc.testing.Empty) returns (grpc.testing.Empty);
}
// A service to remotely control health status of an xDS test server.

@ -24,75 +24,14 @@
#include <grpcpp/grpcpp.h>
#include "src/proto/grpc/testing/test.grpc.pb.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/messages.pb.h"
namespace grpc {
namespace testing {
namespace {
class HookServiceImpl final : public HookService::CallbackService {
public:
ServerUnaryReactor* Hook(CallbackServerContext* context,
const Empty* /* request */,
Empty* /* reply */) override {
auto reactor = context->DefaultReactor();
MatchRequestsAndStatuses(reactor, absl::nullopt);
return reactor;
}
void SetReturnStatus(const Status& status) {
MatchRequestsAndStatuses(absl::nullopt, status);
}
bool TestOnlyExpectRequests(size_t expected_requests_count,
const absl::Duration& timeout) {
grpc_core::MutexLock lock(&mu_);
auto deadline = absl::Now() + timeout;
while (pending_requests_.size() < expected_requests_count &&
!request_var_.WaitWithDeadline(&mu_, deadline)) {
}
return pending_requests_.size() >= expected_requests_count;
}
void Stop() {
{
grpc_core::MutexLock lock(&mu_);
done_ = true;
}
MatchRequestsAndStatuses(absl::nullopt, absl::nullopt);
}
private:
void MatchRequestsAndStatuses(absl::optional<ServerUnaryReactor*> new_request,
absl::optional<Status> new_status) {
grpc_core::MutexLock lock(&mu_);
if (new_request.has_value()) {
pending_requests_.push_back(*new_request);
}
if (new_status.has_value()) {
pending_statuses_.push_back(std::move(*new_status));
}
while (!pending_requests_.empty() && !pending_statuses_.empty()) {
pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
pending_requests_.erase(pending_requests_.begin());
pending_statuses_.erase(pending_statuses_.begin());
}
while (!pending_requests_.empty() && done_) {
pending_requests_.front()->Finish(
Status(StatusCode::ABORTED, "Shutting down"));
pending_requests_.erase(pending_requests_.begin());
}
request_var_.SignalAll();
}
grpc_core::Mutex mu_;
grpc_core::CondVar request_var_ ABSL_GUARDED_BY(&mu_);
std::vector<ServerUnaryReactor*> pending_requests_ ABSL_GUARDED_BY(&mu_);
std::vector<Status> pending_statuses_ ABSL_GUARDED_BY(&mu_);
bool done_ ABSL_GUARDED_BY(&mu_) = false;
};
enum class State { kNew, kWaiting, kDone, kShuttingDown };
enum class State : std::uint8_t { kNew, kWaiting, kDone, kShuttingDown };
std::unique_ptr<Server> BuildHookServer(HookServiceImpl* service, int port) {
ServerBuilder builder;
@ -132,7 +71,7 @@ class PreStopHookServer {
}
void SetReturnStatus(const Status& status) {
hook_service_.SetReturnStatus(status);
hook_service_.AddReturnStatus(status);
}
bool TestOnlyExpectRequests(size_t expected_requests_count,
@ -200,5 +139,82 @@ void PreStopHookServerManager::PreStopHookServerDeleter::operator()(
delete server;
}
//
// HookServiceImpl
//
ServerUnaryReactor* HookServiceImpl::Hook(CallbackServerContext* context,
const Empty* /* request */,
Empty* /* reply */) {
auto reactor = context->DefaultReactor();
grpc_core::MutexLock lock(&mu_);
pending_requests_.emplace_back(reactor);
MatchRequestsAndStatuses();
return reactor;
}
ServerUnaryReactor* HookServiceImpl::SetReturnStatus(
CallbackServerContext* context, const SetReturnStatusRequest* request,
Empty* /* reply */) {
auto reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
grpc_core::MutexLock lock(&mu_);
respond_all_status_.emplace(
static_cast<StatusCode>(request->grpc_code_to_return()),
request->grpc_status_description());
MatchRequestsAndStatuses();
return reactor;
}
ServerUnaryReactor* HookServiceImpl::ClearReturnStatus(
CallbackServerContext* context, const Empty* /* request */,
Empty* /* reply */) {
auto reactor = context->DefaultReactor();
reactor->Finish(Status::OK);
grpc_core::MutexLock lock(&mu_);
respond_all_status_.reset();
MatchRequestsAndStatuses();
return reactor;
}
void HookServiceImpl::AddReturnStatus(const Status& status) {
grpc_core::MutexLock lock(&mu_);
pending_statuses_.push_back(status);
MatchRequestsAndStatuses();
}
bool HookServiceImpl::TestOnlyExpectRequests(size_t expected_requests_count,
const absl::Duration& timeout) {
grpc_core::MutexLock lock(&mu_);
auto deadline = absl::Now() + timeout;
while (pending_requests_.size() < expected_requests_count &&
!request_var_.WaitWithDeadline(&mu_, deadline)) {
}
return pending_requests_.size() >= expected_requests_count;
}
void HookServiceImpl::Stop() {
grpc_core::MutexLock lock(&mu_);
if (!respond_all_status_.has_value()) {
respond_all_status_.emplace(StatusCode::ABORTED, "Shutting down");
}
MatchRequestsAndStatuses();
}
void HookServiceImpl::MatchRequestsAndStatuses() {
while (!pending_requests_.empty() && !pending_statuses_.empty()) {
pending_requests_.front()->Finish(std::move(pending_statuses_.front()));
pending_requests_.erase(pending_requests_.begin());
pending_statuses_.erase(pending_statuses_.begin());
}
if (respond_all_status_.has_value()) {
for (const auto& request : pending_requests_) {
request->Finish(*respond_all_status_);
}
pending_requests_.clear();
}
request_var_.SignalAll();
}
} // namespace testing
} // namespace grpc

@ -21,15 +21,46 @@
#include <grpc/support/port_platform.h>
#include <thread>
#include <grpcpp/server.h>
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h"
namespace grpc {
namespace testing {
class HookServiceImpl final : public HookService::CallbackService {
public:
ServerUnaryReactor* Hook(CallbackServerContext* context,
const Empty* /* request */,
Empty* /* reply */) override;
ServerUnaryReactor* SetReturnStatus(CallbackServerContext* context,
const SetReturnStatusRequest* request,
Empty* /* reply */) override;
ServerUnaryReactor* ClearReturnStatus(CallbackServerContext* context,
const Empty* request,
Empty* /* reply */) override;
void AddReturnStatus(const Status& status);
bool TestOnlyExpectRequests(size_t expected_requests_count,
const absl::Duration& timeout);
void Stop();
private:
void MatchRequestsAndStatuses() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
grpc_core::Mutex mu_;
grpc_core::CondVar request_var_ ABSL_GUARDED_BY(&mu_);
std::vector<ServerUnaryReactor*> pending_requests_ ABSL_GUARDED_BY(&mu_);
std::vector<Status> pending_statuses_ ABSL_GUARDED_BY(&mu_);
absl::optional<Status> respond_all_status_ ABSL_GUARDED_BY(&mu_);
};
// Implementation of the pre-stop hook server. An instance is created to start
// a server and destroyed to stop one.
class PreStopHookServer;

@ -16,8 +16,7 @@
#include "test/cpp/interop/pre_stop_hook_server.h"
#include <map>
#include <memory>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@ -26,7 +25,11 @@
#include <grpc/grpc.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/status.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/empty.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
@ -44,9 +47,7 @@ struct CallInfo {
absl::optional<Status> WaitForStatus(
absl::Duration timeout = absl::Seconds(1)) {
grpc_core::MutexLock lock(&mu);
if (!status_.has_value()) {
cv.WaitWithTimeout(&mu, timeout);
}
cv.WaitWithTimeout(&mu, timeout);
return status_;
}
@ -62,7 +63,22 @@ struct CallInfo {
absl::optional<Status> status_;
};
TEST(PreStopHookServer, StartDoRequestStop) {
void ServerLoop(HookServiceImpl* service, int port, Server** server,
grpc_core::Mutex* mu, grpc_core::CondVar* condition) {
ServerBuilder builder;
builder.AddListeningPort(absl::StrFormat("0.0.0.0:%d", port),
grpc::InsecureServerCredentials());
builder.RegisterService(service);
auto s = builder.BuildAndStart();
{
grpc_core::MutexLock lock(mu);
*server = s.get();
condition->SignalAll();
}
s->Wait();
}
TEST(StandalonePreStopHookServer, StartDoRequestStop) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port, 15);
@ -73,8 +89,8 @@ TEST(PreStopHookServer, StartDoRequestStop) {
CallInfo info;
HookService::Stub stub(std::move(channel));
stub.async()->Hook(&info.context, &info.request, &info.response,
[&info](Status status) { info.SetStatus(status); });
ASSERT_EQ(server.TestOnlyExpectRequests(1), 1);
[&info](const Status& status) { info.SetStatus(status); });
ASSERT_TRUE(server.TestOnlyExpectRequests(1));
server.Return(StatusCode::INTERNAL, "Just a test");
auto status = info.WaitForStatus();
ASSERT_TRUE(status.has_value());
@ -82,7 +98,7 @@ TEST(PreStopHookServer, StartDoRequestStop) {
EXPECT_EQ(status->error_message(), "Just a test");
}
TEST(PreStopHookServer, StartServerWhileAlreadyRunning) {
TEST(StandalonePreStopHookServer, StartServerWhileAlreadyRunning) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status status = server.Start(port, 15);
@ -92,7 +108,7 @@ TEST(PreStopHookServer, StartServerWhileAlreadyRunning) {
<< status.error_message();
}
TEST(PreStopHookServer, StopServerWhileRequestPending) {
TEST(StandalonePreStopHookServer, StopServerWhileRequestPending) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port, 15);
@ -103,15 +119,15 @@ TEST(PreStopHookServer, StopServerWhileRequestPending) {
CallInfo info;
HookService::Stub stub(std::move(channel));
stub.async()->Hook(&info.context, &info.request, &info.response,
[&info](Status status) { info.SetStatus(status); });
ASSERT_EQ(server.TestOnlyExpectRequests(1), 1);
[&info](const Status& status) { info.SetStatus(status); });
ASSERT_TRUE(server.TestOnlyExpectRequests(1));
ASSERT_TRUE(server.Stop().ok());
auto status = info.WaitForStatus();
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), StatusCode::ABORTED);
}
TEST(PreStopHookServer, MultipleRequests) {
TEST(StandalonePreStopHookServer, MultipleRequests) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port, 15);
@ -123,16 +139,16 @@ TEST(PreStopHookServer, MultipleRequests) {
CallInfo info1, info2, info3;
server.Return(StatusCode::INTERNAL, "First");
stub.async()->Hook(&info1.context, &info1.request, &info1.response,
[&](Status status) { info1.SetStatus(status); });
[&](const Status& status) { info1.SetStatus(status); });
auto status = info1.WaitForStatus();
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), StatusCode::INTERNAL);
EXPECT_EQ(status->error_message(), "First");
stub.async()->Hook(&info2.context, &info2.request, &info2.response,
[&](Status status) { info2.SetStatus(status); });
ASSERT_EQ(server.TestOnlyExpectRequests(1, absl::Milliseconds(500)), 1);
[&](const Status& status) { info2.SetStatus(status); });
ASSERT_TRUE(server.TestOnlyExpectRequests(1, absl::Milliseconds(500)));
stub.async()->Hook(&info3.context, &info3.request, &info3.response,
[&](Status status) { info3.SetStatus(status); });
[&](const Status& status) { info3.SetStatus(status); });
server.Return(StatusCode::RESOURCE_EXHAUSTED, "Second");
server.Return(StatusCode::DEADLINE_EXCEEDED, "Third");
status = info2.WaitForStatus();
@ -145,14 +161,14 @@ TEST(PreStopHookServer, MultipleRequests) {
EXPECT_EQ(status->error_message(), "Third");
}
TEST(PreStopHookServer, StopServerThatNotStarted) {
TEST(StandalonePreStopHookServer, StopServerThatNotStarted) {
PreStopHookServerManager server;
Status status = server.Stop();
EXPECT_EQ(status.error_code(), StatusCode::UNAVAILABLE)
<< status.error_message();
}
TEST(PreStopHookServer, SetStatusBeforeRequestReceived) {
TEST(StandalonePreStopHookServer, SetStatusBeforeRequestReceived) {
int port = grpc_pick_unused_port_or_die();
PreStopHookServerManager server;
Status start_status = server.Start(port, 15);
@ -163,12 +179,71 @@ TEST(PreStopHookServer, SetStatusBeforeRequestReceived) {
ASSERT_TRUE(channel);
HookService::Stub stub(std::move(channel));
CallInfo info;
stub.async()->Hook(&info.context, &info.request, &info.response,
[&info](Status status) { info.SetStatus(status); });
auto status = info.WaitForStatus();
auto status = stub.Hook(&info.context, info.request, &info.response);
EXPECT_EQ(status.error_code(), StatusCode::INTERNAL);
EXPECT_EQ(status.error_message(), "Just a test");
}
TEST(PreStopHookService, StartDoRequestStop) {
int port = grpc_pick_unused_port_or_die();
grpc_core::Mutex mu;
grpc_core::CondVar condition;
Server* server = nullptr;
HookServiceImpl service;
std::thread server_thread(ServerLoop, &service, port, &server, &mu,
&condition);
{
grpc_core::MutexLock lock(&mu);
while (server == nullptr) {
condition.Wait(&mu);
}
}
auto channel = CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
InsecureChannelCredentials());
ASSERT_TRUE(channel);
CallInfo infos[3];
HookService::Stub stub(std::move(channel));
stub.async()->Hook(
&infos[0].context, &infos[0].request, &infos[0].response,
[&infos](const Status& status) { infos[0].SetStatus(status); });
stub.async()->Hook(
&infos[1].context, &infos[1].request, &infos[1].response,
[&infos](const Status& status) { infos[1].SetStatus(status); });
ASSERT_TRUE(service.TestOnlyExpectRequests(2, absl::Milliseconds(100)));
ClientContext ctx;
SetReturnStatusRequest request;
request.set_grpc_code_to_return(StatusCode::INTERNAL);
request.set_grpc_status_description("Just a test");
Empty response;
ASSERT_EQ(stub.SetReturnStatus(&ctx, request, &response).error_code(),
StatusCode::OK);
auto status = infos[0].WaitForStatus();
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), StatusCode::INTERNAL);
EXPECT_EQ(status->error_message(), "Just a test");
status = infos[1].WaitForStatus();
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), StatusCode::INTERNAL);
EXPECT_EQ(status->error_message(), "Just a test");
status = stub.Hook(&infos[2].context, infos[2].request, &infos[2].response);
ASSERT_TRUE(status.has_value());
EXPECT_EQ(status->error_code(), StatusCode::INTERNAL);
EXPECT_EQ(status->error_message(), "Just a test");
CallInfo reset_call_info;
ASSERT_TRUE(stub.ClearReturnStatus(&reset_call_info.context,
reset_call_info.request,
&reset_call_info.response)
.ok());
CallInfo call_hangs;
stub.async()->Hook(
&call_hangs.context, &call_hangs.request, &call_hangs.response,
[&](const Status& status) { call_hangs.SetStatus(status); });
ASSERT_TRUE(service.TestOnlyExpectRequests(1, absl::Milliseconds(100)));
status = call_hangs.WaitForStatus(absl::Milliseconds(100));
EXPECT_FALSE(status.has_value()) << status->error_message();
service.Stop();
server->Shutdown();
server_thread.join();
}
} // namespace

@ -56,9 +56,9 @@ int main(int argc, char** argv) {
return 1;
}
grpc::EnableDefaultHealthCheckService(false);
grpc::testing::RunServer(absl::GetFlag(FLAGS_secure_mode), port,
maintenance_port, hostname,
absl::GetFlag(FLAGS_server_id));
grpc::testing::RunServer(
absl::GetFlag(FLAGS_secure_mode), port, maintenance_port, hostname,
absl::GetFlag(FLAGS_server_id), [](grpc::Server* /* unused */) {});
return 0;
}

@ -18,11 +18,10 @@
#include "test/cpp/interop/xds_interop_server_lib.h"
#include <sstream>
#include <memory>
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/synchronization/mutex.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
@ -34,9 +33,6 @@
#include <grpcpp/server_context.h>
#include <grpcpp/xds_server_builder.h>
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/transport/transport.h"
#include "src/proto/grpc/testing/empty.pb.h"
#include "src/proto/grpc/testing/messages.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h"
@ -157,6 +153,38 @@ class XdsUpdateHealthServiceImpl : public XdsUpdateHealthService::Service {
HealthCheckServiceImpl* const health_check_service_;
std::unique_ptr<PreStopHookServerManager> pre_stop_hook_server_;
};
class MaintenanceServices {
public:
MaintenanceServices()
: update_health_service_(&health_check_service_,
std::make_unique<PreStopHookServerManager>()) {
health_check_service_.SetStatus(
"", grpc::health::v1::HealthCheckResponse::SERVING);
health_check_service_.SetStatus(
"grpc.testing.TestService",
grpc::health::v1::HealthCheckResponse::SERVING);
health_check_service_.SetStatus(
"grpc.testing.XdsUpdateHealthService",
grpc::health::v1::HealthCheckResponse::SERVING);
}
std::unique_ptr<ServerBuilder> InitializeServerBuilder(int port) {
auto builder = std::make_unique<ServerBuilder>();
builder->RegisterService(&health_check_service_);
builder->RegisterService(&update_health_service_);
builder->RegisterService(&hook_service_);
grpc::AddAdminServices(builder.get());
builder->AddListeningPort(absl::StrCat("0.0.0.0:", port),
grpc::InsecureServerCredentials());
return builder;
}
private:
HealthCheckServiceImpl health_check_service_;
XdsUpdateHealthServiceImpl update_health_service_;
HookServiceImpl hook_service_;
};
} // namespace
absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
@ -200,23 +228,14 @@ absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
}
void RunServer(bool secure_mode, const int port, const int maintenance_port,
absl::string_view hostname, absl::string_view server_id) {
absl::string_view hostname, absl::string_view server_id,
const std::function<void(Server*)>& server_callback) {
std::unique_ptr<Server> xds_enabled_server;
std::unique_ptr<Server> server;
TestServiceImpl service(hostname, server_id);
HealthCheckServiceImpl health_check_service;
health_check_service.SetStatus(
"", grpc::health::v1::HealthCheckResponse::SERVING);
health_check_service.SetStatus(
"grpc.testing.TestService",
grpc::health::v1::HealthCheckResponse::SERVING);
health_check_service.SetStatus(
"grpc.testing.XdsUpdateHealthService",
grpc::health::v1::HealthCheckResponse::SERVING);
XdsUpdateHealthServiceImpl update_health_service(
&health_check_service, std::make_unique<PreStopHookServerManager>());
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
MaintenanceServices maintenance_services;
ServerBuilder builder;
if (secure_mode) {
XdsServerBuilder xds_builder;
@ -226,25 +245,16 @@ void RunServer(bool secure_mode, const int port, const int maintenance_port,
grpc::XdsServerCredentials(grpc::InsecureServerCredentials()));
xds_enabled_server = xds_builder.BuildAndStart();
gpr_log(GPR_INFO, "Server starting on 0.0.0.0:%d", port);
builder.RegisterService(&health_check_service);
builder.RegisterService(&update_health_service);
grpc::AddAdminServices(&builder);
builder.AddListeningPort(absl::StrCat("0.0.0.0:", maintenance_port),
grpc::InsecureServerCredentials());
server = builder.BuildAndStart();
server = maintenance_services.InitializeServerBuilder(maintenance_port)
->BuildAndStart();
gpr_log(GPR_INFO, "Maintenance server listening on 0.0.0.0:%d",
maintenance_port);
} else {
builder.RegisterService(&service);
builder.RegisterService(&health_check_service);
builder.RegisterService(&update_health_service);
grpc::AddAdminServices(&builder);
builder.AddListeningPort(absl::StrCat("0.0.0.0:", port),
grpc::InsecureServerCredentials());
server = builder.BuildAndStart();
auto builder = maintenance_services.InitializeServerBuilder(port);
server = builder->RegisterService(&service).BuildAndStart();
gpr_log(GPR_INFO, "Server listening on 0.0.0.0:%d", port);
}
server_callback(server.get());
server->Wait();
}

@ -31,7 +31,8 @@ absl::optional<grpc::Status> GetStatusForRpcBehaviorMetadata(
absl::string_view header_value, absl::string_view hostname);
void RunServer(bool secure_mode, const int port, const int maintenance_port,
absl::string_view hostname, absl::string_view server_id);
absl::string_view hostname, absl::string_view server_id,
const std::function<void(Server*)>& server_callback);
} // namespace testing
} // namespace grpc

@ -14,13 +14,20 @@
// limitations under the License.
//
#include <memory>
#include <thread>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "absl/strings/str_format.h"
#include <grpc/grpc.h>
#include <grpcpp/grpcpp.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/proto/grpc/testing/empty.pb.h"
#include "src/proto/grpc/testing/test.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/interop/xds_interop_server_lib.h"
@ -28,6 +35,16 @@ namespace grpc {
namespace testing {
namespace {
void ServerLoop(int port, grpc_core::Mutex* mutex,
grpc_core::CondVar* condition, Server** server) {
RunServer(false, port, /* should not be used */ -1, "127.0.0.1",
"test_server", [&](Server* s) {
grpc_core::MutexLock lock(mutex);
*server = s;
condition->Signal();
});
}
TEST(GetRpcBehaviorMetadataTest, ErrorCodeNoFilter) {
auto status = GetStatusForRpcBehaviorMetadata("error-code-16", "hostname");
ASSERT_TRUE(status.has_value());
@ -69,6 +86,28 @@ TEST(GetRpcBehaviorMetadataTest, ErrorWhenUnsupported) {
<< status->error_message();
}
TEST(MaintenanceServerHookServiceTest, HookServiceInstalled) {
int port = grpc_pick_unused_port_or_die();
grpc_core::Mutex mutex;
grpc_core::CondVar condition;
Server* server = nullptr;
std::thread thread(ServerLoop, port, &mutex, &condition, &server);
{
grpc_core::MutexLock lock(&mutex);
while (server == nullptr) {
condition.Wait(&mutex);
}
}
HookService::Stub stub(CreateChannel(absl::StrFormat("127.0.0.1:%d", port),
InsecureChannelCredentials()));
ClientContext ctx;
Empty req, res;
auto status = stub.ClearReturnStatus(&ctx, req, &res);
EXPECT_EQ(status.error_code(), StatusCode::OK);
server->Shutdown();
thread.join();
}
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save