Revert "Sends out-of-band close for streams from client side when destroying transport (#31814)" (#31833)

This reverts commit 1daa3877ed.
pull/31834/head
Alisha Nanda 2 years ago committed by GitHub
parent fa42edef83
commit 38fd9c87e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      src/core/ext/transport/binder/transport/binder_transport.cc
  2. 1
      src/core/ext/transport/binder/wire_format/transaction.cc
  3. 9
      src/core/ext/transport/binder/wire_format/transaction.h
  4. 118
      test/core/transport/binder/binder_transport_test.cc

@ -149,26 +149,7 @@ static void cancel_stream_locked(grpc_binder_transport* gbt,
grpc_binder_stream* gbs, grpc_binder_stream* gbs,
grpc_error_handle error) { grpc_error_handle error) {
gpr_log(GPR_INFO, "cancel_stream_locked"); gpr_log(GPR_INFO, "cancel_stream_locked");
if (!gbs->is_closed) { if (!gbs->is_closed) {
if (gbt->is_client) {
// Always do an out-of-band close to play it safe. Due the design of gRPC,
// a bidi-streaming call can be "closed" from the client side, but a
// server-streaming call can only be "cancelled". An out-of-band close
// effectively "cancels" both bidi-streaming and server-streaming call.
//
// TODO(littlecvr): Investigate how BinderTransport work in Java and align
// with their behavior.
auto cancel_tx = std::make_unique<grpc_binder::Transaction>(
gbs->GetTxCode(), gbt->is_client);
cancel_tx->SetOutOfBandClose();
cancel_tx->SetStatus(GRPC_STATUS_CANCELLED);
gpr_log(GPR_INFO,
"Sending out-of-band close, gbt = %p, gbs = %p, is_client = %d",
gbt, gbs, gbt->is_client);
gbt->wire_writer->RpcCall(std::move(cancel_tx)).IgnoreError();
}
GPR_ASSERT(gbs->cancel_self_error.ok()); GPR_ASSERT(gbs->cancel_self_error.ok());
gbs->is_closed = true; gbs->is_closed = true;
gbs->cancel_self_error = error; gbs->cancel_self_error = error;

@ -28,7 +28,6 @@ ABSL_CONST_INIT const int kFlagExpectSingleMessage = 0x10;
ABSL_CONST_INIT const int kFlagStatusDescription = 0x20; ABSL_CONST_INIT const int kFlagStatusDescription = 0x20;
ABSL_CONST_INIT const int kFlagMessageDataIsParcelable = 0x40; ABSL_CONST_INIT const int kFlagMessageDataIsParcelable = 0x40;
ABSL_CONST_INIT const int kFlagMessageDataIsPartial = 0x80; ABSL_CONST_INIT const int kFlagMessageDataIsPartial = 0x80;
ABSL_CONST_INIT const int kStatusCodeShift = 16;
} // namespace grpc_binder } // namespace grpc_binder
#endif #endif

@ -34,7 +34,6 @@ ABSL_CONST_INIT extern const int kFlagExpectSingleMessage;
ABSL_CONST_INIT extern const int kFlagStatusDescription; ABSL_CONST_INIT extern const int kFlagStatusDescription;
ABSL_CONST_INIT extern const int kFlagMessageDataIsParcelable; ABSL_CONST_INIT extern const int kFlagMessageDataIsParcelable;
ABSL_CONST_INIT extern const int kFlagMessageDataIsPartial; ABSL_CONST_INIT extern const int kFlagMessageDataIsPartial;
ABSL_CONST_INIT extern const int kStatusCodeShift;
using Metadata = std::vector<std::pair<std::string, std::string>>; using Metadata = std::vector<std::pair<std::string, std::string>>;
@ -68,11 +67,11 @@ class Transaction {
GPR_ASSERT((flags_ & kFlagStatusDescription) == 0); GPR_ASSERT((flags_ & kFlagStatusDescription) == 0);
status_desc_ = status_desc; status_desc_ = status_desc;
} }
void SetOutOfBandClose() { flags_ |= kFlagOutOfBandClose; }
void SetStatus(int status) { void SetStatus(int status) {
GPR_ASSERT((flags_ >> kStatusCodeShift) == 0); GPR_ASSERT(!is_client_);
GPR_ASSERT(status < (1 << kStatusCodeShift)); GPR_ASSERT((flags_ >> 16) == 0);
flags_ |= (status << kStatusCodeShift); GPR_ASSERT(status < (1 << 16));
flags_ |= (status << 16);
} }
bool IsClient() const { return is_client_; } bool IsClient() const { return is_client_; }

@ -44,59 +44,6 @@ using ::testing::Expectation;
using ::testing::NiceMock; using ::testing::NiceMock;
using ::testing::Return; using ::testing::Return;
std::string MetadataString(const Metadata& a) {
return absl::StrCat(
"{",
absl::StrJoin(
a, ", ",
[](std::string* out, const std::pair<std::string, std::string>& kv) {
out->append(
absl::StrCat("\"", kv.first, "\": \"", kv.second, "\""));
}),
"}");
}
bool MetadataEquivalent(Metadata a, Metadata b) {
std::sort(a.begin(), a.end());
std::sort(b.begin(), b.end());
return a == b;
}
// Matches with transactions having the desired flag, method_ref,
// initial_metadata, and message_data.
MATCHER_P4(TransactionMatches, flag, method_ref, initial_metadata, message_data,
"") {
if (arg->GetFlags() != flag) return false;
if (flag & kFlagPrefix) {
if (arg->GetMethodRef() != method_ref) {
printf("METHOD REF NOT EQ: %s %s\n",
std::string(arg->GetMethodRef()).c_str(),
std::string(method_ref).c_str());
return false;
}
if (!MetadataEquivalent(arg->GetPrefixMetadata(), initial_metadata)) {
printf("METADATA NOT EQUIVALENT: %s %s\n",
MetadataString(arg->GetPrefixMetadata()).c_str(),
MetadataString(initial_metadata).c_str());
return false;
}
}
if (flag & kFlagMessageData) {
if (arg->GetMessageData() != message_data) {
printf("MESSAGE NOT EQUIVALENT: %s %s\n",
std::string(arg->GetMessageData()).c_str(),
std::string(message_data).c_str());
return false;
}
}
return true;
}
// Matches with grpc_error having error message containing |msg|.
MATCHER_P(GrpcErrorMessageContains, msg, "") {
return absl::StrContains(grpc_core::StatusToString(arg), msg);
}
class BinderTransportTest : public ::testing::Test { class BinderTransportTest : public ::testing::Test {
public: public:
BinderTransportTest() BinderTransportTest()
@ -144,14 +91,6 @@ class BinderTransportTest : public ::testing::Test {
GetBinderTransport()->wire_writer.get()); GetBinderTransport()->wire_writer.get());
} }
void ExpectOutOfBandCloseCalled() {
EXPECT_CALL(
GetWireWriter(),
RpcCall(TransactionMatches(
kFlagOutOfBandClose | (GRPC_STATUS_CANCELLED << kStatusCodeShift),
"", Metadata{}, "")));
}
static void SetUpTestSuite() { grpc_init(); } static void SetUpTestSuite() { grpc_init(); }
static void TearDownTestSuite() { grpc_shutdown(); } static void TearDownTestSuite() { grpc_shutdown(); }
@ -193,6 +132,55 @@ void MockCallback(void* arg, grpc_error_handle error) {
} }
} }
std::string MetadataString(const Metadata& a) {
return absl::StrCat(
"{",
absl::StrJoin(
a, ", ",
[](std::string* out, const std::pair<std::string, std::string>& kv) {
out->append(
absl::StrCat("\"", kv.first, "\": \"", kv.second, "\""));
}),
"}");
}
bool MetadataEquivalent(Metadata a, Metadata b) {
std::sort(a.begin(), a.end());
std::sort(b.begin(), b.end());
return a == b;
}
// Matches with transactions having the desired flag, method_ref,
// initial_metadata, and message_data.
MATCHER_P4(TransactionMatches, flag, method_ref, initial_metadata, message_data,
"") {
if (arg->GetFlags() != flag) return false;
if (flag & kFlagPrefix) {
if (arg->GetMethodRef() != method_ref) {
printf("METHOD REF NOT EQ: %s %s\n",
std::string(arg->GetMethodRef()).c_str(),
std::string(method_ref).c_str());
return false;
}
if (!MetadataEquivalent(arg->GetPrefixMetadata(), initial_metadata)) {
printf("METADATA NOT EQUIVALENT: %s %s\n",
MetadataString(arg->GetPrefixMetadata()).c_str(),
MetadataString(initial_metadata).c_str());
return false;
}
}
if (flag & kFlagMessageData) {
if (arg->GetMessageData() != message_data) return false;
}
return true;
}
// Matches with grpc_error having error message containing |msg|.
MATCHER_P(GrpcErrorMessageContains, msg, "") {
return absl::StrContains(grpc_core::StatusToString(arg), msg);
}
namespace {
class MetadataEncoder { class MetadataEncoder {
public: public:
void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) { void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
@ -213,6 +201,7 @@ class MetadataEncoder {
private: private:
Metadata metadata_; Metadata metadata_;
}; };
} // namespace
// Verify that the lower-level metadata has the same content as the gRPC // Verify that the lower-level metadata has the same content as the gRPC
// metadata. // metadata.
@ -418,7 +407,6 @@ TEST_F(BinderTransportTest, PerformSendInitialMetadata) {
EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches( EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches(
kFlagPrefix, "", kInitialMetadata, ""))); kFlagPrefix, "", kInitialMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback); EXPECT_CALL(mock_on_complete, Callback);
ExpectOutOfBandCloseCalled();
PerformStreamOp(gbs, &op); PerformStreamOp(gbs, &op);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
@ -442,7 +430,6 @@ TEST_F(BinderTransportTest, PerformSendInitialMetadataMethodRef) {
RpcCall(TransactionMatches(kFlagPrefix, kMethodRef.substr(1), RpcCall(TransactionMatches(kFlagPrefix, kMethodRef.substr(1),
kInitialMetadata, ""))); kInitialMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback); EXPECT_CALL(mock_on_complete, Callback);
ExpectOutOfBandCloseCalled();
PerformStreamOp(gbs, &op); PerformStreamOp(gbs, &op);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
@ -465,7 +452,6 @@ TEST_F(BinderTransportTest, PerformSendMessage) {
GetWireWriter(), GetWireWriter(),
RpcCall(TransactionMatches(kFlagMessageData, "", Metadata{}, kMessage))); RpcCall(TransactionMatches(kFlagMessageData, "", Metadata{}, kMessage)));
EXPECT_CALL(mock_on_complete, Callback); EXPECT_CALL(mock_on_complete, Callback);
ExpectOutOfBandCloseCalled();
PerformStreamOp(gbs, &op); PerformStreamOp(gbs, &op);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
@ -489,7 +475,6 @@ TEST_F(BinderTransportTest, PerformSendTrailingMetadata) {
EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches( EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches(
kFlagSuffix, "", kTrailingMetadata, ""))); kFlagSuffix, "", kTrailingMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback); EXPECT_CALL(mock_on_complete, Callback);
ExpectOutOfBandCloseCalled();
PerformStreamOp(gbs, &op); PerformStreamOp(gbs, &op);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
@ -525,7 +510,6 @@ TEST_F(BinderTransportTest, PerformSendAll) {
kFlagPrefix | kFlagMessageData | kFlagSuffix, kFlagPrefix | kFlagMessageData | kFlagSuffix,
kMethodRef.substr(1), kInitialMetadata, kMessage))); kMethodRef.substr(1), kInitialMetadata, kMessage)));
EXPECT_CALL(mock_on_complete, Callback); EXPECT_CALL(mock_on_complete, Callback);
ExpectOutOfBandCloseCalled();
PerformStreamOp(gbs, &op); PerformStreamOp(gbs, &op);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
@ -683,7 +667,6 @@ TEST_F(BinderTransportTest, PerformAllOps) {
RpcCall(TransactionMatches( RpcCall(TransactionMatches(
kFlagPrefix | kFlagMessageData | kFlagSuffix, kFlagPrefix | kFlagMessageData | kFlagSuffix,
kMethodRef.substr(1), kSendInitialMetadata, kSendMessage))); kMethodRef.substr(1), kSendInitialMetadata, kSendMessage)));
ExpectOutOfBandCloseCalled();
Expectation on_complete = EXPECT_CALL(mock_on_complete, Callback); Expectation on_complete = EXPECT_CALL(mock_on_complete, Callback);
// Recv callbacks can happen after the on_complete callback. // Recv callbacks can happen after the on_complete callback.
@ -737,7 +720,6 @@ TEST_F(BinderTransportTest, WireWriterRpcCallErrorPropagates) {
EXPECT_CALL(mock_on_complete1, Callback(absl::OkStatus())); EXPECT_CALL(mock_on_complete1, Callback(absl::OkStatus()));
EXPECT_CALL(mock_on_complete2, EXPECT_CALL(mock_on_complete2,
Callback(GrpcErrorMessageContains("WireWriter::RpcCall failed"))); Callback(GrpcErrorMessageContains("WireWriter::RpcCall failed")));
ExpectOutOfBandCloseCalled();
const Metadata kInitialMetadata = {}; const Metadata kInitialMetadata = {};
grpc_transport_stream_op_batch op1{}; grpc_transport_stream_op_batch op1{};

Loading…
Cancel
Save