Import binder transport tests (#26970)

This PR imports unit tests and end-to-end tests of binder transport from the internal repository. No further changes will be made to internal repository.
pull/27124/head
Ta-Wei Tu 3 years ago committed by GitHub
parent 5cdaec9a4f
commit 006464c57d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 272
      CMakeLists.txt
  2. 193
      build_autogenerated.yaml
  3. 3
      src/core/ext/transport/binder/transport/binder_transport.cc
  4. 74
      test/core/transport/binder/BUILD
  5. 38
      test/core/transport/binder/binder_smoke_test.cc
  6. 742
      test/core/transport/binder/binder_transport_test.cc
  7. 106
      test/core/transport/binder/end2end/BUILD
  8. 38
      test/core/transport/binder/end2end/echo.proto
  9. 83
      test/core/transport/binder/end2end/echo_service.cc
  10. 51
      test/core/transport/binder/end2end/echo_service.h
  11. 306
      test/core/transport/binder/end2end/end2end_binder_transport_test.cc
  12. 269
      test/core/transport/binder/end2end/fake_binder.cc
  13. 294
      test/core/transport/binder/end2end/fake_binder.h
  14. 373
      test/core/transport/binder/end2end/fake_binder_test.cc
  15. 126
      test/core/transport/binder/end2end/testing_channel_create.cc
  16. 41
      test/core/transport/binder/end2end/testing_channel_create.h
  17. 57
      test/core/transport/binder/mock_objects.cc
  18. 113
      test/core/transport/binder/mock_objects.h
  19. 287
      test/core/transport/binder/transport_stream_receiver_test.cc
  20. 278
      test/core/transport/binder/wire_reader_test.cc
  21. 181
      test/core/transport/binder/wire_writer_test.cc
  22. 122
      tools/run_tests/generated/tests.json

272
CMakeLists.txt generated

@ -532,6 +532,9 @@ protobuf_generate_grpc_cpp(
protobuf_generate_grpc_cpp(
src/proto/grpc/testing/xds/v3/tls.proto
)
protobuf_generate_grpc_cpp(
test/core/transport/binder/end2end/echo.proto
)
protobuf_generate_grpc_cpp(
test/core/tsi/alts/fake_handshaker/handshaker.proto
)
@ -751,7 +754,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bdp_estimator_test)
endif()
add_dependencies(buildtests_cxx binder_smoke_test)
add_dependencies(buildtests_cxx binder_transport_test)
add_dependencies(buildtests_cxx bitset_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_alarm)
@ -852,6 +855,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test)
add_dependencies(buildtests_cxx dual_ref_counted_test)
add_dependencies(buildtests_cxx duplicate_header_bad_client_test)
add_dependencies(buildtests_cxx end2end_binder_transport_test)
add_dependencies(buildtests_cxx end2end_test)
add_dependencies(buildtests_cxx endpoint_config_test)
add_dependencies(buildtests_cxx error_details_test)
@ -861,6 +865,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx examine_stack_test)
endif()
add_dependencies(buildtests_cxx exception_test)
add_dependencies(buildtests_cxx fake_binder_test)
add_dependencies(buildtests_cxx file_watcher_certificate_provider_factory_test)
add_dependencies(buildtests_cxx filter_end2end_test)
add_dependencies(buildtests_cxx flaky_network_test)
@ -994,11 +999,14 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx timer_test)
add_dependencies(buildtests_cxx tls_security_connector_test)
add_dependencies(buildtests_cxx too_many_pings_test)
add_dependencies(buildtests_cxx transport_stream_receiver_test)
add_dependencies(buildtests_cxx try_join_test)
add_dependencies(buildtests_cxx try_seq_test)
add_dependencies(buildtests_cxx unknown_frame_bad_client_test)
add_dependencies(buildtests_cxx uri_parser_test)
add_dependencies(buildtests_cxx window_overflow_bad_client_test)
add_dependencies(buildtests_cxx wire_reader_test)
add_dependencies(buildtests_cxx wire_writer_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx work_serializer_test)
endif()
@ -8399,13 +8407,20 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(binder_smoke_test
test/core/transport/binder/binder_smoke_test.cc
add_executable(binder_transport_test
src/core/ext/transport/binder/transport/binder_transport.cc
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
test/core/transport/binder/binder_transport_test.cc
test/core/transport/binder/mock_objects.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(binder_smoke_test
target_include_directories(binder_transport_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
@ -8424,7 +8439,7 @@ target_include_directories(binder_smoke_test
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(binder_smoke_test
target_link_libraries(binder_transport_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
@ -10550,6 +10565,92 @@ target_link_libraries(duplicate_header_bad_client_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(end2end_binder_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.h
${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.h
src/core/ext/transport/binder/transport/binder_transport.cc
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
src/cpp/client/channel_cc.cc
src/cpp/client/client_callback.cc
src/cpp/client/client_context.cc
src/cpp/client/client_interceptor.cc
src/cpp/client/create_channel.cc
src/cpp/client/create_channel_internal.cc
src/cpp/client/create_channel_posix.cc
src/cpp/client/credentials_cc.cc
src/cpp/codegen/codegen_init.cc
src/cpp/common/alarm.cc
src/cpp/common/channel_arguments.cc
src/cpp/common/channel_filter.cc
src/cpp/common/completion_queue_cc.cc
src/cpp/common/core_codegen.cc
src/cpp/common/resource_quota_cc.cc
src/cpp/common/rpc_method.cc
src/cpp/common/validate_service_config.cc
src/cpp/common/version_cc.cc
src/cpp/server/async_generic_service.cc
src/cpp/server/channel_argument_option.cc
src/cpp/server/create_default_thread_pool.cc
src/cpp/server/dynamic_thread_pool.cc
src/cpp/server/external_connection_acceptor_impl.cc
src/cpp/server/health/default_health_check_service.cc
src/cpp/server/health/health_check_service.cc
src/cpp/server/health/health_check_service_server_builder_option.cc
src/cpp/server/server_builder.cc
src/cpp/server/server_callback.cc
src/cpp/server/server_cc.cc
src/cpp/server/server_context.cc
src/cpp/server/server_credentials.cc
src/cpp/server/server_posix.cc
src/cpp/thread_manager/thread_manager.cc
src/cpp/util/byte_buffer_cc.cc
src/cpp/util/status.cc
src/cpp/util/string_ref.cc
src/cpp/util/time_cc.cc
test/core/transport/binder/end2end/echo_service.cc
test/core/transport/binder/end2end/end2end_binder_transport_test.cc
test/core/transport/binder/end2end/fake_binder.cc
test/core/transport/binder/end2end/testing_channel_create.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(end2end_binder_transport_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(end2end_binder_transport_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::random_random
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -10837,6 +10938,48 @@ target_link_libraries(exception_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(fake_binder_test
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
test/core/transport/binder/end2end/fake_binder.cc
test/core/transport/binder/end2end/fake_binder_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(fake_binder_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(fake_binder_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::random_random
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -15457,6 +15600,43 @@ target_link_libraries(too_many_pings_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(transport_stream_receiver_test
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/transaction.cc
test/core/transport/binder/transport_stream_receiver_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(transport_stream_receiver_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(transport_stream_receiver_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
@ -15640,6 +15820,88 @@ target_link_libraries(window_overflow_bad_client_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(wire_reader_test
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
test/core/transport/binder/mock_objects.cc
test/core/transport/binder/wire_reader_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(wire_reader_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(wire_reader_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(wire_writer_test
src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
src/core/ext/transport/binder/wire_format/binder_constants.cc
src/core/ext/transport/binder/wire_format/transaction.cc
src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
src/core/ext/transport/binder/wire_format/wire_writer.cc
test/core/transport/binder/mock_objects.cc
test/core/transport/binder/wire_writer_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(wire_writer_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(wire_writer_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)

@ -4212,6 +4212,7 @@ targets:
- src/core/ext/upb-generated/google/rpc/status.upb.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/env.h
- src/core/lib/gpr/log_internal.h
- src/core/lib/gpr/murmur_hash.h
- src/core/lib/gpr/spinlock.h
- src/core/lib/gpr/string.h
@ -4558,13 +4559,31 @@ targets:
- posix
- mac
uses_polling: false
- name: binder_smoke_test
- name: binder_transport_test
gtest: true
build: test
language: c++
headers: []
headers:
- src/core/ext/transport/binder/transport/binder_stream.h
- src/core/ext/transport/binder/transport/binder_transport.h
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/binder.h
- src/core/ext/transport/binder/wire_format/binder_constants.h
- src/core/ext/transport/binder/wire_format/transaction.h
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- test/core/transport/binder/mock_objects.h
src:
- test/core/transport/binder/binder_smoke_test.cc
- src/core/ext/transport/binder/transport/binder_transport.cc
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- test/core/transport/binder/binder_transport_test.cc
- test/core/transport/binder/mock_objects.cc
deps:
- grpc_test_util
uses_polling: false
@ -5327,6 +5346,83 @@ targets:
- test/core/end2end/cq_verifier.cc
deps:
- grpc_test_util
- name: end2end_binder_transport_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/binder/transport/binder_stream.h
- src/core/ext/transport/binder/transport/binder_transport.h
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/binder.h
- src/core/ext/transport/binder/wire_format/binder_constants.h
- src/core/ext/transport/binder/wire_format/transaction.h
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- src/cpp/client/create_channel_internal.h
- src/cpp/common/channel_filter.h
- src/cpp/server/dynamic_thread_pool.h
- src/cpp/server/external_connection_acceptor_impl.h
- src/cpp/server/health/default_health_check_service.h
- src/cpp/server/thread_pool_interface.h
- src/cpp/thread_manager/thread_manager.h
- test/core/transport/binder/end2end/echo_service.h
- test/core/transport/binder/end2end/fake_binder.h
- test/core/transport/binder/end2end/testing_channel_create.h
src:
- test/core/transport/binder/end2end/echo.proto
- src/core/ext/transport/binder/transport/binder_transport.cc
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- src/cpp/client/channel_cc.cc
- src/cpp/client/client_callback.cc
- src/cpp/client/client_context.cc
- src/cpp/client/client_interceptor.cc
- src/cpp/client/create_channel.cc
- src/cpp/client/create_channel_internal.cc
- src/cpp/client/create_channel_posix.cc
- src/cpp/client/credentials_cc.cc
- src/cpp/codegen/codegen_init.cc
- src/cpp/common/alarm.cc
- src/cpp/common/channel_arguments.cc
- src/cpp/common/channel_filter.cc
- src/cpp/common/completion_queue_cc.cc
- src/cpp/common/core_codegen.cc
- src/cpp/common/resource_quota_cc.cc
- src/cpp/common/rpc_method.cc
- src/cpp/common/validate_service_config.cc
- src/cpp/common/version_cc.cc
- src/cpp/server/async_generic_service.cc
- src/cpp/server/channel_argument_option.cc
- src/cpp/server/create_default_thread_pool.cc
- src/cpp/server/dynamic_thread_pool.cc
- src/cpp/server/external_connection_acceptor_impl.cc
- src/cpp/server/health/default_health_check_service.cc
- src/cpp/server/health/health_check_service.cc
- src/cpp/server/health/health_check_service_server_builder_option.cc
- src/cpp/server/server_builder.cc
- src/cpp/server/server_callback.cc
- src/cpp/server/server_cc.cc
- src/cpp/server/server_context.cc
- src/cpp/server/server_credentials.cc
- src/cpp/server/server_posix.cc
- src/cpp/thread_manager/thread_manager.cc
- src/cpp/util/byte_buffer_cc.cc
- src/cpp/util/status.cc
- src/cpp/util/string_ref.cc
- src/cpp/util/time_cc.cc
- test/core/transport/binder/end2end/echo_service.cc
- test/core/transport/binder/end2end/end2end_binder_transport_test.cc
- test/core/transport/binder/end2end/fake_binder.cc
- test/core/transport/binder/end2end/testing_channel_create.cc
deps:
- absl/random:random
- grpc_test_util
- name: end2end_test
gtest: true
build: test
@ -5411,6 +5507,32 @@ targets:
- test/cpp/end2end/exception_test.cc
deps:
- grpc++_test_util
- name: fake_binder_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/binder.h
- src/core/ext/transport/binder/wire_format/binder_constants.h
- src/core/ext/transport/binder/wire_format/transaction.h
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- test/core/transport/binder/end2end/fake_binder.h
src:
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- test/core/transport/binder/end2end/fake_binder.cc
- test/core/transport/binder/end2end/fake_binder_test.cc
deps:
- absl/random:random
- grpc_test_util
uses_polling: false
- name: file_watcher_certificate_provider_factory_test
gtest: true
build: test
@ -7027,6 +7149,21 @@ targets:
deps:
- grpc++_test_config
- grpc++_test_util
- name: transport_stream_receiver_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/transaction.h
src:
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- test/core/transport/binder/transport_stream_receiver_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: try_join_test
gtest: true
build: test
@ -7116,6 +7253,56 @@ targets:
- test/core/end2end/cq_verifier.cc
deps:
- grpc_test_util
- name: wire_reader_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/binder.h
- src/core/ext/transport/binder/wire_format/binder_constants.h
- src/core/ext/transport/binder/wire_format/transaction.h
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- test/core/transport/binder/mock_objects.h
src:
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- test/core/transport/binder/mock_objects.cc
- test/core/transport/binder/wire_reader_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: wire_writer_test
gtest: true
build: test
language: c++
headers:
- src/core/ext/transport/binder/utils/transport_stream_receiver.h
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h
- src/core/ext/transport/binder/wire_format/binder.h
- src/core/ext/transport/binder/wire_format/binder_constants.h
- src/core/ext/transport/binder/wire_format/transaction.h
- src/core/ext/transport/binder/wire_format/wire_reader.h
- src/core/ext/transport/binder/wire_format/wire_reader_impl.h
- src/core/ext/transport/binder/wire_format/wire_writer.h
- test/core/transport/binder/mock_objects.h
src:
- src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc
- src/core/ext/transport/binder/wire_format/binder_constants.cc
- src/core/ext/transport/binder/wire_format/transaction.cc
- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
- src/core/ext/transport/binder/wire_format/wire_writer.cc
- test/core/transport/binder/mock_objects.cc
- test/core/transport/binder/wire_writer_test.cc
deps:
- grpc_test_util
uses_polling: false
- name: work_serializer_test
gtest: true
build: test

@ -201,9 +201,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
// TODO(mingcl): Will we ever has key-value pair here? According to
// wireformat client suffix data is always empty.
tx->SetSuffix(trailing_metadata);
if (op->payload->send_trailing_metadata.sent != nullptr) {
*op->payload->send_trailing_metadata.sent = true;
}
}
if (op->recv_initial_metadata) {
gpr_log(GPR_INFO, "recv_initial_metadata");

@ -12,23 +12,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package")
licenses(["notice"])
grpc_package(name = "test/core/transport/binder")
grpc_cc_library(
name = "mock_objects",
srcs = ["mock_objects.cc"],
hdrs = ["mock_objects.h"],
external_deps = [
"absl/memory",
"gtest",
],
language = "C++",
deps = [
"//src/core/ext/transport/binder/wire_format:binder",
"//src/core/ext/transport/binder/wire_format:wire_reader",
"//src/core/ext/transport/binder/wire_format:wire_writer",
],
)
grpc_cc_test(
name = "wire_writer_test",
srcs = ["wire_writer_test.cc"],
external_deps = [
"absl/memory",
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
":mock_objects",
"//src/core/ext/transport/binder/wire_format:wire_writer",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "wire_reader_test",
srcs = ["wire_reader_test.cc"],
external_deps = [
"absl/memory",
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
":mock_objects",
"//src/core/ext/transport/binder/wire_format:wire_reader",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "transport_stream_receiver_test",
srcs = ["transport_stream_receiver_test.cc"],
external_deps = [
"absl/memory",
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
"//src/core/ext/transport/binder/utils:transport_stream_receiver",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test(
name = "binder_smoke_test",
srcs = ["binder_smoke_test.cc"],
name = "binder_transport_test",
srcs = ["binder_transport_test.cc"],
external_deps = [
"absl/memory",
"absl/strings",
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
"//:gpr",
":mock_objects",
"//:grpc",
"//src/core/ext/transport/binder/transport",
"//test/core/util:grpc_test_util",
],
)

@ -1,38 +0,0 @@
//
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <gtest/gtest.h>
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace testing {
namespace {
TEST(SmokeTest, Empty) { gpr_log(GPR_INFO, __func__); }
} // namespace
} // namespace testing
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}

@ -0,0 +1,742 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unit-tests for grpc_binder_transport
//
// Verify that a calls to the perform_stream_op of grpc_binder_transport
// transform into the correct sequence of binder transactions.
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include <grpc/grpc.h>
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "src/core/ext/transport/binder/transport/binder_stream.h"
#include "test/core/transport/binder/mock_objects.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
namespace {
using ::testing::Expectation;
using ::testing::NiceMock;
using ::testing::Return;
class BinderTransportTest : public ::testing::Test {
public:
BinderTransportTest()
: arena_(grpc_core::Arena::Create(/* initial_size = */ 1)),
transport_(grpc_create_binder_transport_client(
absl::make_unique<NiceMock<MockBinder>>())) {
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
gbt->wire_writer = absl::make_unique<MockWireWriter>();
GRPC_STREAM_REF_INIT(&ref_, 1, nullptr, nullptr, "phony ref");
}
~BinderTransportTest() override {
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
delete gbt;
for (grpc_binder_stream* gbs : stream_buffer_) {
gpr_free(gbs);
}
arena_->Destroy();
}
void PerformStreamOp(grpc_binder_stream* gbs,
grpc_transport_stream_op_batch* op) {
grpc_transport_perform_stream_op(transport_,
reinterpret_cast<grpc_stream*>(gbs), op);
}
grpc_binder_transport* GetBinderTransport() {
return reinterpret_cast<grpc_binder_transport*>(transport_);
}
grpc_binder_stream* InitNewBinderStream() {
grpc_binder_stream* gbs = static_cast<grpc_binder_stream*>(
gpr_malloc(grpc_transport_stream_size(transport_)));
grpc_transport_init_stream(transport_, reinterpret_cast<grpc_stream*>(gbs),
&ref_, nullptr, arena_);
stream_buffer_.push_back(gbs);
return gbs;
}
MockWireWriter& GetWireWriter() {
return *reinterpret_cast<MockWireWriter*>(
GetBinderTransport()->wire_writer.get());
}
static void SetUpTestSuite() { grpc_init(); }
static void TearDownTestSuite() { grpc_shutdown(); }
protected:
grpc_core::Arena* arena_;
grpc_transport* transport_;
grpc_stream_refcount ref_;
std::vector<grpc_binder_stream*> stream_buffer_;
};
void MockCallback(void* arg, grpc_error_handle error);
class MockGrpcClosure {
public:
MockGrpcClosure() {
GRPC_CLOSURE_INIT(&closure_, MockCallback, this, nullptr);
}
grpc_closure* GetGrpcClosure() { return &closure_; }
MOCK_METHOD(void, Callback, (grpc_error_handle), ());
private:
grpc_closure closure_;
};
void MockCallback(void* arg, grpc_error_handle error) {
MockGrpcClosure* mock_closure = static_cast<MockGrpcClosure*>(arg);
mock_closure->Callback(error);
}
// Matches with transactions having the desired flag, method_ref,
// initial_metadata, and message_data.
MATCHER_P4(TransactionMatches, flag, method_ref, initial_metadata, message_data,
"") {
if (arg.GetFlags() != flag) return false;
if (flag & kFlagPrefix) {
if (arg.GetMethodRef() != method_ref) return false;
if (arg.GetPrefixMetadata() != initial_metadata) return false;
}
if (flag & kFlagMessageData) {
if (arg.GetMessageData() != message_data) return false;
}
return true;
}
// Matches with grpc_error having error message containing |msg|.
MATCHER_P(GrpcErrorMessageContains, msg, "") {
return absl::StrContains(grpc_error_std_string(arg), msg);
}
// Verify that the lower-level metadata has the same content as the gRPC
// metadata.
void VerifyMetadataEqual(const Metadata& md, grpc_metadata_batch grpc_md) {
grpc_linked_mdelem* elm = grpc_md.list.head;
for (size_t i = 0; i < md.size(); ++i) {
ASSERT_NE(elm, nullptr);
EXPECT_EQ(grpc_core::StringViewFromSlice(GRPC_MDKEY(elm->md)), md[i].first);
EXPECT_EQ(grpc_core::StringViewFromSlice(GRPC_MDVALUE(elm->md)),
md[i].second);
elm = elm->next;
}
EXPECT_EQ(elm, nullptr);
}
// RAII helper classes for constructing gRPC metadata and receiving callbacks.
struct MakeSendInitialMetadata {
MakeSendInitialMetadata(const Metadata& initial_metadata,
const std::string& method_ref,
grpc_transport_stream_op_batch* op)
: storage(initial_metadata.size()) {
grpc_metadata_batch_init(&grpc_initial_metadata);
size_t i = 0;
for (const auto& md : initial_metadata) {
const std::string& key = md.first;
const std::string& value = md.second;
EXPECT_EQ(grpc_metadata_batch_add_tail(
&grpc_initial_metadata, &storage[i],
grpc_mdelem_from_slices(grpc_slice_from_cpp_string(key),
grpc_slice_from_cpp_string(value))),
GRPC_ERROR_NONE);
i++;
}
if (!method_ref.empty()) {
EXPECT_EQ(
grpc_metadata_batch_add_tail(
&grpc_initial_metadata, &method_ref_storage,
grpc_mdelem_from_slices(GRPC_MDSTR_PATH,
grpc_slice_from_cpp_string(method_ref))),
GRPC_ERROR_NONE);
}
op->send_initial_metadata = true;
op->payload->send_initial_metadata.send_initial_metadata =
&grpc_initial_metadata;
}
~MakeSendInitialMetadata() {
grpc_metadata_batch_destroy(&grpc_initial_metadata);
}
std::vector<grpc_linked_mdelem> storage;
grpc_linked_mdelem method_ref_storage;
grpc_metadata_batch grpc_initial_metadata{};
};
struct MakeSendMessage {
MakeSendMessage(const std::string& message,
grpc_transport_stream_op_batch* op) {
grpc_slice_buffer send_buffer;
grpc_slice_buffer_init(&send_buffer);
grpc_slice send_slice = grpc_slice_from_cpp_string(message);
grpc_slice_buffer_add(&send_buffer, send_slice);
send_stream.Init(&send_buffer, 0);
grpc_slice_buffer_destroy(&send_buffer);
op->send_message = true;
op->payload->send_message.send_message.reset(send_stream.get());
}
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> send_stream;
};
struct MakeSendTrailingMetadata {
explicit MakeSendTrailingMetadata(const Metadata& trailing_metadata,
grpc_transport_stream_op_batch* op) {
EXPECT_TRUE(trailing_metadata.empty());
grpc_metadata_batch_init(&grpc_trailing_metadata);
op->send_trailing_metadata = true;
op->payload->send_trailing_metadata.send_trailing_metadata =
&grpc_trailing_metadata;
}
grpc_metadata_batch grpc_trailing_metadata{};
};
struct MakeRecvInitialMetadata {
explicit MakeRecvInitialMetadata(grpc_transport_stream_op_batch* op,
Expectation* call_before = nullptr) {
grpc_metadata_batch_init(&grpc_initial_metadata);
op->recv_initial_metadata = true;
op->payload->recv_initial_metadata.recv_initial_metadata =
&grpc_initial_metadata;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
ready.GetGrpcClosure();
if (call_before) {
EXPECT_CALL(ready, Callback).After(*call_before);
} else {
EXPECT_CALL(ready, Callback);
}
}
~MakeRecvInitialMetadata() {
grpc_metadata_batch_destroy(&grpc_initial_metadata);
}
MockGrpcClosure ready;
grpc_metadata_batch grpc_initial_metadata;
};
struct MakeRecvMessage {
explicit MakeRecvMessage(grpc_transport_stream_op_batch* op,
Expectation* call_before = nullptr) {
op->recv_message = true;
op->payload->recv_message.recv_message = &grpc_message;
op->payload->recv_message.recv_message_ready = ready.GetGrpcClosure();
if (call_before) {
EXPECT_CALL(ready, Callback).After(*call_before);
} else {
EXPECT_CALL(ready, Callback);
}
}
MockGrpcClosure ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream> grpc_message;
};
struct MakeRecvTrailingMetadata {
explicit MakeRecvTrailingMetadata(grpc_transport_stream_op_batch* op,
Expectation* call_before = nullptr) {
grpc_metadata_batch_init(&grpc_trailing_metadata);
op->recv_trailing_metadata = true;
op->payload->recv_trailing_metadata.recv_trailing_metadata =
&grpc_trailing_metadata;
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
ready.GetGrpcClosure();
if (call_before) {
EXPECT_CALL(ready, Callback).After(*call_before);
} else {
EXPECT_CALL(ready, Callback);
}
}
~MakeRecvTrailingMetadata() {
grpc_metadata_batch_destroy(&grpc_trailing_metadata);
}
MockGrpcClosure ready;
grpc_metadata_batch grpc_trailing_metadata;
};
const Metadata kDefaultMetadata = {
{"", ""},
{"", "value"},
{"key", ""},
{"key", "value"},
};
constexpr char kDefaultMethodRef[] = "/some/path";
constexpr char kDefaultMessage[] = "binder transport message";
constexpr int kDefaultStatus = 0x1234;
Metadata AppendMethodRef(const Metadata& md, const std::string& method_ref) {
Metadata result = md;
result.emplace_back(":path", method_ref);
return result;
}
Metadata AppendStatus(const Metadata& md, int status) {
Metadata result = md;
result.emplace_back("grpc-status", std::to_string(status));
return result;
}
} // namespace
TEST_F(BinderTransportTest, CreateBinderTransport) {
EXPECT_NE(transport_, nullptr);
}
TEST_F(BinderTransportTest, TransactionIdIncrement) {
grpc_binder_stream* gbs0 = InitNewBinderStream();
EXPECT_EQ(gbs0->t, GetBinderTransport());
EXPECT_EQ(gbs0->tx_code, kFirstCallId);
EXPECT_EQ(gbs0->seq, 0);
grpc_binder_stream* gbs1 = InitNewBinderStream();
EXPECT_EQ(gbs1->t, GetBinderTransport());
EXPECT_EQ(gbs1->tx_code, kFirstCallId + 1);
EXPECT_EQ(gbs1->seq, 0);
grpc_binder_stream* gbs2 = InitNewBinderStream();
EXPECT_EQ(gbs2->t, GetBinderTransport());
EXPECT_EQ(gbs2->tx_code, kFirstCallId + 2);
EXPECT_EQ(gbs2->seq, 0);
}
TEST_F(BinderTransportTest, SeqNumIncrement) {
grpc_binder_stream* gbs = InitNewBinderStream();
EXPECT_EQ(gbs->t, GetBinderTransport());
EXPECT_EQ(gbs->tx_code, kFirstCallId);
// A simple batch that contains only "send_initial_metadata"
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeSendInitialMetadata send_initial_metadata(kDefaultMetadata, "", &op);
EXPECT_EQ(gbs->seq, 0);
PerformStreamOp(gbs, &op);
EXPECT_EQ(gbs->tx_code, kFirstCallId);
EXPECT_EQ(gbs->seq, 1);
PerformStreamOp(gbs, &op);
EXPECT_EQ(gbs->tx_code, kFirstCallId);
EXPECT_EQ(gbs->seq, 2);
}
TEST_F(BinderTransportTest, SeqNumNotIncrementWithoutSend) {
{
grpc_binder_stream* gbs = InitNewBinderStream();
EXPECT_EQ(gbs->t, GetBinderTransport());
EXPECT_EQ(gbs->tx_code, kFirstCallId);
// No-op batch.
grpc_transport_stream_op_batch op{};
EXPECT_EQ(gbs->seq, 0);
PerformStreamOp(gbs, &op);
EXPECT_EQ(gbs->tx_code, kFirstCallId);
EXPECT_EQ(gbs->seq, 0);
}
{
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
EXPECT_EQ(gbs->t, GetBinderTransport());
EXPECT_EQ(gbs->tx_code, kFirstCallId + 1);
// Batch with only receiving operations.
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvInitialMetadata recv_initial_metadata(&op);
EXPECT_EQ(gbs->seq, 0);
PerformStreamOp(gbs, &op);
EXPECT_EQ(gbs->tx_code, kFirstCallId + 1);
EXPECT_EQ(gbs->seq, 0);
// Just to trigger the callback.
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
gbt->transport_stream_receiver->NotifyRecvInitialMetadata(gbs->tx_code,
kDefaultMetadata);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
}
TEST_F(BinderTransportTest, PerformSendInitialMetadata) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
const Metadata kInitialMetadata = kDefaultMetadata;
MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, "", &op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
::testing::InSequence sequence;
EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches(
kFlagPrefix, "", kInitialMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
TEST_F(BinderTransportTest, PerformSendInitialMetadataMethodRef) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
const Metadata kInitialMetadata = kDefaultMetadata;
const std::string kMethodRef = kDefaultMethodRef;
MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, kMethodRef,
&op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
::testing::InSequence sequence;
EXPECT_CALL(GetWireWriter(),
RpcCall(TransactionMatches(kFlagPrefix, kMethodRef.substr(1),
kInitialMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
TEST_F(BinderTransportTest, PerformSendMessage) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
const std::string kMessage = kDefaultMessage;
MakeSendMessage send_message(kMessage, &op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
::testing::InSequence sequence;
EXPECT_CALL(
GetWireWriter(),
RpcCall(TransactionMatches(kFlagMessageData, "", Metadata{}, kMessage)));
EXPECT_CALL(mock_on_complete, Callback);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
TEST_F(BinderTransportTest, PerformSendTrailingMetadata) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
// The wireformat guarantees that suffix metadata will always be empty.
// TODO(waynetu): Check whether gRPC can internally add extra trailing
// metadata.
const Metadata kTrailingMetadata = {};
MakeSendTrailingMetadata send_trailing_metadata(kTrailingMetadata, &op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
::testing::InSequence sequence;
EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches(
kFlagSuffix, "", kTrailingMetadata, "")));
EXPECT_CALL(mock_on_complete, Callback);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
TEST_F(BinderTransportTest, PerformSendAll) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
const Metadata kInitialMetadata = kDefaultMetadata;
const std::string kMethodRef = kDefaultMethodRef;
MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, kMethodRef,
&op);
const std::string kMessage = kDefaultMessage;
MakeSendMessage send_message(kMessage, &op);
// The wireformat guarantees that suffix metadata will always be empty.
// TODO(waynetu): Check whether gRPC can internally add extra trailing
// metadata.
const Metadata kTrailingMetadata = {};
MakeSendTrailingMetadata send_trailing_metadata(kTrailingMetadata, &op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
::testing::InSequence sequence;
EXPECT_CALL(GetWireWriter(),
RpcCall(TransactionMatches(
kFlagPrefix | kFlagMessageData | kFlagSuffix,
kMethodRef.substr(1), kInitialMetadata, kMessage)));
EXPECT_CALL(mock_on_complete, Callback);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
}
TEST_F(BinderTransportTest, PerformRecvInitialMetadata) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvInitialMetadata recv_initial_metadata(&op);
const Metadata kInitialMetadata = kDefaultMetadata;
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
gbt->transport_stream_receiver->NotifyRecvInitialMetadata(gbs->tx_code,
kInitialMetadata);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
VerifyMetadataEqual(kInitialMetadata,
recv_initial_metadata.grpc_initial_metadata);
}
TEST_F(BinderTransportTest, PerformRecvInitialMetadataWithMethodRef) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvInitialMetadata recv_initial_metadata(&op);
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
const Metadata kInitialMetadataWithMethodRef =
AppendMethodRef(kDefaultMetadata, kDefaultMethodRef);
gbt->transport_stream_receiver->NotifyRecvInitialMetadata(
gbs->tx_code, kInitialMetadataWithMethodRef);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
VerifyMetadataEqual(kInitialMetadataWithMethodRef,
recv_initial_metadata.grpc_initial_metadata);
}
TEST_F(BinderTransportTest, PerformRecvMessage) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvMessage recv_message(&op);
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
const std::string kMessage = kDefaultMessage;
gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kMessage);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr));
grpc_slice slice;
recv_message.grpc_message->Pull(&slice);
EXPECT_EQ(kMessage,
std::string(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)),
GRPC_SLICE_LENGTH(slice)));
grpc_slice_unref_internal(slice);
}
TEST_F(BinderTransportTest, PerformRecvTrailingMetadata) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvTrailingMetadata recv_trailing_metadata(&op);
const Metadata kTrailingMetadata = kDefaultMetadata;
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
constexpr int kStatus = kDefaultStatus;
gbt->transport_stream_receiver->NotifyRecvTrailingMetadata(
gbs->tx_code, kTrailingMetadata, kStatus);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
VerifyMetadataEqual(AppendStatus(kTrailingMetadata, kStatus),
recv_trailing_metadata.grpc_trailing_metadata);
}
TEST_F(BinderTransportTest, PerformRecvAll) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
MakeRecvInitialMetadata recv_initial_metadata(&op);
MakeRecvMessage recv_message(&op);
MakeRecvTrailingMetadata recv_trailing_metadata(&op);
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
const Metadata kInitialMetadataWithMethodRef =
AppendMethodRef(kDefaultMetadata, kDefaultMethodRef);
gbt->transport_stream_receiver->NotifyRecvInitialMetadata(
gbs->tx_code, kInitialMetadataWithMethodRef);
const std::string kMessage = kDefaultMessage;
gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kMessage);
Metadata trailing_metadata = kDefaultMetadata;
constexpr int kStatus = kDefaultStatus;
gbt->transport_stream_receiver->NotifyRecvTrailingMetadata(
gbs->tx_code, trailing_metadata, kStatus);
PerformStreamOp(gbs, &op);
exec_ctx.Flush();
VerifyMetadataEqual(kInitialMetadataWithMethodRef,
recv_initial_metadata.grpc_initial_metadata);
trailing_metadata.emplace_back("grpc-status", std::to_string(kStatus));
VerifyMetadataEqual(trailing_metadata,
recv_trailing_metadata.grpc_trailing_metadata);
EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr));
grpc_slice slice;
recv_message.grpc_message->Pull(&slice);
EXPECT_EQ(kMessage,
std::string(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)),
GRPC_SLICE_LENGTH(slice)));
grpc_slice_unref_internal(slice);
}
TEST_F(BinderTransportTest, PerformAllOps) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
grpc_transport_stream_op_batch op{};
grpc_transport_stream_op_batch_payload payload(nullptr);
op.payload = &payload;
const Metadata kSendInitialMetadata = kDefaultMetadata;
const std::string kMethodRef = kDefaultMethodRef;
MakeSendInitialMetadata send_initial_metadata(kSendInitialMetadata,
kMethodRef, &op);
const std::string kSendMessage = kDefaultMessage;
MakeSendMessage send_message(kSendMessage, &op);
// The wireformat guarantees that suffix metadata will always be empty.
// TODO(waynetu): Check whether gRPC can internally add extra trailing
// metadata.
const Metadata kSendTrailingMetadata = {};
MakeSendTrailingMetadata send_trailing_metadata(kSendTrailingMetadata, &op);
MockGrpcClosure mock_on_complete;
op.on_complete = mock_on_complete.GetGrpcClosure();
// TODO(waynetu): Currently, we simply drop the prefix '/' from the :path
// argument to obtain the method name. Update the test if this turns out to be
// incorrect.
EXPECT_CALL(GetWireWriter(),
RpcCall(TransactionMatches(
kFlagPrefix | kFlagMessageData | kFlagSuffix,
kMethodRef.substr(1), kSendInitialMetadata, kSendMessage)));
Expectation on_complete = EXPECT_CALL(mock_on_complete, Callback);
// Recv callbacks can happen after the on_complete callback.
MakeRecvInitialMetadata recv_initial_metadata(
&op, /* call_before = */ &on_complete);
MakeRecvMessage recv_message(&op, /* call_before = */ &on_complete);
MakeRecvTrailingMetadata recv_trailing_metadata(
&op, /* call_before = */ &on_complete);
PerformStreamOp(gbs, &op);
// Flush the execution context to force on_complete to run before recv
// callbacks get scheduled.
exec_ctx.Flush();
auto* gbt = reinterpret_cast<grpc_binder_transport*>(transport_);
const Metadata kRecvInitialMetadata =
AppendMethodRef(kDefaultMetadata, kDefaultMethodRef);
gbt->transport_stream_receiver->NotifyRecvInitialMetadata(
gbs->tx_code, kRecvInitialMetadata);
const std::string kRecvMessage = kDefaultMessage;
gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kRecvMessage);
const Metadata kRecvTrailingMetadata = kDefaultMetadata;
constexpr int kStatus = 0x1234;
gbt->transport_stream_receiver->NotifyRecvTrailingMetadata(
gbs->tx_code, kRecvTrailingMetadata, kStatus);
exec_ctx.Flush();
VerifyMetadataEqual(kRecvInitialMetadata,
recv_initial_metadata.grpc_initial_metadata);
VerifyMetadataEqual(AppendStatus(kRecvTrailingMetadata, kStatus),
recv_trailing_metadata.grpc_trailing_metadata);
EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr));
grpc_slice slice;
recv_message.grpc_message->Pull(&slice);
EXPECT_EQ(kRecvMessage,
std::string(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)),
GRPC_SLICE_LENGTH(slice)));
grpc_slice_unref_internal(slice);
}
TEST_F(BinderTransportTest, WireWriterRpcCallErrorPropagates) {
grpc_core::ExecCtx exec_ctx;
grpc_binder_stream* gbs = InitNewBinderStream();
MockGrpcClosure mock_on_complete1;
MockGrpcClosure mock_on_complete2;
EXPECT_CALL(GetWireWriter(), RpcCall)
.WillOnce(Return(absl::OkStatus()))
.WillOnce(Return(absl::InternalError("WireWriter::RpcCall failed")));
EXPECT_CALL(mock_on_complete1, Callback(GRPC_ERROR_NONE));
EXPECT_CALL(mock_on_complete2,
Callback(GrpcErrorMessageContains("WireWriter::RpcCall failed")));
const Metadata kInitialMetadata = {};
grpc_transport_stream_op_batch op1{};
grpc_transport_stream_op_batch_payload payload1(nullptr);
op1.payload = &payload1;
MakeSendInitialMetadata send_initial_metadata1(kInitialMetadata, "", &op1);
op1.on_complete = mock_on_complete1.GetGrpcClosure();
grpc_transport_stream_op_batch op2{};
grpc_transport_stream_op_batch_payload payload2(nullptr);
op2.payload = &payload2;
MakeSendInitialMetadata send_initial_metadata2(kInitialMetadata, "", &op2);
op2.on_complete = mock_on_complete2.GetGrpcClosure();
PerformStreamOp(gbs, &op1);
PerformStreamOp(gbs, &op2);
exec_ctx.Flush();
}
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,106 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_proto_library")
licenses(["notice"])
grpc_package(name = "test/core/transport/binder/end2end")
grpc_cc_library(
name = "fake_binder",
srcs = ["fake_binder.cc"],
hdrs = ["fake_binder.h"],
external_deps = [
"absl/memory",
"absl/random",
"absl/strings",
"absl/strings:str_format",
"absl/time",
"absl/types:variant",
],
deps = [
"//:gpr_base",
"//src/core/ext/transport/binder/wire_format:binder",
"//src/core/ext/transport/binder/wire_format:wire_reader",
],
)
grpc_cc_test(
name = "fake_binder_test",
srcs = ["fake_binder_test.cc"],
external_deps = [
"absl/strings",
"absl/time",
"gtest",
],
language = "C++",
uses_polling = False,
deps = [
":fake_binder",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_library(
name = "end2end_binder_channel",
srcs = ["testing_channel_create.cc"],
hdrs = ["testing_channel_create.h"],
external_deps = [],
deps = [
":fake_binder",
"//:grpc++_base",
"//:grpc_base_c",
"//src/core/ext/transport/binder/transport",
"//src/core/ext/transport/binder/wire_format:wire_reader",
],
)
grpc_proto_library(
name = "echo_grpc_proto",
srcs = ["echo.proto"],
)
grpc_cc_library(
name = "echo_service",
srcs = ["echo_service.cc"],
hdrs = ["echo_service.h"],
external_deps = [
"absl/strings",
"absl/strings:str_format",
"absl/time",
],
deps = [
":echo_grpc_proto",
],
)
grpc_cc_test(
name = "end2end_binder_transport_test",
srcs = ["end2end_binder_transport_test.cc"],
external_deps = [
"absl/memory",
"absl/time",
"gtest",
],
language = "C++",
deps = [
":echo_service",
":end2end_binder_channel",
":fake_binder",
"//src/core/ext/transport/binder/transport",
"//src/core/ext/transport/binder/wire_format:wire_reader",
"//test/core/util:grpc_test_util",
],
)

@ -0,0 +1,38 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A simple RPC service that echos what the client passes in. The request and
// the response simply contains the text represented in a string.
//
// This service is for end-to-end testing with fake binder tunnels.
syntax = "proto3";
// TODO(waynetu): This can be replaced by EchoTestService in
// src/proto/grpc/testing/echo.proto
package grpc_binder.end2end_testing;
message EchoRequest {
string text = 1;
}
message EchoResponse {
string text = 1;
}
service EchoService {
rpc EchoUnaryCall(EchoRequest) returns (EchoResponse);
rpc EchoServerStreamingCall(EchoRequest) returns (stream EchoResponse);
rpc EchoClientStreamingCall(stream EchoRequest) returns (EchoResponse);
rpc EchoBiDirStreamingCall(stream EchoRequest) returns (stream EchoResponse);
}

@ -0,0 +1,83 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/end2end/echo_service.h"
#include <string>
#include "absl/strings/str_format.h"
#include "absl/time/time.h"
namespace grpc_binder {
namespace end2end_testing {
const absl::string_view EchoServer::kCancelledText = "cancel";
const absl::string_view EchoServer::kTimeoutText = "timeout";
const size_t EchoServer::kServerStreamingCounts = 100;
grpc::Status EchoServer::EchoUnaryCall(grpc::ServerContext* /*context*/,
const EchoRequest* request,
EchoResponse* response) {
const std::string& data = request->text();
if (data == kCancelledText) {
return grpc::Status::CANCELLED;
}
if (data == kTimeoutText) {
absl::SleepFor(absl::Seconds(5));
}
response->set_text(data);
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoServerStreamingCall(
grpc::ServerContext* /*context*/, const EchoRequest* request,
grpc::ServerWriter<EchoResponse>* writer) {
const std::string& data = request->text();
if (data == kTimeoutText) {
absl::SleepFor(absl::Seconds(5));
}
for (size_t i = 0; i < kServerStreamingCounts; ++i) {
EchoResponse response;
response.set_text(absl::StrFormat("%s(%d)", data, i));
writer->Write(response);
}
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoClientStreamingCall(
grpc::ServerContext* /*context*/, grpc::ServerReader<EchoRequest>* reader,
EchoResponse* response) {
EchoRequest request;
std::string result = "";
while (reader->Read(&request)) {
result += request.text();
}
response->set_text(result);
return grpc::Status::OK;
}
grpc::Status EchoServer::EchoBiDirStreamingCall(
grpc::ServerContext* /*context*/,
grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
EchoRequest request;
while (stream->Read(&request)) {
EchoResponse response;
response.set_text(request.text());
stream->Write(response);
}
return grpc::Status::OK;
}
} // namespace end2end_testing
} // namespace grpc_binder

@ -0,0 +1,51 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H
#define TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H
#include "absl/strings/string_view.h"
#include "test/core/transport/binder/end2end/echo.grpc.pb.h"
namespace grpc_binder {
namespace end2end_testing {
// TODO(waynetu): Replace this with TestServiceImpl declared in
// test/cpp/end2end/test_service_impl.h
class EchoServer final : public EchoService::Service {
public:
static const absl::string_view kCancelledText;
static const absl::string_view kTimeoutText;
grpc::Status EchoUnaryCall(grpc::ServerContext* context,
const EchoRequest* request,
EchoResponse* response) override;
static const size_t kServerStreamingCounts;
grpc::Status EchoServerStreamingCall(
grpc::ServerContext* context, const EchoRequest* request,
grpc::ServerWriter<EchoResponse>* writer) override;
grpc::Status EchoClientStreamingCall(grpc::ServerContext* context,
grpc::ServerReader<EchoRequest>* reader,
EchoResponse* response) override;
grpc::Status EchoBiDirStreamingCall(
grpc::ServerContext* context,
grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) override;
};
} // namespace end2end_testing
} // namespace grpc_binder
#endif // TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H_

@ -0,0 +1,306 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <grpcpp/grpcpp.h>
#include <string>
#include <thread>
#include <utility>
#include "absl/memory/memory.h"
#include "absl/time/time.h"
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "test/core/transport/binder/end2end/echo_service.h"
#include "test/core/transport/binder/end2end/fake_binder.h"
#include "test/core/transport/binder/end2end/testing_channel_create.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
namespace {
class End2EndBinderTransportTest
: public ::testing::TestWithParam<absl::Duration> {
public:
End2EndBinderTransportTest() {
end2end_testing::g_transaction_processor =
new end2end_testing::TransactionProcessor(GetParam());
}
~End2EndBinderTransportTest() override {
delete end2end_testing::g_transaction_processor;
}
static void SetUpTestSuite() { grpc_init(); }
static void TearDownTestSuite() { grpc_shutdown(); }
std::shared_ptr<grpc::Channel> BinderChannel(
grpc::Server* server, const grpc::ChannelArguments& args) {
return end2end_testing::BinderChannelForTesting(server, args);
}
};
using end2end_testing::EchoRequest;
using end2end_testing::EchoResponse;
using end2end_testing::EchoService;
} // namespace
TEST_P(End2EndBinderTransportTest, SetupTransport) {
grpc_transport *client_transport, *server_transport;
std::tie(client_transport, server_transport) =
end2end_testing::CreateClientServerBindersPairForTesting();
EXPECT_NE(client_transport, nullptr);
EXPECT_NE(server_transport, nullptr);
grpc_transport_destroy(client_transport);
grpc_transport_destroy(server_transport);
}
TEST_P(End2EndBinderTransportTest, UnaryCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
EchoRequest request;
EchoResponse response;
request.set_text("it works!");
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), "it works!");
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelNonOkStatus) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
EchoRequest request;
EchoResponse response;
request.set_text(std::string(end2end_testing::EchoServer::kCancelledText));
// Server will not response the client with message data, however, since all
// callbacks after the trailing metadata are cancelled, we shall not be
// blocked here.
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
EXPECT_FALSE(status.ok());
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelServerTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
EchoResponse response;
request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText));
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
server->Shutdown();
}
// Temporarily disabled due to a potential deadlock in our design.
// TODO(waynetu): Enable this test once the issue is resolved.
TEST_P(End2EndBinderTransportTest,
UnaryCallThroughFakeBinderChannelClientTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
// Set transaction delay to a large number. This happens after the channel
// creation so that we don't need to wait that long for client and server to
// be connected.
end2end_testing::g_transaction_processor->SetDelay(absl::Seconds(5));
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
EchoResponse response;
request.set_text("normal-text");
grpc::Status status = stub->EchoUnaryCall(&context, request, &response);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest,
ServerStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
EchoRequest request;
request.set_text("it works!");
std::unique_ptr<grpc::ClientReader<EchoResponse>> reader =
stub->EchoServerStreamingCall(&context, request);
EchoResponse response;
size_t cnt = 0;
while (reader->Read(&response)) {
EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", cnt));
cnt++;
}
EXPECT_EQ(cnt, end2end_testing::EchoServer::kServerStreamingCounts);
grpc::Status status = reader->Finish();
EXPECT_TRUE(status.ok());
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest,
ServerStreamingCallThroughFakeBinderChannelServerTimeout) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1)));
EchoRequest request;
request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText));
std::unique_ptr<grpc::ClientReader<EchoResponse>> reader =
stub->EchoServerStreamingCall(&context, request);
EchoResponse response;
EXPECT_FALSE(reader->Read(&response));
grpc::Status status = reader->Finish();
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.error_message(), "Deadline Exceeded");
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest,
ClientStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
EchoResponse response;
std::unique_ptr<grpc::ClientWriter<EchoRequest>> writer =
stub->EchoClientStreamingCall(&context, &response);
constexpr size_t kClientStreamingCounts = 100;
std::string expected = "";
for (size_t i = 0; i < kClientStreamingCounts; ++i) {
EchoRequest request;
request.set_text(absl::StrFormat("it works!(%d)", i));
writer->Write(request);
expected += absl::StrFormat("it works!(%d)", i);
}
writer->WritesDone();
grpc::Status status = writer->Finish();
EXPECT_TRUE(status.ok());
EXPECT_EQ(response.text(), expected);
server->Shutdown();
}
TEST_P(End2EndBinderTransportTest, BiDirStreamingCallThroughFakeBinderChannel) {
grpc::ChannelArguments args;
grpc::ServerBuilder builder;
end2end_testing::EchoServer service;
builder.RegisterService(&service);
std::unique_ptr<grpc::Server> server = builder.BuildAndStart();
std::shared_ptr<grpc::Channel> channel = BinderChannel(server.get(), args);
std::unique_ptr<EchoService::Stub> stub = EchoService::NewStub(channel);
grpc::ClientContext context;
EchoResponse response;
std::shared_ptr<grpc::ClientReaderWriter<EchoRequest, EchoResponse>> stream =
stub->EchoBiDirStreamingCall(&context);
constexpr size_t kBiDirStreamingCounts = 100;
struct WriterArgs {
std::shared_ptr<grpc::ClientReaderWriter<EchoRequest, EchoResponse>> stream;
size_t bi_dir_streaming_counts;
} writer_args;
writer_args.stream = stream;
writer_args.bi_dir_streaming_counts = kBiDirStreamingCounts;
auto writer_fn = [](void* arg) {
const WriterArgs& args = *static_cast<WriterArgs*>(arg);
EchoResponse response;
for (size_t i = 0; i < args.bi_dir_streaming_counts; ++i) {
EchoRequest request;
request.set_text(absl::StrFormat("it works!(%d)", i));
args.stream->Write(request);
}
args.stream->WritesDone();
};
grpc_core::Thread writer_thread("writer-thread", writer_fn,
static_cast<void*>(&writer_args));
writer_thread.Start();
for (size_t i = 0; i < kBiDirStreamingCounts; ++i) {
EchoResponse response;
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", i));
}
grpc::Status status = stream->Finish();
EXPECT_TRUE(status.ok());
writer_thread.Join();
server->Shutdown();
}
INSTANTIATE_TEST_SUITE_P(
End2EndBinderTransportTestWithDifferentDelayTimes,
End2EndBinderTransportTest,
testing::Values(absl::ZeroDuration(), absl::Nanoseconds(10),
absl::Microseconds(10), absl::Microseconds(100),
absl::Milliseconds(1), absl::Milliseconds(20)));
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,269 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/end2end/fake_binder.h"
#include <grpc/support/log.h>
#include <string>
#include <utility>
namespace grpc_binder {
namespace end2end_testing {
TransactionProcessor* g_transaction_processor = nullptr;
FakeWritableParcel::FakeWritableParcel() : data_(1) {}
int32_t FakeWritableParcel::GetDataPosition() const { return data_position_; }
absl::Status FakeWritableParcel::SetDataPosition(int32_t pos) {
if (data_.size() < static_cast<size_t>(pos) + 1) {
data_.resize(pos + 1);
}
data_position_ = pos;
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteInt32(int32_t data) {
data_[data_position_] = data;
SetDataPosition(data_position_ + 1).IgnoreError();
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteBinder(HasRawBinder* binder) {
data_[data_position_] = binder->GetRawBinder();
SetDataPosition(data_position_ + 1).IgnoreError();
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteString(absl::string_view s) {
data_[data_position_] = std::string(s);
SetDataPosition(data_position_ + 1).IgnoreError();
return absl::OkStatus();
}
absl::Status FakeWritableParcel::WriteByteArray(const int8_t* buffer,
int32_t length) {
data_[data_position_] = std::vector<int8_t>(buffer, buffer + length);
SetDataPosition(data_position_ + 1).IgnoreError();
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadInt32(int32_t* data) const {
if (data_position_ >= data_.size() ||
!absl::holds_alternative<int32_t>(data_[data_position_])) {
return absl::InternalError("ReadInt32 failed");
}
*data = absl::get<int32_t>(data_[data_position_++]);
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadBinder(
std::unique_ptr<Binder>* data) const {
if (data_position_ >= data_.size() ||
!absl::holds_alternative<void*>(data_[data_position_])) {
return absl::InternalError("ReadBinder failed");
}
void* endpoint = absl::get<void*>(data_[data_position_++]);
if (!endpoint) return absl::InternalError("ReadBinder failed");
*data = absl::make_unique<FakeBinder>(static_cast<FakeEndpoint*>(endpoint));
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadString(char data[111]) const {
if (data_position_ >= data_.size() ||
!absl::holds_alternative<std::string>(data_[data_position_])) {
return absl::InternalError("ReadString failed");
}
const std::string& s = absl::get<std::string>(data_[data_position_++]);
if (s.size() >= 100) return absl::InternalError("ReadString failed");
std::memcpy(data, s.data(), s.size());
return absl::OkStatus();
}
absl::Status FakeReadableParcel::ReadByteArray(std::string* data) const {
if (data_position_ >= data_.size() ||
!absl::holds_alternative<std::vector<int8_t>>(data_[data_position_])) {
return absl::InternalError("ReadByteArray failed");
}
const std::vector<int8_t>& byte_array =
absl::get<std::vector<int8_t>>(data_[data_position_++]);
data->resize(byte_array.size());
for (size_t i = 0; i < byte_array.size(); ++i) {
(*data)[i] = byte_array[i];
}
return absl::OkStatus();
}
absl::Status FakeBinder::Transact(BinderTransportTxCode tx_code) {
endpoint_->tunnel->EnQueueTransaction(endpoint_->other_end, tx_code,
input_->MoveData());
return absl::OkStatus();
}
FakeTransactionReceiver::FakeTransactionReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb transact_cb) {
persistent_tx_receiver_ = &g_transaction_processor->NewPersistentTxReceiver(
std::move(wire_reader_ref), std::move(transact_cb),
absl::make_unique<FakeBinderTunnel>());
}
std::unique_ptr<TransactionReceiver> FakeBinder::ConstructTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb cb) const {
return absl::make_unique<FakeTransactionReceiver>(wire_reader_ref, cb);
}
void* FakeTransactionReceiver::GetRawBinder() {
return persistent_tx_receiver_->tunnel_->GetSendEndpoint();
}
std::unique_ptr<Binder> FakeTransactionReceiver::GetSender() const {
return absl::make_unique<FakeBinder>(
persistent_tx_receiver_->tunnel_->GetSendEndpoint());
}
PersistentFakeTransactionReceiver::PersistentFakeTransactionReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb cb,
std::unique_ptr<FakeBinderTunnel> tunnel)
: wire_reader_ref_(std::move(wire_reader_ref)),
callback_(std::move(cb)),
tunnel_(std::move(tunnel)) {
FakeEndpoint* recv_endpoint = tunnel_->GetRecvEndpoint();
recv_endpoint->owner = this;
}
TransactionProcessor::TransactionProcessor(absl::Duration delay)
: delay_nsec_(absl::ToInt64Nanoseconds(delay)),
tx_thread_(
"process-thread",
[](void* arg) {
auto* self = static_cast<TransactionProcessor*>(arg);
self->ProcessLoop();
},
this),
terminated_(false) {
tx_thread_.Start();
}
void TransactionProcessor::SetDelay(absl::Duration delay) {
delay_nsec_ = absl::ToInt64Nanoseconds(delay);
}
void TransactionProcessor::Terminate() {
if (!terminated_.load(std::memory_order_seq_cst)) {
gpr_log(GPR_INFO, "Terminating the processor");
terminated_.store(true, std::memory_order_seq_cst);
tx_thread_.Join();
gpr_log(GPR_INFO, "Processor terminated");
}
}
void TransactionProcessor::WaitForNextTransaction() {
absl::Time now = absl::Now();
if (now < deliver_time_) {
absl::Duration diff = deliver_time_ - now;
// Release the lock before going to sleep.
mu_.Unlock();
absl::SleepFor(diff);
mu_.Lock();
}
}
void TransactionProcessor::Flush() {
while (true) {
FakeEndpoint* target = nullptr;
BinderTransportTxCode tx_code{};
FakeData data;
mu_.Lock();
if (tx_queue_.empty()) {
mu_.Unlock();
break;
}
WaitForNextTransaction();
std::tie(target, tx_code, data) = std::move(tx_queue_.front());
tx_queue_.pop();
if (!tx_queue_.empty()) {
deliver_time_ = absl::Now() + GetRandomDelay();
}
mu_.Unlock();
auto* tx_receiver =
static_cast<PersistentFakeTransactionReceiver*>(target->owner);
auto parcel = absl::make_unique<FakeReadableParcel>(std::move(data));
tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
}
}
void TransactionProcessor::ProcessLoop() {
while (!terminated_.load(std::memory_order_seq_cst)) {
FakeEndpoint* target = nullptr;
BinderTransportTxCode tx_code{};
FakeData data;
mu_.Lock();
if (tx_queue_.empty()) {
mu_.Unlock();
continue;
}
WaitForNextTransaction();
std::tie(target, tx_code, data) = std::move(tx_queue_.front());
tx_queue_.pop();
if (!tx_queue_.empty()) {
deliver_time_ = absl::Now() + GetRandomDelay();
}
mu_.Unlock();
auto* tx_receiver =
static_cast<PersistentFakeTransactionReceiver*>(target->owner);
auto parcel = absl::make_unique<FakeReadableParcel>(std::move(data));
tx_receiver->Receive(tx_code, parcel.get()).IgnoreError();
}
Flush();
}
absl::Duration TransactionProcessor::GetRandomDelay() {
int64_t delay =
absl::Uniform<int64_t>(bit_gen_, delay_nsec_ / 2, delay_nsec_);
return absl::Nanoseconds(delay);
}
void TransactionProcessor::EnQueueTransaction(FakeEndpoint* target,
BinderTransportTxCode tx_code,
FakeData data) {
grpc_core::MutexLock lock(&mu_);
if (tx_queue_.empty()) {
// This is the first transaction in the queue. Compute its deliver time.
deliver_time_ = absl::Now() + GetRandomDelay();
}
tx_queue_.emplace(target, tx_code, std::move(data));
}
FakeBinderTunnel::FakeBinderTunnel()
: send_endpoint_(absl::make_unique<FakeEndpoint>(this)),
recv_endpoint_(absl::make_unique<FakeEndpoint>(this)) {
send_endpoint_->other_end = recv_endpoint_.get();
recv_endpoint_->other_end = send_endpoint_.get();
}
std::pair<std::unique_ptr<Binder>, std::unique_ptr<TransactionReceiver>>
NewBinderPair(TransactionReceiver::OnTransactCb transact_cb) {
auto tx_receiver = absl::make_unique<FakeTransactionReceiver>(
nullptr, std::move(transact_cb));
std::unique_ptr<Binder> sender = tx_receiver->GetSender();
return std::make_pair(std::move(sender), std::move(tx_receiver));
}
} // namespace end2end_testing
} // namespace grpc_binder

@ -0,0 +1,294 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A collection of fake objects that offers in-memory simulation of data
// transmission from one binder to another.
//
// Once the implementation of Binder is changed from BinderAndroid to
// FakeBinder, we'll be able to test and fuzz our end-to-end binder transport in
// a non-Android environment.
//
// The following diagram shows the high-level overview of how the in-memory
// simulation works (FakeReceiver means FakeTransactionReceiver).
//
// thread boundary
// |
// |
// ---------------- ---------------- | receive
// | FakeBinder | | FakeReceiver | <--|----------------
// ---------------- ---------------- | |
// | ^ | ------------------------
// | endpoint owner | | | TransactionProcessor |
// | | | ------------------------
// v | | ^
// ---------------- ---------------- | |
// | FakeEndpoint | --------> | FakeEndpoint | ---|----------------
// ---------------- other_end ---------------- | enqueue
// | ^ ^ | |
// | | recv_endpoint | | |
// | | | |
// | | send_endpoint | |
// v | | v
// -------------------------------------------
// | FakeBinderTunnel |
// -------------------------------------------
#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H
#define GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H
#include <atomic>
#include <forward_list>
#include <memory>
#include <queue>
#include <string>
#include <thread>
#include <tuple>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "absl/random/random.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc_binder {
namespace end2end_testing {
using FakeData = std::vector<
absl::variant<int32_t, void*, std::string, std::vector<int8_t>>>;
// A fake writable parcel.
//
// It simulates the functionalities of a real writable parcel and stores all
// written data in memory. The data can then be transferred by calling
// MoveData().
class FakeWritableParcel final : public WritableParcel {
public:
FakeWritableParcel();
int32_t GetDataPosition() const override;
absl::Status SetDataPosition(int32_t pos) override;
absl::Status WriteInt32(int32_t data) override;
absl::Status WriteBinder(HasRawBinder* binder) override;
absl::Status WriteString(absl::string_view s) override;
absl::Status WriteByteArray(const int8_t* buffer, int32_t length) override;
FakeData MoveData() { return std::move(data_); }
private:
FakeData data_;
size_t data_position_ = 0;
};
// A fake readable parcel.
//
// It takes in the data transferred from a FakeWritableParcel and provides
// methods to retrieve those data in the receiving end.
class FakeReadableParcel final : public ReadableParcel {
public:
explicit FakeReadableParcel(FakeData data) : data_(std::move(data)) {}
absl::Status ReadInt32(int32_t* data) const override;
absl::Status ReadBinder(std::unique_ptr<Binder>* data) const override;
absl::Status ReadByteArray(std::string* data) const override;
absl::Status ReadString(char data[111]) const override;
private:
const FakeData data_;
mutable size_t data_position_ = 0;
};
class FakeBinder;
class FakeBinderTunnel;
// FakeEndpoint is a simple struct that holds the pointer to the other end, a
// pointer to the tunnel and a pointer to its owner. This tells the owner where
// the data should be sent.
struct FakeEndpoint {
explicit FakeEndpoint(FakeBinderTunnel* tunnel) : tunnel(tunnel) {}
FakeEndpoint* other_end;
FakeBinderTunnel* tunnel;
// The owner is either a FakeBinder (the sending part) or a
// FakeTransactionReceiver (the receiving part). Both parts hold an endpoint
// with |owner| pointing back to them and |other_end| pointing to each other.
void* owner;
};
class PersistentFakeTransactionReceiver;
// A fake transaction receiver.
//
// This is the receiving part of a pair of binders. When constructed, a binder
// tunnle is created, and the sending part can be retrieved by calling
// GetSender().
//
// It also provides a Receive() function to simulate the on-transaction
// callback of a real Android binder.
class FakeTransactionReceiver : public TransactionReceiver {
public:
FakeTransactionReceiver(grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb cb);
void* GetRawBinder() override;
std::unique_ptr<Binder> GetSender() const;
private:
PersistentFakeTransactionReceiver* persistent_tx_receiver_;
};
// A "persistent" version of the FakeTransactionReceiver. That is, its lifetime
// is managed by the processor and it outlives the wire reader and
// grpc_binder_transport, so we can safely dereference a pointer to it in
// ProcessLoop().
class PersistentFakeTransactionReceiver {
public:
PersistentFakeTransactionReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb cb,
std::unique_ptr<FakeBinderTunnel> tunnel);
absl::Status Receive(BinderTransportTxCode tx_code,
const ReadableParcel* parcel) {
return callback_(static_cast<transaction_code_t>(tx_code), parcel);
}
private:
grpc_core::RefCountedPtr<WireReader> wire_reader_ref_;
TransactionReceiver::OnTransactCb callback_;
std::unique_ptr<FakeBinderTunnel> tunnel_;
friend class FakeTransactionReceiver;
};
// The sending part of a binders pair. It provides a FakeWritableParcel to the
// user, and when Transact() is called, it transfers the written data to the
// other end of the tunnel by following the information in its endpoint.
class FakeBinder final : public Binder {
public:
explicit FakeBinder(FakeEndpoint* endpoint) : endpoint_(endpoint) {
endpoint_->owner = this;
}
void Initialize() override {}
absl::Status PrepareTransaction() override {
input_ = absl::make_unique<FakeWritableParcel>();
return absl::OkStatus();
}
absl::Status Transact(BinderTransportTxCode tx_code) override;
WritableParcel* GetWritableParcel() const override { return input_.get(); }
ReadableParcel* GetReadableParcel() const override { return output_.get(); }
std::unique_ptr<TransactionReceiver> ConstructTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb transact_cb) const override;
void* GetRawBinder() override { return endpoint_->other_end; }
private:
FakeEndpoint* endpoint_;
std::unique_ptr<FakeWritableParcel> input_;
std::unique_ptr<FakeReadableParcel> output_;
};
// A transaction processor.
//
// Once constructed, it'll create a another thread that deliver in-coming
// transactions to their destinations.
class TransactionProcessor {
public:
explicit TransactionProcessor(absl::Duration delay = absl::ZeroDuration());
~TransactionProcessor() { Terminate(); }
void SetDelay(absl::Duration delay);
void Terminate();
void ProcessLoop();
void Flush();
// Issue a transaction with |target| pointing to the target endpoint. The
// transactions will be delivered in the same order they're issued, possibly
// with random delay to simulate real-world situation.
void EnQueueTransaction(FakeEndpoint* target, BinderTransportTxCode tx_code,
FakeData data);
PersistentFakeTransactionReceiver& NewPersistentTxReceiver(
grpc_core::RefCountedPtr<WireReader> wire_reader_ref,
TransactionReceiver::OnTransactCb cb,
std::unique_ptr<FakeBinderTunnel> tunnel) {
grpc_core::MutexLock lock(&tx_receiver_mu_);
storage_.emplace_front(wire_reader_ref, cb, std::move(tunnel));
return storage_.front();
}
private:
absl::Duration GetRandomDelay();
void WaitForNextTransaction() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
grpc_core::Mutex mu_;
std::queue<std::tuple<FakeEndpoint*, BinderTransportTxCode, FakeData>>
tx_queue_ ABSL_GUARDED_BY(mu_);
absl::Time deliver_time_ ABSL_GUARDED_BY(mu_);
int64_t delay_nsec_;
absl::BitGen bit_gen_;
grpc_core::Thread tx_thread_;
std::atomic<bool> terminated_;
grpc_core::Mutex tx_receiver_mu_;
// Use forward_list to avoid invalid pointers resulted by reallocation in
// containers such as std::vector.
std::forward_list<PersistentFakeTransactionReceiver> storage_
ABSL_GUARDED_BY(tx_receiver_mu_);
};
// The global (shared) processor. Test suite should be responsible of
// creating/deleting it.
extern TransactionProcessor* g_transaction_processor;
// A binder tunnel.
//
// It is a simple helper that creates and links two endpoints.
class FakeBinderTunnel {
public:
FakeBinderTunnel();
void EnQueueTransaction(FakeEndpoint* target, BinderTransportTxCode tx_code,
FakeData data) {
g_transaction_processor->EnQueueTransaction(target, tx_code,
std::move(data));
}
FakeEndpoint* GetSendEndpoint() const { return send_endpoint_.get(); }
FakeEndpoint* GetRecvEndpoint() const { return recv_endpoint_.get(); }
private:
std::unique_ptr<FakeEndpoint> send_endpoint_;
std::unique_ptr<FakeEndpoint> recv_endpoint_;
};
// A helper function for constructing a pair of connected binders.
std::pair<std::unique_ptr<Binder>, std::unique_ptr<TransactionReceiver>>
NewBinderPair(TransactionReceiver::OnTransactCb transact_cb);
} // namespace end2end_testing
} // namespace grpc_binder
#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H

@ -0,0 +1,373 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/end2end/fake_binder.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <random>
#include <string>
#include <utility>
#include "absl/strings/str_format.h"
#include "absl/time/time.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
namespace end2end_testing {
TEST(FakeBinderTestWithoutTransaction, WritableParcelDataPosition) {
std::unique_ptr<WritableParcel> parcel =
absl::make_unique<FakeWritableParcel>();
EXPECT_EQ(parcel->GetDataPosition(), 0);
EXPECT_TRUE(parcel->WriteInt32(0).ok());
EXPECT_EQ(parcel->GetDataPosition(), 1);
EXPECT_TRUE(parcel->WriteInt32(1).ok());
EXPECT_TRUE(parcel->WriteInt32(2).ok());
EXPECT_EQ(parcel->GetDataPosition(), 3);
EXPECT_TRUE(parcel->WriteString("").ok());
EXPECT_EQ(parcel->GetDataPosition(), 4);
EXPECT_TRUE(parcel->SetDataPosition(0).ok());
const char kBuffer[] = "test";
EXPECT_TRUE(parcel
->WriteByteArray(reinterpret_cast<const int8_t*>(kBuffer),
strlen(kBuffer))
.ok());
EXPECT_EQ(parcel->GetDataPosition(), 1);
EXPECT_TRUE(parcel->SetDataPosition(100).ok());
EXPECT_EQ(parcel->GetDataPosition(), 100);
}
namespace {
class FakeBinderTest : public ::testing::TestWithParam<absl::Duration> {
public:
FakeBinderTest() {
g_transaction_processor = new TransactionProcessor(GetParam());
}
~FakeBinderTest() override { delete g_transaction_processor; }
};
} // namespace
TEST_P(FakeBinderTest, SendInt32) {
constexpr int kValue = 0x1234;
constexpr int kTxCode = 0x4321;
int called = 0;
std::unique_ptr<Binder> sender;
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
EXPECT_EQ(tx_code, kTxCode);
int value = 0;
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ(value, kValue);
called++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteInt32(kValue).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 1);
}
TEST_P(FakeBinderTest, SendString) {
constexpr char kValue[] = "example-string";
constexpr int kTxCode = 0x4321;
int called = 0;
std::unique_ptr<Binder> sender;
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
EXPECT_EQ(tx_code, kTxCode);
char value[111];
memset(value, 0, sizeof(value));
EXPECT_TRUE(parcel->ReadString(value).ok());
EXPECT_STREQ(value, kValue);
called++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteString(kValue).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 1);
}
TEST_P(FakeBinderTest, SendByteArray) {
constexpr char kValue[] = "example-byte-array";
constexpr int kTxCode = 0x4321;
int called = 0;
std::unique_ptr<Binder> sender;
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
EXPECT_EQ(tx_code, kTxCode);
std::string value;
EXPECT_TRUE(parcel->ReadByteArray(&value).ok());
EXPECT_EQ(value, kValue);
called++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel
->WriteByteArray(reinterpret_cast<const int8_t*>(kValue),
strlen(kValue))
.ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 1);
}
TEST_P(FakeBinderTest, SendMultipleItems) {
constexpr char kByteArray[] = "example-byte-array";
constexpr char kString[] = "example-string";
constexpr int kValue = 0x1234;
constexpr int kTxCode = 0x4321;
int called = 0;
std::unique_ptr<Binder> sender;
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
int value_result;
EXPECT_EQ(tx_code, kTxCode);
EXPECT_TRUE(parcel->ReadInt32(&value_result).ok());
EXPECT_EQ(value_result, kValue);
std::string byte_array_result;
EXPECT_TRUE(parcel->ReadByteArray(&byte_array_result).ok());
EXPECT_EQ(byte_array_result, kByteArray);
char string_result[111];
memset(string_result, 0, sizeof(string_result));
EXPECT_TRUE(parcel->ReadString(string_result).ok());
EXPECT_STREQ(string_result, kString);
called++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteInt32(kValue).ok());
EXPECT_TRUE(parcel
->WriteByteArray(reinterpret_cast<const int8_t*>(kByteArray),
strlen(kByteArray))
.ok());
EXPECT_TRUE(parcel->WriteString(kString).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 1);
}
TEST_P(FakeBinderTest, SendBinder) {
constexpr int kValue = 0x1234;
constexpr int kTxCode = 0x4321;
int called = 0;
std::unique_ptr<Binder> sender;
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
EXPECT_EQ(tx_code, kTxCode);
std::unique_ptr<Binder> binder;
EXPECT_TRUE(parcel->ReadBinder(&binder).ok());
EXPECT_TRUE(binder->PrepareTransaction().ok());
WritableParcel* writable_parcel = binder->GetWritableParcel();
EXPECT_TRUE(writable_parcel->WriteInt32(kValue).ok());
EXPECT_TRUE(binder->Transact(BinderTransportTxCode(kTxCode + 1)).ok());
called++;
return absl::OkStatus();
});
int called2 = 0;
std::unique_ptr<TransactionReceiver> tx_receiver2 =
absl::make_unique<FakeTransactionReceiver>(
nullptr,
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
int value;
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ(value, kValue);
EXPECT_EQ(tx_code, kTxCode + 1);
called2++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteBinder(tx_receiver2.get()).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 1);
EXPECT_EQ(called2, 1);
}
TEST_P(FakeBinderTest, SendTransactionAfterDestruction) {
constexpr int kValue = 0x1234;
constexpr int kTxCode = 0x4321;
std::unique_ptr<Binder> sender;
int called = 0;
{
std::unique_ptr<TransactionReceiver> tx_receiver;
std::tie(sender, tx_receiver) = NewBinderPair(
[&](transaction_code_t tx_code, const ReadableParcel* parcel) {
EXPECT_EQ(tx_code, kTxCode);
int value;
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ(value, kValue + called);
called++;
return absl::OkStatus();
});
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteInt32(kValue).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
}
// tx_receiver gets destructed here. This additional transaction should
// *still* be received.
EXPECT_TRUE(sender->PrepareTransaction().ok());
WritableParcel* parcel = sender->GetWritableParcel();
EXPECT_TRUE(parcel->WriteInt32(kValue + 1).ok());
EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok());
g_transaction_processor->Terminate();
EXPECT_EQ(called, 2);
}
namespace {
struct ThreadArgument {
int tid;
std::vector<std::vector<std::pair<std::unique_ptr<Binder>,
std::unique_ptr<TransactionReceiver>>>>*
global_binder_pairs;
std::vector<std::vector<int>>* global_cnts;
int tx_code;
int num_pairs_per_thread;
int num_transactions_per_pair;
grpc_core::Mutex* mu;
};
} // namespace
// Verify that this system works correctly in a concurrent environment.
//
// In end-to-end tests, there will be at least two threads, one from client to
// server and vice versa. Thus, it's important for us to make sure that the
// simulation is correct in such setup.
TEST_P(FakeBinderTest, StressTest) {
constexpr int kTxCode = 0x4321;
constexpr int kNumThreads = 16;
constexpr int kNumPairsPerThread = 128;
constexpr int kNumTransactionsPerPair = 128;
std::vector<ThreadArgument> args(kNumThreads);
grpc_core::Mutex mu;
std::vector<std::vector<
std::pair<std::unique_ptr<Binder>, std::unique_ptr<TransactionReceiver>>>>
global_binder_pairs(kNumThreads);
std::vector<std::vector<int>> global_cnts(
kNumThreads, std::vector<int>(kNumPairsPerThread, 0));
auto th_function = [](void* arg) {
ThreadArgument* th_arg = static_cast<ThreadArgument*>(arg);
int tid = th_arg->tid;
std::vector<std::pair<std::unique_ptr<Binder>,
std::unique_ptr<TransactionReceiver>>>
binder_pairs;
for (int p = 0; p < th_arg->num_pairs_per_thread; ++p) {
std::unique_ptr<Binder> binder;
std::unique_ptr<TransactionReceiver> tx_receiver;
int expected_tx_code = th_arg->tx_code;
std::vector<std::vector<int>>* cnt = th_arg->global_cnts;
std::tie(binder, tx_receiver) =
NewBinderPair([tid, p, cnt, expected_tx_code](
transaction_code_t tx_code,
const ReadableParcel* parcel) mutable {
EXPECT_EQ(tx_code, expected_tx_code);
int value;
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ(tid, value);
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ(p, value);
EXPECT_TRUE(parcel->ReadInt32(&value).ok());
EXPECT_EQ((*cnt)[tid][p], value);
(*cnt)[tid][p]++;
return absl::OkStatus();
});
binder_pairs.emplace_back(std::move(binder), std::move(tx_receiver));
}
std::vector<int> order;
for (int i = 0; i < th_arg->num_pairs_per_thread; ++i) {
for (int j = 0; j < th_arg->num_transactions_per_pair; ++j) {
order.emplace_back(i);
}
}
std::mt19937 rng(tid);
std::shuffle(order.begin(), order.end(), rng);
std::vector<int> tx_cnt(th_arg->num_pairs_per_thread);
for (int p : order) {
EXPECT_TRUE(binder_pairs[p].first->PrepareTransaction().ok());
WritableParcel* parcel = binder_pairs[p].first->GetWritableParcel();
EXPECT_TRUE(parcel->WriteInt32(th_arg->tid).ok());
EXPECT_TRUE(parcel->WriteInt32(p).ok());
EXPECT_TRUE(parcel->WriteInt32(tx_cnt[p]++).ok());
EXPECT_TRUE(binder_pairs[p]
.first->Transact(BinderTransportTxCode(th_arg->tx_code))
.ok());
}
th_arg->mu->Lock();
(*th_arg->global_binder_pairs)[tid] = std::move(binder_pairs);
th_arg->mu->Unlock();
};
std::vector<grpc_core::Thread> thrs(kNumThreads);
std::vector<std::string> thr_names(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) {
args[i].tid = i;
args[i].global_binder_pairs = &global_binder_pairs;
args[i].global_cnts = &global_cnts;
args[i].tx_code = kTxCode;
args[i].num_pairs_per_thread = kNumPairsPerThread;
args[i].num_transactions_per_pair = kNumTransactionsPerPair;
args[i].mu = &mu;
thr_names[i] = absl::StrFormat("thread-%d", i);
thrs[i] = grpc_core::Thread(thr_names[i].c_str(), th_function, &args[i]);
}
for (auto& th : thrs) th.Start();
for (auto& th : thrs) th.Join();
g_transaction_processor->Terminate();
}
INSTANTIATE_TEST_SUITE_P(FakeBinderTestWithDifferentDelayTimes, FakeBinderTest,
testing::Values(absl::ZeroDuration(),
absl::Nanoseconds(10),
absl::Microseconds(10)));
} // namespace end2end_testing
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,126 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/end2end/testing_channel_create.h"
#include <utility>
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/error_utils.h"
namespace grpc_binder {
namespace end2end_testing {
namespace {
// Since we assume the first half of the transport setup is completed before the
// server side enters WireReader::SetupTransport, we need this helper to wait
// and finish that part of the negotiation for us.
class ServerSetupTransportHelper {
public:
ServerSetupTransportHelper()
: wire_reader_(absl::make_unique<WireReaderImpl>(
/*transport_stream_receiver=*/nullptr, /*is_client=*/false)) {
std::tie(endpoint_binder_, tx_receiver_) = NewBinderPair(
[this](transaction_code_t tx_code, const ReadableParcel* parcel) {
return this->wire_reader_->ProcessTransaction(tx_code, parcel);
});
}
std::unique_ptr<Binder> WaitForClientBinder() {
return wire_reader_->RecvSetupTransport();
}
std::unique_ptr<Binder> GetEndpointBinderForClient() {
return std::move(endpoint_binder_);
}
private:
std::unique_ptr<WireReaderImpl> wire_reader_;
// The endpoint binder for client.
std::unique_ptr<Binder> endpoint_binder_;
std::unique_ptr<TransactionReceiver> tx_receiver_;
};
} // namespace
std::pair<grpc_transport*, grpc_transport*>
CreateClientServerBindersPairForTesting() {
ServerSetupTransportHelper helper;
std::unique_ptr<Binder> endpoint_binder = helper.GetEndpointBinderForClient();
grpc_transport* client_transport = nullptr;
struct ThreadArgs {
std::unique_ptr<Binder> endpoint_binder;
grpc_transport** client_transport;
} args;
args.endpoint_binder = std::move(endpoint_binder);
args.client_transport = &client_transport;
grpc_core::Thread client_thread(
"client-thread",
[](void* arg) {
ThreadArgs* args = static_cast<ThreadArgs*>(arg);
std::unique_ptr<Binder> endpoint_binder =
std::move(args->endpoint_binder);
*args->client_transport =
grpc_create_binder_transport_client(std::move(endpoint_binder));
},
&args);
client_thread.Start();
grpc_transport* server_transport =
grpc_create_binder_transport_server(helper.WaitForClientBinder());
client_thread.Join();
return std::make_pair(client_transport, server_transport);
}
std::shared_ptr<grpc::Channel> BinderChannelForTesting(
grpc::Server* server, const grpc::ChannelArguments& args) {
grpc_channel_args channel_args = args.c_channel_args();
return grpc::CreateChannelInternal(
"",
grpc_binder_channel_create_for_testing(server->c_server(), &channel_args,
nullptr),
std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>());
}
} // namespace end2end_testing
} // namespace grpc_binder
grpc_channel* grpc_binder_channel_create_for_testing(grpc_server* server,
grpc_channel_args* args,
void* /*reserved*/) {
grpc_core::ExecCtx exec_ctx;
grpc_arg default_authority_arg = grpc_channel_arg_string_create(
const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
const_cast<char*>("test.authority"));
grpc_channel_args* client_args =
grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
grpc_transport *client_transport, *server_transport;
std::tie(client_transport, server_transport) =
grpc_binder::end2end_testing::CreateClientServerBindersPairForTesting();
grpc_error_handle error = server->core_server->SetupTransport(
server_transport, nullptr, args, nullptr);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_channel* channel =
grpc_channel_create("binder", client_args, GRPC_CLIENT_DIRECT_CHANNEL,
client_transport, nullptr, 0, &error);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_channel_args_destroy(client_args);
return channel;
}

