diff --git a/CMakeLists.txt b/CMakeLists.txt index d53df198ef0..61291be72be 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -532,6 +532,9 @@ protobuf_generate_grpc_cpp( protobuf_generate_grpc_cpp( src/proto/grpc/testing/xds/v3/tls.proto ) +protobuf_generate_grpc_cpp( + test/core/transport/binder/end2end/echo.proto +) protobuf_generate_grpc_cpp( test/core/tsi/alts/fake_handshaker/handshaker.proto ) @@ -751,7 +754,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bdp_estimator_test) endif() - add_dependencies(buildtests_cxx binder_smoke_test) + add_dependencies(buildtests_cxx binder_transport_test) add_dependencies(buildtests_cxx bitset_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_alarm) @@ -852,6 +855,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx destroy_grpclb_channel_with_active_connect_stress_test) add_dependencies(buildtests_cxx dual_ref_counted_test) add_dependencies(buildtests_cxx duplicate_header_bad_client_test) + add_dependencies(buildtests_cxx end2end_binder_transport_test) add_dependencies(buildtests_cxx end2end_test) add_dependencies(buildtests_cxx endpoint_config_test) add_dependencies(buildtests_cxx error_details_test) @@ -861,6 +865,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx examine_stack_test) endif() add_dependencies(buildtests_cxx exception_test) + add_dependencies(buildtests_cxx fake_binder_test) add_dependencies(buildtests_cxx file_watcher_certificate_provider_factory_test) add_dependencies(buildtests_cxx filter_end2end_test) add_dependencies(buildtests_cxx flaky_network_test) @@ -994,11 +999,14 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx timer_test) add_dependencies(buildtests_cxx tls_security_connector_test) add_dependencies(buildtests_cxx too_many_pings_test) + add_dependencies(buildtests_cxx transport_stream_receiver_test) add_dependencies(buildtests_cxx try_join_test) add_dependencies(buildtests_cxx try_seq_test) add_dependencies(buildtests_cxx unknown_frame_bad_client_test) add_dependencies(buildtests_cxx uri_parser_test) add_dependencies(buildtests_cxx window_overflow_bad_client_test) + add_dependencies(buildtests_cxx wire_reader_test) + add_dependencies(buildtests_cxx wire_writer_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx work_serializer_test) endif() @@ -8399,13 +8407,20 @@ endif() endif() if(gRPC_BUILD_TESTS) -add_executable(binder_smoke_test - test/core/transport/binder/binder_smoke_test.cc +add_executable(binder_transport_test + src/core/ext/transport/binder/transport/binder_transport.cc + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/binder_constants.cc + src/core/ext/transport/binder/wire_format/transaction.cc + src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + src/core/ext/transport/binder/wire_format/wire_writer.cc + test/core/transport/binder/binder_transport_test.cc + test/core/transport/binder/mock_objects.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) -target_include_directories(binder_smoke_test +target_include_directories(binder_transport_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/include @@ -8424,7 +8439,7 @@ target_include_directories(binder_smoke_test ${_gRPC_PROTO_GENS_DIR} ) -target_link_libraries(binder_smoke_test +target_link_libraries(binder_transport_test ${_gRPC_PROTOBUF_LIBRARIES} ${_gRPC_ALLTARGETS_LIBRARIES} grpc_test_util @@ -10550,6 +10565,92 @@ target_link_libraries(duplicate_header_bad_client_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(end2end_binder_transport_test + ${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/transport/binder/end2end/echo.grpc.pb.h + src/core/ext/transport/binder/transport/binder_transport.cc + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/binder_constants.cc + src/core/ext/transport/binder/wire_format/transaction.cc + src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + src/core/ext/transport/binder/wire_format/wire_writer.cc + src/cpp/client/channel_cc.cc + src/cpp/client/client_callback.cc + src/cpp/client/client_context.cc + src/cpp/client/client_interceptor.cc + src/cpp/client/create_channel.cc + src/cpp/client/create_channel_internal.cc + src/cpp/client/create_channel_posix.cc + src/cpp/client/credentials_cc.cc + src/cpp/codegen/codegen_init.cc + src/cpp/common/alarm.cc + src/cpp/common/channel_arguments.cc + src/cpp/common/channel_filter.cc + src/cpp/common/completion_queue_cc.cc + src/cpp/common/core_codegen.cc + src/cpp/common/resource_quota_cc.cc + src/cpp/common/rpc_method.cc + src/cpp/common/validate_service_config.cc + src/cpp/common/version_cc.cc + src/cpp/server/async_generic_service.cc + src/cpp/server/channel_argument_option.cc + src/cpp/server/create_default_thread_pool.cc + src/cpp/server/dynamic_thread_pool.cc + src/cpp/server/external_connection_acceptor_impl.cc + src/cpp/server/health/default_health_check_service.cc + src/cpp/server/health/health_check_service.cc + src/cpp/server/health/health_check_service_server_builder_option.cc + src/cpp/server/server_builder.cc + src/cpp/server/server_callback.cc + src/cpp/server/server_cc.cc + src/cpp/server/server_context.cc + src/cpp/server/server_credentials.cc + src/cpp/server/server_posix.cc + src/cpp/thread_manager/thread_manager.cc + src/cpp/util/byte_buffer_cc.cc + src/cpp/util/status.cc + src/cpp/util/string_ref.cc + src/cpp/util/time_cc.cc + test/core/transport/binder/end2end/echo_service.cc + test/core/transport/binder/end2end/end2end_binder_transport_test.cc + test/core/transport/binder/end2end/fake_binder.cc + test/core/transport/binder/end2end/testing_channel_create.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(end2end_binder_transport_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(end2end_binder_transport_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::random_random + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) @@ -10837,6 +10938,48 @@ target_link_libraries(exception_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(fake_binder_test + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/binder_constants.cc + src/core/ext/transport/binder/wire_format/transaction.cc + src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + src/core/ext/transport/binder/wire_format/wire_writer.cc + test/core/transport/binder/end2end/fake_binder.cc + test/core/transport/binder/end2end/fake_binder_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(fake_binder_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(fake_binder_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::random_random + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) @@ -15457,6 +15600,43 @@ target_link_libraries(too_many_pings_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(transport_stream_receiver_test + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/transaction.cc + test/core/transport/binder/transport_stream_receiver_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(transport_stream_receiver_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(transport_stream_receiver_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) @@ -15640,6 +15820,88 @@ target_link_libraries(window_overflow_bad_client_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(wire_reader_test + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/binder_constants.cc + src/core/ext/transport/binder/wire_format/transaction.cc + src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + src/core/ext/transport/binder/wire_format/wire_writer.cc + test/core/transport/binder/mock_objects.cc + test/core/transport/binder/wire_reader_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(wire_reader_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(wire_reader_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + +add_executable(wire_writer_test + src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + src/core/ext/transport/binder/wire_format/binder_constants.cc + src/core/ext/transport/binder/wire_format/transaction.cc + src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + src/core/ext/transport/binder/wire_format/wire_writer.cc + test/core/transport/binder/mock_objects.cc + test/core/transport/binder/wire_writer_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(wire_writer_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(wire_writer_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index d8bf90db668..14bffc0a241 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -4212,6 +4212,7 @@ targets: - src/core/ext/upb-generated/google/rpc/status.upb.h - src/core/lib/gpr/alloc.h - src/core/lib/gpr/env.h + - src/core/lib/gpr/log_internal.h - src/core/lib/gpr/murmur_hash.h - src/core/lib/gpr/spinlock.h - src/core/lib/gpr/string.h @@ -4558,13 +4559,31 @@ targets: - posix - mac uses_polling: false -- name: binder_smoke_test +- name: binder_transport_test gtest: true build: test language: c++ - headers: [] + headers: + - src/core/ext/transport/binder/transport/binder_stream.h + - src/core/ext/transport/binder/transport/binder_transport.h + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/binder.h + - src/core/ext/transport/binder/wire_format/binder_constants.h + - src/core/ext/transport/binder/wire_format/transaction.h + - src/core/ext/transport/binder/wire_format/wire_reader.h + - src/core/ext/transport/binder/wire_format/wire_reader_impl.h + - src/core/ext/transport/binder/wire_format/wire_writer.h + - test/core/transport/binder/mock_objects.h src: - - test/core/transport/binder/binder_smoke_test.cc + - src/core/ext/transport/binder/transport/binder_transport.cc + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/binder_constants.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + - src/core/ext/transport/binder/wire_format/wire_writer.cc + - test/core/transport/binder/binder_transport_test.cc + - test/core/transport/binder/mock_objects.cc deps: - grpc_test_util uses_polling: false @@ -5327,6 +5346,83 @@ targets: - test/core/end2end/cq_verifier.cc deps: - grpc_test_util +- name: end2end_binder_transport_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/binder/transport/binder_stream.h + - src/core/ext/transport/binder/transport/binder_transport.h + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/binder.h + - src/core/ext/transport/binder/wire_format/binder_constants.h + - src/core/ext/transport/binder/wire_format/transaction.h + - src/core/ext/transport/binder/wire_format/wire_reader.h + - src/core/ext/transport/binder/wire_format/wire_reader_impl.h + - src/core/ext/transport/binder/wire_format/wire_writer.h + - src/cpp/client/create_channel_internal.h + - src/cpp/common/channel_filter.h + - src/cpp/server/dynamic_thread_pool.h + - src/cpp/server/external_connection_acceptor_impl.h + - src/cpp/server/health/default_health_check_service.h + - src/cpp/server/thread_pool_interface.h + - src/cpp/thread_manager/thread_manager.h + - test/core/transport/binder/end2end/echo_service.h + - test/core/transport/binder/end2end/fake_binder.h + - test/core/transport/binder/end2end/testing_channel_create.h + src: + - test/core/transport/binder/end2end/echo.proto + - src/core/ext/transport/binder/transport/binder_transport.cc + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/binder_constants.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + - src/core/ext/transport/binder/wire_format/wire_writer.cc + - src/cpp/client/channel_cc.cc + - src/cpp/client/client_callback.cc + - src/cpp/client/client_context.cc + - src/cpp/client/client_interceptor.cc + - src/cpp/client/create_channel.cc + - src/cpp/client/create_channel_internal.cc + - src/cpp/client/create_channel_posix.cc + - src/cpp/client/credentials_cc.cc + - src/cpp/codegen/codegen_init.cc + - src/cpp/common/alarm.cc + - src/cpp/common/channel_arguments.cc + - src/cpp/common/channel_filter.cc + - src/cpp/common/completion_queue_cc.cc + - src/cpp/common/core_codegen.cc + - src/cpp/common/resource_quota_cc.cc + - src/cpp/common/rpc_method.cc + - src/cpp/common/validate_service_config.cc + - src/cpp/common/version_cc.cc + - src/cpp/server/async_generic_service.cc + - src/cpp/server/channel_argument_option.cc + - src/cpp/server/create_default_thread_pool.cc + - src/cpp/server/dynamic_thread_pool.cc + - src/cpp/server/external_connection_acceptor_impl.cc + - src/cpp/server/health/default_health_check_service.cc + - src/cpp/server/health/health_check_service.cc + - src/cpp/server/health/health_check_service_server_builder_option.cc + - src/cpp/server/server_builder.cc + - src/cpp/server/server_callback.cc + - src/cpp/server/server_cc.cc + - src/cpp/server/server_context.cc + - src/cpp/server/server_credentials.cc + - src/cpp/server/server_posix.cc + - src/cpp/thread_manager/thread_manager.cc + - src/cpp/util/byte_buffer_cc.cc + - src/cpp/util/status.cc + - src/cpp/util/string_ref.cc + - src/cpp/util/time_cc.cc + - test/core/transport/binder/end2end/echo_service.cc + - test/core/transport/binder/end2end/end2end_binder_transport_test.cc + - test/core/transport/binder/end2end/fake_binder.cc + - test/core/transport/binder/end2end/testing_channel_create.cc + deps: + - absl/random:random + - grpc_test_util - name: end2end_test gtest: true build: test @@ -5411,6 +5507,32 @@ targets: - test/cpp/end2end/exception_test.cc deps: - grpc++_test_util +- name: fake_binder_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/binder.h + - src/core/ext/transport/binder/wire_format/binder_constants.h + - src/core/ext/transport/binder/wire_format/transaction.h + - src/core/ext/transport/binder/wire_format/wire_reader.h + - src/core/ext/transport/binder/wire_format/wire_reader_impl.h + - src/core/ext/transport/binder/wire_format/wire_writer.h + - test/core/transport/binder/end2end/fake_binder.h + src: + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/binder_constants.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + - src/core/ext/transport/binder/wire_format/wire_writer.cc + - test/core/transport/binder/end2end/fake_binder.cc + - test/core/transport/binder/end2end/fake_binder_test.cc + deps: + - absl/random:random + - grpc_test_util + uses_polling: false - name: file_watcher_certificate_provider_factory_test gtest: true build: test @@ -7027,6 +7149,21 @@ targets: deps: - grpc++_test_config - grpc++_test_util +- name: transport_stream_receiver_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/transaction.h + src: + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - test/core/transport/binder/transport_stream_receiver_test.cc + deps: + - grpc_test_util + uses_polling: false - name: try_join_test gtest: true build: test @@ -7116,6 +7253,56 @@ targets: - test/core/end2end/cq_verifier.cc deps: - grpc_test_util +- name: wire_reader_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/binder.h + - src/core/ext/transport/binder/wire_format/binder_constants.h + - src/core/ext/transport/binder/wire_format/transaction.h + - src/core/ext/transport/binder/wire_format/wire_reader.h + - src/core/ext/transport/binder/wire_format/wire_reader_impl.h + - src/core/ext/transport/binder/wire_format/wire_writer.h + - test/core/transport/binder/mock_objects.h + src: + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/binder_constants.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + - src/core/ext/transport/binder/wire_format/wire_writer.cc + - test/core/transport/binder/mock_objects.cc + - test/core/transport/binder/wire_reader_test.cc + deps: + - grpc_test_util + uses_polling: false +- name: wire_writer_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/binder/utils/transport_stream_receiver.h + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h + - src/core/ext/transport/binder/wire_format/binder.h + - src/core/ext/transport/binder/wire_format/binder_constants.h + - src/core/ext/transport/binder/wire_format/transaction.h + - src/core/ext/transport/binder/wire_format/wire_reader.h + - src/core/ext/transport/binder/wire_format/wire_reader_impl.h + - src/core/ext/transport/binder/wire_format/wire_writer.h + - test/core/transport/binder/mock_objects.h + src: + - src/core/ext/transport/binder/utils/transport_stream_receiver_impl.cc + - src/core/ext/transport/binder/wire_format/binder_constants.cc + - src/core/ext/transport/binder/wire_format/transaction.cc + - src/core/ext/transport/binder/wire_format/wire_reader_impl.cc + - src/core/ext/transport/binder/wire_format/wire_writer.cc + - test/core/transport/binder/mock_objects.cc + - test/core/transport/binder/wire_writer_test.cc + deps: + - grpc_test_util + uses_polling: false - name: work_serializer_test gtest: true build: test diff --git a/src/core/ext/transport/binder/transport/binder_transport.cc b/src/core/ext/transport/binder/transport/binder_transport.cc index 7cd275b9933..9d00c6dc3d3 100644 --- a/src/core/ext/transport/binder/transport/binder_transport.cc +++ b/src/core/ext/transport/binder/transport/binder_transport.cc @@ -201,9 +201,6 @@ static void perform_stream_op(grpc_transport* gt, grpc_stream* gs, // TODO(mingcl): Will we ever has key-value pair here? According to // wireformat client suffix data is always empty. tx->SetSuffix(trailing_metadata); - if (op->payload->send_trailing_metadata.sent != nullptr) { - *op->payload->send_trailing_metadata.sent = true; - } } if (op->recv_initial_metadata) { gpr_log(GPR_INFO, "recv_initial_metadata"); diff --git a/test/core/transport/binder/BUILD b/test/core/transport/binder/BUILD index 7478fb63589..e6b972c9300 100644 --- a/test/core/transport/binder/BUILD +++ b/test/core/transport/binder/BUILD @@ -12,23 +12,89 @@ # See the License for the specific language governing permissions and # limitations under the License. -load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_package") +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package") licenses(["notice"]) grpc_package(name = "test/core/transport/binder") +grpc_cc_library( + name = "mock_objects", + srcs = ["mock_objects.cc"], + hdrs = ["mock_objects.h"], + external_deps = [ + "absl/memory", + "gtest", + ], + language = "C++", + deps = [ + "//src/core/ext/transport/binder/wire_format:binder", + "//src/core/ext/transport/binder/wire_format:wire_reader", + "//src/core/ext/transport/binder/wire_format:wire_writer", + ], +) + +grpc_cc_test( + name = "wire_writer_test", + srcs = ["wire_writer_test.cc"], + external_deps = [ + "absl/memory", + "gtest", + ], + language = "C++", + uses_polling = False, + deps = [ + ":mock_objects", + "//src/core/ext/transport/binder/wire_format:wire_writer", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "wire_reader_test", + srcs = ["wire_reader_test.cc"], + external_deps = [ + "absl/memory", + "gtest", + ], + language = "C++", + uses_polling = False, + deps = [ + ":mock_objects", + "//src/core/ext/transport/binder/wire_format:wire_reader", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_test( + name = "transport_stream_receiver_test", + srcs = ["transport_stream_receiver_test.cc"], + external_deps = [ + "absl/memory", + "gtest", + ], + language = "C++", + uses_polling = False, + deps = [ + "//src/core/ext/transport/binder/utils:transport_stream_receiver", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( - name = "binder_smoke_test", - srcs = ["binder_smoke_test.cc"], + name = "binder_transport_test", + srcs = ["binder_transport_test.cc"], external_deps = [ + "absl/memory", + "absl/strings", "gtest", ], language = "C++", uses_polling = False, deps = [ - "//:gpr", + ":mock_objects", "//:grpc", + "//src/core/ext/transport/binder/transport", "//test/core/util:grpc_test_util", ], ) diff --git a/test/core/transport/binder/binder_smoke_test.cc b/test/core/transport/binder/binder_smoke_test.cc deleted file mode 100644 index 565d6765f31..00000000000 --- a/test/core/transport/binder/binder_smoke_test.cc +++ /dev/null @@ -1,38 +0,0 @@ -// -// Copyright 2021 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include -#include -#include - -#include "test/core/util/test_config.h" - -namespace grpc_core { -namespace testing { -namespace { -TEST(SmokeTest, Empty) { gpr_log(GPR_INFO, __func__); } -} // namespace -} // namespace testing -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc::testing::TestEnvironment env(argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; -} diff --git a/test/core/transport/binder/binder_transport_test.cc b/test/core/transport/binder/binder_transport_test.cc new file mode 100644 index 00000000000..cb0e9bebe27 --- /dev/null +++ b/test/core/transport/binder/binder_transport_test.cc @@ -0,0 +1,742 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Unit-tests for grpc_binder_transport +// +// Verify that a calls to the perform_stream_op of grpc_binder_transport +// transform into the correct sequence of binder transactions. +#include "src/core/ext/transport/binder/transport/binder_transport.h" + +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/strings/match.h" +#include "src/core/ext/transport/binder/transport/binder_stream.h" +#include "test/core/transport/binder/mock_objects.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { +namespace { + +using ::testing::Expectation; +using ::testing::NiceMock; +using ::testing::Return; + +class BinderTransportTest : public ::testing::Test { + public: + BinderTransportTest() + : arena_(grpc_core::Arena::Create(/* initial_size = */ 1)), + transport_(grpc_create_binder_transport_client( + absl::make_unique>())) { + auto* gbt = reinterpret_cast(transport_); + gbt->wire_writer = absl::make_unique(); + GRPC_STREAM_REF_INIT(&ref_, 1, nullptr, nullptr, "phony ref"); + } + + ~BinderTransportTest() override { + auto* gbt = reinterpret_cast(transport_); + delete gbt; + for (grpc_binder_stream* gbs : stream_buffer_) { + gpr_free(gbs); + } + arena_->Destroy(); + } + + void PerformStreamOp(grpc_binder_stream* gbs, + grpc_transport_stream_op_batch* op) { + grpc_transport_perform_stream_op(transport_, + reinterpret_cast(gbs), op); + } + + grpc_binder_transport* GetBinderTransport() { + return reinterpret_cast(transport_); + } + + grpc_binder_stream* InitNewBinderStream() { + grpc_binder_stream* gbs = static_cast( + gpr_malloc(grpc_transport_stream_size(transport_))); + grpc_transport_init_stream(transport_, reinterpret_cast(gbs), + &ref_, nullptr, arena_); + stream_buffer_.push_back(gbs); + return gbs; + } + + MockWireWriter& GetWireWriter() { + return *reinterpret_cast( + GetBinderTransport()->wire_writer.get()); + } + + static void SetUpTestSuite() { grpc_init(); } + static void TearDownTestSuite() { grpc_shutdown(); } + + protected: + grpc_core::Arena* arena_; + grpc_transport* transport_; + grpc_stream_refcount ref_; + std::vector stream_buffer_; +}; + +void MockCallback(void* arg, grpc_error_handle error); + +class MockGrpcClosure { + public: + MockGrpcClosure() { + GRPC_CLOSURE_INIT(&closure_, MockCallback, this, nullptr); + } + + grpc_closure* GetGrpcClosure() { return &closure_; } + MOCK_METHOD(void, Callback, (grpc_error_handle), ()); + + private: + grpc_closure closure_; +}; + +void MockCallback(void* arg, grpc_error_handle error) { + MockGrpcClosure* mock_closure = static_cast(arg); + mock_closure->Callback(error); +} + +// Matches with transactions having the desired flag, method_ref, +// initial_metadata, and message_data. +MATCHER_P4(TransactionMatches, flag, method_ref, initial_metadata, message_data, + "") { + if (arg.GetFlags() != flag) return false; + if (flag & kFlagPrefix) { + if (arg.GetMethodRef() != method_ref) return false; + if (arg.GetPrefixMetadata() != initial_metadata) return false; + } + if (flag & kFlagMessageData) { + if (arg.GetMessageData() != message_data) return false; + } + return true; +} + +// Matches with grpc_error having error message containing |msg|. +MATCHER_P(GrpcErrorMessageContains, msg, "") { + return absl::StrContains(grpc_error_std_string(arg), msg); +} + +// Verify that the lower-level metadata has the same content as the gRPC +// metadata. +void VerifyMetadataEqual(const Metadata& md, grpc_metadata_batch grpc_md) { + grpc_linked_mdelem* elm = grpc_md.list.head; + for (size_t i = 0; i < md.size(); ++i) { + ASSERT_NE(elm, nullptr); + EXPECT_EQ(grpc_core::StringViewFromSlice(GRPC_MDKEY(elm->md)), md[i].first); + EXPECT_EQ(grpc_core::StringViewFromSlice(GRPC_MDVALUE(elm->md)), + md[i].second); + elm = elm->next; + } + EXPECT_EQ(elm, nullptr); +} + +// RAII helper classes for constructing gRPC metadata and receiving callbacks. +struct MakeSendInitialMetadata { + MakeSendInitialMetadata(const Metadata& initial_metadata, + const std::string& method_ref, + grpc_transport_stream_op_batch* op) + : storage(initial_metadata.size()) { + grpc_metadata_batch_init(&grpc_initial_metadata); + size_t i = 0; + for (const auto& md : initial_metadata) { + const std::string& key = md.first; + const std::string& value = md.second; + EXPECT_EQ(grpc_metadata_batch_add_tail( + &grpc_initial_metadata, &storage[i], + grpc_mdelem_from_slices(grpc_slice_from_cpp_string(key), + grpc_slice_from_cpp_string(value))), + GRPC_ERROR_NONE); + i++; + } + if (!method_ref.empty()) { + EXPECT_EQ( + grpc_metadata_batch_add_tail( + &grpc_initial_metadata, &method_ref_storage, + grpc_mdelem_from_slices(GRPC_MDSTR_PATH, + grpc_slice_from_cpp_string(method_ref))), + GRPC_ERROR_NONE); + } + op->send_initial_metadata = true; + op->payload->send_initial_metadata.send_initial_metadata = + &grpc_initial_metadata; + } + ~MakeSendInitialMetadata() { + grpc_metadata_batch_destroy(&grpc_initial_metadata); + } + + std::vector storage; + grpc_linked_mdelem method_ref_storage; + grpc_metadata_batch grpc_initial_metadata{}; +}; + +struct MakeSendMessage { + MakeSendMessage(const std::string& message, + grpc_transport_stream_op_batch* op) { + grpc_slice_buffer send_buffer; + grpc_slice_buffer_init(&send_buffer); + grpc_slice send_slice = grpc_slice_from_cpp_string(message); + grpc_slice_buffer_add(&send_buffer, send_slice); + + send_stream.Init(&send_buffer, 0); + grpc_slice_buffer_destroy(&send_buffer); + + op->send_message = true; + op->payload->send_message.send_message.reset(send_stream.get()); + } + + grpc_core::ManualConstructor send_stream; +}; + +struct MakeSendTrailingMetadata { + explicit MakeSendTrailingMetadata(const Metadata& trailing_metadata, + grpc_transport_stream_op_batch* op) { + EXPECT_TRUE(trailing_metadata.empty()); + grpc_metadata_batch_init(&grpc_trailing_metadata); + + op->send_trailing_metadata = true; + op->payload->send_trailing_metadata.send_trailing_metadata = + &grpc_trailing_metadata; + } + + grpc_metadata_batch grpc_trailing_metadata{}; +}; + +struct MakeRecvInitialMetadata { + explicit MakeRecvInitialMetadata(grpc_transport_stream_op_batch* op, + Expectation* call_before = nullptr) { + grpc_metadata_batch_init(&grpc_initial_metadata); + op->recv_initial_metadata = true; + op->payload->recv_initial_metadata.recv_initial_metadata = + &grpc_initial_metadata; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + ready.GetGrpcClosure(); + if (call_before) { + EXPECT_CALL(ready, Callback).After(*call_before); + } else { + EXPECT_CALL(ready, Callback); + } + } + + ~MakeRecvInitialMetadata() { + grpc_metadata_batch_destroy(&grpc_initial_metadata); + } + + MockGrpcClosure ready; + grpc_metadata_batch grpc_initial_metadata; +}; + +struct MakeRecvMessage { + explicit MakeRecvMessage(grpc_transport_stream_op_batch* op, + Expectation* call_before = nullptr) { + op->recv_message = true; + op->payload->recv_message.recv_message = &grpc_message; + op->payload->recv_message.recv_message_ready = ready.GetGrpcClosure(); + if (call_before) { + EXPECT_CALL(ready, Callback).After(*call_before); + } else { + EXPECT_CALL(ready, Callback); + } + } + + MockGrpcClosure ready; + grpc_core::OrphanablePtr grpc_message; +}; + +struct MakeRecvTrailingMetadata { + explicit MakeRecvTrailingMetadata(grpc_transport_stream_op_batch* op, + Expectation* call_before = nullptr) { + grpc_metadata_batch_init(&grpc_trailing_metadata); + op->recv_trailing_metadata = true; + op->payload->recv_trailing_metadata.recv_trailing_metadata = + &grpc_trailing_metadata; + op->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + ready.GetGrpcClosure(); + if (call_before) { + EXPECT_CALL(ready, Callback).After(*call_before); + } else { + EXPECT_CALL(ready, Callback); + } + } + + ~MakeRecvTrailingMetadata() { + grpc_metadata_batch_destroy(&grpc_trailing_metadata); + } + + MockGrpcClosure ready; + grpc_metadata_batch grpc_trailing_metadata; +}; + +const Metadata kDefaultMetadata = { + {"", ""}, + {"", "value"}, + {"key", ""}, + {"key", "value"}, +}; + +constexpr char kDefaultMethodRef[] = "/some/path"; +constexpr char kDefaultMessage[] = "binder transport message"; +constexpr int kDefaultStatus = 0x1234; + +Metadata AppendMethodRef(const Metadata& md, const std::string& method_ref) { + Metadata result = md; + result.emplace_back(":path", method_ref); + return result; +} + +Metadata AppendStatus(const Metadata& md, int status) { + Metadata result = md; + result.emplace_back("grpc-status", std::to_string(status)); + return result; +} + +} // namespace + +TEST_F(BinderTransportTest, CreateBinderTransport) { + EXPECT_NE(transport_, nullptr); +} + +TEST_F(BinderTransportTest, TransactionIdIncrement) { + grpc_binder_stream* gbs0 = InitNewBinderStream(); + EXPECT_EQ(gbs0->t, GetBinderTransport()); + EXPECT_EQ(gbs0->tx_code, kFirstCallId); + EXPECT_EQ(gbs0->seq, 0); + grpc_binder_stream* gbs1 = InitNewBinderStream(); + EXPECT_EQ(gbs1->t, GetBinderTransport()); + EXPECT_EQ(gbs1->tx_code, kFirstCallId + 1); + EXPECT_EQ(gbs1->seq, 0); + grpc_binder_stream* gbs2 = InitNewBinderStream(); + EXPECT_EQ(gbs2->t, GetBinderTransport()); + EXPECT_EQ(gbs2->tx_code, kFirstCallId + 2); + EXPECT_EQ(gbs2->seq, 0); +} + +TEST_F(BinderTransportTest, SeqNumIncrement) { + grpc_binder_stream* gbs = InitNewBinderStream(); + EXPECT_EQ(gbs->t, GetBinderTransport()); + EXPECT_EQ(gbs->tx_code, kFirstCallId); + // A simple batch that contains only "send_initial_metadata" + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + MakeSendInitialMetadata send_initial_metadata(kDefaultMetadata, "", &op); + EXPECT_EQ(gbs->seq, 0); + PerformStreamOp(gbs, &op); + EXPECT_EQ(gbs->tx_code, kFirstCallId); + EXPECT_EQ(gbs->seq, 1); + PerformStreamOp(gbs, &op); + EXPECT_EQ(gbs->tx_code, kFirstCallId); + EXPECT_EQ(gbs->seq, 2); +} + +TEST_F(BinderTransportTest, SeqNumNotIncrementWithoutSend) { + { + grpc_binder_stream* gbs = InitNewBinderStream(); + EXPECT_EQ(gbs->t, GetBinderTransport()); + EXPECT_EQ(gbs->tx_code, kFirstCallId); + // No-op batch. + grpc_transport_stream_op_batch op{}; + EXPECT_EQ(gbs->seq, 0); + PerformStreamOp(gbs, &op); + EXPECT_EQ(gbs->tx_code, kFirstCallId); + EXPECT_EQ(gbs->seq, 0); + } + { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + EXPECT_EQ(gbs->t, GetBinderTransport()); + EXPECT_EQ(gbs->tx_code, kFirstCallId + 1); + // Batch with only receiving operations. + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + MakeRecvInitialMetadata recv_initial_metadata(&op); + EXPECT_EQ(gbs->seq, 0); + PerformStreamOp(gbs, &op); + EXPECT_EQ(gbs->tx_code, kFirstCallId + 1); + EXPECT_EQ(gbs->seq, 0); + + // Just to trigger the callback. + auto* gbt = reinterpret_cast(transport_); + gbt->transport_stream_receiver->NotifyRecvInitialMetadata(gbs->tx_code, + kDefaultMetadata); + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + } +} + +TEST_F(BinderTransportTest, PerformSendInitialMetadata) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + const Metadata kInitialMetadata = kDefaultMetadata; + MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, "", &op); + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + ::testing::InSequence sequence; + EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches( + kFlagPrefix, "", kInitialMetadata, ""))); + EXPECT_CALL(mock_on_complete, Callback); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); +} + +TEST_F(BinderTransportTest, PerformSendInitialMetadataMethodRef) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + const Metadata kInitialMetadata = kDefaultMetadata; + const std::string kMethodRef = kDefaultMethodRef; + MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, kMethodRef, + &op); + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + ::testing::InSequence sequence; + EXPECT_CALL(GetWireWriter(), + RpcCall(TransactionMatches(kFlagPrefix, kMethodRef.substr(1), + kInitialMetadata, ""))); + EXPECT_CALL(mock_on_complete, Callback); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); +} + +TEST_F(BinderTransportTest, PerformSendMessage) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + const std::string kMessage = kDefaultMessage; + MakeSendMessage send_message(kMessage, &op); + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + ::testing::InSequence sequence; + EXPECT_CALL( + GetWireWriter(), + RpcCall(TransactionMatches(kFlagMessageData, "", Metadata{}, kMessage))); + EXPECT_CALL(mock_on_complete, Callback); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); +} + +TEST_F(BinderTransportTest, PerformSendTrailingMetadata) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + // The wireformat guarantees that suffix metadata will always be empty. + // TODO(waynetu): Check whether gRPC can internally add extra trailing + // metadata. + const Metadata kTrailingMetadata = {}; + MakeSendTrailingMetadata send_trailing_metadata(kTrailingMetadata, &op); + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + ::testing::InSequence sequence; + EXPECT_CALL(GetWireWriter(), RpcCall(TransactionMatches( + kFlagSuffix, "", kTrailingMetadata, ""))); + EXPECT_CALL(mock_on_complete, Callback); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); +} + +TEST_F(BinderTransportTest, PerformSendAll) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + const Metadata kInitialMetadata = kDefaultMetadata; + const std::string kMethodRef = kDefaultMethodRef; + MakeSendInitialMetadata send_initial_metadata(kInitialMetadata, kMethodRef, + &op); + + const std::string kMessage = kDefaultMessage; + MakeSendMessage send_message(kMessage, &op); + + // The wireformat guarantees that suffix metadata will always be empty. + // TODO(waynetu): Check whether gRPC can internally add extra trailing + // metadata. + const Metadata kTrailingMetadata = {}; + MakeSendTrailingMetadata send_trailing_metadata(kTrailingMetadata, &op); + + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + ::testing::InSequence sequence; + EXPECT_CALL(GetWireWriter(), + RpcCall(TransactionMatches( + kFlagPrefix | kFlagMessageData | kFlagSuffix, + kMethodRef.substr(1), kInitialMetadata, kMessage))); + EXPECT_CALL(mock_on_complete, Callback); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); +} + +TEST_F(BinderTransportTest, PerformRecvInitialMetadata) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + MakeRecvInitialMetadata recv_initial_metadata(&op); + + const Metadata kInitialMetadata = kDefaultMetadata; + auto* gbt = reinterpret_cast(transport_); + gbt->transport_stream_receiver->NotifyRecvInitialMetadata(gbs->tx_code, + kInitialMetadata); + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + + VerifyMetadataEqual(kInitialMetadata, + recv_initial_metadata.grpc_initial_metadata); +} + +TEST_F(BinderTransportTest, PerformRecvInitialMetadataWithMethodRef) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + MakeRecvInitialMetadata recv_initial_metadata(&op); + + auto* gbt = reinterpret_cast(transport_); + const Metadata kInitialMetadataWithMethodRef = + AppendMethodRef(kDefaultMetadata, kDefaultMethodRef); + gbt->transport_stream_receiver->NotifyRecvInitialMetadata( + gbs->tx_code, kInitialMetadataWithMethodRef); + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + + VerifyMetadataEqual(kInitialMetadataWithMethodRef, + recv_initial_metadata.grpc_initial_metadata); +} + +TEST_F(BinderTransportTest, PerformRecvMessage) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + MakeRecvMessage recv_message(&op); + + auto* gbt = reinterpret_cast(transport_); + const std::string kMessage = kDefaultMessage; + gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kMessage); + + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr)); + grpc_slice slice; + recv_message.grpc_message->Pull(&slice); + EXPECT_EQ(kMessage, + std::string(reinterpret_cast(GRPC_SLICE_START_PTR(slice)), + GRPC_SLICE_LENGTH(slice))); + grpc_slice_unref_internal(slice); +} + +TEST_F(BinderTransportTest, PerformRecvTrailingMetadata) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + MakeRecvTrailingMetadata recv_trailing_metadata(&op); + + const Metadata kTrailingMetadata = kDefaultMetadata; + auto* gbt = reinterpret_cast(transport_); + constexpr int kStatus = kDefaultStatus; + gbt->transport_stream_receiver->NotifyRecvTrailingMetadata( + gbs->tx_code, kTrailingMetadata, kStatus); + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + VerifyMetadataEqual(AppendStatus(kTrailingMetadata, kStatus), + recv_trailing_metadata.grpc_trailing_metadata); +} + +TEST_F(BinderTransportTest, PerformRecvAll) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + MakeRecvInitialMetadata recv_initial_metadata(&op); + MakeRecvMessage recv_message(&op); + MakeRecvTrailingMetadata recv_trailing_metadata(&op); + + auto* gbt = reinterpret_cast(transport_); + const Metadata kInitialMetadataWithMethodRef = + AppendMethodRef(kDefaultMetadata, kDefaultMethodRef); + gbt->transport_stream_receiver->NotifyRecvInitialMetadata( + gbs->tx_code, kInitialMetadataWithMethodRef); + + const std::string kMessage = kDefaultMessage; + gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kMessage); + + Metadata trailing_metadata = kDefaultMetadata; + constexpr int kStatus = kDefaultStatus; + gbt->transport_stream_receiver->NotifyRecvTrailingMetadata( + gbs->tx_code, trailing_metadata, kStatus); + PerformStreamOp(gbs, &op); + exec_ctx.Flush(); + + VerifyMetadataEqual(kInitialMetadataWithMethodRef, + recv_initial_metadata.grpc_initial_metadata); + trailing_metadata.emplace_back("grpc-status", std::to_string(kStatus)); + VerifyMetadataEqual(trailing_metadata, + recv_trailing_metadata.grpc_trailing_metadata); + EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr)); + grpc_slice slice; + recv_message.grpc_message->Pull(&slice); + EXPECT_EQ(kMessage, + std::string(reinterpret_cast(GRPC_SLICE_START_PTR(slice)), + GRPC_SLICE_LENGTH(slice))); + grpc_slice_unref_internal(slice); +} + +TEST_F(BinderTransportTest, PerformAllOps) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + grpc_transport_stream_op_batch op{}; + grpc_transport_stream_op_batch_payload payload(nullptr); + op.payload = &payload; + + const Metadata kSendInitialMetadata = kDefaultMetadata; + const std::string kMethodRef = kDefaultMethodRef; + MakeSendInitialMetadata send_initial_metadata(kSendInitialMetadata, + kMethodRef, &op); + + const std::string kSendMessage = kDefaultMessage; + MakeSendMessage send_message(kSendMessage, &op); + + // The wireformat guarantees that suffix metadata will always be empty. + // TODO(waynetu): Check whether gRPC can internally add extra trailing + // metadata. + const Metadata kSendTrailingMetadata = {}; + MakeSendTrailingMetadata send_trailing_metadata(kSendTrailingMetadata, &op); + + MockGrpcClosure mock_on_complete; + op.on_complete = mock_on_complete.GetGrpcClosure(); + + // TODO(waynetu): Currently, we simply drop the prefix '/' from the :path + // argument to obtain the method name. Update the test if this turns out to be + // incorrect. + EXPECT_CALL(GetWireWriter(), + RpcCall(TransactionMatches( + kFlagPrefix | kFlagMessageData | kFlagSuffix, + kMethodRef.substr(1), kSendInitialMetadata, kSendMessage))); + Expectation on_complete = EXPECT_CALL(mock_on_complete, Callback); + + // Recv callbacks can happen after the on_complete callback. + MakeRecvInitialMetadata recv_initial_metadata( + &op, /* call_before = */ &on_complete); + MakeRecvMessage recv_message(&op, /* call_before = */ &on_complete); + MakeRecvTrailingMetadata recv_trailing_metadata( + &op, /* call_before = */ &on_complete); + + PerformStreamOp(gbs, &op); + + // Flush the execution context to force on_complete to run before recv + // callbacks get scheduled. + exec_ctx.Flush(); + + auto* gbt = reinterpret_cast(transport_); + const Metadata kRecvInitialMetadata = + AppendMethodRef(kDefaultMetadata, kDefaultMethodRef); + gbt->transport_stream_receiver->NotifyRecvInitialMetadata( + gbs->tx_code, kRecvInitialMetadata); + const std::string kRecvMessage = kDefaultMessage; + gbt->transport_stream_receiver->NotifyRecvMessage(gbs->tx_code, kRecvMessage); + const Metadata kRecvTrailingMetadata = kDefaultMetadata; + constexpr int kStatus = 0x1234; + gbt->transport_stream_receiver->NotifyRecvTrailingMetadata( + gbs->tx_code, kRecvTrailingMetadata, kStatus); + + exec_ctx.Flush(); + VerifyMetadataEqual(kRecvInitialMetadata, + recv_initial_metadata.grpc_initial_metadata); + VerifyMetadataEqual(AppendStatus(kRecvTrailingMetadata, kStatus), + recv_trailing_metadata.grpc_trailing_metadata); + + EXPECT_TRUE(recv_message.grpc_message->Next(SIZE_MAX, nullptr)); + grpc_slice slice; + recv_message.grpc_message->Pull(&slice); + EXPECT_EQ(kRecvMessage, + std::string(reinterpret_cast(GRPC_SLICE_START_PTR(slice)), + GRPC_SLICE_LENGTH(slice))); + grpc_slice_unref_internal(slice); +} + +TEST_F(BinderTransportTest, WireWriterRpcCallErrorPropagates) { + grpc_core::ExecCtx exec_ctx; + grpc_binder_stream* gbs = InitNewBinderStream(); + + MockGrpcClosure mock_on_complete1; + MockGrpcClosure mock_on_complete2; + + EXPECT_CALL(GetWireWriter(), RpcCall) + .WillOnce(Return(absl::OkStatus())) + .WillOnce(Return(absl::InternalError("WireWriter::RpcCall failed"))); + EXPECT_CALL(mock_on_complete1, Callback(GRPC_ERROR_NONE)); + EXPECT_CALL(mock_on_complete2, + Callback(GrpcErrorMessageContains("WireWriter::RpcCall failed"))); + + const Metadata kInitialMetadata = {}; + grpc_transport_stream_op_batch op1{}; + grpc_transport_stream_op_batch_payload payload1(nullptr); + op1.payload = &payload1; + MakeSendInitialMetadata send_initial_metadata1(kInitialMetadata, "", &op1); + op1.on_complete = mock_on_complete1.GetGrpcClosure(); + + grpc_transport_stream_op_batch op2{}; + grpc_transport_stream_op_batch_payload payload2(nullptr); + op2.payload = &payload2; + MakeSendInitialMetadata send_initial_metadata2(kInitialMetadata, "", &op2); + op2.on_complete = mock_on_complete2.GetGrpcClosure(); + + PerformStreamOp(gbs, &op1); + PerformStreamOp(gbs, &op2); + exec_ctx.Flush(); +} + +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/transport/binder/end2end/BUILD b/test/core/transport/binder/end2end/BUILD new file mode 100644 index 00000000000..82af1ed1b19 --- /dev/null +++ b/test/core/transport/binder/end2end/BUILD @@ -0,0 +1,106 @@ +# Copyright 2021 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_package", "grpc_proto_library") + +licenses(["notice"]) + +grpc_package(name = "test/core/transport/binder/end2end") + +grpc_cc_library( + name = "fake_binder", + srcs = ["fake_binder.cc"], + hdrs = ["fake_binder.h"], + external_deps = [ + "absl/memory", + "absl/random", + "absl/strings", + "absl/strings:str_format", + "absl/time", + "absl/types:variant", + ], + deps = [ + "//:gpr_base", + "//src/core/ext/transport/binder/wire_format:binder", + "//src/core/ext/transport/binder/wire_format:wire_reader", + ], +) + +grpc_cc_test( + name = "fake_binder_test", + srcs = ["fake_binder_test.cc"], + external_deps = [ + "absl/strings", + "absl/time", + "gtest", + ], + language = "C++", + uses_polling = False, + deps = [ + ":fake_binder", + "//test/core/util:grpc_test_util", + ], +) + +grpc_cc_library( + name = "end2end_binder_channel", + srcs = ["testing_channel_create.cc"], + hdrs = ["testing_channel_create.h"], + external_deps = [], + deps = [ + ":fake_binder", + "//:grpc++_base", + "//:grpc_base_c", + "//src/core/ext/transport/binder/transport", + "//src/core/ext/transport/binder/wire_format:wire_reader", + ], +) + +grpc_proto_library( + name = "echo_grpc_proto", + srcs = ["echo.proto"], +) + +grpc_cc_library( + name = "echo_service", + srcs = ["echo_service.cc"], + hdrs = ["echo_service.h"], + external_deps = [ + "absl/strings", + "absl/strings:str_format", + "absl/time", + ], + deps = [ + ":echo_grpc_proto", + ], +) + +grpc_cc_test( + name = "end2end_binder_transport_test", + srcs = ["end2end_binder_transport_test.cc"], + external_deps = [ + "absl/memory", + "absl/time", + "gtest", + ], + language = "C++", + deps = [ + ":echo_service", + ":end2end_binder_channel", + ":fake_binder", + "//src/core/ext/transport/binder/transport", + "//src/core/ext/transport/binder/wire_format:wire_reader", + "//test/core/util:grpc_test_util", + ], +) diff --git a/test/core/transport/binder/end2end/echo.proto b/test/core/transport/binder/end2end/echo.proto new file mode 100644 index 00000000000..6669e1f28c7 --- /dev/null +++ b/test/core/transport/binder/end2end/echo.proto @@ -0,0 +1,38 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// A simple RPC service that echos what the client passes in. The request and +// the response simply contains the text represented in a string. +// +// This service is for end-to-end testing with fake binder tunnels. +syntax = "proto3"; + +// TODO(waynetu): This can be replaced by EchoTestService in +// src/proto/grpc/testing/echo.proto +package grpc_binder.end2end_testing; + +message EchoRequest { + string text = 1; +} + +message EchoResponse { + string text = 1; +} + +service EchoService { + rpc EchoUnaryCall(EchoRequest) returns (EchoResponse); + rpc EchoServerStreamingCall(EchoRequest) returns (stream EchoResponse); + rpc EchoClientStreamingCall(stream EchoRequest) returns (EchoResponse); + rpc EchoBiDirStreamingCall(stream EchoRequest) returns (stream EchoResponse); +} diff --git a/test/core/transport/binder/end2end/echo_service.cc b/test/core/transport/binder/end2end/echo_service.cc new file mode 100644 index 00000000000..def23f57d2d --- /dev/null +++ b/test/core/transport/binder/end2end/echo_service.cc @@ -0,0 +1,83 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/binder/end2end/echo_service.h" + +#include + +#include "absl/strings/str_format.h" +#include "absl/time/time.h" + +namespace grpc_binder { +namespace end2end_testing { + +const absl::string_view EchoServer::kCancelledText = "cancel"; +const absl::string_view EchoServer::kTimeoutText = "timeout"; +const size_t EchoServer::kServerStreamingCounts = 100; + +grpc::Status EchoServer::EchoUnaryCall(grpc::ServerContext* /*context*/, + const EchoRequest* request, + EchoResponse* response) { + const std::string& data = request->text(); + if (data == kCancelledText) { + return grpc::Status::CANCELLED; + } + if (data == kTimeoutText) { + absl::SleepFor(absl::Seconds(5)); + } + response->set_text(data); + return grpc::Status::OK; +} + +grpc::Status EchoServer::EchoServerStreamingCall( + grpc::ServerContext* /*context*/, const EchoRequest* request, + grpc::ServerWriter* writer) { + const std::string& data = request->text(); + if (data == kTimeoutText) { + absl::SleepFor(absl::Seconds(5)); + } + for (size_t i = 0; i < kServerStreamingCounts; ++i) { + EchoResponse response; + response.set_text(absl::StrFormat("%s(%d)", data, i)); + writer->Write(response); + } + return grpc::Status::OK; +} + +grpc::Status EchoServer::EchoClientStreamingCall( + grpc::ServerContext* /*context*/, grpc::ServerReader* reader, + EchoResponse* response) { + EchoRequest request; + std::string result = ""; + while (reader->Read(&request)) { + result += request.text(); + } + response->set_text(result); + return grpc::Status::OK; +} + +grpc::Status EchoServer::EchoBiDirStreamingCall( + grpc::ServerContext* /*context*/, + grpc::ServerReaderWriter* stream) { + EchoRequest request; + while (stream->Read(&request)) { + EchoResponse response; + response.set_text(request.text()); + stream->Write(response); + } + return grpc::Status::OK; +} + +} // namespace end2end_testing +} // namespace grpc_binder diff --git a/test/core/transport/binder/end2end/echo_service.h b/test/core/transport/binder/end2end/echo_service.h new file mode 100644 index 00000000000..8d86ca1579b --- /dev/null +++ b/test/core/transport/binder/end2end/echo_service.h @@ -0,0 +1,51 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H +#define TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H + +#include "absl/strings/string_view.h" +#include "test/core/transport/binder/end2end/echo.grpc.pb.h" + +namespace grpc_binder { +namespace end2end_testing { + +// TODO(waynetu): Replace this with TestServiceImpl declared in +// test/cpp/end2end/test_service_impl.h +class EchoServer final : public EchoService::Service { + public: + static const absl::string_view kCancelledText; + static const absl::string_view kTimeoutText; + + grpc::Status EchoUnaryCall(grpc::ServerContext* context, + const EchoRequest* request, + EchoResponse* response) override; + + static const size_t kServerStreamingCounts; + + grpc::Status EchoServerStreamingCall( + grpc::ServerContext* context, const EchoRequest* request, + grpc::ServerWriter* writer) override; + grpc::Status EchoClientStreamingCall(grpc::ServerContext* context, + grpc::ServerReader* reader, + EchoResponse* response) override; + grpc::Status EchoBiDirStreamingCall( + grpc::ServerContext* context, + grpc::ServerReaderWriter* stream) override; +}; + +} // namespace end2end_testing +} // namespace grpc_binder + +#endif // TEST_CORE_TRANSPORT_BINDER_END2END_ECHO_SERVICE_H_ diff --git a/test/core/transport/binder/end2end/end2end_binder_transport_test.cc b/test/core/transport/binder/end2end/end2end_binder_transport_test.cc new file mode 100644 index 00000000000..8ca18c96b2d --- /dev/null +++ b/test/core/transport/binder/end2end/end2end_binder_transport_test.cc @@ -0,0 +1,306 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/time/time.h" +#include "src/core/ext/transport/binder/transport/binder_transport.h" +#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h" +#include "test/core/transport/binder/end2end/echo_service.h" +#include "test/core/transport/binder/end2end/fake_binder.h" +#include "test/core/transport/binder/end2end/testing_channel_create.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { + +namespace { + +class End2EndBinderTransportTest + : public ::testing::TestWithParam { + public: + End2EndBinderTransportTest() { + end2end_testing::g_transaction_processor = + new end2end_testing::TransactionProcessor(GetParam()); + } + + ~End2EndBinderTransportTest() override { + delete end2end_testing::g_transaction_processor; + } + + static void SetUpTestSuite() { grpc_init(); } + static void TearDownTestSuite() { grpc_shutdown(); } + + std::shared_ptr BinderChannel( + grpc::Server* server, const grpc::ChannelArguments& args) { + return end2end_testing::BinderChannelForTesting(server, args); + } +}; + +using end2end_testing::EchoRequest; +using end2end_testing::EchoResponse; +using end2end_testing::EchoService; + +} // namespace + +TEST_P(End2EndBinderTransportTest, SetupTransport) { + grpc_transport *client_transport, *server_transport; + std::tie(client_transport, server_transport) = + end2end_testing::CreateClientServerBindersPairForTesting(); + EXPECT_NE(client_transport, nullptr); + EXPECT_NE(server_transport, nullptr); + + grpc_transport_destroy(client_transport); + grpc_transport_destroy(server_transport); +} + +TEST_P(End2EndBinderTransportTest, UnaryCallThroughFakeBinderChannel) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + EchoRequest request; + EchoResponse response; + request.set_text("it works!"); + grpc::Status status = stub->EchoUnaryCall(&context, request, &response); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.text(), "it works!"); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, + UnaryCallThroughFakeBinderChannelNonOkStatus) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + EchoRequest request; + EchoResponse response; + request.set_text(std::string(end2end_testing::EchoServer::kCancelledText)); + // Server will not response the client with message data, however, since all + // callbacks after the trailing metadata are cancelled, we shall not be + // blocked here. + grpc::Status status = stub->EchoUnaryCall(&context, request, &response); + EXPECT_FALSE(status.ok()); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, + UnaryCallThroughFakeBinderChannelServerTimeout) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1))); + EchoRequest request; + EchoResponse response; + request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText)); + grpc::Status status = stub->EchoUnaryCall(&context, request, &response); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Deadline Exceeded"); + + server->Shutdown(); +} + +// Temporarily disabled due to a potential deadlock in our design. +// TODO(waynetu): Enable this test once the issue is resolved. +TEST_P(End2EndBinderTransportTest, + UnaryCallThroughFakeBinderChannelClientTimeout) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + + // Set transaction delay to a large number. This happens after the channel + // creation so that we don't need to wait that long for client and server to + // be connected. + end2end_testing::g_transaction_processor->SetDelay(absl::Seconds(5)); + + grpc::ClientContext context; + context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1))); + EchoRequest request; + EchoResponse response; + request.set_text("normal-text"); + grpc::Status status = stub->EchoUnaryCall(&context, request, &response); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Deadline Exceeded"); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, + ServerStreamingCallThroughFakeBinderChannel) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + EchoRequest request; + request.set_text("it works!"); + std::unique_ptr> reader = + stub->EchoServerStreamingCall(&context, request); + EchoResponse response; + size_t cnt = 0; + while (reader->Read(&response)) { + EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", cnt)); + cnt++; + } + EXPECT_EQ(cnt, end2end_testing::EchoServer::kServerStreamingCounts); + grpc::Status status = reader->Finish(); + EXPECT_TRUE(status.ok()); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, + ServerStreamingCallThroughFakeBinderChannelServerTimeout) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + context.set_deadline(absl::ToChronoTime(absl::Now() + absl::Seconds(1))); + EchoRequest request; + request.set_text(std::string(end2end_testing::EchoServer::kTimeoutText)); + std::unique_ptr> reader = + stub->EchoServerStreamingCall(&context, request); + EchoResponse response; + EXPECT_FALSE(reader->Read(&response)); + grpc::Status status = reader->Finish(); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.error_message(), "Deadline Exceeded"); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, + ClientStreamingCallThroughFakeBinderChannel) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + EchoResponse response; + std::unique_ptr> writer = + stub->EchoClientStreamingCall(&context, &response); + constexpr size_t kClientStreamingCounts = 100; + std::string expected = ""; + for (size_t i = 0; i < kClientStreamingCounts; ++i) { + EchoRequest request; + request.set_text(absl::StrFormat("it works!(%d)", i)); + writer->Write(request); + expected += absl::StrFormat("it works!(%d)", i); + } + writer->WritesDone(); + grpc::Status status = writer->Finish(); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.text(), expected); + + server->Shutdown(); +} + +TEST_P(End2EndBinderTransportTest, BiDirStreamingCallThroughFakeBinderChannel) { + grpc::ChannelArguments args; + grpc::ServerBuilder builder; + end2end_testing::EchoServer service; + builder.RegisterService(&service); + std::unique_ptr server = builder.BuildAndStart(); + std::shared_ptr channel = BinderChannel(server.get(), args); + std::unique_ptr stub = EchoService::NewStub(channel); + grpc::ClientContext context; + EchoResponse response; + std::shared_ptr> stream = + stub->EchoBiDirStreamingCall(&context); + constexpr size_t kBiDirStreamingCounts = 100; + + struct WriterArgs { + std::shared_ptr> stream; + size_t bi_dir_streaming_counts; + } writer_args; + + writer_args.stream = stream; + writer_args.bi_dir_streaming_counts = kBiDirStreamingCounts; + + auto writer_fn = [](void* arg) { + const WriterArgs& args = *static_cast(arg); + EchoResponse response; + for (size_t i = 0; i < args.bi_dir_streaming_counts; ++i) { + EchoRequest request; + request.set_text(absl::StrFormat("it works!(%d)", i)); + args.stream->Write(request); + } + args.stream->WritesDone(); + }; + + grpc_core::Thread writer_thread("writer-thread", writer_fn, + static_cast(&writer_args)); + writer_thread.Start(); + for (size_t i = 0; i < kBiDirStreamingCounts; ++i) { + EchoResponse response; + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.text(), absl::StrFormat("it works!(%d)", i)); + } + grpc::Status status = stream->Finish(); + EXPECT_TRUE(status.ok()); + writer_thread.Join(); + + server->Shutdown(); +} + +INSTANTIATE_TEST_SUITE_P( + End2EndBinderTransportTestWithDifferentDelayTimes, + End2EndBinderTransportTest, + testing::Values(absl::ZeroDuration(), absl::Nanoseconds(10), + absl::Microseconds(10), absl::Microseconds(100), + absl::Milliseconds(1), absl::Milliseconds(20))); + +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/transport/binder/end2end/fake_binder.cc b/test/core/transport/binder/end2end/fake_binder.cc new file mode 100644 index 00000000000..43fb8a4ddd5 --- /dev/null +++ b/test/core/transport/binder/end2end/fake_binder.cc @@ -0,0 +1,269 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/binder/end2end/fake_binder.h" + +#include + +#include +#include + +namespace grpc_binder { +namespace end2end_testing { + +TransactionProcessor* g_transaction_processor = nullptr; + +FakeWritableParcel::FakeWritableParcel() : data_(1) {} + +int32_t FakeWritableParcel::GetDataPosition() const { return data_position_; } + +absl::Status FakeWritableParcel::SetDataPosition(int32_t pos) { + if (data_.size() < static_cast(pos) + 1) { + data_.resize(pos + 1); + } + data_position_ = pos; + return absl::OkStatus(); +} + +absl::Status FakeWritableParcel::WriteInt32(int32_t data) { + data_[data_position_] = data; + SetDataPosition(data_position_ + 1).IgnoreError(); + return absl::OkStatus(); +} + +absl::Status FakeWritableParcel::WriteBinder(HasRawBinder* binder) { + data_[data_position_] = binder->GetRawBinder(); + SetDataPosition(data_position_ + 1).IgnoreError(); + return absl::OkStatus(); +} + +absl::Status FakeWritableParcel::WriteString(absl::string_view s) { + data_[data_position_] = std::string(s); + SetDataPosition(data_position_ + 1).IgnoreError(); + return absl::OkStatus(); +} + +absl::Status FakeWritableParcel::WriteByteArray(const int8_t* buffer, + int32_t length) { + data_[data_position_] = std::vector(buffer, buffer + length); + SetDataPosition(data_position_ + 1).IgnoreError(); + return absl::OkStatus(); +} + +absl::Status FakeReadableParcel::ReadInt32(int32_t* data) const { + if (data_position_ >= data_.size() || + !absl::holds_alternative(data_[data_position_])) { + return absl::InternalError("ReadInt32 failed"); + } + *data = absl::get(data_[data_position_++]); + return absl::OkStatus(); +} + +absl::Status FakeReadableParcel::ReadBinder( + std::unique_ptr* data) const { + if (data_position_ >= data_.size() || + !absl::holds_alternative(data_[data_position_])) { + return absl::InternalError("ReadBinder failed"); + } + void* endpoint = absl::get(data_[data_position_++]); + if (!endpoint) return absl::InternalError("ReadBinder failed"); + *data = absl::make_unique(static_cast(endpoint)); + return absl::OkStatus(); +} + +absl::Status FakeReadableParcel::ReadString(char data[111]) const { + if (data_position_ >= data_.size() || + !absl::holds_alternative(data_[data_position_])) { + return absl::InternalError("ReadString failed"); + } + const std::string& s = absl::get(data_[data_position_++]); + if (s.size() >= 100) return absl::InternalError("ReadString failed"); + std::memcpy(data, s.data(), s.size()); + return absl::OkStatus(); +} + +absl::Status FakeReadableParcel::ReadByteArray(std::string* data) const { + if (data_position_ >= data_.size() || + !absl::holds_alternative>(data_[data_position_])) { + return absl::InternalError("ReadByteArray failed"); + } + const std::vector& byte_array = + absl::get>(data_[data_position_++]); + data->resize(byte_array.size()); + for (size_t i = 0; i < byte_array.size(); ++i) { + (*data)[i] = byte_array[i]; + } + return absl::OkStatus(); +} + +absl::Status FakeBinder::Transact(BinderTransportTxCode tx_code) { + endpoint_->tunnel->EnQueueTransaction(endpoint_->other_end, tx_code, + input_->MoveData()); + return absl::OkStatus(); +} + +FakeTransactionReceiver::FakeTransactionReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb transact_cb) { + persistent_tx_receiver_ = &g_transaction_processor->NewPersistentTxReceiver( + std::move(wire_reader_ref), std::move(transact_cb), + absl::make_unique()); +} + +std::unique_ptr FakeBinder::ConstructTxReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb cb) const { + return absl::make_unique(wire_reader_ref, cb); +} + +void* FakeTransactionReceiver::GetRawBinder() { + return persistent_tx_receiver_->tunnel_->GetSendEndpoint(); +} + +std::unique_ptr FakeTransactionReceiver::GetSender() const { + return absl::make_unique( + persistent_tx_receiver_->tunnel_->GetSendEndpoint()); +} + +PersistentFakeTransactionReceiver::PersistentFakeTransactionReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb cb, + std::unique_ptr tunnel) + : wire_reader_ref_(std::move(wire_reader_ref)), + callback_(std::move(cb)), + tunnel_(std::move(tunnel)) { + FakeEndpoint* recv_endpoint = tunnel_->GetRecvEndpoint(); + recv_endpoint->owner = this; +} + +TransactionProcessor::TransactionProcessor(absl::Duration delay) + : delay_nsec_(absl::ToInt64Nanoseconds(delay)), + tx_thread_( + "process-thread", + [](void* arg) { + auto* self = static_cast(arg); + self->ProcessLoop(); + }, + this), + terminated_(false) { + tx_thread_.Start(); +} + +void TransactionProcessor::SetDelay(absl::Duration delay) { + delay_nsec_ = absl::ToInt64Nanoseconds(delay); +} + +void TransactionProcessor::Terminate() { + if (!terminated_.load(std::memory_order_seq_cst)) { + gpr_log(GPR_INFO, "Terminating the processor"); + terminated_.store(true, std::memory_order_seq_cst); + tx_thread_.Join(); + gpr_log(GPR_INFO, "Processor terminated"); + } +} + +void TransactionProcessor::WaitForNextTransaction() { + absl::Time now = absl::Now(); + if (now < deliver_time_) { + absl::Duration diff = deliver_time_ - now; + // Release the lock before going to sleep. + mu_.Unlock(); + absl::SleepFor(diff); + mu_.Lock(); + } +} + +void TransactionProcessor::Flush() { + while (true) { + FakeEndpoint* target = nullptr; + BinderTransportTxCode tx_code{}; + FakeData data; + mu_.Lock(); + if (tx_queue_.empty()) { + mu_.Unlock(); + break; + } + WaitForNextTransaction(); + std::tie(target, tx_code, data) = std::move(tx_queue_.front()); + tx_queue_.pop(); + if (!tx_queue_.empty()) { + deliver_time_ = absl::Now() + GetRandomDelay(); + } + mu_.Unlock(); + auto* tx_receiver = + static_cast(target->owner); + auto parcel = absl::make_unique(std::move(data)); + tx_receiver->Receive(tx_code, parcel.get()).IgnoreError(); + } +} + +void TransactionProcessor::ProcessLoop() { + while (!terminated_.load(std::memory_order_seq_cst)) { + FakeEndpoint* target = nullptr; + BinderTransportTxCode tx_code{}; + FakeData data; + mu_.Lock(); + if (tx_queue_.empty()) { + mu_.Unlock(); + continue; + } + WaitForNextTransaction(); + std::tie(target, tx_code, data) = std::move(tx_queue_.front()); + tx_queue_.pop(); + if (!tx_queue_.empty()) { + deliver_time_ = absl::Now() + GetRandomDelay(); + } + mu_.Unlock(); + auto* tx_receiver = + static_cast(target->owner); + auto parcel = absl::make_unique(std::move(data)); + tx_receiver->Receive(tx_code, parcel.get()).IgnoreError(); + } + Flush(); +} + +absl::Duration TransactionProcessor::GetRandomDelay() { + int64_t delay = + absl::Uniform(bit_gen_, delay_nsec_ / 2, delay_nsec_); + return absl::Nanoseconds(delay); +} + +void TransactionProcessor::EnQueueTransaction(FakeEndpoint* target, + BinderTransportTxCode tx_code, + FakeData data) { + grpc_core::MutexLock lock(&mu_); + if (tx_queue_.empty()) { + // This is the first transaction in the queue. Compute its deliver time. + deliver_time_ = absl::Now() + GetRandomDelay(); + } + tx_queue_.emplace(target, tx_code, std::move(data)); +} + +FakeBinderTunnel::FakeBinderTunnel() + : send_endpoint_(absl::make_unique(this)), + recv_endpoint_(absl::make_unique(this)) { + send_endpoint_->other_end = recv_endpoint_.get(); + recv_endpoint_->other_end = send_endpoint_.get(); +} + +std::pair, std::unique_ptr> +NewBinderPair(TransactionReceiver::OnTransactCb transact_cb) { + auto tx_receiver = absl::make_unique( + nullptr, std::move(transact_cb)); + std::unique_ptr sender = tx_receiver->GetSender(); + return std::make_pair(std::move(sender), std::move(tx_receiver)); +} + +} // namespace end2end_testing +} // namespace grpc_binder diff --git a/test/core/transport/binder/end2end/fake_binder.h b/test/core/transport/binder/end2end/fake_binder.h new file mode 100644 index 00000000000..bfb6835e585 --- /dev/null +++ b/test/core/transport/binder/end2end/fake_binder.h @@ -0,0 +1,294 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// A collection of fake objects that offers in-memory simulation of data +// transmission from one binder to another. +// +// Once the implementation of Binder is changed from BinderAndroid to +// FakeBinder, we'll be able to test and fuzz our end-to-end binder transport in +// a non-Android environment. +// +// The following diagram shows the high-level overview of how the in-memory +// simulation works (FakeReceiver means FakeTransactionReceiver). +// +// thread boundary +// | +// | +// ---------------- ---------------- | receive +// | FakeBinder | | FakeReceiver | <--|---------------- +// ---------------- ---------------- | | +// | ^ | ------------------------ +// | endpoint owner | | | TransactionProcessor | +// | | | ------------------------ +// v | | ^ +// ---------------- ---------------- | | +// | FakeEndpoint | --------> | FakeEndpoint | ---|---------------- +// ---------------- other_end ---------------- | enqueue +// | ^ ^ | | +// | | recv_endpoint | | | +// | | | | +// | | send_endpoint | | +// v | | v +// ------------------------------------------- +// | FakeBinderTunnel | +// ------------------------------------------- + +#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H +#define GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "absl/random/random.h" +#include "absl/strings/str_format.h" +#include "absl/strings/string_view.h" +#include "absl/time/time.h" +#include "absl/types/variant.h" +#include "src/core/ext/transport/binder/wire_format/binder.h" +#include "src/core/ext/transport/binder/wire_format/wire_reader.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" + +namespace grpc_binder { +namespace end2end_testing { + +using FakeData = std::vector< + absl::variant>>; + +// A fake writable parcel. +// +// It simulates the functionalities of a real writable parcel and stores all +// written data in memory. The data can then be transferred by calling +// MoveData(). +class FakeWritableParcel final : public WritableParcel { + public: + FakeWritableParcel(); + int32_t GetDataPosition() const override; + absl::Status SetDataPosition(int32_t pos) override; + absl::Status WriteInt32(int32_t data) override; + absl::Status WriteBinder(HasRawBinder* binder) override; + absl::Status WriteString(absl::string_view s) override; + absl::Status WriteByteArray(const int8_t* buffer, int32_t length) override; + + FakeData MoveData() { return std::move(data_); } + + private: + FakeData data_; + size_t data_position_ = 0; +}; + +// A fake readable parcel. +// +// It takes in the data transferred from a FakeWritableParcel and provides +// methods to retrieve those data in the receiving end. +class FakeReadableParcel final : public ReadableParcel { + public: + explicit FakeReadableParcel(FakeData data) : data_(std::move(data)) {} + absl::Status ReadInt32(int32_t* data) const override; + absl::Status ReadBinder(std::unique_ptr* data) const override; + absl::Status ReadByteArray(std::string* data) const override; + absl::Status ReadString(char data[111]) const override; + + private: + const FakeData data_; + mutable size_t data_position_ = 0; +}; + +class FakeBinder; +class FakeBinderTunnel; + +// FakeEndpoint is a simple struct that holds the pointer to the other end, a +// pointer to the tunnel and a pointer to its owner. This tells the owner where +// the data should be sent. +struct FakeEndpoint { + explicit FakeEndpoint(FakeBinderTunnel* tunnel) : tunnel(tunnel) {} + + FakeEndpoint* other_end; + FakeBinderTunnel* tunnel; + // The owner is either a FakeBinder (the sending part) or a + // FakeTransactionReceiver (the receiving part). Both parts hold an endpoint + // with |owner| pointing back to them and |other_end| pointing to each other. + void* owner; +}; + +class PersistentFakeTransactionReceiver; + +// A fake transaction receiver. +// +// This is the receiving part of a pair of binders. When constructed, a binder +// tunnle is created, and the sending part can be retrieved by calling +// GetSender(). +// +// It also provides a Receive() function to simulate the on-transaction +// callback of a real Android binder. +class FakeTransactionReceiver : public TransactionReceiver { + public: + FakeTransactionReceiver(grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb cb); + + void* GetRawBinder() override; + + std::unique_ptr GetSender() const; + + private: + PersistentFakeTransactionReceiver* persistent_tx_receiver_; +}; + +// A "persistent" version of the FakeTransactionReceiver. That is, its lifetime +// is managed by the processor and it outlives the wire reader and +// grpc_binder_transport, so we can safely dereference a pointer to it in +// ProcessLoop(). +class PersistentFakeTransactionReceiver { + public: + PersistentFakeTransactionReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb cb, + std::unique_ptr tunnel); + + absl::Status Receive(BinderTransportTxCode tx_code, + const ReadableParcel* parcel) { + return callback_(static_cast(tx_code), parcel); + } + + private: + grpc_core::RefCountedPtr wire_reader_ref_; + TransactionReceiver::OnTransactCb callback_; + std::unique_ptr tunnel_; + + friend class FakeTransactionReceiver; +}; + +// The sending part of a binders pair. It provides a FakeWritableParcel to the +// user, and when Transact() is called, it transfers the written data to the +// other end of the tunnel by following the information in its endpoint. +class FakeBinder final : public Binder { + public: + explicit FakeBinder(FakeEndpoint* endpoint) : endpoint_(endpoint) { + endpoint_->owner = this; + } + + void Initialize() override {} + absl::Status PrepareTransaction() override { + input_ = absl::make_unique(); + return absl::OkStatus(); + } + + absl::Status Transact(BinderTransportTxCode tx_code) override; + + WritableParcel* GetWritableParcel() const override { return input_.get(); } + ReadableParcel* GetReadableParcel() const override { return output_.get(); } + + std::unique_ptr ConstructTxReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb transact_cb) const override; + + void* GetRawBinder() override { return endpoint_->other_end; } + + private: + FakeEndpoint* endpoint_; + std::unique_ptr input_; + std::unique_ptr output_; +}; + +// A transaction processor. +// +// Once constructed, it'll create a another thread that deliver in-coming +// transactions to their destinations. +class TransactionProcessor { + public: + explicit TransactionProcessor(absl::Duration delay = absl::ZeroDuration()); + ~TransactionProcessor() { Terminate(); } + + void SetDelay(absl::Duration delay); + + void Terminate(); + void ProcessLoop(); + void Flush(); + + // Issue a transaction with |target| pointing to the target endpoint. The + // transactions will be delivered in the same order they're issued, possibly + // with random delay to simulate real-world situation. + void EnQueueTransaction(FakeEndpoint* target, BinderTransportTxCode tx_code, + FakeData data); + + PersistentFakeTransactionReceiver& NewPersistentTxReceiver( + grpc_core::RefCountedPtr wire_reader_ref, + TransactionReceiver::OnTransactCb cb, + std::unique_ptr tunnel) { + grpc_core::MutexLock lock(&tx_receiver_mu_); + storage_.emplace_front(wire_reader_ref, cb, std::move(tunnel)); + return storage_.front(); + } + + private: + absl::Duration GetRandomDelay(); + void WaitForNextTransaction() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + + grpc_core::Mutex mu_; + std::queue> + tx_queue_ ABSL_GUARDED_BY(mu_); + absl::Time deliver_time_ ABSL_GUARDED_BY(mu_); + int64_t delay_nsec_; + absl::BitGen bit_gen_; + grpc_core::Thread tx_thread_; + std::atomic terminated_; + + grpc_core::Mutex tx_receiver_mu_; + // Use forward_list to avoid invalid pointers resulted by reallocation in + // containers such as std::vector. + std::forward_list storage_ + ABSL_GUARDED_BY(tx_receiver_mu_); +}; + +// The global (shared) processor. Test suite should be responsible of +// creating/deleting it. +extern TransactionProcessor* g_transaction_processor; + +// A binder tunnel. +// +// It is a simple helper that creates and links two endpoints. +class FakeBinderTunnel { + public: + FakeBinderTunnel(); + + void EnQueueTransaction(FakeEndpoint* target, BinderTransportTxCode tx_code, + FakeData data) { + g_transaction_processor->EnQueueTransaction(target, tx_code, + std::move(data)); + } + + FakeEndpoint* GetSendEndpoint() const { return send_endpoint_.get(); } + FakeEndpoint* GetRecvEndpoint() const { return recv_endpoint_.get(); } + + private: + std::unique_ptr send_endpoint_; + std::unique_ptr recv_endpoint_; +}; + +// A helper function for constructing a pair of connected binders. +std::pair, std::unique_ptr> +NewBinderPair(TransactionReceiver::OnTransactCb transact_cb); + +} // namespace end2end_testing +} // namespace grpc_binder + +#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_FAKE_BINDER_H diff --git a/test/core/transport/binder/end2end/fake_binder_test.cc b/test/core/transport/binder/end2end/fake_binder_test.cc new file mode 100644 index 00000000000..ef52ef2a31e --- /dev/null +++ b/test/core/transport/binder/end2end/fake_binder_test.cc @@ -0,0 +1,373 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/binder/end2end/fake_binder.h" + +#include +#include +#include +#include +#include +#include + +#include "absl/strings/str_format.h" +#include "absl/time/time.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { +namespace end2end_testing { + +TEST(FakeBinderTestWithoutTransaction, WritableParcelDataPosition) { + std::unique_ptr parcel = + absl::make_unique(); + EXPECT_EQ(parcel->GetDataPosition(), 0); + EXPECT_TRUE(parcel->WriteInt32(0).ok()); + EXPECT_EQ(parcel->GetDataPosition(), 1); + EXPECT_TRUE(parcel->WriteInt32(1).ok()); + EXPECT_TRUE(parcel->WriteInt32(2).ok()); + EXPECT_EQ(parcel->GetDataPosition(), 3); + EXPECT_TRUE(parcel->WriteString("").ok()); + EXPECT_EQ(parcel->GetDataPosition(), 4); + EXPECT_TRUE(parcel->SetDataPosition(0).ok()); + const char kBuffer[] = "test"; + EXPECT_TRUE(parcel + ->WriteByteArray(reinterpret_cast(kBuffer), + strlen(kBuffer)) + .ok()); + EXPECT_EQ(parcel->GetDataPosition(), 1); + EXPECT_TRUE(parcel->SetDataPosition(100).ok()); + EXPECT_EQ(parcel->GetDataPosition(), 100); +} + +namespace { + +class FakeBinderTest : public ::testing::TestWithParam { + public: + FakeBinderTest() { + g_transaction_processor = new TransactionProcessor(GetParam()); + } + ~FakeBinderTest() override { delete g_transaction_processor; } +}; + +} // namespace + +TEST_P(FakeBinderTest, SendInt32) { + constexpr int kValue = 0x1234; + constexpr int kTxCode = 0x4321; + int called = 0; + std::unique_ptr sender; + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + EXPECT_EQ(tx_code, kTxCode); + int value = 0; + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ(value, kValue); + called++; + return absl::OkStatus(); + }); + + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteInt32(kValue).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 1); +} + +TEST_P(FakeBinderTest, SendString) { + constexpr char kValue[] = "example-string"; + constexpr int kTxCode = 0x4321; + int called = 0; + std::unique_ptr sender; + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + EXPECT_EQ(tx_code, kTxCode); + char value[111]; + memset(value, 0, sizeof(value)); + EXPECT_TRUE(parcel->ReadString(value).ok()); + EXPECT_STREQ(value, kValue); + called++; + return absl::OkStatus(); + }); + + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteString(kValue).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 1); +} + +TEST_P(FakeBinderTest, SendByteArray) { + constexpr char kValue[] = "example-byte-array"; + constexpr int kTxCode = 0x4321; + int called = 0; + std::unique_ptr sender; + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + EXPECT_EQ(tx_code, kTxCode); + std::string value; + EXPECT_TRUE(parcel->ReadByteArray(&value).ok()); + EXPECT_EQ(value, kValue); + called++; + return absl::OkStatus(); + }); + + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel + ->WriteByteArray(reinterpret_cast(kValue), + strlen(kValue)) + .ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 1); +} + +TEST_P(FakeBinderTest, SendMultipleItems) { + constexpr char kByteArray[] = "example-byte-array"; + constexpr char kString[] = "example-string"; + constexpr int kValue = 0x1234; + constexpr int kTxCode = 0x4321; + int called = 0; + std::unique_ptr sender; + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + int value_result; + EXPECT_EQ(tx_code, kTxCode); + EXPECT_TRUE(parcel->ReadInt32(&value_result).ok()); + EXPECT_EQ(value_result, kValue); + std::string byte_array_result; + EXPECT_TRUE(parcel->ReadByteArray(&byte_array_result).ok()); + EXPECT_EQ(byte_array_result, kByteArray); + char string_result[111]; + memset(string_result, 0, sizeof(string_result)); + EXPECT_TRUE(parcel->ReadString(string_result).ok()); + EXPECT_STREQ(string_result, kString); + called++; + return absl::OkStatus(); + }); + + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteInt32(kValue).ok()); + EXPECT_TRUE(parcel + ->WriteByteArray(reinterpret_cast(kByteArray), + strlen(kByteArray)) + .ok()); + EXPECT_TRUE(parcel->WriteString(kString).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 1); +} + +TEST_P(FakeBinderTest, SendBinder) { + constexpr int kValue = 0x1234; + constexpr int kTxCode = 0x4321; + int called = 0; + std::unique_ptr sender; + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + EXPECT_EQ(tx_code, kTxCode); + std::unique_ptr binder; + EXPECT_TRUE(parcel->ReadBinder(&binder).ok()); + EXPECT_TRUE(binder->PrepareTransaction().ok()); + WritableParcel* writable_parcel = binder->GetWritableParcel(); + EXPECT_TRUE(writable_parcel->WriteInt32(kValue).ok()); + EXPECT_TRUE(binder->Transact(BinderTransportTxCode(kTxCode + 1)).ok()); + called++; + return absl::OkStatus(); + }); + + int called2 = 0; + std::unique_ptr tx_receiver2 = + absl::make_unique( + nullptr, + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + int value; + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ(value, kValue); + EXPECT_EQ(tx_code, kTxCode + 1); + called2++; + return absl::OkStatus(); + }); + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteBinder(tx_receiver2.get()).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 1); + EXPECT_EQ(called2, 1); +} + +TEST_P(FakeBinderTest, SendTransactionAfterDestruction) { + constexpr int kValue = 0x1234; + constexpr int kTxCode = 0x4321; + std::unique_ptr sender; + int called = 0; + { + std::unique_ptr tx_receiver; + std::tie(sender, tx_receiver) = NewBinderPair( + [&](transaction_code_t tx_code, const ReadableParcel* parcel) { + EXPECT_EQ(tx_code, kTxCode); + int value; + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ(value, kValue + called); + called++; + return absl::OkStatus(); + }); + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteInt32(kValue).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + } + // tx_receiver gets destructed here. This additional transaction should + // *still* be received. + EXPECT_TRUE(sender->PrepareTransaction().ok()); + WritableParcel* parcel = sender->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteInt32(kValue + 1).ok()); + EXPECT_TRUE(sender->Transact(BinderTransportTxCode(kTxCode)).ok()); + + g_transaction_processor->Terminate(); + EXPECT_EQ(called, 2); +} + +namespace { + +struct ThreadArgument { + int tid; + std::vector, + std::unique_ptr>>>* + global_binder_pairs; + std::vector>* global_cnts; + int tx_code; + int num_pairs_per_thread; + int num_transactions_per_pair; + grpc_core::Mutex* mu; +}; + +} // namespace + +// Verify that this system works correctly in a concurrent environment. +// +// In end-to-end tests, there will be at least two threads, one from client to +// server and vice versa. Thus, it's important for us to make sure that the +// simulation is correct in such setup. +TEST_P(FakeBinderTest, StressTest) { + constexpr int kTxCode = 0x4321; + constexpr int kNumThreads = 16; + constexpr int kNumPairsPerThread = 128; + constexpr int kNumTransactionsPerPair = 128; + std::vector args(kNumThreads); + + grpc_core::Mutex mu; + std::vector, std::unique_ptr>>> + global_binder_pairs(kNumThreads); + std::vector> global_cnts( + kNumThreads, std::vector(kNumPairsPerThread, 0)); + + auto th_function = [](void* arg) { + ThreadArgument* th_arg = static_cast(arg); + int tid = th_arg->tid; + std::vector, + std::unique_ptr>> + binder_pairs; + for (int p = 0; p < th_arg->num_pairs_per_thread; ++p) { + std::unique_ptr binder; + std::unique_ptr tx_receiver; + int expected_tx_code = th_arg->tx_code; + std::vector>* cnt = th_arg->global_cnts; + std::tie(binder, tx_receiver) = + NewBinderPair([tid, p, cnt, expected_tx_code]( + transaction_code_t tx_code, + const ReadableParcel* parcel) mutable { + EXPECT_EQ(tx_code, expected_tx_code); + int value; + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ(tid, value); + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ(p, value); + EXPECT_TRUE(parcel->ReadInt32(&value).ok()); + EXPECT_EQ((*cnt)[tid][p], value); + (*cnt)[tid][p]++; + return absl::OkStatus(); + }); + binder_pairs.emplace_back(std::move(binder), std::move(tx_receiver)); + } + std::vector order; + for (int i = 0; i < th_arg->num_pairs_per_thread; ++i) { + for (int j = 0; j < th_arg->num_transactions_per_pair; ++j) { + order.emplace_back(i); + } + } + std::mt19937 rng(tid); + std::shuffle(order.begin(), order.end(), rng); + std::vector tx_cnt(th_arg->num_pairs_per_thread); + for (int p : order) { + EXPECT_TRUE(binder_pairs[p].first->PrepareTransaction().ok()); + WritableParcel* parcel = binder_pairs[p].first->GetWritableParcel(); + EXPECT_TRUE(parcel->WriteInt32(th_arg->tid).ok()); + EXPECT_TRUE(parcel->WriteInt32(p).ok()); + EXPECT_TRUE(parcel->WriteInt32(tx_cnt[p]++).ok()); + EXPECT_TRUE(binder_pairs[p] + .first->Transact(BinderTransportTxCode(th_arg->tx_code)) + .ok()); + } + th_arg->mu->Lock(); + (*th_arg->global_binder_pairs)[tid] = std::move(binder_pairs); + th_arg->mu->Unlock(); + }; + + std::vector thrs(kNumThreads); + std::vector thr_names(kNumThreads); + for (int i = 0; i < kNumThreads; ++i) { + args[i].tid = i; + args[i].global_binder_pairs = &global_binder_pairs; + args[i].global_cnts = &global_cnts; + args[i].tx_code = kTxCode; + args[i].num_pairs_per_thread = kNumPairsPerThread; + args[i].num_transactions_per_pair = kNumTransactionsPerPair; + args[i].mu = μ + thr_names[i] = absl::StrFormat("thread-%d", i); + thrs[i] = grpc_core::Thread(thr_names[i].c_str(), th_function, &args[i]); + } + for (auto& th : thrs) th.Start(); + for (auto& th : thrs) th.Join(); + g_transaction_processor->Terminate(); +} + +INSTANTIATE_TEST_SUITE_P(FakeBinderTestWithDifferentDelayTimes, FakeBinderTest, + testing::Values(absl::ZeroDuration(), + absl::Nanoseconds(10), + absl::Microseconds(10))); + +} // namespace end2end_testing +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/transport/binder/end2end/testing_channel_create.cc b/test/core/transport/binder/end2end/testing_channel_create.cc new file mode 100644 index 00000000000..04f09f57341 --- /dev/null +++ b/test/core/transport/binder/end2end/testing_channel_create.cc @@ -0,0 +1,126 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/binder/end2end/testing_channel_create.h" + +#include + +#include "src/core/ext/transport/binder/transport/binder_transport.h" +#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/error_utils.h" + +namespace grpc_binder { +namespace end2end_testing { + +namespace { +// Since we assume the first half of the transport setup is completed before the +// server side enters WireReader::SetupTransport, we need this helper to wait +// and finish that part of the negotiation for us. +class ServerSetupTransportHelper { + public: + ServerSetupTransportHelper() + : wire_reader_(absl::make_unique( + /*transport_stream_receiver=*/nullptr, /*is_client=*/false)) { + std::tie(endpoint_binder_, tx_receiver_) = NewBinderPair( + [this](transaction_code_t tx_code, const ReadableParcel* parcel) { + return this->wire_reader_->ProcessTransaction(tx_code, parcel); + }); + } + std::unique_ptr WaitForClientBinder() { + return wire_reader_->RecvSetupTransport(); + } + + std::unique_ptr GetEndpointBinderForClient() { + return std::move(endpoint_binder_); + } + + private: + std::unique_ptr wire_reader_; + // The endpoint binder for client. + std::unique_ptr endpoint_binder_; + std::unique_ptr tx_receiver_; +}; +} // namespace + +std::pair +CreateClientServerBindersPairForTesting() { + ServerSetupTransportHelper helper; + std::unique_ptr endpoint_binder = helper.GetEndpointBinderForClient(); + grpc_transport* client_transport = nullptr; + + struct ThreadArgs { + std::unique_ptr endpoint_binder; + grpc_transport** client_transport; + } args; + + args.endpoint_binder = std::move(endpoint_binder); + args.client_transport = &client_transport; + + grpc_core::Thread client_thread( + "client-thread", + [](void* arg) { + ThreadArgs* args = static_cast(arg); + std::unique_ptr endpoint_binder = + std::move(args->endpoint_binder); + *args->client_transport = + grpc_create_binder_transport_client(std::move(endpoint_binder)); + }, + &args); + client_thread.Start(); + grpc_transport* server_transport = + grpc_create_binder_transport_server(helper.WaitForClientBinder()); + client_thread.Join(); + return std::make_pair(client_transport, server_transport); +} + +std::shared_ptr BinderChannelForTesting( + grpc::Server* server, const grpc::ChannelArguments& args) { + grpc_channel_args channel_args = args.c_channel_args(); + return grpc::CreateChannelInternal( + "", + grpc_binder_channel_create_for_testing(server->c_server(), &channel_args, + nullptr), + std::vector>()); +} + +} // namespace end2end_testing +} // namespace grpc_binder + +grpc_channel* grpc_binder_channel_create_for_testing(grpc_server* server, + grpc_channel_args* args, + void* /*reserved*/) { + grpc_core::ExecCtx exec_ctx; + + grpc_arg default_authority_arg = grpc_channel_arg_string_create( + const_cast(GRPC_ARG_DEFAULT_AUTHORITY), + const_cast("test.authority")); + grpc_channel_args* client_args = + grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); + + grpc_transport *client_transport, *server_transport; + std::tie(client_transport, server_transport) = + grpc_binder::end2end_testing::CreateClientServerBindersPairForTesting(); + grpc_error_handle error = server->core_server->SetupTransport( + server_transport, nullptr, args, nullptr); + GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_channel* channel = + grpc_channel_create("binder", client_args, GRPC_CLIENT_DIRECT_CHANNEL, + client_transport, nullptr, 0, &error); + GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_channel_args_destroy(client_args); + return channel; +} diff --git a/test/core/transport/binder/end2end/testing_channel_create.h b/test/core/transport/binder/end2end/testing_channel_create.h new file mode 100644 index 00000000000..321e633f3f2 --- /dev/null +++ b/test/core/transport/binder/end2end/testing_channel_create.h @@ -0,0 +1,41 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H +#define GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H + +#include +#include + +#include "src/core/ext/transport/binder/transport/binder_transport.h" +#include "src/core/lib/surface/server.h" +#include "test/core/transport/binder/end2end/fake_binder.h" + +namespace grpc_binder { +namespace end2end_testing { + +std::pair +CreateClientServerBindersPairForTesting(); + +std::shared_ptr BinderChannelForTesting( + grpc::Server* server, const grpc::ChannelArguments& args); + +} // namespace end2end_testing +} // namespace grpc_binder + +grpc_channel* grpc_binder_channel_create_for_testing(grpc_server* server, + grpc_channel_args* args, + void* /*reserved*/); + +#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_END2END_TESTING_CHANNEL_CREATE_H diff --git a/test/core/transport/binder/mock_objects.cc b/test/core/transport/binder/mock_objects.cc new file mode 100644 index 00000000000..98261b0db4a --- /dev/null +++ b/test/core/transport/binder/mock_objects.cc @@ -0,0 +1,57 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test/core/transport/binder/mock_objects.h" + +#include + +#include "absl/memory/memory.h" + +namespace grpc_binder { + +using ::testing::Return; + +MockReadableParcel::MockReadableParcel() { + ON_CALL(*this, ReadBinder).WillByDefault([](std::unique_ptr* binder) { + *binder = absl::make_unique(); + return absl::OkStatus(); + }); + ON_CALL(*this, ReadInt32).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, ReadByteArray).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, ReadString).WillByDefault(Return(absl::OkStatus())); +} + +MockWritableParcel::MockWritableParcel() { + ON_CALL(*this, SetDataPosition).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, WriteInt32).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, WriteBinder).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, WriteString).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, WriteByteArray).WillByDefault(Return(absl::OkStatus())); +} + +MockBinder::MockBinder() { + ON_CALL(*this, PrepareTransaction).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, Transact).WillByDefault(Return(absl::OkStatus())); + ON_CALL(*this, GetWritableParcel).WillByDefault(Return(&mock_input_)); + ON_CALL(*this, GetReadableParcel).WillByDefault(Return(&mock_output_)); + ON_CALL(*this, ConstructTxReceiver) + .WillByDefault( + [this](grpc_core::RefCountedPtr /*wire_reader_ref*/, + TransactionReceiver::OnTransactCb cb) { + return absl::make_unique( + cb, BinderTransportTxCode::SETUP_TRANSPORT, &mock_output_); + }); +} + +} // namespace grpc_binder diff --git a/test/core/transport/binder/mock_objects.h b/test/core/transport/binder/mock_objects.h new file mode 100644 index 00000000000..32e2171147d --- /dev/null +++ b/test/core/transport/binder/mock_objects.h @@ -0,0 +1,113 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H +#define GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H + +#include + +#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" +#include "src/core/ext/transport/binder/wire_format/binder.h" +#include "src/core/ext/transport/binder/wire_format/wire_reader.h" +#include "src/core/ext/transport/binder/wire_format/wire_writer.h" + +namespace grpc_binder { + +class MockWritableParcel : public WritableParcel { + public: + MOCK_METHOD(int32_t, GetDataPosition, (), (const, override)); + MOCK_METHOD(absl::Status, SetDataPosition, (int32_t), (override)); + MOCK_METHOD(absl::Status, WriteInt32, (int32_t), (override)); + MOCK_METHOD(absl::Status, WriteBinder, (HasRawBinder*), (override)); + MOCK_METHOD(absl::Status, WriteString, (absl::string_view), (override)); + MOCK_METHOD(absl::Status, WriteByteArray, (const int8_t*, int32_t), + (override)); + + MockWritableParcel(); +}; + +class MockReadableParcel : public ReadableParcel { + public: + MOCK_METHOD(absl::Status, ReadInt32, (int32_t*), (const, override)); + MOCK_METHOD(absl::Status, ReadBinder, (std::unique_ptr*), + (const, override)); + MOCK_METHOD(absl::Status, ReadByteArray, (std::string*), (const, override)); + MOCK_METHOD(absl::Status, ReadString, (char[111]), (const, override)); + + MockReadableParcel(); +}; + +class MockBinder : public Binder { + public: + MOCK_METHOD(void, Initialize, (), (override)); + MOCK_METHOD(absl::Status, PrepareTransaction, (), (override)); + MOCK_METHOD(absl::Status, Transact, (BinderTransportTxCode), (override)); + MOCK_METHOD(WritableParcel*, GetWritableParcel, (), (const, override)); + MOCK_METHOD(ReadableParcel*, GetReadableParcel, (), (const, override)); + MOCK_METHOD(std::unique_ptr, ConstructTxReceiver, + (grpc_core::RefCountedPtr, + TransactionReceiver::OnTransactCb), + (const, override)); + MOCK_METHOD(void*, GetRawBinder, (), (override)); + + MockBinder(); + MockWritableParcel& GetWriter() { return mock_input_; } + MockReadableParcel& GetReader() { return mock_output_; } + + private: + MockWritableParcel mock_input_; + MockReadableParcel mock_output_; +}; + +// TODO(waynetu): Implement transaction injection later for more thorough +// testing. +class MockTransactionReceiver : public TransactionReceiver { + public: + explicit MockTransactionReceiver(OnTransactCb transact_cb, + BinderTransportTxCode code, + const ReadableParcel* output) { + transact_cb(static_cast(code), output).IgnoreError(); + } + + MOCK_METHOD(void*, GetRawBinder, (), (override)); +}; + +class MockWireWriter : public WireWriter { + public: + MOCK_METHOD(absl::Status, RpcCall, (const Transaction&), (override)); +}; + +class MockTransportStreamReceiver : public TransportStreamReceiver { + public: + MOCK_METHOD(void, RegisterRecvInitialMetadata, + (StreamIdentifier, InitialMetadataCallbackType), (override)); + MOCK_METHOD(void, RegisterRecvMessage, + (StreamIdentifier, MessageDataCallbackType), (override)); + MOCK_METHOD(void, RegisterRecvTrailingMetadata, + (StreamIdentifier, TrailingMetadataCallbackType), (override)); + MOCK_METHOD(void, NotifyRecvInitialMetadata, + (StreamIdentifier, absl::StatusOr), (override)); + MOCK_METHOD(void, NotifyRecvMessage, + (StreamIdentifier, absl::StatusOr), (override)); + MOCK_METHOD(void, NotifyRecvTrailingMetadata, + (StreamIdentifier, absl::StatusOr, int), (override)); + MOCK_METHOD(void, CancelRecvMessageCallbacksDueToTrailingMetadata, + (StreamIdentifier), (override)); + MOCK_METHOD(void, Clear, (StreamIdentifier), (override)); + MOCK_METHOD(void, CancelStream, (StreamIdentifier, absl::Status), (override)); +}; + +} // namespace grpc_binder + +#endif // GRPC_TEST_CORE_TRANSPORT_BINDER_MOCK_OBJECTS_H diff --git a/test/core/transport/binder/transport_stream_receiver_test.cc b/test/core/transport/binder/transport_stream_receiver_test.cc new file mode 100644 index 00000000000..1659783a840 --- /dev/null +++ b/test/core/transport/binder/transport_stream_receiver_test.cc @@ -0,0 +1,287 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include +#include +#include +#include + +#include "absl/memory/memory.h" +#include "src/core/ext/transport/binder/utils/transport_stream_receiver_impl.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { +namespace { + +// TODO(waynetu): These are hacks to make callbacks aware of their stream IDs +// and sequence numbers. Remove/Refactor these hacks when possible. +template +std::pair Decode(const T& /*data*/) { + assert(false && "This should not be called"); + return {}; +} + +template <> +std::pair Decode(const std::string& data) { + assert(data.size() == sizeof(StreamIdentifier) + sizeof(int)); + StreamIdentifier id{}; + int seq_num{}; + std::memcpy(&id, data.data(), sizeof(StreamIdentifier)); + std::memcpy(&seq_num, data.data() + sizeof(StreamIdentifier), sizeof(int)); + return std::make_pair(id, seq_num); +} + +template <> +std::pair Decode(const Metadata& data) { + assert(data.size() == 1); + const std::string& encoding = data[0].first; + return Decode(encoding); +} + +template +T Encode(StreamIdentifier /*id*/, int /*seq_num*/) { + assert(false && "This should not be called"); + return {}; +} + +template <> +std::string Encode(StreamIdentifier id, int seq_num) { + char result[sizeof(StreamIdentifier) + sizeof(int)]; + std::memcpy(result, &id, sizeof(StreamIdentifier)); + std::memcpy(result + sizeof(StreamIdentifier), &seq_num, sizeof(int)); + return std::string(result, sizeof(StreamIdentifier) + sizeof(int)); +} + +template <> +Metadata Encode(StreamIdentifier id, int seq_num) { + return {{Encode(id, seq_num), ""}}; +} + +MATCHER_P2(StreamIdAndSeqNumMatch, id, seq_num, "") { + auto p = Decode(arg.value()); + return p.first == id && p.second == seq_num; +} + +// MockCallback is used to verify the every callback passed to transaction +// receiver will eventually be invoked with the artifact of its corresponding +// binder transaction. +template +class MockCallback { + public: + explicit MockCallback(StreamIdentifier id, int seq_num) + : id_(id), seq_num_(seq_num) {} + + MOCK_METHOD(void, ActualCallback, (FirstArg), ()); + + std::function GetHandle() { + return [this](FirstArg first_arg, TrailingArgs...) { + this->ActualCallback(first_arg); + }; + } + + void ExpectCallbackInvocation() { + EXPECT_CALL(*this, ActualCallback(StreamIdAndSeqNumMatch(id_, seq_num_))); + } + + private: + StreamIdentifier id_; + int seq_num_; +}; + +using MockInitialMetadataCallback = MockCallback>; +using MockMessageCallback = MockCallback>; +using MockTrailingMetadataCallback = + MockCallback, int>; + +class MockOpBatch { + public: + MockOpBatch(StreamIdentifier id, int flag, int seq_num) + : id_(id), flag_(flag), seq_num_(seq_num) { + if (flag_ & kFlagPrefix) { + initial_metadata_callback_ = + absl::make_unique(id_, seq_num_); + } + if (flag_ & kFlagMessageData) { + message_callback_ = absl::make_unique(id_, seq_num_); + } + if (flag_ & kFlagSuffix) { + trailing_metadata_callback_ = + absl::make_unique(id_, seq_num_); + } + } + + void Complete(TransportStreamReceiver& receiver) { + if (flag_ & kFlagPrefix) { + initial_metadata_callback_->ExpectCallbackInvocation(); + receiver.NotifyRecvInitialMetadata(id_, Encode(id_, seq_num_)); + } + if (flag_ & kFlagMessageData) { + message_callback_->ExpectCallbackInvocation(); + receiver.NotifyRecvMessage(id_, Encode(id_, seq_num_)); + } + if (flag_ & kFlagSuffix) { + trailing_metadata_callback_->ExpectCallbackInvocation(); + receiver.NotifyRecvTrailingMetadata(id_, Encode(id_, seq_num_), + 0); + } + } + + void RequestRecv(TransportStreamReceiver& receiver) { + if (flag_ & kFlagPrefix) { + receiver.RegisterRecvInitialMetadata( + id_, initial_metadata_callback_->GetHandle()); + } + if (flag_ & kFlagMessageData) { + receiver.RegisterRecvMessage(id_, message_callback_->GetHandle()); + } + if (flag_ & kFlagSuffix) { + receiver.RegisterRecvTrailingMetadata( + id_, trailing_metadata_callback_->GetHandle()); + } + } + + MockOpBatch NextBatch(int flag) const { + return MockOpBatch(id_, flag, seq_num_ + 1); + } + + private: + std::unique_ptr initial_metadata_callback_; + std::unique_ptr message_callback_; + std::unique_ptr trailing_metadata_callback_; + int id_, flag_, seq_num_; +}; + +class TransportStreamReceiverTest : public ::testing::Test { + protected: + MockOpBatch NewGrpcStream(int flag) { + return MockOpBatch(current_id_++, flag, 0); + } + + StreamIdentifier current_id_ = 0; +}; + +const int kFlagAll = kFlagPrefix | kFlagMessageData | kFlagSuffix; + +} // namespace + +TEST_F(TransportStreamReceiverTest, MultipleStreamRequestThenComplete) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagAll); + t0.RequestRecv(receiver); + t0.Complete(receiver); +} + +TEST_F(TransportStreamReceiverTest, MultipleStreamCompleteThenRequest) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagAll); + t0.Complete(receiver); + t0.RequestRecv(receiver); +} + +TEST_F(TransportStreamReceiverTest, MultipleStreamInterleaved) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagAll); + MockOpBatch t1 = NewGrpcStream(kFlagAll); + t1.Complete(receiver); + t0.Complete(receiver); + t0.RequestRecv(receiver); + t1.RequestRecv(receiver); +} + +TEST_F(TransportStreamReceiverTest, MultipleStreamInterleavedReversed) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagAll); + MockOpBatch t1 = NewGrpcStream(kFlagAll); + t0.RequestRecv(receiver); + t1.RequestRecv(receiver); + t1.Complete(receiver); + t0.Complete(receiver); +} + +TEST_F(TransportStreamReceiverTest, MultipleStreamMoreInterleaved) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagAll); + MockOpBatch t1 = NewGrpcStream(kFlagAll); + t0.RequestRecv(receiver); + t1.Complete(receiver); + MockOpBatch t2 = NewGrpcStream(kFlagAll); + t2.RequestRecv(receiver); + t0.Complete(receiver); + t1.RequestRecv(receiver); + t2.Complete(receiver); +} + +TEST_F(TransportStreamReceiverTest, SingleStreamUnaryCall) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagPrefix); + MockOpBatch t1 = t0.NextBatch(kFlagMessageData); + MockOpBatch t2 = t1.NextBatch(kFlagSuffix); + t0.RequestRecv(receiver); + t1.RequestRecv(receiver); + t2.RequestRecv(receiver); + t0.Complete(receiver); + t1.Complete(receiver); + t2.Complete(receiver); +} + +TEST_F(TransportStreamReceiverTest, SingleStreamStreamingCall) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagPrefix); + t0.RequestRecv(receiver); + t0.Complete(receiver); + MockOpBatch t1 = t0.NextBatch(kFlagMessageData); + t1.Complete(receiver); + t1.RequestRecv(receiver); + MockOpBatch t2 = t1.NextBatch(kFlagMessageData); + t2.RequestRecv(receiver); + t2.Complete(receiver); + MockOpBatch t3 = t2.NextBatch(kFlagMessageData); + MockOpBatch t4 = t3.NextBatch(kFlagMessageData); + t3.Complete(receiver); + t4.Complete(receiver); + t3.RequestRecv(receiver); + t4.RequestRecv(receiver); +} + +TEST_F(TransportStreamReceiverTest, DISABLED_SingleStreamBufferedCallbacks) { + TransportStreamReceiverImpl receiver(/*is_client=*/true); + MockOpBatch t0 = NewGrpcStream(kFlagPrefix); + MockOpBatch t1 = t0.NextBatch(kFlagMessageData); + MockOpBatch t2 = t1.NextBatch(kFlagMessageData); + MockOpBatch t3 = t2.NextBatch(kFlagSuffix); + t0.RequestRecv(receiver); + // TODO(waynetu): Can gRPC issues recv_message before it actually receives the + // previous one? + t1.RequestRecv(receiver); + t2.RequestRecv(receiver); + t3.RequestRecv(receiver); + t0.Complete(receiver); + t1.Complete(receiver); + t2.Complete(receiver); + t3.Complete(receiver); +} + +// TODO(waynetu): Should we have some concurrent stress tests to make sure that +// thread safety is well taken care of? + +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/transport/binder/wire_reader_test.cc b/test/core/transport/binder/wire_reader_test.cc new file mode 100644 index 00000000000..6d3dc71959d --- /dev/null +++ b/test/core/transport/binder/wire_reader_test.cc @@ -0,0 +1,278 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Unit tests for WireReaderImpl. +// +// WireReaderImpl is responsible for turning incoming transactions into +// top-level metadata. The following tests verify that the interactions between +// WireReaderImpl and both the output (readable) parcel and the transport stream +// receiver are correct in all possible situations. +#include + +#include +#include +#include + +#include "absl/memory/memory.h" +#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h" +#include "test/core/transport/binder/mock_objects.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { + +using ::testing::DoAll; +using ::testing::Return; +using ::testing::SetArgPointee; +using ::testing::StrictMock; + +namespace { + +class WireReaderTest : public ::testing::Test { + public: + WireReaderTest() + : transport_stream_receiver_( + std::make_shared>()), + wire_reader_(transport_stream_receiver_, /*is_client=*/true) {} + + protected: + void ExpectReadInt32(int result) { + EXPECT_CALL(mock_readable_parcel_, ReadInt32) + .WillOnce(DoAll(SetArgPointee<0>(result), Return(absl::OkStatus()))); + } + + void ExpectReadByteArray(const std::string& buffer) { + ExpectReadInt32(buffer.length()); + if (!buffer.empty()) { + EXPECT_CALL(mock_readable_parcel_, ReadByteArray) + .WillOnce([buffer](std::string* data) { + *data = buffer; + return absl::OkStatus(); + }); + } + } + + template + absl::Status CallProcessTransaction(T tx_code) { + return wire_reader_.ProcessTransaction( + static_cast(tx_code), &mock_readable_parcel_); + } + + std::shared_ptr> + transport_stream_receiver_; + WireReaderImpl wire_reader_; + StrictMock mock_readable_parcel_; +}; + +MATCHER_P(StatusOrStrEq, target, "") { + if (!arg.ok()) return false; + return arg.value() == target; +} + +MATCHER_P(StatusOrContainerEq, target, "") { + if (!arg.ok()) return false; + return arg.value() == target; +} + +} // namespace + +TEST_F(WireReaderTest, SetupTransport) { + auto mock_binder = absl::make_unique(); + MockBinder& mock_binder_ref = *mock_binder; + + ::testing::InSequence sequence; + EXPECT_CALL(mock_binder_ref, Initialize); + EXPECT_CALL(mock_binder_ref, PrepareTransaction); + const MockReadableParcel mock_readable_parcel; + EXPECT_CALL(mock_binder_ref, GetWritableParcel); + + // Write version. + EXPECT_CALL(mock_binder_ref.GetWriter(), WriteInt32(77)); + + // The transaction receiver immediately informs the wire writer that the + // transport has been successfully set up. + EXPECT_CALL(mock_binder_ref, ConstructTxReceiver); + + EXPECT_CALL(mock_binder_ref.GetReader(), ReadInt32); + EXPECT_CALL(mock_binder_ref.GetReader(), ReadBinder); + + // Write transaction receiver. + EXPECT_CALL(mock_binder_ref.GetWriter(), WriteBinder); + // Perform transaction. + EXPECT_CALL(mock_binder_ref, Transact); + + wire_reader_.SetupTransport(std::move(mock_binder)); +} + +TEST_F(WireReaderTest, ProcessTransactionControlMessageSetupTransport) { + ::testing::InSequence sequence; + + EXPECT_CALL(mock_readable_parcel_, ReadInt32); + EXPECT_CALL(mock_readable_parcel_, ReadBinder) + .WillOnce([&](std::unique_ptr* binder) { + auto mock_binder = absl::make_unique(); + // binder that is read from the output parcel must first be initialized + // before it can be used. + EXPECT_CALL(*mock_binder, Initialize); + *binder = std::move(mock_binder); + return absl::OkStatus(); + }); + + EXPECT_TRUE( + CallProcessTransaction(BinderTransportTxCode::SETUP_TRANSPORT).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionControlMessagePingResponse) { + EXPECT_CALL(mock_readable_parcel_, ReadInt32); + EXPECT_TRUE( + CallProcessTransaction(BinderTransportTxCode::PING_RESPONSE).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataEmptyFlagIgnored) { + ::testing::InSequence sequence; + + // first transaction: empty flag + ExpectReadInt32(0); + // Won't further read sequence number. + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, + ProcessTransactionServerRpcDataFlagPrefixWithoutMetadata) { + ::testing::InSequence sequence; + + // flag + ExpectReadInt32(kFlagPrefix); + // sequence number + ExpectReadInt32(0); + + // count + ExpectReadInt32(0); + EXPECT_CALL( + *transport_stream_receiver_, + NotifyRecvInitialMetadata(kFirstCallId, StatusOrContainerEq(Metadata{}))); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagPrefixWithMetadata) { + ::testing::InSequence sequence; + + // flag + ExpectReadInt32(kFlagPrefix); + // sequence number + ExpectReadInt32(0); + + const std::vector> kMetadata = { + {"", ""}, + {"", "value"}, + {"key", ""}, + {"key", "value"}, + {"another-key", "another-value"}, + }; + + // count + ExpectReadInt32(kMetadata.size()); + for (const auto& md : kMetadata) { + // metadata key + ExpectReadByteArray(md.first); + // metadata val + // TODO(waynetu): metadata value can also be "parcelable". + ExpectReadByteArray(md.second); + } + EXPECT_CALL( + *transport_stream_receiver_, + NotifyRecvInitialMetadata(kFirstCallId, StatusOrContainerEq(kMetadata))); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataNonEmpty) { + ::testing::InSequence sequence; + + // flag + ExpectReadInt32(kFlagMessageData); + // sequence number + ExpectReadInt32(0); + + // message data + // TODO(waynetu): message data can also be "parcelable". + const std::string kMessageData = "message data"; + ExpectReadByteArray(kMessageData); + EXPECT_CALL(*transport_stream_receiver_, + NotifyRecvMessage(kFirstCallId, StatusOrStrEq(kMessageData))); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagMessageDataEmpty) { + ::testing::InSequence sequence; + + // flag + ExpectReadInt32(kFlagMessageData); + // sequence number + ExpectReadInt32(0); + + // message data + // TODO(waynetu): message data can also be "parcelable". + const std::string kMessageData = ""; + ExpectReadByteArray(kMessageData); + EXPECT_CALL(*transport_stream_receiver_, + NotifyRecvMessage(kFirstCallId, StatusOrStrEq(kMessageData))); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithStatus) { + ::testing::InSequence sequence; + + constexpr int kStatus = 0x1234; + // flag + ExpectReadInt32(kFlagSuffix | kFlagStatusDescription | (kStatus << 16)); + // sequence number + ExpectReadInt32(0); + // status description + EXPECT_CALL(mock_readable_parcel_, ReadString); + // metadata count + ExpectReadInt32(0); + EXPECT_CALL(*transport_stream_receiver_, + NotifyRecvTrailingMetadata( + kFirstCallId, StatusOrContainerEq(Metadata{}), kStatus)); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +TEST_F(WireReaderTest, ProcessTransactionServerRpcDataFlagSuffixWithoutStatus) { + ::testing::InSequence sequence; + + // flag + ExpectReadInt32(kFlagSuffix); + // sequence number + ExpectReadInt32(0); + // No status description + // metadata count + ExpectReadInt32(0); + EXPECT_CALL(*transport_stream_receiver_, + NotifyRecvTrailingMetadata(kFirstCallId, + StatusOrContainerEq(Metadata{}), 0)); + + EXPECT_TRUE(CallProcessTransaction(kFirstCallId).ok()); +} + +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/test/core/transport/binder/wire_writer_test.cc b/test/core/transport/binder/wire_writer_test.cc new file mode 100644 index 00000000000..129a68c4474 --- /dev/null +++ b/test/core/transport/binder/wire_writer_test.cc @@ -0,0 +1,181 @@ +// Copyright 2021 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/ext/transport/binder/wire_format/wire_writer.h" + +#include +#include +#include + +#include "absl/memory/memory.h" +#include "test/core/transport/binder/mock_objects.h" +#include "test/core/util/test_config.h" + +namespace grpc_binder { + +using ::testing::Return; +using ::testing::StrictMock; + +MATCHER_P(StrEqInt8Ptr, target, "") { + return std::string(reinterpret_cast(arg)) == target; +} + +TEST(WireWriterTest, RpcCall) { + auto mock_binder = absl::make_unique(); + MockBinder& mock_binder_ref = *mock_binder; + StrictMock mock_writable_parcel; + ON_CALL(mock_binder_ref, GetWritableParcel) + .WillByDefault(Return(&mock_writable_parcel)); + WireWriterImpl wire_writer(std::move(mock_binder)); + + auto ExpectWriteByteArray = [&](const std::string& target) { + // length + EXPECT_CALL(mock_writable_parcel, WriteInt32(target.size())); + if (!target.empty()) { + // content + EXPECT_CALL(mock_writable_parcel, + WriteByteArray(StrEqInt8Ptr(target), target.size())); + } + }; + + ::testing::InSequence sequence; + int sequence_number = 0; + int tx_code = kFirstCallId; + + { + // flag + EXPECT_CALL(mock_writable_parcel, WriteInt32(0)); + // sequence number + EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number)); + + EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code))); + + Transaction tx(tx_code, sequence_number, /*is_client=*/true); + EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + sequence_number++; + tx_code++; + } + { + // flag + EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagPrefix)); + // sequence number + EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number)); + + EXPECT_CALL(mock_writable_parcel, + WriteString(absl::string_view("/example/method/ref"))); + + const std::vector> kMetadata = { + {"", ""}, + {"", "value"}, + {"key", ""}, + {"key", "value"}, + {"another-key", "another-value"}, + }; + + // Number of metadata + EXPECT_CALL(mock_writable_parcel, WriteInt32(kMetadata.size())); + + for (const auto& md : kMetadata) { + ExpectWriteByteArray(md.first); + ExpectWriteByteArray(md.second); + } + + EXPECT_CALL(mock_binder_ref, + Transact(BinderTransportTxCode(kFirstCallId + 1))); + + Transaction tx(kFirstCallId + 1, 1, /*is_client=*/true); + tx.SetPrefix(kMetadata); + tx.SetMethodRef("/example/method/ref"); + EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + sequence_number++; + tx_code++; + } + { + // flag + EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagMessageData)); + // sequence number + EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number)); + + ExpectWriteByteArray("data"); + EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code))); + + Transaction tx(tx_code, sequence_number, /*is_client=*/true); + tx.SetData("data"); + EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + } + { + // flag + EXPECT_CALL(mock_writable_parcel, WriteInt32(kFlagSuffix)); + // sequence number + EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number)); + + EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code))); + + Transaction tx(tx_code, sequence_number, /*is_client=*/true); + tx.SetSuffix({}); + EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + sequence_number++; + tx_code++; + } + { + // flag + EXPECT_CALL(mock_writable_parcel, + WriteInt32(kFlagPrefix | kFlagMessageData | kFlagSuffix)); + // sequence number + EXPECT_CALL(mock_writable_parcel, WriteInt32(sequence_number)); + + EXPECT_CALL(mock_writable_parcel, + WriteString(absl::string_view("/example/method/ref"))); + + const std::vector> kMetadata = { + {"", ""}, + {"", "value"}, + {"key", ""}, + {"key", "value"}, + {"another-key", "another-value"}, + }; + + // Number of metadata + EXPECT_CALL(mock_writable_parcel, WriteInt32(kMetadata.size())); + + for (const auto& md : kMetadata) { + ExpectWriteByteArray(md.first); + ExpectWriteByteArray(md.second); + } + + // Empty message data + ExpectWriteByteArray(""); + + EXPECT_CALL(mock_binder_ref, Transact(BinderTransportTxCode(tx_code))); + + Transaction tx(tx_code, sequence_number, /*is_client=*/true); + // TODO(waynetu): Implement a helper function that automatically creates + // EXPECT_CALL based on the tx object. + tx.SetPrefix(kMetadata); + tx.SetMethodRef("/example/method/ref"); + tx.SetData(""); + tx.SetSuffix({}); + EXPECT_TRUE(wire_writer.RpcCall(tx).ok()); + sequence_number++; + tx_code++; + } +} + +} // namespace grpc_binder + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 0a4bbe0f2b5..1e8724ccf39 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3468,7 +3468,7 @@ "flaky": false, "gtest": true, "language": "c++", - "name": "binder_smoke_test", + "name": "binder_transport_test", "platforms": [ "linux", "mac", @@ -4537,6 +4537,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "end2end_binder_transport_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, @@ -4679,6 +4703,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "fake_binder_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -6985,6 +7033,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "transport_stream_receiver_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false, @@ -7105,6 +7177,54 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "wire_reader_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "wire_writer_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,