Migrate binder transport end-to-end tests to use the existing testing service (#27179)

* Use TestServiceImpl in binder end-to-end tests

* Fix TSAN warnings
pull/27382/head
Ta-Wei Tu 3 years ago committed by GitHub
parent b9a997be11
commit f7998db700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      CMakeLists.txt
  2. 57
      build_autogenerated.yaml
  3. 7
      src/core/ext/transport/binder/transport/binder_stream.h
  4. 12
      src/core/ext/transport/binder/transport/binder_transport.cc
  5. 25
      test/core/transport/binder/end2end/BUILD
  6. 38
      test/core/transport/binder/end2end/binder_server_test.cc
  7. 38
      test/core/transport/binder/end2end/echo.proto
  8. 83
      test/core/transport/binder/end2end/echo_service.cc
  9. 52
      test/core/transport/binder/end2end/echo_service.h
  10. 529
      test/core/transport/binder/end2end/end2end_binder_transport_test.cc

65
CMakeLists.txt generated

@ -532,9 +532,6 @@ protobuf_generate_grpc_cpp(
protobuf_generate_grpc_cpp(
src/proto/grpc/testing/xds/v3/tls.proto
)
protobuf_generate_grpc_cpp(
test/core/transport/binder/end2end/echo.proto
)
protobuf_generate_grpc_cpp(
test/core/tsi/alts/fake_handshaker/handshaker.proto
)
@ -8396,10 +8393,6 @@ add_executable(binder_server_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.h
src/core/ext/transport/binder/client/channel_create_impl.cc
src/core/ext/transport/binder/server/binder_server.cc
src/core/ext/transport/binder/server/binder_server_credentials.cc
@ -8411,7 +8404,6 @@ add_executable(binder_server_test
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
test/core/transport/binder/end2end/binder_server_test.cc
test/core/transport/binder/end2end/echo_service.cc
test/core/transport/binder/end2end/fake_binder.cc
test/cpp/end2end/test_service_impl.cc
third_party/googletest/googletest/src/gtest-all.cc
@ -10678,57 +10670,28 @@ endif()
if(gRPC_BUILD_TESTS)
add_executable(end2end_binder_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.pb.h
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/simple_messages.grpc.pb.h
src/core/ext/transport/binder/transport/binder_transport.cc
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
src/cpp/client/channel_cc.cc
src/cpp/client/client_callback.cc
src/cpp/client/client_context.cc
src/cpp/client/client_interceptor.cc
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/credentials_cc.cc
src/cpp/codegen/codegen_init.cc
src/cpp/common/alarm.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/resource_quota_cc.cc
src/cpp/common/rpc_method.cc
src/cpp/common/validate_service_config.cc
src/cpp/common/version_cc.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/channel_argument_option.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
src/cpp/server/external_connection_acceptor_impl.cc
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
src/cpp/server/server_context.cc
src/cpp/server/server_credentials.cc
src/cpp/server/server_posix.cc
src/cpp/thread_manager/thread_manager.cc
src/cpp/util/byte_buffer_cc.cc
src/cpp/util/status.cc
src/cpp/util/string_ref.cc
src/cpp/util/time_cc.cc
test/core/transport/binder/end2end/echo_service.cc
test/core/transport/binder/end2end/end2end_binder_transport_test.cc
test/core/transport/binder/end2end/fake_binder.cc
test/core/transport/binder/end2end/testing_channel_create.cc
test/cpp/end2end/test_service_impl.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
@ -10756,7 +10719,7 @@ target_link_libraries(end2end_binder_transport_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::random_random
grpc_test_util
grpc++_test_util
)

@ -4572,14 +4572,12 @@ targets:
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- test/core/transport/binder/end2end/echo_service.h
- test/core/transport/binder/end2end/fake_binder.h
- test/cpp/end2end/test_service_impl.h
src:
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- test/core/transport/binder/end2end/echo.proto
- src/core/ext/transport/binder/client/channel_create_impl.cc
- src/core/ext/transport/binder/server/binder_server.cc
- src/core/ext/transport/binder/server/binder_server_credentials.cc
@ -4591,7 +4589,6 @@ targets:
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- test/core/transport/binder/end2end/binder_server_test.cc
- test/core/transport/binder/end2end/echo_service.cc
- test/core/transport/binder/end2end/fake_binder.cc
- test/cpp/end2end/test_service_impl.cc
deps:
@ -5509,68 +5506,26 @@ targets:
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- src/cpp/client/create_channel_internal.h
- src/cpp/common/channel_filter.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/external_connection_acceptor_impl.h
- src/cpp/server/health/default_health_check_service.h
- src/cpp/server/thread_pool_interface.h
- src/cpp/thread_manager/thread_manager.h
- test/core/transport/binder/end2end/echo_service.h
- test/core/transport/binder/end2end/fake_binder.h
- test/core/transport/binder/end2end/testing_channel_create.h
- test/cpp/end2end/test_service_impl.h
src:
- test/core/transport/binder/end2end/echo.proto
- src/proto/grpc/testing/echo.proto
- src/proto/grpc/testing/echo_messages.proto
- src/proto/grpc/testing/simple_messages.proto
- src/core/ext/transport/binder/transport/binder_transport.cc
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- src/cpp/client/channel_cc.cc
- src/cpp/client/client_callback.cc
- src/cpp/client/client_context.cc
- src/cpp/client/client_interceptor.cc
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/credentials_cc.cc
- src/cpp/codegen/codegen_init.cc
- src/cpp/common/alarm.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
- src/cpp/common/core_codegen.cc
- src/cpp/common/resource_quota_cc.cc
- src/cpp/common/rpc_method.cc
- src/cpp/common/validate_service_config.cc
- src/cpp/common/version_cc.cc
- src/cpp/server/async_generic_service.cc
- src/cpp/server/channel_argument_option.cc
- src/cpp/server/create_default_thread_pool.cc
- src/cpp/server/dynamic_thread_pool.cc
- src/cpp/server/external_connection_acceptor_impl.cc
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
- src/cpp/server/server_context.cc
- src/cpp/server/server_credentials.cc
- src/cpp/server/server_posix.cc
- src/cpp/thread_manager/thread_manager.cc
- src/cpp/util/byte_buffer_cc.cc
- src/cpp/util/status.cc
- src/cpp/util/string_ref.cc
- src/cpp/util/time_cc.cc
- test/core/transport/binder/end2end/echo_service.cc
- test/core/transport/binder/end2end/end2end_binder_transport_test.cc
- test/core/transport/binder/end2end/fake_binder.cc
- test/core/transport/binder/end2end/testing_channel_create.cc
- test/cpp/end2end/test_service_impl.cc
deps:
- absl/random:random
- grpc_test_util
- grpc++_test_util
- name: end2end_test
gtest: true
build: test

@ -52,7 +52,8 @@ struct grpc_binder_stream {
refcount(refcount),
arena(arena),
tx_code(tx_code),
is_client(is_client) {
is_client(is_client),
is_closed(false) {
// TODO(waynetu): Should this be protected?
t->registered_stream[tx_code] = this;
@ -79,8 +80,8 @@ struct grpc_binder_stream {
grpc_core::Arena* arena;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
int tx_code;
bool is_client;
bool is_closed = false;
const bool is_client;
bool is_closed;
grpc_closure* destroy_stream_then_closure = nullptr;
grpc_closure destroy_stream;

@ -482,10 +482,6 @@ static void perform_stream_op_locked(void* stream_op,
tx_code, [tx_code, gbs,
gbt](absl::StatusOr<grpc_binder::Metadata> initial_metadata) {
grpc_core::ExecCtx exec_ctx;
if (gbs->is_closed) {
GRPC_BINDER_STREAM_UNREF(gbs, "recv_initial_metadata");
return;
}
gbs->recv_initial_metadata_args.tx_code = tx_code;
gbs->recv_initial_metadata_args.initial_metadata =
std::move(initial_metadata);
@ -506,10 +502,6 @@ static void perform_stream_op_locked(void* stream_op,
gbt->transport_stream_receiver->RegisterRecvMessage(
tx_code, [tx_code, gbs, gbt](absl::StatusOr<std::string> message) {
grpc_core::ExecCtx exec_ctx;
if (gbs->is_closed) {
GRPC_BINDER_STREAM_UNREF(gbs, "recv_message");
return;
}
gbs->recv_message_args.tx_code = tx_code;
gbs->recv_message_args.message = std::move(message);
gbt->combiner->Run(
@ -530,10 +522,6 @@ static void perform_stream_op_locked(void* stream_op,
absl::StatusOr<grpc_binder::Metadata> trailing_metadata,
int status) {
grpc_core::ExecCtx exec_ctx;
if (gbs->is_closed) {
GRPC_BINDER_STREAM_UNREF(gbs, "recv_trailing_metadata");
return;
}
gbs->recv_trailing_metadata_args.tx_code = tx_code;
gbs->recv_trailing_metadata_args.trailing_metadata =
std::move(trailing_metadata);

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_proto_library")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])
@ -72,26 +72,6 @@ grpc_cc_library(
],
)
grpc_proto_library(
name = "echo_grpc_proto",
srcs = ["echo.proto"],
)
grpc_cc_library(
name = "echo_service",
testonly = 1,
srcs = ["echo_service.cc"],
hdrs = ["echo_service.h"],
external_deps = [
"absl/strings",
"absl/strings:str_format",
"absl/time",
],
deps = [
":echo_grpc_proto",
],
)
grpc_cc_test(
name = "end2end_binder_transport_test",
srcs = ["end2end_binder_transport_test.cc"],
@ -102,12 +82,12 @@ grpc_cc_test(
],
language = "C++",
deps = [
":echo_service",
":end2end_binder_channel",
":fake_binder",
"//src/core/ext/transport/binder/transport:binder_transport",
"//src/core/ext/transport/binder/wire_format:wire_reader",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:test_service_impl",
],
)
@ -121,7 +101,6 @@ grpc_cc_test(
"//:grpc++",
"//src/core/ext/transport/binder/client:grpc_transport_binder_client_impl",
"//src/core/ext/transport/binder/server:grpc_transport_binder_server",
"//test/core/transport/binder/end2end:echo_service",
"//test/core/transport/binder/end2end:fake_binder",
"//test/core/util:grpc_test_util",
"//test/cpp/end2end:test_service_impl",

@ -28,7 +28,6 @@
#include "src/core/ext/transport/binder/client/channel_create_impl.h"
#include "src/core/ext/transport/binder/server/binder_server.h"
#include "src/core/ext/transport/binder/server/binder_server_credentials.h"
#include "test/core/transport/binder/end2end/echo_service.h"
#include "test/core/transport/binder/end2end/fake_binder.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
@ -110,7 +109,7 @@ TEST(BinderServerCredentialsTest,
TEST_F(BinderServerTest, BuildAndStart) {
grpc::ServerBuilder server_builder;
grpc_binder::end2end_testing::EchoServer service;
grpc::testing::TestServiceImpl service;
server_builder.RegisterService(&service);
server_builder.AddListeningPort("binder://example.service",
grpc::testing::BinderServerCredentials());
@ -124,7 +123,7 @@ TEST_F(BinderServerTest, BuildAndStart) {
TEST_F(BinderServerTest, BuildAndStartFailed) {
grpc::ServerBuilder server_builder;
grpc_binder::end2end_testing::EchoServer service;
grpc::testing::TestServiceImpl service;
server_builder.RegisterService(&service);
// Error: binder address should begin with binder:
server_builder.AddListeningPort("localhost:12345",
@ -135,7 +134,7 @@ TEST_F(BinderServerTest, BuildAndStartFailed) {
TEST_F(BinderServerTest, CreateChannelWithEndpointBinder) {
grpc::ServerBuilder server_builder;
grpc_binder::end2end_testing::EchoServer service;
grpc::testing::TestServiceImpl service;
server_builder.RegisterService(&service);
server_builder.AddListeningPort("binder://example.service",
grpc::testing::BinderServerCredentials());
@ -148,21 +147,21 @@ TEST_F(BinderServerTest, CreateChannelWithEndpointBinder) {
raw_endpoint_binder));
std::shared_ptr<grpc::Channel> channel =
grpc::testing::CreateBinderChannel(std::move(endpoint_binder));
std::unique_ptr<grpc_binder::end2end_testing::EchoService::Stub> stub =
grpc_binder::end2end_testing::EchoService::NewStub(channel);
grpc_binder::end2end_testing::EchoRequest request;
grpc_binder::end2end_testing::EchoResponse response;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub =
grpc::testing::EchoTestService::NewStub(channel);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
grpc::ClientContext context;
request.set_text("BinderServerBuilder");
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
request.set_message("BinderServerBuilder");
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), "BinderServerBuilder");
EXPECT_EQ(response.message(), "BinderServerBuilder");
server->Shutdown();
}
TEST_F(BinderServerTest, CreateChannelWithEndpointBinderMultipleConnections) {
grpc::ServerBuilder server_builder;
grpc_binder::end2end_testing::EchoServer service;
grpc::testing::TestServiceImpl service;
server_builder.RegisterService(&service);
server_builder.AddListeningPort(
"binder://example.service.multiple.connections",
@ -179,15 +178,16 @@ TEST_F(BinderServerTest, CreateChannelWithEndpointBinderMultipleConnections) {
raw_endpoint_binder));
std::shared_ptr<grpc::Channel> channel =
grpc::testing::CreateBinderChannel(std::move(endpoint_binder));
std::unique_ptr<grpc_binder::end2end_testing::EchoService::Stub> stub =
grpc_binder::end2end_testing::EchoService::NewStub(channel);
grpc_binder::end2end_testing::EchoRequest request;
grpc_binder::end2end_testing::EchoResponse response;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub =
grpc::testing::EchoTestService::NewStub(channel);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
grpc::ClientContext context;
request.set_text(absl::StrFormat("BinderServerBuilder-%d", id));
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
request.set_message(absl::StrFormat("BinderServerBuilder-%d", id));
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), absl::StrFormat("BinderServerBuilder-%d", id));
EXPECT_EQ(response.message(),
absl::StrFormat("BinderServerBuilder-%d", id));
};
std::vector<std::thread> threads(kNumThreads);

@ -1,38 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A simple RPC service that echos what the client passes in. The request and
// the response simply contains the text represented in a string.
//
// This service is for end-to-end testing with fake binder tunnels.
syntax = "proto3";
// TODO(waynetu): This can be replaced by EchoTestService in
// src/proto/grpc/testing/echo.proto
package grpc_binder.end2end_testing;
message EchoRequest {
string text = 1;
}
message EchoResponse {
string text = 1;
}
service EchoService {
rpc EchoUnaryCall(EchoRequest) returns (EchoResponse);
rpc EchoServerStreamingCall(EchoRequest) returns (stream EchoResponse);
rpc EchoClientStreamingCall(stream EchoRequest) returns (EchoResponse);
rpc EchoBiDirStreamingCall(stream EchoRequest) returns (stream EchoResponse);
}

@ -1,83 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/end2end/echo_service.h"
#include <string>
#include "absl/strings/str_format.h"
#include "absl/time/time.h"
namespace grpc_binder {
namespace end2end_testing {
const absl::string_view EchoServer::kCancelledText = "cancel";
const absl::string_view EchoServer::kTimeoutText = "timeout";
const size_t EchoServer::kServerStreamingCounts = 100;
grpc::Status EchoServer::EchoUnaryCall(grpc::ServerContext* /*context*/,
const EchoRequest* request,
EchoResponse* response) {
const std::string& data = request->text();
if (data == kCancelledText) {
return grpc::Status::CANCELLED;
}
if (data == kTimeoutText) {
absl::SleepFor(absl::Seconds(5));
}
response->set_text(data);
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoServerStreamingCall(
grpc::ServerContext* /*context*/, const EchoRequest* request,
grpc::ServerWriter<EchoResponse>* writer) {
const std::string& data = request->text();
if (data == kTimeoutText) {
absl::SleepFor(absl::Seconds(5));
}
for (size_t i = 0; i < kServerStreamingCounts; ++i) {
EchoResponse response;
response.set_text(absl::StrFormat("%s(%d)", data, i));
writer->Write(response);
}
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoClientStreamingCall(
grpc::ServerContext* /*context*/, grpc::ServerReader<EchoRequest>* reader,
EchoResponse* response) {
EchoRequest request;
std::string result = "";
while (reader->Read(&request)) {
result += request.text();
}
response->set_text(result);
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoBiDirStreamingCall(
grpc::ServerContext* /*context*/,
grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
EchoRequest request;
while (stream->Read(&request)) {
EchoResponse response;
response.set_text(request.text());
stream->Write(response);
}
return grpc::Status::OK;
}
} // namespace end2end_testing
} // namespace grpc_binder

@ -1,52 +0,0 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H
#define TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H
#include "absl/strings/string_view.h"
#include "test/core/transport/binder/end2end/echo.grpc.pb.h"
namespace grpc_binder {
namespace end2end_testing {
// TODO(waynetu): Replace this with TestServiceImpl declared in
// test/cpp/end2end/test_service_impl.h
class EchoServer final : public EchoService::Service {
public:
static const absl::string_view kCancelledText;
static const absl::string_view kTimeoutText;
grpc::Status EchoUnaryCall(grpc::ServerContext* context,
const EchoRequest* request,
EchoResponse* response) override;
static const size_t kServerStreamingCounts;
grpc::Status EchoServerStreamingCall(
grpc::ServerContext* context, const EchoRequest* request,
grpc::ServerWriter<EchoResponse>* writer) override;
grpc::Status EchoClientStreamingCall(grpc::ServerContext* context,
grpc::ServerReader<EchoRequest>* reader,
EchoResponse* response) override;
grpc::Status EchoBiDirStreamingCall(
grpc::ServerContext* context,
grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) override;
};
} // namespace end2end_testing
} // namespace grpc_binder
#endif // TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H_

@ -26,10 +26,10 @@
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "test/core/transport/binder/end2end/echo_service.h"
#include "test/core/transport/binder/end2end/fake_binder.h"
#include "test/core/transport/binder/end2end/testing_channel_create.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
namespace grpc_binder {
@ -41,12 +41,24 @@ class End2EndBinderTransportTest
End2EndBinderTransportTest() {
end2end_testing::g_transaction_processor =
new end2end_testing::TransactionProcessor(GetParam());
service_ = absl::make_unique<grpc::testing::TestServiceImpl>();
grpc::ServerBuilder builder;
builder.RegisterService(service_.get());
server_ = builder.BuildAndStart();
}
~End2EndBinderTransportTest() override {
server_->Shutdown();
service_.reset();
delete end2end_testing::g_transaction_processor;
}
std::unique_ptr<grpc::testing::EchoTestService::Stub> NewStub() {
grpc::ChannelArguments args;
std::shared_ptr<grpc::Channel> channel = BinderChannel(server_.get(), args);
return grpc::testing::EchoTestService::NewStub(channel);
}
static void SetUpTestSuite() { grpc_init(); }
static void TearDownTestSuite() { grpc_shutdown(); }
@ -54,11 +66,11 @@ class End2EndBinderTransportTest
grpc::Server* server, const grpc::ChannelArguments& args) {
return end2end_testing::BinderChannelForTesting(server, args);
}
};
using end2end_testing::EchoRequest;
using end2end_testing::EchoResponse;
using end2end_testing::EchoService;
protected:
std::unique_ptr<grpc::testing::TestServiceImpl> service_;
std::unique_ptr<grpc::Server> server_;
};
} // namespace
@ -74,79 +86,54 @@ TEST_P(End2EndBinderTransportTest, SetupTransport) {
grpc_transport_destroy(server_transport);
}
TEST_P(End2EndBinderTransportTest, UnaryCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, UnaryCall) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
EchoRequest request;
EchoResponse response;
request.set_text("it works!");
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCall");
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), "it works!");
server->Shutdown();
EXPECT_EQ(response.message(), "UnaryCall");
}
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelNonOkStatus) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, UnaryCallWithNonOkStatus) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
EchoRequest request;
EchoResponse response;
request.set_text(std::string(end2end_testing::EchoServer::kCancelledText));
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallWithNonOkStatus");
request.mutable_param()->mutable_expected_error()->set_code(
grpc::StatusCode::INTERNAL);
request.mutable_param()->mutable_expected_error()->set_error_message(
"expected to fail");
// Server will not response the client with message data, however, since all
// callbacks after the trailing metadata are cancelled, we shall not be
// blocked here.
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
server->Shutdown();
EXPECT_EQ(status.error_code(), grpc::StatusCode::INTERNAL);
EXPECT_THAT(status.error_message(), ::testing::HasSubstr("expected to fail"));
}
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelServerTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, UnaryCallServerTimeout) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
EchoResponse response;
request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText));
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallServerTimeout");
// Server will sleep for 2 seconds before responding us.
request.mutable_param()->set_server_sleep_us(2000000);
// Disable cancellation check because the request will time out.
request.mutable_param()->set_skip_cancelled_check(true);
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
server->Shutdown();
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
}
// Temporarily disabled due to a potential deadlock in our design.
// TODO(waynetu): Enable this test once the issue is resolved.
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelClientTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, UnaryCallClientTimeout) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
// Set transaction delay to a large number. This happens after the channel
// creation so that we don't need to wait that long for client and server to
@ -155,112 +142,314 @@ TEST_P(End2EndBinderTransportTest,
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
EchoResponse response;
request.set_text("normal-text");
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallClientTimeout");
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
EXPECT_EQ(status.error_code(), grpc::StatusCode::DEADLINE_EXCEEDED);
}
TEST_P(End2EndBinderTransportTest, UnaryCallUnimplemented) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
server->Shutdown();
grpc::ClientContext context;
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallUnimplemented");
grpc::Status status = stub->Unimplemented(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::UNIMPLEMENTED);
}
TEST_P(End2EndBinderTransportTest,
ServerStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, UnaryCallClientCancel) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallClientCancel");
context.TryCancel();
grpc::Status status = stub->Unimplemented(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest, UnaryCallEchoMetadataInitially) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallEchoMetadataInitially");
request.mutable_param()->set_echo_metadata_initially(true);
context.AddMetadata("key1", "value1");
context.AddMetadata("key2", "value2");
grpc::Status status = stub->Echo(&context, request, &response);
const auto& initial_metadata = context.GetServerInitialMetadata();
EXPECT_EQ(initial_metadata.find("key1")->second, "value1");
EXPECT_EQ(initial_metadata.find("key2")->second, "value2");
}
TEST_P(End2EndBinderTransportTest, UnaryCallEchoMetadata) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallEchoMetadata");
request.mutable_param()->set_echo_metadata(true);
context.AddMetadata("key1", "value1");
context.AddMetadata("key2", "value2");
grpc::Status status = stub->Echo(&context, request, &response);
const auto& initial_metadata = context.GetServerTrailingMetadata();
EXPECT_EQ(initial_metadata.find("key1")->second, "value1");
EXPECT_EQ(initial_metadata.find("key2")->second, "value2");
}
TEST_P(End2EndBinderTransportTest, UnaryCallResponseMessageLength) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
for (size_t response_length : {1, 2, 5, 10, 100, 1000000}) {
grpc::ClientContext context;
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallResponseMessageLength");
request.mutable_param()->set_response_message_length(response_length);
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_EQ(response.message().length(), response_length);
}
}
TEST_P(End2EndBinderTransportTest, UnaryCallTryCancel) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_BEFORE_PROCESSING));
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message("UnaryCallTryCancel");
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest, ServerStreamingCall) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kServerResponseStreamsToSend = 100;
grpc::ClientContext context;
EchoRequest request;
request.set_text("it works!");
std::unique_ptr<grpc::ClientReader<EchoResponse>> reader =
stub->EchoServerStreamingCall(&context, request);
EchoResponse response;
context.AddMetadata(grpc::testing::kServerResponseStreamsToSend,
std::to_string(kServerResponseStreamsToSend));
grpc::testing::EchoRequest request;
request.set_message("ServerStreamingCall");
std::unique_ptr<grpc::ClientReader<grpc::testing::EchoResponse>> reader =
stub->ResponseStream(&context, request);
grpc::testing::EchoResponse response;
size_t cnt = 0;
while (reader->Read(&response)) {
EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", cnt));
EXPECT_EQ(response.message(), "ServerStreamingCall" + std::to_string(cnt));
cnt++;
}
EXPECT_EQ(cnt, end2end_testing::EchoServer::kServerStreamingCounts);
EXPECT_EQ(cnt, kServerResponseStreamsToSend);
grpc::Status status = reader->Finish();
EXPECT_TRUE(status.ok());
}
server->Shutdown();
TEST_P(End2EndBinderTransportTest, ServerStreamingCallCoalescingApi) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kServerResponseStreamsToSend = 100;
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerResponseStreamsToSend,
std::to_string(kServerResponseStreamsToSend));
context.AddMetadata(grpc::testing::kServerUseCoalescingApi, "1");
grpc::testing::EchoRequest request;
request.set_message("ServerStreamingCallCoalescingApi");
std::unique_ptr<grpc::ClientReader<grpc::testing::EchoResponse>> reader =
stub->ResponseStream(&context, request);
grpc::testing::EchoResponse response;
size_t cnt = 0;
while (reader->Read(&response)) {
EXPECT_EQ(response.message(),
"ServerStreamingCallCoalescingApi" + std::to_string(cnt));
cnt++;
}
EXPECT_EQ(cnt, kServerResponseStreamsToSend);
grpc::Status status = reader->Finish();
EXPECT_TRUE(status.ok());
}
TEST_P(End2EndBinderTransportTest,
ServerStreamingCallThroughFakeBinderChannelServerTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
ServerStreamingCallTryCancelBeforeProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kServerResponseStreamsToSend = 100;
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText));
std::unique_ptr<grpc::ClientReader<EchoResponse>> reader =
stub->EchoServerStreamingCall(&context, request);
EchoResponse response;
context.AddMetadata(grpc::testing::kServerResponseStreamsToSend,
std::to_string(kServerResponseStreamsToSend));
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_BEFORE_PROCESSING));
grpc::testing::EchoRequest request;
request.set_message("ServerStreamingCallTryCancelBeforeProcessing");
std::unique_ptr<grpc::ClientReader<grpc::testing::EchoResponse>> reader =
stub->ResponseStream(&context, request);
grpc::testing::EchoResponse response;
EXPECT_FALSE(reader->Read(&response));
grpc::Status status = reader->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
server->Shutdown();
TEST_P(End2EndBinderTransportTest,
ServerSteramingCallTryCancelDuringProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kServerResponseStreamsToSend = 2;
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerResponseStreamsToSend,
std::to_string(kServerResponseStreamsToSend));
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_DURING_PROCESSING));
grpc::testing::EchoRequest request;
request.set_message("ServerStreamingCallTryCancelDuringProcessing");
std::unique_ptr<grpc::ClientReader<grpc::testing::EchoResponse>> reader =
stub->ResponseStream(&context, request);
grpc::testing::EchoResponse response;
size_t cnt = 0;
while (reader->Read(&response)) {
EXPECT_EQ(
response.message(),
"ServerStreamingCallTryCancelDuringProcessing" + std::to_string(cnt));
cnt++;
}
grpc::Status status = reader->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest,
ClientStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
ServerSteramingCallTryCancelAfterProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kServerResponseStreamsToSend = 100;
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerResponseStreamsToSend,
std::to_string(kServerResponseStreamsToSend));
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_AFTER_PROCESSING));
grpc::testing::EchoRequest request;
request.set_message("ServerStreamingCallTryCancelAfterProcessing");
std::unique_ptr<grpc::ClientReader<grpc::testing::EchoResponse>> reader =
stub->ResponseStream(&context, request);
grpc::testing::EchoResponse response;
size_t cnt = 0;
while (reader->Read(&response)) {
EXPECT_EQ(
response.message(),
"ServerStreamingCallTryCancelAfterProcessing" + std::to_string(cnt));
cnt++;
}
EXPECT_EQ(cnt, kServerResponseStreamsToSend);
grpc::Status status = reader->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest, ClientStreamingCall) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
EchoResponse response;
std::unique_ptr<grpc::ClientWriter<EchoRequest>> writer =
stub->EchoClientStreamingCall(&context, &response);
grpc::testing::EchoResponse response;
std::unique_ptr<grpc::ClientWriter<grpc::testing::EchoRequest>> writer =
stub->RequestStream(&context, &response);
constexpr size_t kClientStreamingCounts = 100;
std::string expected = "";
for (size_t i = 0; i < kClientStreamingCounts; ++i) {
EchoRequest request;
request.set_text(absl::StrFormat("it works!(%d)", i));
writer->Write(request);
expected += absl::StrFormat("it works!(%d)", i);
grpc::testing::EchoRequest request;
request.set_message("ClientStreamingCall" + std::to_string(i));
EXPECT_TRUE(writer->Write(request));
expected += "ClientStreamingCall" + std::to_string(i);
}
writer->WritesDone();
grpc::Status status = writer->Finish();
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), expected);
EXPECT_EQ(response.message(), expected);
}
TEST_P(End2EndBinderTransportTest,
ClientStreamingCallTryCancelBeforeProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_BEFORE_PROCESSING));
grpc::testing::EchoResponse response;
std::unique_ptr<grpc::ClientWriter<grpc::testing::EchoRequest>> writer =
stub->RequestStream(&context, &response);
constexpr size_t kClientStreamingCounts = 100;
for (size_t i = 0; i < kClientStreamingCounts; ++i) {
grpc::testing::EchoRequest request;
request.set_message("ClientStreamingCallBeforeProcessing" +
std::to_string(i));
writer->Write(request);
}
writer->WritesDone();
grpc::Status status = writer->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
server->Shutdown();
TEST_P(End2EndBinderTransportTest,
ClientStreamingCallTryCancelDuringProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_DURING_PROCESSING));
grpc::testing::EchoResponse response;
std::unique_ptr<grpc::ClientWriter<grpc::testing::EchoRequest>> writer =
stub->RequestStream(&context, &response);
constexpr size_t kClientStreamingCounts = 100;
for (size_t i = 0; i < kClientStreamingCounts; ++i) {
grpc::testing::EchoRequest request;
request.set_message("ClientStreamingCallDuringProcessing" +
std::to_string(i));
writer->Write(request);
}
writer->WritesDone();
grpc::Status status = writer->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest,
ClientStreamingCallTryCancelAfterProcessing) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerTryCancelRequest,
std::to_string(grpc::testing::CANCEL_AFTER_PROCESSING));
grpc::testing::EchoResponse response;
std::unique_ptr<grpc::ClientWriter<grpc::testing::EchoRequest>> writer =
stub->RequestStream(&context, &response);
constexpr size_t kClientStreamingCounts = 100;
for (size_t i = 0; i < kClientStreamingCounts; ++i) {
grpc::testing::EchoRequest request;
request.set_message("ClientStreamingCallAfterProcessing" +
std::to_string(i));
writer->Write(request);
}
writer->WritesDone();
grpc::Status status = writer->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_code(), grpc::StatusCode::CANCELLED);
}
TEST_P(End2EndBinderTransportTest, BiDirStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
TEST_P(End2EndBinderTransportTest, BiDirStreamingCall) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
grpc::ClientContext context;
EchoResponse response;
std::shared_ptr<grpc::ClientReaderWriter<EchoRequest, EchoResponse>> stream =
stub->EchoBiDirStreamingCall(&context);
std::shared_ptr<grpc::ClientReaderWriter<grpc::testing::EchoRequest,
grpc::testing::EchoResponse>>
stream = stub->BidiStream(&context);
constexpr size_t kBiDirStreamingCounts = 100;
struct WriterArgs {
std::shared_ptr<grpc::ClientReaderWriter<EchoRequest, EchoResponse>> stream;
std::shared_ptr<grpc::ClientReaderWriter<grpc::testing::EchoRequest,
grpc::testing::EchoResponse>>
stream;
size_t bi_dir_streaming_counts;
} writer_args;
@ -269,10 +458,9 @@ TEST_P(End2EndBinderTransportTest, BiDirStreamingCallThroughFakeBinderChannel) {
auto writer_fn = [](void* arg) {
const WriterArgs& args = *static_cast<WriterArgs*>(arg);
EchoResponse response;
for (size_t i = 0; i < args.bi_dir_streaming_counts; ++i) {
EchoRequest request;
request.set_text(absl::StrFormat("it works!(%d)", i));
grpc::testing::EchoRequest request;
request.set_message("BiDirStreamingCall" + std::to_string(i));
args.stream->Write(request);
}
args.stream->WritesDone();
@ -282,37 +470,78 @@ TEST_P(End2EndBinderTransportTest, BiDirStreamingCallThroughFakeBinderChannel) {
static_cast<void*>(&writer_args));
writer_thread.Start();
for (size_t i = 0; i < kBiDirStreamingCounts; ++i) {
EchoResponse response;
grpc::testing::EchoResponse response;
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", i));
EXPECT_EQ(response.message(), "BiDirStreamingCall" + std::to_string(i));
}
grpc::Status status = stream->Finish();
EXPECT_TRUE(status.ok());
writer_thread.Join();
}
TEST_P(End2EndBinderTransportTest, BiDirStreamingCallServerFinishesHalfway) {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
constexpr size_t kBiDirStreamingCounts = 100;
grpc::ClientContext context;
context.AddMetadata(grpc::testing::kServerFinishAfterNReads,
std::to_string(kBiDirStreamingCounts / 2));
std::shared_ptr<grpc::ClientReaderWriter<grpc::testing::EchoRequest,
grpc::testing::EchoResponse>>
stream = stub->BidiStream(&context);
struct WriterArgs {
std::shared_ptr<grpc::ClientReaderWriter<grpc::testing::EchoRequest,
grpc::testing::EchoResponse>>
stream;
size_t bi_dir_streaming_counts;
} writer_args;
writer_args.stream = stream;
writer_args.bi_dir_streaming_counts = kBiDirStreamingCounts;
server->Shutdown();
auto writer_fn = [](void* arg) {
const WriterArgs& args = *static_cast<WriterArgs*>(arg);
for (size_t i = 0; i < args.bi_dir_streaming_counts; ++i) {
grpc::testing::EchoRequest request;
request.set_message("BiDirStreamingCallServerFinishesHalfway" +
std::to_string(i));
if (!args.stream->Write(request)) {
return;
}
}
args.stream->WritesDone();
};
grpc_core::Thread writer_thread("writer-thread", writer_fn,
static_cast<void*>(&writer_args));
writer_thread.Start();
for (size_t i = 0; i < kBiDirStreamingCounts / 2; ++i) {
grpc::testing::EchoResponse response;
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(),
"BiDirStreamingCallServerFinishesHalfway" + std::to_string(i));
}
grpc::testing::EchoResponse response;
EXPECT_FALSE(stream->Read(&response));
writer_thread.Join();
grpc::Status status = stream->Finish();
EXPECT_TRUE(status.ok());
}
TEST_P(End2EndBinderTransportTest, LargeMessages) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub = NewStub();
for (size_t size = 1; size <= 1024 * 1024; size *= 4) {
grpc::ClientContext context;
EchoRequest request;
EchoResponse response;
request.set_text(std::string(size, 'a'));
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
grpc::testing::EchoRequest request;
grpc::testing::EchoResponse response;
request.set_message(std::string(size, 'a'));
grpc::Status status = stub->Echo(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text().size(), size);
EXPECT_TRUE(std::all_of(response.text().begin(), response.text().end(),
EXPECT_EQ(response.message().size(), size);
EXPECT_TRUE(std::all_of(response.message().begin(),
response.message().end(),
[](char c) { return c == 'a'; }));
}
server->Shutdown();
}
INSTANTIATE_TEST_SUITE_P(

Loading…
Cancel
Save