@ -0,0 +1,41 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H
#define GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H
#include <grpcpp/grpcpp.h>
#include <utility>
#include "src/core/ext/transport/binder/transport/binder_transport.h"
#include "src/core/lib/surface/server.h"
#include "test/core/transport/binder/end2end/fake_binder.h"
namespace grpc_binder {
namespace end2end_testing {
std::pair<grpc_transport*, grpc_transport*>
CreateClientServerBindersPairForTesting();
std::shared_ptr<grpc::Channel> BinderChannelForTesting(
grpc::Server* server, const grpc::ChannelArguments& args);
} // namespace end2end_testing
} // namespace grpc_binder
grpc_channel* grpc_binder_channel_create_for_testing(grpc_server* server,
grpc_channel_args* args,
void* /*reserved*/);
#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H

@ -0,0 +1,57 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "test/core/transport/binder/mock_objects.h"
#include <memory>
#include "absl/memory/memory.h"
namespace grpc_binder {
using ::testing::Return;
MockReadableParcel::MockReadableParcel() {
ON_CALL(*this, ReadBinder).WillByDefault([](std::unique_ptr<Binder>* binder) {
*binder = absl::make_unique<MockBinder>();
return absl::OkStatus();
});
ON_CALL(*this, ReadInt32).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, ReadByteArray).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, ReadString).WillByDefault(Return(absl::OkStatus()));
}
MockWritableParcel::MockWritableParcel() {
ON_CALL(*this, SetDataPosition).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, WriteInt32).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, WriteBinder).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, WriteString).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, WriteByteArray).WillByDefault(Return(absl::OkStatus()));
}
MockBinder::MockBinder() {
ON_CALL(*this, PrepareTransaction).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, Transact).WillByDefault(Return(absl::OkStatus()));
ON_CALL(*this, GetWritableParcel).WillByDefault(Return(&mock_input_));
ON_CALL(*this, GetReadableParcel).WillByDefault(Return(&mock_output_));
ON_CALL(*this, ConstructTxReceiver)
.WillByDefault(
[this](grpc_core::RefCountedPtr<WireReader> /*wire_reader_ref*/,
TransactionReceiver::OnTransactCb cb) {
return absl::make_unique<MockTransactionReceiver>(
cb, BinderTransportTxCode::SETUP_TRANSPORT, &mock_output_);
});
}
} // namespace grpc_binder

