From 3ade3e84a974fedfe0a6077f08133403eb856e2a Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 21 Jan 2020 15:36:39 -0800 Subject: [PATCH 1/3] Make use of alarm in test service more robust --- test/cpp/end2end/test_service_impl.cc | 90 ++++++++++++++++----------- test/cpp/end2end/test_service_impl.h | 1 - 2 files changed, 55 insertions(+), 36 deletions(-) diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index d9846fdd8b1..30f7fe52772 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -19,6 +19,7 @@ #include "test/cpp/end2end/test_service_impl.h" #include +#include #include #include #include @@ -120,7 +121,7 @@ void ServerTryCancel(ServerContext* context) { void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { EXPECT_FALSE(context->IsCancelled()); context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); + gpr_log(GPR_INFO, "Server called TryCancelNB() to cancel the request"); } } // namespace @@ -444,16 +445,20 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( } void StartRpc() { - if (req_->has_param() && req_->param().server_sleep_us() > 0) { - // Set an alarm for that much time - alarm_.experimental().Set( - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_micros(req_->param().server_sleep_us(), - GPR_TIMESPAN)), - [this](bool ok) { NonDelayed(ok); }); - } else { - NonDelayed(true); + { + std::lock_guard l(alarm_mu_); + if (req_->has_param() && req_->param().server_sleep_us() > 0 && + !dont_alarm_anymore_) { + // Set an alarm for that much time + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(req_->param().server_sleep_us(), + GPR_TIMESPAN)), + [this](bool ok) { NonDelayed(ok); }); + return; + } } + NonDelayed(true); } void OnSendInitialMetadataDone(bool ok) override { EXPECT_TRUE(ok); @@ -462,10 +467,12 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( void OnCancel() override { EXPECT_TRUE(started_); EXPECT_TRUE(ctx_->IsCancelled()); - // do the actual finish in the main handler only but use this as a chance + on_cancel_invoked_ = true; + // Do the actual finish in the main handler only but use this as a chance // to cancel any alarms. + std::lock_guard l(alarm_mu_); alarm_.Cancel(); - on_cancel_invoked_ = true; + dont_alarm_anymore_ = true; } void OnDone() override { if (req_->has_param() && req_->param().echo_metadata_initially()) { @@ -523,12 +530,17 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( LoopUntilCancelled(req_->param().client_cancel_after_us()); return; } else if (req_->has_param() && req_->param().server_cancel_after_us()) { - alarm_.experimental().Set( - gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(req_->param().server_cancel_after_us(), - GPR_TIMESPAN)), - [this](bool) { Finish(Status::CANCELLED); }); + std::lock_guard l(alarm_mu_); + if (dont_alarm_anymore_) { + Finish(Status::CANCELLED); + } else { + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(req_->param().server_cancel_after_us(), + GPR_TIMESPAN)), + [this](bool) { Finish(Status::CANCELLED); }); + } return; } else if (!req_->has_param() || !req_->param().skip_cancelled_check()) { EXPECT_FALSE(ctx_->IsCancelled()); @@ -579,35 +591,43 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( Finish(Status::OK); } void LoopUntilCancelled(int loop_delay_us) { - if (!ctx_->IsCancelled()) { - alarm_.experimental().Set( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)), - [this, loop_delay_us](bool ok) { - if (!ok) { - EXPECT_TRUE(ctx_->IsCancelled()); - } - LoopUntilCancelled(loop_delay_us); - }); - } else { - Finish(Status::CANCELLED); + { + std::lock_guard l(alarm_mu_); + if (!ctx_->IsCancelled()) { + // dont_alarm_anymore_ wouldn't be set either since that is only set + // in OnCancel + EXPECT_FALSE(dont_alarm_anymore_); + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)), + [this, loop_delay_us](bool ok) { + if (!ok) { + EXPECT_TRUE(ctx_->IsCancelled()); + } + LoopUntilCancelled(loop_delay_us); + }); + return; + } } + Finish(Status::CANCELLED); } CallbackTestServiceImpl* const service_; experimental::CallbackServerContext* const ctx_; const EchoRequest* const req_; EchoResponse* const resp_; - Alarm alarm_; - bool initial_metadata_sent_{false}; - bool started_{false}; - bool on_cancel_invoked_{false}; + std::mutex alarm_mu_; + bool dont_alarm_anymore_ /* GUARDED_BY(alarm_mu_) */ = false; + Alarm alarm_ /* GUARDED_BY(alarm_mu_) */; + bool initial_metadata_sent_ = false; + bool started_ = false; + bool on_cancel_invoked_ = false; std::thread async_cancel_check_; std::thread rpc_wait_thread_; }; return new Reactor(this, context, request, response); -} +} // namespace testing experimental::ServerUnaryReactor* CallbackTestServiceImpl::CheckClientInitialMetadata( diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 924dccf3bee..0978d5f19b7 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -23,7 +23,6 @@ #include #include -#include #include #include "src/proto/grpc/testing/echo.grpc.pb.h" From d173d3fde85205b4450d24d6f8254369d710d038 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 29 Jan 2020 00:33:39 -0800 Subject: [PATCH 2/3] Fix cancellation to make it easier to follow --- test/cpp/end2end/test_service_impl.cc | 73 ++++++++++----------------- 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 30f7fe52772..4acdc820433 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -121,7 +121,8 @@ void ServerTryCancel(ServerContext* context) { void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) { EXPECT_FALSE(context->IsCancelled()); context->TryCancel(); - gpr_log(GPR_INFO, "Server called TryCancelNB() to cancel the request"); + gpr_log(GPR_INFO, + "Server called TryCancelNonblocking() to cancel the request"); } } // namespace @@ -446,9 +447,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( void StartRpc() { { - std::lock_guard l(alarm_mu_); - if (req_->has_param() && req_->param().server_sleep_us() > 0 && - !dont_alarm_anymore_) { + if (req_->has_param() && req_->param().server_sleep_us() > 0) { // Set an alarm for that much time alarm_.experimental().Set( gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), @@ -468,11 +467,8 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( EXPECT_TRUE(started_); EXPECT_TRUE(ctx_->IsCancelled()); on_cancel_invoked_ = true; - // Do the actual finish in the main handler only but use this as a chance - // to cancel any alarms. - std::lock_guard l(alarm_mu_); - alarm_.Cancel(); - dont_alarm_anymore_ = true; + std::lock_guard l(cancel_mu_); + cancel_cv_.notify_one(); } void OnDone() override { if (req_->has_param() && req_->param().echo_metadata_initially()) { @@ -483,6 +479,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( if (rpc_wait_thread_.joinable()) { rpc_wait_thread_.join(); } + if (finish_when_cancelled_.joinable()) { + finish_when_cancelled_.join(); + } delete this; } @@ -513,7 +512,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( EXPECT_FALSE(ctx_->IsCancelled()); ctx_->TryCancel(); gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request"); - LoopUntilCancelled(1000); + FinishWhenCancelledAsync(); return; } gpr_log(GPR_DEBUG, "Request message was %s", req_->message().c_str()); @@ -527,20 +526,15 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( std::unique_lock lock(service_->mu_); service_->signal_client_ = true; } - LoopUntilCancelled(req_->param().client_cancel_after_us()); + FinishWhenCancelledAsync(); return; } else if (req_->has_param() && req_->param().server_cancel_after_us()) { - std::lock_guard l(alarm_mu_); - if (dont_alarm_anymore_) { - Finish(Status::CANCELLED); - } else { - alarm_.experimental().Set( - gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(req_->param().server_cancel_after_us(), - GPR_TIMESPAN)), - [this](bool) { Finish(Status::CANCELLED); }); - } + alarm_.experimental().Set( + gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(req_->param().server_cancel_after_us(), + GPR_TIMESPAN)), + [this](bool) { Finish(Status::CANCELLED); }); return; } else if (!req_->has_param() || !req_->param().skip_cancelled_check()) { EXPECT_FALSE(ctx_->IsCancelled()); @@ -590,44 +584,31 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( } Finish(Status::OK); } - void LoopUntilCancelled(int loop_delay_us) { - { - std::lock_guard l(alarm_mu_); - if (!ctx_->IsCancelled()) { - // dont_alarm_anymore_ wouldn't be set either since that is only set - // in OnCancel - EXPECT_FALSE(dont_alarm_anymore_); - alarm_.experimental().Set( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)), - [this, loop_delay_us](bool ok) { - if (!ok) { - EXPECT_TRUE(ctx_->IsCancelled()); - } - LoopUntilCancelled(loop_delay_us); - }); - return; - } - } - Finish(Status::CANCELLED); + void FinishWhenCancelledAsync() { + finish_when_cancelled_ = std::thread([this] { + std::unique_lock l(cancel_mu_); + cancel_cv_.wait(l, [this] { return ctx_->IsCancelled(); }); + Finish(Status::CANCELLED); + }); } CallbackTestServiceImpl* const service_; experimental::CallbackServerContext* const ctx_; const EchoRequest* const req_; EchoResponse* const resp_; - std::mutex alarm_mu_; - bool dont_alarm_anymore_ /* GUARDED_BY(alarm_mu_) */ = false; - Alarm alarm_ /* GUARDED_BY(alarm_mu_) */; + Alarm alarm_; + std::mutex cancel_mu_; + std::condition_variable cancel_cv_; bool initial_metadata_sent_ = false; bool started_ = false; bool on_cancel_invoked_ = false; std::thread async_cancel_check_; std::thread rpc_wait_thread_; + std::thread finish_when_cancelled_; }; return new Reactor(this, context, request, response); -} // namespace testing +} experimental::ServerUnaryReactor* CallbackTestServiceImpl::CheckClientInitialMetadata( From c34999ae2cf616c4e9ea18dd6bf6091eaf320808 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 30 Jan 2020 00:20:17 -0800 Subject: [PATCH 3/3] Remove unneeded curly braces --- test/cpp/end2end/test_service_impl.cc | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index 4acdc820433..aed436098c2 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -446,16 +446,14 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo( } void StartRpc() { - { - if (req_->has_param() && req_->param().server_sleep_us() > 0) { - // Set an alarm for that much time - alarm_.experimental().Set( - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_micros(req_->param().server_sleep_us(), - GPR_TIMESPAN)), - [this](bool ok) { NonDelayed(ok); }); - return; - } + if (req_->has_param() && req_->param().server_sleep_us() > 0) { + // Set an alarm for that much time + alarm_.experimental().Set( + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_micros(req_->param().server_sleep_us(), + GPR_TIMESPAN)), + [this](bool ok) { NonDelayed(ok); }); + return; } NonDelayed(true); }