Fix cancellation to make it easier to follow

pull/21747/head
Vijay Pai 5 years ago
parent e695974375
commit d173d3fde8
  1. 73
      test/cpp/end2end/test_service_impl.cc

@ -121,7 +121,8 @@ void ServerTryCancel(ServerContext* context) {
void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
EXPECT_FALSE(context->IsCancelled());
context->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancelNB() to cancel the request");
gpr_log(GPR_INFO,
"Server called TryCancelNonblocking() to cancel the request");
}
} // namespace
@ -446,9 +447,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
void StartRpc() {
{
std::lock_guard<std::mutex> l(alarm_mu_);
if (req_->has_param() && req_->param().server_sleep_us() > 0 &&
!dont_alarm_anymore_) {
if (req_->has_param() && req_->param().server_sleep_us() > 0) {
// Set an alarm for that much time
alarm_.experimental().Set(
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
@ -468,11 +467,8 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
EXPECT_TRUE(started_);
EXPECT_TRUE(ctx_->IsCancelled());
on_cancel_invoked_ = true;
// Do the actual finish in the main handler only but use this as a chance
// to cancel any alarms.
std::lock_guard<std::mutex> l(alarm_mu_);
alarm_.Cancel();
dont_alarm_anymore_ = true;
std::lock_guard<std::mutex> l(cancel_mu_);
cancel_cv_.notify_one();
}
void OnDone() override {
if (req_->has_param() && req_->param().echo_metadata_initially()) {
@ -483,6 +479,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
if (rpc_wait_thread_.joinable()) {
rpc_wait_thread_.join();
}
if (finish_when_cancelled_.joinable()) {
finish_when_cancelled_.join();
}
delete this;
}
@ -513,7 +512,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
EXPECT_FALSE(ctx_->IsCancelled());
ctx_->TryCancel();
gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
LoopUntilCancelled(1000);
FinishWhenCancelledAsync();
return;
}
gpr_log(GPR_DEBUG, "Request message was %s", req_->message().c_str());
@ -527,20 +526,15 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
std::unique_lock<std::mutex> lock(service_->mu_);
service_->signal_client_ = true;
}
LoopUntilCancelled(req_->param().client_cancel_after_us());
FinishWhenCancelledAsync();
return;
} else if (req_->has_param() && req_->param().server_cancel_after_us()) {
std::lock_guard<std::mutex> l(alarm_mu_);
if (dont_alarm_anymore_) {
Finish(Status::CANCELLED);
} else {
alarm_.experimental().Set(
gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(req_->param().server_cancel_after_us(),
GPR_TIMESPAN)),
[this](bool) { Finish(Status::CANCELLED); });
}
alarm_.experimental().Set(
gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(req_->param().server_cancel_after_us(),
GPR_TIMESPAN)),
[this](bool) { Finish(Status::CANCELLED); });
return;
} else if (!req_->has_param() || !req_->param().skip_cancelled_check()) {
EXPECT_FALSE(ctx_->IsCancelled());
@ -590,44 +584,31 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
}
Finish(Status::OK);
}
void LoopUntilCancelled(int loop_delay_us) {
{
std::lock_guard<std::mutex> l(alarm_mu_);
if (!ctx_->IsCancelled()) {
// dont_alarm_anymore_ wouldn't be set either since that is only set
// in OnCancel
EXPECT_FALSE(dont_alarm_anymore_);
alarm_.experimental().Set(
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)),
[this, loop_delay_us](bool ok) {
if (!ok) {
EXPECT_TRUE(ctx_->IsCancelled());
}
LoopUntilCancelled(loop_delay_us);
});
return;
}
}
Finish(Status::CANCELLED);
void FinishWhenCancelledAsync() {
finish_when_cancelled_ = std::thread([this] {
std::unique_lock<std::mutex> l(cancel_mu_);
cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); });
Finish(Status::CANCELLED);
});
}
CallbackTestServiceImpl* const service_;
experimental::CallbackServerContext* const ctx_;
const EchoRequest* const req_;
EchoResponse* const resp_;
std::mutex alarm_mu_;
bool dont_alarm_anymore_ /* GUARDED_BY(alarm_mu_) */ = false;
Alarm alarm_ /* GUARDED_BY(alarm_mu_) */;
Alarm alarm_;
std::mutex cancel_mu_;
std::condition_variable cancel_cv_;
bool initial_metadata_sent_ = false;
bool started_ = false;
bool on_cancel_invoked_ = false;
std::thread async_cancel_check_;
std::thread rpc_wait_thread_;
std::thread finish_when_cancelled_;
};
return new Reactor(this, context, request, response);
} // namespace testing
}
experimental::ServerUnaryReactor*
CallbackTestServiceImpl::CheckClientInitialMetadata(

Loading…
Cancel
Save