@ -0,0 +1,113 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H
#define GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H
#include <gmock/gmock.h>
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
namespace grpc_binder {
class MockWritableParcel : public WritableParcel {
public:
MOCK_METHOD(int32_t, GetDataPosition, (), (const, override));
MOCK_METHOD(absl::Status, SetDataPosition, (int32_t), (override));
MOCK_METHOD(absl::Status, WriteInt32, (int32_t), (override));
MOCK_METHOD(absl::Status, WriteBinder, (HasRawBinder*), (override));
MOCK_METHOD(absl::Status, WriteString, (absl::string_view), (override));
MOCK_METHOD(absl::Status, WriteByteArray, (const int8_t*, int32_t),
(override));
MockWritableParcel();
};
class MockReadableParcel : public ReadableParcel {
public:
MOCK_METHOD(absl::Status, ReadInt32, (int32_t*), (const, override));
MOCK_METHOD(absl::Status, ReadBinder, (std::unique_ptr<Binder>*),
(const, override));
MOCK_METHOD(absl::Status, ReadByteArray, (std::string*), (const, override));
MOCK_METHOD(absl::Status, ReadString, (char[111]), (const, override));
MockReadableParcel();
};
class MockBinder : public Binder {
public:
MOCK_METHOD(void, Initialize, (), (override));
MOCK_METHOD(absl::Status, PrepareTransaction, (), (override));
MOCK_METHOD(absl::Status, Transact, (BinderTransportTxCode), (override));
MOCK_METHOD(WritableParcel*, GetWritableParcel, (), (const, override));
MOCK_METHOD(ReadableParcel*, GetReadableParcel, (), (const, override));
MOCK_METHOD(std::unique_ptr<TransactionReceiver>, ConstructTxReceiver,
(grpc_core::RefCountedPtr<WireReader>,
TransactionReceiver::OnTransactCb),
(const, override));
MOCK_METHOD(void*, GetRawBinder, (), (override));
MockBinder();
MockWritableParcel& GetWriter() { return mock_input_; }
MockReadableParcel& GetReader() { return mock_output_; }
private:
MockWritableParcel mock_input_;
MockReadableParcel mock_output_;
};
// TODO(waynetu): Implement transaction injection later for more thorough
// testing.
class MockTransactionReceiver : public TransactionReceiver {
public:
explicit MockTransactionReceiver(OnTransactCb transact_cb,
BinderTransportTxCode code,
const ReadableParcel* output) {
transact_cb(static_cast<transaction_code_t>(code), output).IgnoreError();
}
MOCK_METHOD(void*, GetRawBinder, (), (override));
};
class MockWireWriter : public WireWriter {
public:
MOCK_METHOD(absl::Status, RpcCall, (const Transaction&), (override));
};
class MockTransportStreamReceiver : public TransportStreamReceiver {
public:
MOCK_METHOD(void, RegisterRecvInitialMetadata,
(StreamIdentifier, InitialMetadataCallbackType), (override));
MOCK_METHOD(void, RegisterRecvMessage,
(StreamIdentifier, MessageDataCallbackType), (override));
MOCK_METHOD(void, RegisterRecvTrailingMetadata,
(StreamIdentifier, TrailingMetadataCallbackType), (override));
MOCK_METHOD(void, NotifyRecvInitialMetadata,
(StreamIdentifier, absl::StatusOr<Metadata>), (override));
MOCK_METHOD(void, NotifyRecvMessage,
(StreamIdentifier, absl::StatusOr<std::string>), (override));
MOCK_METHOD(void, NotifyRecvTrailingMetadata,
(StreamIdentifier, absl::StatusOr<Metadata>, int), (override));
MOCK_METHOD(void, CancelRecvMessageCallbacksDueToTrailingMetadata,
(StreamIdentifier), (override));
MOCK_METHOD(void, Clear, (StreamIdentifier), (override));
MOCK_METHOD(void, CancelStream, (StreamIdentifier, absl::Status), (override));
};
} // namespace grpc_binder
#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H

