Merge pull request #21465 from vjpai/fix_cancel_delayed_rpc

Make CancelDelayedRpc test not use sleeps for client-server sync
pull/21445/head
Vijay Pai 5 years ago committed by GitHub
commit e522302e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/proto/grpc/testing/echo_messages.proto
  2. 20
      test/cpp/end2end/end2end_test.cc
  3. 37
      test/cpp/end2end/test_service_impl.cc
  4. 36
      test/cpp/end2end/test_service_impl.h

@ -47,9 +47,10 @@ message RequestParams {
bool server_die = 12; // Server should not see a request with this set. bool server_die = 12; // Server should not see a request with this set.
string binary_error_details = 13; string binary_error_details = 13;
ErrorStatus expected_error = 14; ErrorStatus expected_error = 14;
int32 server_sleep_us = 15; // Amount to sleep when invoking server int32 server_sleep_us = 15; // sleep when invoking server for deadline tests
int32 backend_channel_idx = 16; // which backend to send request to int32 backend_channel_idx = 16; // which backend to send request to
bool echo_metadata_initially = 17; bool echo_metadata_initially = 17;
bool server_notify_client_when_started = 18;
} }
message EchoRequest { message EchoRequest {

@ -1131,24 +1131,34 @@ TEST_P(End2endTest, CancelRpcBeforeStart) {
} }
} }
// TODO(https://github.com/grpc/grpc/issues/21263): stop using timed sleeps to TEST_P(End2endTest, CancelRpcAfterStart) {
// synchronize cancellation semantics.
TEST_P(End2endTest, CancelDelayedRpc) {
MAYBE_SKIP_TEST; MAYBE_SKIP_TEST;
ResetStub(); ResetStub();
EchoRequest request; EchoRequest request;
EchoResponse response; EchoResponse response;
ClientContext context; ClientContext context;
request.set_message("hello"); request.set_message("hello");
request.mutable_param()->set_server_sleep_us(100 * 1000); request.mutable_param()->set_server_notify_client_when_started(true);
request.mutable_param()->set_skip_cancelled_check(true); request.mutable_param()->set_skip_cancelled_check(true);
Status s; Status s;
std::thread echo_thread([this, &s, &context, &request, &response] { std::thread echo_thread([this, &s, &context, &request, &response] {
s = stub_->Echo(&context, request, &response); s = stub_->Echo(&context, request, &response);
EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
}); });
std::this_thread::sleep_for(std::chrono::microseconds(10 * 1000)); if (!GetParam().callback_server) {
service_.ClientWaitUntilRpcStarted();
} else {
callback_service_.ClientWaitUntilRpcStarted();
}
context.TryCancel(); context.TryCancel();
if (!GetParam().callback_server) {
service_.SignalServerToContinue();
} else {
callback_service_.SignalServerToContinue();
}
echo_thread.join(); echo_thread.join();
EXPECT_EQ("", response.message()); EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());

