Merge pull request #21747 from vjpai/synced_alarm

Make use of alarm in test service more robust
reviewable/pr17921/r1
Vijay Pai 5 years ago committed by GitHub
commit fcc5e58a4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      test/cpp/end2end/test_service_impl.cc
  2. 1
      test/cpp/end2end/test_service_impl.h

@ -19,6 +19,7 @@
#include "test/cpp/end2end/test_service_impl.h" #include "test/cpp/end2end/test_service_impl.h"
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <grpcpp/alarm.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -120,7 +121,8 @@ void ServerTryCancel(ServerContext* context) {
void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
EXPECT_FALSE(context->IsCancelled()); EXPECT_FALSE(context->IsCancelled());
context->TryCancel(); context->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); gpr_log(GPR_INFO,
"Server called TryCancelNonblocking() to cancel the request");
} }
} // namespace } // namespace
@ -451,9 +453,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
gpr_time_from_micros(req_->param().server_sleep_us(), gpr_time_from_micros(req_->param().server_sleep_us(),
GPR_TIMESPAN)), GPR_TIMESPAN)),
[this](bool ok) { NonDelayed(ok); }); [this](bool ok) { NonDelayed(ok); });
} else { return;
NonDelayed(true);
} }
NonDelayed(true);
} }
void OnSendInitialMetadataDone(bool ok) override { void OnSendInitialMetadataDone(bool ok) override {
EXPECT_TRUE(ok); EXPECT_TRUE(ok);
@ -462,10 +464,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
void OnCancel() override { void OnCancel() override {
EXPECT_TRUE(started_); EXPECT_TRUE(started_);
EXPECT_TRUE(ctx_->IsCancelled()); EXPECT_TRUE(ctx_->IsCancelled());
// do the actual finish in the main handler only but use this as a chance
// to cancel any alarms.
alarm_.Cancel();
on_cancel_invoked_ = true; on_cancel_invoked_ = true;
std::lock_guard<std::mutex> l(cancel_mu_);
cancel_cv_.notify_one();
} }
void OnDone() override { void OnDone() override {
if (req_->has_param() && req_->param().echo_metadata_initially()) { if (req_->has_param() && req_->param().echo_metadata_initially()) {
@ -476,6 +477,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
if (rpc_wait_thread_.joinable()) { if (rpc_wait_thread_.joinable()) {
rpc_wait_thread_.join(); rpc_wait_thread_.join();
} }
if (finish_when_cancelled_.joinable()) {
finish_when_cancelled_.join();
}
delete this; delete this;
} }
@ -506,7 +510,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
EXPECT_FALSE(ctx_->IsCancelled()); EXPECT_FALSE(ctx_->IsCancelled());
ctx_->TryCancel(); ctx_->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
LoopUntilCancelled(1000); FinishWhenCancelledAsync();
return; return;
} }
gpr_log(GPR_DEBUG, "Request message was %s", req_->message().c_str()); gpr_log(GPR_DEBUG, "Request message was %s", req_->message().c_str());
@ -520,7 +524,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
std::unique_lock<std::mutex> lock(service_->mu_); std::unique_lock<std::mutex> lock(service_->mu_);
service_->signal_client_ = true; service_->signal_client_ = true;
} }
LoopUntilCancelled(req_->param().client_cancel_after_us()); FinishWhenCancelledAsync();
return; return;
} else if (req_->has_param() && req_->param().server_cancel_after_us()) { } else if (req_->has_param() && req_->param().server_cancel_after_us()) {
alarm_.experimental().Set( alarm_.experimental().Set(
@ -578,20 +582,12 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
} }
Finish(Status::OK); Finish(Status::OK);
} }
void LoopUntilCancelled(int loop_delay_us) { void FinishWhenCancelledAsync() {
if (!ctx_->IsCancelled()) { finish_when_cancelled_ = std::thread([this] {
alarm_.experimental().Set( std::unique_lock<std::mutex> l(cancel_mu_);
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); });
gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)),
[this, loop_delay_us](bool ok) {
if (!ok) {
EXPECT_TRUE(ctx_->IsCancelled());
}
LoopUntilCancelled(loop_delay_us);
});
} else {
Finish(Status::CANCELLED); Finish(Status::CANCELLED);
} });
} }
CallbackTestServiceImpl* const service_; CallbackTestServiceImpl* const service_;
@ -599,11 +595,14 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
const EchoRequest* const req_; const EchoRequest* const req_;
EchoResponse* const resp_; EchoResponse* const resp_;
Alarm alarm_; Alarm alarm_;
bool initial_metadata_sent_{false}; std::mutex cancel_mu_;
bool started_{false}; std::condition_variable cancel_cv_;
bool on_cancel_invoked_{false}; bool initial_metadata_sent_ = false;
bool started_ = false;
bool on_cancel_invoked_ = false;
std::thread async_cancel_check_; std::thread async_cancel_check_;
std::thread rpc_wait_thread_; std::thread rpc_wait_thread_;
std::thread finish_when_cancelled_;
}; };
return new Reactor(this, context, request, response); return new Reactor(this, context, request, response);

@ -23,7 +23,6 @@
#include <mutex> #include <mutex>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpcpp/alarm.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
#include "src/proto/grpc/testing/echo.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h"

Loading…
Cancel
Save