@ -0,0 +1,287 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <cassert>
#include <string>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
namespace {
// TODO(waynetu): These are hacks to make callbacks aware of their stream IDs
// and sequence numbers. Remove/Refactor these hacks when possible.
template <typename T>
std::pair<StreamIdentifier, int> Decode(const T& /*data*/) {
assert(false && "This should not be called");
return {};
}
template <>
std::pair<StreamIdentifier, int> Decode<std::string>(const std::string& data) {
assert(data.size() == sizeof(StreamIdentifier) + sizeof(int));
StreamIdentifier id{};
int seq_num{};
std::memcpy(&id, data.data(), sizeof(StreamIdentifier));
std::memcpy(&seq_num, data.data() + sizeof(StreamIdentifier), sizeof(int));
return std::make_pair(id, seq_num);
}
template <>
std::pair<StreamIdentifier, int> Decode<Metadata>(const Metadata& data) {
assert(data.size() == 1);
const std::string& encoding = data[0].first;
return Decode(encoding);
}
template <typename T>
T Encode(StreamIdentifier /*id*/, int /*seq_num*/) {
assert(false && "This should not be called");
return {};
}
template <>
std::string Encode<std::string>(StreamIdentifier id, int seq_num) {
char result[sizeof(StreamIdentifier) + sizeof(int)];
std::memcpy(result, &id, sizeof(StreamIdentifier));
std::memcpy(result + sizeof(StreamIdentifier), &seq_num, sizeof(int));
return std::string(result, sizeof(StreamIdentifier) + sizeof(int));
}
template <>
Metadata Encode<Metadata>(StreamIdentifier id, int seq_num) {
return {{Encode<std::string>(id, seq_num), ""}};
}
MATCHER_P2(StreamIdAndSeqNumMatch, id, seq_num, "") {
auto p = Decode(arg.value());
return p.first == id && p.second == seq_num;
}
// MockCallback is used to verify the every callback passed to transaction
// receiver will eventually be invoked with the artifact of its corresponding
// binder transaction.
template <typename FirstArg, typename... TrailingArgs>
class MockCallback {
public:
explicit MockCallback(StreamIdentifier id, int seq_num)
: id_(id), seq_num_(seq_num) {}
MOCK_METHOD(void, ActualCallback, (FirstArg), ());
std::function<void(FirstArg, TrailingArgs...)> GetHandle() {
return [this](FirstArg first_arg, TrailingArgs...) {
this->ActualCallback(first_arg);
};
}
void ExpectCallbackInvocation() {
EXPECT_CALL(*this, ActualCallback(StreamIdAndSeqNumMatch(id_, seq_num_)));
}
private:
StreamIdentifier id_;
int seq_num_;
};
using MockInitialMetadataCallback = MockCallback<absl::StatusOr<Metadata>>;
using MockMessageCallback = MockCallback<absl::StatusOr<std::string>>;
using MockTrailingMetadataCallback =
MockCallback<absl::StatusOr<Metadata>, int>;
class MockOpBatch {
public:
MockOpBatch(StreamIdentifier id, int flag, int seq_num)
: id_(id), flag_(flag), seq_num_(seq_num) {
if (flag_ & kFlagPrefix) {
initial_metadata_callback_ =
absl::make_unique<MockInitialMetadataCallback>(id_, seq_num_);
}
if (flag_ & kFlagMessageData) {
message_callback_ = absl::make_unique<MockMessageCallback>(id_, seq_num_);
}
if (flag_ & kFlagSuffix) {
trailing_metadata_callback_ =
absl::make_unique<MockTrailingMetadataCallback>(id_, seq_num_);
}
}
void Complete(TransportStreamReceiver& receiver) {
if (flag_ & kFlagPrefix) {
initial_metadata_callback_->ExpectCallbackInvocation();
receiver.NotifyRecvInitialMetadata(id_, Encode<Metadata>(id_, seq_num_));
}
if (flag_ & kFlagMessageData) {
message_callback_->ExpectCallbackInvocation();
receiver.NotifyRecvMessage(id_, Encode<std::string>(id_, seq_num_));
}
if (flag_ & kFlagSuffix) {
trailing_metadata_callback_->ExpectCallbackInvocation();
receiver.NotifyRecvTrailingMetadata(id_, Encode<Metadata>(id_, seq_num_),
0);
}
}
void RequestRecv(TransportStreamReceiver& receiver) {
if (flag_ & kFlagPrefix) {
receiver.RegisterRecvInitialMetadata(
id_, initial_metadata_callback_->GetHandle());
}
if (flag_ & kFlagMessageData) {
receiver.RegisterRecvMessage(id_, message_callback_->GetHandle());
}
if (flag_ & kFlagSuffix) {
receiver.RegisterRecvTrailingMetadata(
id_, trailing_metadata_callback_->GetHandle());
}
}
MockOpBatch NextBatch(int flag) const {
return MockOpBatch(id_, flag, seq_num_ + 1);
}
private:
std::unique_ptr<MockInitialMetadataCallback> initial_metadata_callback_;
std::unique_ptr<MockMessageCallback> message_callback_;
std::unique_ptr<MockTrailingMetadataCallback> trailing_metadata_callback_;
int id_, flag_, seq_num_;
};
class TransportStreamReceiverTest : public ::testing::Test {
protected:
MockOpBatch NewGrpcStream(int flag) {
return MockOpBatch(current_id_++, flag, 0);
}
StreamIdentifier current_id_ = 0;
};
const int kFlagAll = kFlagPrefix | kFlagMessageData | kFlagSuffix;
} // namespace
TEST_F(TransportStreamReceiverTest, MultipleStreamRequestThenComplete) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagAll);
t0.RequestRecv(receiver);
t0.Complete(receiver);
}
TEST_F(TransportStreamReceiverTest, MultipleStreamCompleteThenRequest) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagAll);
t0.Complete(receiver);
t0.RequestRecv(receiver);
}
TEST_F(TransportStreamReceiverTest, MultipleStreamInterleaved) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagAll);
MockOpBatch t1 = NewGrpcStream(kFlagAll);
t1.Complete(receiver);
t0.Complete(receiver);
t0.RequestRecv(receiver);
t1.RequestRecv(receiver);
}
TEST_F(TransportStreamReceiverTest, MultipleStreamInterleavedReversed) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagAll);
MockOpBatch t1 = NewGrpcStream(kFlagAll);
t0.RequestRecv(receiver);
t1.RequestRecv(receiver);
t1.Complete(receiver);
t0.Complete(receiver);
}
TEST_F(TransportStreamReceiverTest, MultipleStreamMoreInterleaved) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagAll);
MockOpBatch t1 = NewGrpcStream(kFlagAll);
t0.RequestRecv(receiver);
t1.Complete(receiver);
MockOpBatch t2 = NewGrpcStream(kFlagAll);
t2.RequestRecv(receiver);
t0.Complete(receiver);
t1.RequestRecv(receiver);
t2.Complete(receiver);
}
TEST_F(TransportStreamReceiverTest, SingleStreamUnaryCall) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagPrefix);
MockOpBatch t1 = t0.NextBatch(kFlagMessageData);
MockOpBatch t2 = t1.NextBatch(kFlagSuffix);
t0.RequestRecv(receiver);
t1.RequestRecv(receiver);
t2.RequestRecv(receiver);
t0.Complete(receiver);
t1.Complete(receiver);
t2.Complete(receiver);
}
TEST_F(TransportStreamReceiverTest, SingleStreamStreamingCall) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagPrefix);
t0.RequestRecv(receiver);
t0.Complete(receiver);
MockOpBatch t1 = t0.NextBatch(kFlagMessageData);
t1.Complete(receiver);
t1.RequestRecv(receiver);
MockOpBatch t2 = t1.NextBatch(kFlagMessageData);
t2.RequestRecv(receiver);
t2.Complete(receiver);
MockOpBatch t3 = t2.NextBatch(kFlagMessageData);
MockOpBatch t4 = t3.NextBatch(kFlagMessageData);
t3.Complete(receiver);
t4.Complete(receiver);
t3.RequestRecv(receiver);
t4.RequestRecv(receiver);
}
TEST_F(TransportStreamReceiverTest, DISABLED_SingleStreamBufferedCallbacks) {
TransportStreamReceiverImpl receiver(/*is_client=*/true);
MockOpBatch t0 = NewGrpcStream(kFlagPrefix);
MockOpBatch t1 = t0.NextBatch(kFlagMessageData);
MockOpBatch t2 = t1.NextBatch(kFlagMessageData);
MockOpBatch t3 = t2.NextBatch(kFlagSuffix);
t0.RequestRecv(receiver);
// TODO(waynetu): Can gRPC issues recv_message before it actually receives the
// previous one?
t1.RequestRecv(receiver);
t2.RequestRecv(receiver);
t3.RequestRecv(receiver);
t0.Complete(receiver);
t1.Complete(receiver);
t2.Complete(receiver);
t3.Complete(receiver);
}
// TODO(waynetu): Should we have some concurrent stress tests to make sure that
// thread safety is well taken care of?
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,278 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unit tests for WireReaderImpl.
//
// WireReaderImpl is responsible for turning incoming transactions into
// top-level metadata. The following tests verify that the interactions between
// WireReaderImpl and both the output (readable) parcel and the transport stream
// receiver are correct in all possible situations.
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <utility>
#include "absl/memory/memory.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "test/core/transport/binder/mock_objects.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
using ::testing::DoAll;
using ::testing::Return;
using ::testing::SetArgPointee;
using ::testing::StrictMock;
namespace {
class WireReaderTest : public ::testing::Test {
public:
WireReaderTest()
: transport_stream_receiver_(
std::make_shared<StrictMock<MockTransportStreamReceiver>>()),
wire_reader_(transport_stream_receiver_, /*is_client=*/true) {}
protected:
void ExpectReadInt32(int result) {
EXPECT_CALL(mock_readable_parcel_, ReadInt32)
.WillOnce(DoAll(SetArgPointee<0>(result), Return(absl::OkStatus())));
}
void ExpectReadByteArray(const std::string& buffer) {
ExpectReadInt32(buffer.length());
if (!buffer.empty()) {
EXPECT_CALL(mock_readable_parcel_, ReadByteArray)
.WillOnce([buffer](std::string* data) {
*data = buffer;
return absl::OkStatus();
});
}
}
template <typename T>
absl::Status CallProcessTransaction(T tx_code) {
return wire_reader_.ProcessTransaction(
static_cast<transaction_code_t>(tx_code), &mock_readable_parcel_);
}
std::shared_ptr<StrictMock<MockTransportStreamReceiver>>
transport_stream_receiver_;
WireReaderImpl wire_reader_;
StrictMock<MockReadableParcel> mock_readable_parcel_;
};
MATCHER_P(StatusOrStrEq, target, "") {
if (!arg.ok()) return false;
return arg.value() == target;
}
MATCHER_P(StatusOrContainerEq, target, "") {
if (!arg.ok()) return false;
return arg.value() == target;
}
} // namespace
TEST_F(WireReaderTest, SetupTransport) {
auto mock_binder = absl::make_unique<MockBinder>();
MockBinder& mock_binder_ref = *mock_binder;
::testing::InSequence sequence;
EXPECT_CALL(mock_binder_ref, Initialize);
EXPECT_CALL(mock_binder_ref, PrepareTransaction);
const MockReadableParcel mock_readable_parcel;
EXPECT_CALL(mock_binder_ref, GetWritableParcel);
// Write version.
EXPECT_CALL(mock_binder_ref.GetWriter(), WriteInt32(77));
// The transaction receiver immediately informs the wire writer that the
// transport has been successfully set up.
EXPECT_CALL(mock_binder_ref, ConstructTxReceiver);
EXPECT_CALL(mock_binder_ref.GetReader(), ReadInt32);
EXPECT_CALL(mock_binder_ref.GetReader(), ReadBinder);
// Write transaction receiver.
EXPECT_CALL(mock_binder_ref.GetWriter(), WriteBinder);
// Perform transaction.
EXPECT_CALL(mock_binder_ref, Transact);
wire_reader_.SetupTransport(std::move(mock_binder));
}
TEST_F(WireReaderTest, ProcessTransactionControlMessageSetupTransport) {
::testing::InSequence sequence;
EXPECT_CALL(mock_readable_parcel_, ReadInt32);
EXPECT_CALL(mock_readable_parcel_, ReadBinder)
.WillOnce([&](std::unique_ptr<Binder>* binder) {
auto mock_binder = absl::make_unique<MockBinder>();
// binder that is read from the output parcel must first be initialized
// before it can be used.
EXPECT_CALL(*mock_binder, Initialize);
*binder = std::move(mock_binder);
return absl::OkStatus();
});
EXPECT_TRUE(
CallProcessTransaction(BinderTransportTxCode::SETUP_TRANSPORT).ok());
}
TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) {
EXPECT_CALL(mock_readable_parcel_, ReadInt32);
EXPECT_TRUE(
CallProcessTransaction(BinderTransportTxCode::PING_RESPONSE).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) {
::testing::InSequence sequence;
// first transaction: empty flag
ExpectReadInt32(0);
// Won't further read sequence number.
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest,
ProcessTransactionServerRpcDataFlagPrefixWithoutMetadata) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagPrefix);
// sequence number
ExpectReadInt32(0);
// count
ExpectReadInt32(0);
EXPECT_CALL(
*transport_stream_receiver_,
NotifyRecvInitialMetadata(kFirstCallId, StatusOrContainerEq(Metadata{})));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagPrefix);
// sequence number
ExpectReadInt32(0);
const std::vector<std::pair<std::string, std::string>> kMetadata = {
{"", ""},
{"", "value"},
{"key", ""},
{"key", "value"},
{"another-key", "another-value"},
};
// count
ExpectReadInt32(kMetadata.size());
for (const auto& md : kMetadata) {
// metadata key
ExpectReadByteArray(md.first);
// metadata val
// TODO(waynetu): metadata value can also be "parcelable".
ExpectReadByteArray(md.second);
}
EXPECT_CALL(
*transport_stream_receiver_,
NotifyRecvInitialMetadata(kFirstCallId, StatusOrContainerEq(kMetadata)));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagMessageData);
// sequence number
ExpectReadInt32(0);
// message data
// TODO(waynetu): message data can also be "parcelable".
const std::string kMessageData = "message data";
ExpectReadByteArray(kMessageData);
EXPECT_CALL(*transport_stream_receiver_,
NotifyRecvMessage(kFirstCallId, StatusOrStrEq(kMessageData)));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagMessageData);
// sequence number
ExpectReadInt32(0);
// message data
// TODO(waynetu): message data can also be "parcelable".
const std::string kMessageData = "";
ExpectReadByteArray(kMessageData);
EXPECT_CALL(*transport_stream_receiver_,
NotifyRecvMessage(kFirstCallId, StatusOrStrEq(kMessageData)));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) {
::testing::InSequence sequence;
constexpr int kStatus = 0x1234;
// flag
ExpectReadInt32(kFlagSuffix | kFlagStatusDescription | (kStatus << 16));
// sequence number
ExpectReadInt32(0);
// status description
EXPECT_CALL(mock_readable_parcel_, ReadString);
// metadata count
ExpectReadInt32(0);
EXPECT_CALL(*transport_stream_receiver_,
NotifyRecvTrailingMetadata(
kFirstCallId, StatusOrContainerEq(Metadata{}), kStatus));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) {
::testing::InSequence sequence;
// flag
ExpectReadInt32(kFlagSuffix);
// sequence number
ExpectReadInt32(0);
// No status description
// metadata count
ExpectReadInt32(0);
EXPECT_CALL(*transport_stream_receiver_,
NotifyRecvTrailingMetadata(kFirstCallId,
StatusOrContainerEq(Metadata{}), 0));
EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok());
}
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,181 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include <gtest/gtest.h>
#include <string>
#include <utility>
#include "absl/memory/memory.h"
#include "test/core/transport/binder/mock_objects.h"
#include "test/core/util/test_config.h"
namespace grpc_binder {
using ::testing::Return;
using ::testing::StrictMock;
MATCHER_P(StrEqInt8Ptr, target, "") {
return std::string(reinterpret_cast<const char*>(arg)) == target;
}
TEST(WireWriterTest, RpcCall) {
auto mock_binder = absl::make_unique<MockBinder>();
MockBinder& mock_binder_ref = *mock_binder;
StrictMock<MockWritableParcel> mock_writable_parcel;
ON_CALL(mock_binder_ref, GetWritableParcel)
.WillByDefault(Return(&mock_writable_parcel));
WireWriterImpl wire_writer(std::move(mock_binder));
auto ExpectWriteByteArray = [&](const std::string& target) {
// length
EXPECT_CALL(mock_writable_parcel, WriteInt32(target.size()));
if (!target.empty()) {
// content
EXPECT_CALL(mock_writable_parcel,
WriteByteArray(StrEqInt8Ptr(target), target.size()));
}
};
::testing::InSequence sequence;
int sequence_number = 0;
int tx_code = kFirstCallId;
{
// flag
EXPECT_CALL(mock_writable_parcel, WriteInt32(0));
// sequence number
EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number));
EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code)));
Transaction tx(tx_code, sequence_number, /*is_client=*/true);
EXPECT_TRUE(wire_writer.RpcCall(tx).ok());
sequence_number++;
tx_code++;
}
{
// flag
EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagPrefix));
// sequence number
EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number));
EXPECT_CALL(mock_writable_parcel,
WriteString(absl::string_view("/example/method/ref")));
const std::vector<std::pair<std::string, std::string>> kMetadata = {
{"", ""},
{"", "value"},
{"key", ""},
{"key", "value"},
{"another-key", "another-value"},
};
// Number of metadata
EXPECT_CALL(mock_writable_parcel, WriteInt32(kMetadata.size()));
for (const auto& md : kMetadata) {
ExpectWriteByteArray(md.first);
ExpectWriteByteArray(md.second);
}
EXPECT_CALL(mock_binder_ref,
Transact(BinderTransportTxCode(kFirstCallId + 1)));
Transaction tx(kFirstCallId + 1, 1, /*is_client=*/true);
tx.SetPrefix(kMetadata);
tx.SetMethodRef("/example/method/ref");
EXPECT_TRUE(wire_writer.RpcCall(tx).ok());
sequence_number++;
tx_code++;
}
{
// flag
EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagMessageData));
// sequence number
EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number));
ExpectWriteByteArray("data");
EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code)));
Transaction tx(tx_code, sequence_number, /*is_client=*/true);
tx.SetData("data");
EXPECT_TRUE(wire_writer.RpcCall(tx).ok());
}
{
// flag
EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagSuffix));
// sequence number
EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number));
EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code)));
Transaction tx(tx_code, sequence_number, /*is_client=*/true);
tx.SetSuffix({});
EXPECT_TRUE(wire_writer.RpcCall(tx).ok());
sequence_number++;
tx_code++;
}
{
// flag
EXPECT_CALL(mock_writable_parcel,
WriteInt32(kFlagPrefix | kFlagMessageData | kFlagSuffix));
// sequence number
EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number));
EXPECT_CALL(mock_writable_parcel,
WriteString(absl::string_view("/example/method/ref")));
const std::vector<std::pair<std::string, std::string>> kMetadata = {
{"", ""},
{"", "value"},
{"key", ""},
{"key", "value"},
{"another-key", "another-value"},
};
// Number of metadata
EXPECT_CALL(mock_writable_parcel, WriteInt32(kMetadata.size()));
for (const auto& md : kMetadata) {
ExpectWriteByteArray(md.first);
ExpectWriteByteArray(md.second);
}
// Empty message data
ExpectWriteByteArray("");
EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code)));
Transaction tx(tx_code, sequence_number, /*is_client=*/true);
// TODO(waynetu): Implement a helper function that automatically creates
// EXPECT_CALL based on the tx object.
tx.SetPrefix(kMetadata);
tx.SetMethodRef("/example/method/ref");
tx.SetData("");
tx.SetSuffix({});
EXPECT_TRUE(wire_writer.RpcCall(tx).ok());
sequence_number++;
tx_code++;
}
}
} // namespace grpc_binder
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc::testing::TestEnvironment env(argc, argv);
return RUN_ALL_TESTS();
}

@ -3468,7 +3468,7 @@
"flaky": false,
"gtest": true,
"language": "c++",
"name": "binder_smoke_test",
"name": "binder_transport_test",
"platforms": [
"linux",
"mac",
@ -4537,6 +4537,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "end2end_binder_transport_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
@ -4679,6 +4703,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "fake_binder_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -6985,6 +7033,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "transport_stream_receiver_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -7105,6 +7177,54 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "wire_reader_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "wire_writer_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save