@ -127,6 +127,12 @@ void ServerTryCancelNonblocking(experimental::CallbackServerContext* context) {
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request, Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) { EchoResponse* response) {
if (request->has_param() &&
request->param().server_notify_client_when_started()) {
signaller_.SignalClientThatRpcStarted();
signaller_.ServerWaitToContinue();
}
// A bit of sleep to make sure that short deadline tests fail // A bit of sleep to make sure that short deadline tests fail
if (request->has_param() && request->param().server_sleep_us() > 0) { if (request->has_param() && request->param().server_sleep_us() > 0) {
gpr_sleep_until( gpr_sleep_until(
@ -416,19 +422,38 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
: service_(service), ctx_(ctx), req_(request), resp_(response) { : service_(service), ctx_(ctx), req_(request), resp_(response) {
// It should be safe to call IsCancelled here, even though we don't know // It should be safe to call IsCancelled here, even though we don't know
// the result. Call it asynchronously to see if we trigger any data races. // the result. Call it asynchronously to see if we trigger any data races.
// Join it in OnDone (technically that could be blocking but shouldn't be
// for very long).
async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); }); async_cancel_check_ = std::thread([this] { (void)ctx_->IsCancelled(); });
if (request->has_param() && request->param().server_sleep_us() > 0) { started_ = true;
if (request->has_param() &&
request->param().server_notify_client_when_started()) {
service->signaller_.SignalClientThatRpcStarted();
// Block on the "wait to continue" decision in a different thread since
// we can't tie up an EM thread with blocking events. We can join it in
// OnDone since it would definitely be done by then.
rpc_wait_thread_ = std::thread([this] {
service_->signaller_.ServerWaitToContinue();
StartRpc();
});
} else {
StartRpc();
}
}
void StartRpc() {
if (req_->has_param() && req_->param().server_sleep_us() > 0) {
// Set an alarm for that much time // Set an alarm for that much time
alarm_.experimental().Set( alarm_.experimental().Set(
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros( gpr_time_from_micros(req_->param().server_sleep_us(),
request->param().server_sleep_us(), GPR_TIMESPAN)), GPR_TIMESPAN)),
[this](bool ok) { NonDelayed(ok); }); [this](bool ok) { NonDelayed(ok); });
} else { } else {
NonDelayed(true); NonDelayed(true);
} }
started_ = true;
} }
void OnSendInitialMetadataDone(bool ok) override { void OnSendInitialMetadataDone(bool ok) override {
EXPECT_TRUE(ok); EXPECT_TRUE(ok);
@ -448,6 +473,9 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
} }
EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_); EXPECT_EQ(ctx_->IsCancelled(), on_cancel_invoked_);
async_cancel_check_.join(); async_cancel_check_.join();
if (rpc_wait_thread_.joinable()) {
rpc_wait_thread_.join();
}
delete this; delete this;
} }
@ -575,6 +603,7 @@ experimental::ServerUnaryReactor* CallbackTestServiceImpl::Echo(
bool started_{false}; bool started_{false};
bool on_cancel_invoked_{false}; bool on_cancel_invoked_{false};
std::thread async_cancel_check_; std::thread async_cancel_check_;
std::thread rpc_wait_thread_;
}; };
return new Reactor(this, context, request, response); return new Reactor(this, context, request, response);

@ -18,6 +18,7 @@
#ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
#define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
#include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -46,6 +47,35 @@ typedef enum {
CANCEL_AFTER_PROCESSING CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase; } ServerTryCancelRequestPhase;
class TestServiceSignaller {
public:
void ClientWaitUntilRpcStarted() {
std::unique_lock<std::mutex> lock(mu_);
cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
}
void ServerWaitToContinue() {
std::unique_lock<std::mutex> lock(mu_);
cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
}
void SignalClientThatRpcStarted() {
std::unique_lock<std::mutex> lock(mu_);
rpc_started_ = true;
cv_rpc_started_.notify_one();
}
void SignalServerToContinue() {
std::unique_lock<std::mutex> lock(mu_);
server_should_continue_ = true;
cv_server_continue_.notify_one();
}
private:
std::mutex mu_;
std::condition_variable cv_rpc_started_;
bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
std::condition_variable cv_server_continue_;
bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
};
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public: public:
TestServiceImpl() : signal_client_(false), host_() {} TestServiceImpl() : signal_client_(false), host_() {}
@ -76,10 +106,13 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
return signal_client_; return signal_client_;
} }
void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
private: private:
bool signal_client_; bool signal_client_;
std::mutex mu_; std::mutex mu_;
TestServiceSignaller signaller_;
std::unique_ptr<grpc::string> host_; std::unique_ptr<grpc::string> host_;
}; };
@ -114,10 +147,13 @@ class CallbackTestServiceImpl
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
return signal_client_; return signal_client_;
} }
void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
private: private:
bool signal_client_; bool signal_client_;
std::mutex mu_; std::mutex mu_;
TestServiceSignaller signaller_;
std::unique_ptr<grpc::string> host_; std::unique_ptr<grpc::string> host_;
}; };

Loading…
Cancel
Save