Also add streaming tests that cancel from client

pull/18136/head
Vijay Pai 6 years ago
parent 7eb08ad72e
commit 05b98a4896
  1. 142
      test/cpp/end2end/client_callback_end2end_test.cc

@ -16,6 +16,7 @@
*
*/
#include <algorithm>
#include <functional>
#include <mutex>
#include <sstream>
@ -104,8 +105,8 @@ class ClientCallbackEnd2endTest
do_not_test_ = true;
return;
}
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
picked_port_ = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << picked_port_;
builder.AddListeningPort(server_address_.str(), server_creds);
}
if (!GetParam().callback_server) {
@ -166,6 +167,9 @@ class ClientCallbackEnd2endTest
if (is_server_started_) {
server_->Shutdown();
}
if (picked_port_ > 0) {
grpc_recycle_unused_port(picked_port_);
}
}
void SendRpcs(int num_rpcs, bool with_binary_metadata) {
@ -321,6 +325,7 @@ class ClientCallbackEnd2endTest
}
bool do_not_test_{false};
bool is_server_started_{false};
int picked_port_{0};
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<grpc::GenericStub> generic_stub_;
@ -489,13 +494,22 @@ TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
}
}
struct ClientCancelInfo {
bool cancel{false};
int ops_before_cancel;
ClientCancelInfo() : cancel{false} {}
// Allow the single-op version to be non-explicit for ease of use
ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
};
class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public:
WriteClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send)
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
num_msgs_to_send_(num_msgs_to_send) {
num_msgs_to_send_(num_msgs_to_send),
client_cancel_{client_cancel} {
grpc::string msg{"Hello server."};
for (int i = 0; i < num_msgs_to_send; i++) {
desired_ += msg;
@ -512,13 +526,17 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
MaybeWrite();
}
void OnWriteDone(bool ok) override {
num_msgs_sent_++;
if (ok) {
num_msgs_sent_++;
MaybeWrite();
}
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
int num_to_send =
(client_cancel_.cancel)
? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
: num_msgs_to_send_;
switch (server_try_cancel_) {
case CANCEL_BEFORE_PROCESSING:
case CANCEL_DURING_PROCESSING:
@ -526,19 +544,19 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
// client, it means that the client most likely did not get a chance to
// send all the messages it wanted to send. i.e num_msgs_sent <=
// num_msgs_to_send
EXPECT_LE(num_msgs_sent_, num_msgs_to_send_);
EXPECT_LE(num_msgs_sent_, num_to_send);
break;
case DO_NOT_CANCEL:
case CANCEL_AFTER_PROCESSING:
// If the RPC was not canceled or canceled after all messages were read
// by the server, the client did get a chance to send all its messages
EXPECT_EQ(num_msgs_sent_, num_msgs_to_send_);
EXPECT_EQ(num_msgs_sent_, num_to_send);
break;
default:
assert(false);
break;
}
if (server_try_cancel_ == DO_NOT_CANCEL) {
if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(response_.message(), desired_);
} else {
@ -558,7 +576,10 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
private:
void MaybeWrite() {
if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
if (client_cancel_.cancel &&
num_msgs_sent_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
StartWrite(&request_);
} else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
StartWriteLast(&request_, WriteOptions());
@ -571,6 +592,7 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
int num_msgs_sent_{0};
const int num_msgs_to_send_;
grpc::string desired_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
@ -627,8 +649,9 @@ TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
ReadClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel)
: server_try_cancel_(server_try_cancel) {
ServerTryCancelRequestPhase server_try_cancel,
ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
@ -636,12 +659,18 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
}
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL) {
if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
}
} else {
@ -649,6 +678,12 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_));
reads_complete_++;
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
}
}
@ -656,8 +691,19 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
EXPECT_TRUE(s.ok());
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
kServerDefaultResponseStreamsToSend) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
// Status might be ok or cancelled depending on whether server
// sent status before client cancel went through
if (!s.ok()) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
@ -694,6 +740,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int reads_complete_{0};
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
@ -710,6 +757,15 @@ TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), DO_NOT_CANCEL, 2};
test.Await();
// Because cancel in this case races with server finish, we can't be sure that
// server interceptors even see cancellation
}
// Server to cancel before sending any response messages
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
@ -752,8 +808,10 @@ class BidiClient
public:
BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send)
: server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send} {
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
@ -761,14 +819,18 @@ class BidiClient
}
request_.set_message("Hello fren ");
stub->experimental_async()->BidiStream(&context_, this);
MaybeWrite();
StartRead(&response_);
StartWrite(&request_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_EQ(reads_complete_, msgs_to_send_);
if (!client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, msgs_to_send_);
} else {
EXPECT_LE(reads_complete_, writes_complete_);
}
}
} else {
EXPECT_LE(reads_complete_, msgs_to_send_);
@ -783,20 +845,25 @@ class BidiClient
} else if (!ok) {
return;
}
if (++writes_complete_ == msgs_to_send_) {
StartWritesDone();
} else {
StartWrite(&request_);
}
writes_complete_++;
MaybeWrite();
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
EXPECT_TRUE(s.ok());
EXPECT_EQ(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, writes_complete_);
if (!client_cancel_.cancel ||
client_cancel_.ops_before_cancel > msgs_to_send_) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, writes_complete_);
} else {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, writes_complete_);
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
@ -837,6 +904,16 @@ class BidiClient
}
private:
void MaybeWrite() {
if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (writes_complete_ == msgs_to_send_) {
StartWritesDone();
} else {
StartWrite(&request_);
}
}
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
@ -844,6 +921,7 @@ class BidiClient
int reads_complete_{0};
int writes_complete_{0};
const int msgs_to_send_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
@ -861,6 +939,18 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend, 2};
test.Await();
// Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel before reading/writing any requests/responses on the stream
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST;

Loading…
Cancel
Save