diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 70464ea5a2d..66d87f4f508 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -2,7 +2,7 @@ name: Report a bug about: Create a report to help us improve labels: kind/bug, priority/P2 -assignees: veblush +assignees: karthikravis --- diff --git a/.github/ISSUE_TEMPLATE/cleanup_request.md b/.github/ISSUE_TEMPLATE/cleanup_request.md index 6e5c5aec48a..72a5c267257 100644 --- a/.github/ISSUE_TEMPLATE/cleanup_request.md +++ b/.github/ISSUE_TEMPLATE/cleanup_request.md @@ -2,7 +2,7 @@ name: Request a cleanup about: Suggest a cleanup in our repository labels: kind/internal cleanup, priority/P2 -assignees: veblush +assignees: karthikravis --- diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md index ba5d92da901..12315263911 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.md +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -2,7 +2,7 @@ name: Request a feature about: Suggest an idea for this project labels: kind/enhancement, priority/P2 -assignees: veblush +assignees: karthikravis --- diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 366b68604df..681d2f819ce 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -8,4 +8,4 @@ If you know who should review your pull request, please remove the mentioning be --> -@veblush +@karthikravis diff --git a/CMakeLists.txt b/CMakeLists.txt index cac83849d08..9f644900723 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -152,6 +152,14 @@ if(WIN32) set(_gRPC_PLATFORM_WINDOWS ON) endif() + # Use C99 standard +set(CMAKE_C_STANDARD 99) + +# Add c++11 flags +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + set(CMAKE_POSITION_INDEPENDENT_CODE TRUE) set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules") @@ -201,11 +209,6 @@ include(cmake/ssl.cmake) include(cmake/upb.cmake) include(cmake/zlib.cmake) -if(NOT MSVC) - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") -endif() - if(_gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_IOS) set(_gRPC_ALLTARGETS_LIBRARIES ${CMAKE_DL_LIBS} m pthread) elseif(_gRPC_PLATFORM_ANDROID) @@ -822,6 +825,8 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx xds_end2end_test) endif() + add_dependencies(buildtests_cxx xds_interop_client) + add_dependencies(buildtests_cxx xds_interop_server) add_dependencies(buildtests_cxx alts_credentials_fuzzer_one_entry) add_dependencies(buildtests_cxx client_fuzzer_one_entry) add_dependencies(buildtests_cxx hpack_parser_fuzzer_test_one_entry) @@ -14509,6 +14514,110 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(xds_interop_client + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h + test/cpp/interop/xds_interop_client.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(xds_interop_client + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_interop_client + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc++ + grpc++_test_config + grpc + gpr + address_sorting + upb + ${_gRPC_GFLAGS_LIBRARIES} +) + + +endif() +if(gRPC_BUILD_TESTS) + +add_executable(xds_interop_server + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/empty.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/messages.grpc.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.pb.h + ${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/test.grpc.pb.h + test/cpp/interop/xds_interop_server.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(xds_interop_server + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(xds_interop_server + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc++ + grpc++_test_config + grpc + gpr + address_sorting + upb + ${_gRPC_GFLAGS_LIBRARIES} +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(alts_credentials_fuzzer_one_entry test/core/security/alts_credentials_fuzzer.cc test/core/util/one_corpus_entry_fuzzer.cc diff --git a/Makefile b/Makefile index 243053da687..61bf806226a 100644 --- a/Makefile +++ b/Makefile @@ -1307,6 +1307,8 @@ work_serializer_test: $(BINDIR)/$(CONFIG)/work_serializer_test writes_per_rpc_test: $(BINDIR)/$(CONFIG)/writes_per_rpc_test xds_bootstrap_test: $(BINDIR)/$(CONFIG)/xds_bootstrap_test xds_end2end_test: $(BINDIR)/$(CONFIG)/xds_end2end_test +xds_interop_client: $(BINDIR)/$(CONFIG)/xds_interop_client +xds_interop_server: $(BINDIR)/$(CONFIG)/xds_interop_server boringssl_ssl_test: $(BINDIR)/$(CONFIG)/boringssl_ssl_test boringssl_crypto_test: $(BINDIR)/$(CONFIG)/boringssl_crypto_test alts_credentials_fuzzer_one_entry: $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry @@ -1666,6 +1668,8 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ $(BINDIR)/$(CONFIG)/xds_bootstrap_test \ $(BINDIR)/$(CONFIG)/xds_end2end_test \ + $(BINDIR)/$(CONFIG)/xds_interop_client \ + $(BINDIR)/$(CONFIG)/xds_interop_server \ $(BINDIR)/$(CONFIG)/boringssl_ssl_test \ $(BINDIR)/$(CONFIG)/boringssl_crypto_test \ $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry \ @@ -1822,6 +1826,8 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/writes_per_rpc_test \ $(BINDIR)/$(CONFIG)/xds_bootstrap_test \ $(BINDIR)/$(CONFIG)/xds_end2end_test \ + $(BINDIR)/$(CONFIG)/xds_interop_client \ + $(BINDIR)/$(CONFIG)/xds_interop_server \ $(BINDIR)/$(CONFIG)/alts_credentials_fuzzer_one_entry \ $(BINDIR)/$(CONFIG)/client_fuzzer_one_entry \ $(BINDIR)/$(CONFIG)/hpack_parser_fuzzer_test_one_entry \ @@ -19152,6 +19158,112 @@ $(OBJDIR)/$(CONFIG)/test/cpp/end2end/test_service_impl.o: $(GENDIR)/src/proto/gr $(OBJDIR)/$(CONFIG)/test/cpp/end2end/xds_end2end_test.o: $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.pb.cc $(GENDIR)/src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/simple_messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/ads_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/cds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/eds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lds_rds_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lds_rds_for_test.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.pb.cc $(GENDIR)/src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.cc +XDS_INTEROP_CLIENT_SRC = \ + $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc \ + $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \ + $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc \ + test/cpp/interop/xds_interop_client.cc \ + +XDS_INTEROP_CLIENT_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(XDS_INTEROP_CLIENT_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/xds_interop_client: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/xds_interop_client: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/xds_interop_client: $(PROTOBUF_DEP) $(XDS_INTEROP_CLIENT_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(XDS_INTEROP_CLIENT_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/xds_interop_client + +endif + +endif + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/empty.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/messages.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/test/cpp/interop/xds_interop_client.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +deps_xds_interop_client: $(XDS_INTEROP_CLIENT_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(XDS_INTEROP_CLIENT_OBJS:.o=.dep) +endif +endif +$(OBJDIR)/$(CONFIG)/test/cpp/interop/xds_interop_client.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc + + +XDS_INTEROP_SERVER_SRC = \ + $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc \ + $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc \ + $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc \ + test/cpp/interop/xds_interop_server.cc \ + +XDS_INTEROP_SERVER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(XDS_INTEROP_SERVER_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/xds_interop_server: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.5.0+. + +$(BINDIR)/$(CONFIG)/xds_interop_server: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/xds_interop_server: $(PROTOBUF_DEP) $(XDS_INTEROP_SERVER_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(XDS_INTEROP_SERVER_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/xds_interop_server + +endif + +endif + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/empty.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/messages.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/src/proto/grpc/testing/test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +$(OBJDIR)/$(CONFIG)/test/cpp/interop/xds_interop_server.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_config.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LIBDIR)/$(CONFIG)/libaddress_sorting.a $(LIBDIR)/$(CONFIG)/libupb.a + +deps_xds_interop_server: $(XDS_INTEROP_SERVER_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(XDS_INTEROP_SERVER_OBJS:.o=.dep) +endif +endif +$(OBJDIR)/$(CONFIG)/test/cpp/interop/xds_interop_server.o: $(GENDIR)/src/proto/grpc/testing/empty.pb.cc $(GENDIR)/src/proto/grpc/testing/empty.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.pb.cc $(GENDIR)/src/proto/grpc/testing/messages.grpc.pb.cc $(GENDIR)/src/proto/grpc/testing/test.pb.cc $(GENDIR)/src/proto/grpc/testing/test.grpc.pb.cc + + BORINGSSL_SSL_TEST_SRC = \ third_party/boringssl-with-bazel/src/crypto/test/abi_test.cc \ third_party/boringssl-with-bazel/src/crypto/test/gtest_main.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index db915e77dcc..c0261d69326 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7565,4 +7565,40 @@ targets: - linux - posix - mac +- name: xds_interop_client + build: test + run: false + language: c++ + headers: [] + src: + - src/proto/grpc/testing/empty.proto + - src/proto/grpc/testing/messages.proto + - src/proto/grpc/testing/test.proto + - test/cpp/interop/xds_interop_client.cc + deps: + - grpc_test_util + - grpc++ + - grpc++_test_config + - grpc + - gpr + - address_sorting + - upb +- name: xds_interop_server + build: test + run: false + language: c++ + headers: [] + src: + - src/proto/grpc/testing/empty.proto + - src/proto/grpc/testing/messages.proto + - src/proto/grpc/testing/test.proto + - test/cpp/interop/xds_interop_server.cc + deps: + - grpc_test_util + - grpc++ + - grpc++_test_config + - grpc + - gpr + - address_sorting + - upb tests: [] diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 8935a516f8a..2312c04b124 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -70,10 +70,16 @@ typedef set StringPairSet; class IndentScope { public: explicit IndentScope(grpc_generator::Printer* printer) : printer_(printer) { + // NOTE(rbellevi): Two-space tabs are hard-coded in the protocol compiler. + // Doubling our indents and outdents guarantees compliance with PEP8. + printer_->Indent(); printer_->Indent(); } - ~IndentScope() { printer_->Outdent(); } + ~IndentScope() { + printer_->Outdent(); + printer_->Outdent(); + } private: grpc_generator::Printer* printer_; @@ -92,8 +98,9 @@ void PrivateGenerator::PrintAllComments(StringVector comments, // smarter and more sophisticated, but at the moment, if there is // no docstring to print, we simply emit "pass" to ensure validity // of the generated code. - out->Print("# missing associated documentation comment in .proto file\n"); - out->Print("pass\n"); + out->Print( + "\"\"\"Missing associated documentation comment in .proto " + "file\"\"\"\n"); return; } out->Print("\"\"\""); @@ -570,6 +577,93 @@ bool PrivateGenerator::PrintAddServicerToServer( return true; } +/* Prints out a service class used as a container for static methods pertaining + * to a class. This class has the exact name of service written in the ".proto" + * file, with no suffixes. Since this class merely acts as a namespace, it + * should never be instantiated. + */ +bool PrivateGenerator::PrintServiceClass( + const grpc::string& package_qualified_service_name, + const grpc_generator::Service* service, grpc_generator::Printer* out) { + StringMap dict; + dict["Service"] = service->name(); + out->Print("\n\n"); + out->Print(" # This class is part of an EXPERIMENTAL API.\n"); + out->Print(dict, "class $Service$(object):\n"); + { + IndentScope class_indent(out); + StringVector service_comments = service->GetAllComments(); + PrintAllComments(service_comments, out); + for (int i = 0; i < service->method_count(); ++i) { + const auto& method = service->method(i); + grpc::string request_module_and_class; + if (!method->get_module_and_message_path_input( + &request_module_and_class, generator_file_name, + generate_in_pb2_grpc, config.import_prefix, + config.prefixes_to_filter)) { + return false; + } + grpc::string response_module_and_class; + if (!method->get_module_and_message_path_output( + &response_module_and_class, generator_file_name, + generate_in_pb2_grpc, config.import_prefix, + config.prefixes_to_filter)) { + return false; + } + out->Print("\n"); + StringMap method_dict; + method_dict["Method"] = method->name(); + out->Print("@staticmethod\n"); + out->Print(method_dict, "def $Method$("); + grpc::string request_parameter( + method->ClientStreaming() ? "request_iterator" : "request"); + StringMap args_dict; + args_dict["RequestParameter"] = request_parameter; + { + IndentScope args_indent(out); + IndentScope args_double_indent(out); + out->Print(args_dict, "$RequestParameter$,\n"); + out->Print("target,\n"); + out->Print("options=(),\n"); + out->Print("channel_credentials=None,\n"); + out->Print("call_credentials=None,\n"); + out->Print("compression=None,\n"); + out->Print("wait_for_ready=None,\n"); + out->Print("timeout=None,\n"); + out->Print("metadata=None):\n"); + } + { + IndentScope method_indent(out); + grpc::string arity_method_name = + grpc::string(method->ClientStreaming() ? "stream" : "unary") + "_" + + grpc::string(method->ServerStreaming() ? "stream" : "unary"); + args_dict["ArityMethodName"] = arity_method_name; + args_dict["PackageQualifiedService"] = package_qualified_service_name; + args_dict["Method"] = method->name(); + out->Print(args_dict, + "return " + "grpc.experimental.$ArityMethodName$($RequestParameter$, " + "target, '/$PackageQualifiedService$/$Method$',\n"); + { + IndentScope continuation_indent(out); + StringMap serializer_dict; + serializer_dict["RequestModuleAndClass"] = request_module_and_class; + serializer_dict["ResponseModuleAndClass"] = response_module_and_class; + out->Print(serializer_dict, + "$RequestModuleAndClass$.SerializeToString,\n"); + out->Print(serializer_dict, "$ResponseModuleAndClass$.FromString,\n"); + out->Print("options, channel_credentials,\n"); + out->Print( + "call_credentials, compression, wait_for_ready, timeout, " + "metadata)\n"); + } + } + } + } + // TODO(rbellevi): Add methods pertinent to the server side as well. + return true; +} + bool PrivateGenerator::PrintBetaPreamble(grpc_generator::Printer* out) { StringMap var; var["Package"] = config.beta_package_root; @@ -646,7 +740,9 @@ bool PrivateGenerator::PrintGAServices(grpc_generator::Printer* out) { if (!(PrintStub(package_qualified_service_name, service.get(), out) && PrintServicer(service.get(), out) && PrintAddServicerToServer(package_qualified_service_name, - service.get(), out))) { + service.get(), out) && + PrintServiceClass(package_qualified_service_name, service.get(), + out))) { return false; } } diff --git a/src/compiler/python_private_generator.h b/src/compiler/python_private_generator.h index ee5cbbd9ae3..4d27839ad47 100644 --- a/src/compiler/python_private_generator.h +++ b/src/compiler/python_private_generator.h @@ -59,6 +59,9 @@ struct PrivateGenerator { const grpc_generator::Service* service, grpc_generator::Printer* out); + bool PrintServiceClass(const grpc::string& package_qualified_service_name, + const grpc_generator::Service* service, + grpc_generator::Printer* out); bool PrintBetaServicer(const grpc_generator::Service* service, grpc_generator::Printer* out); bool PrintBetaServerFactory( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi index 67848cadaf8..24c79533012 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/callback_common.pyx.pxi @@ -72,7 +72,7 @@ cdef CallbackFailureHandler CQ_SHUTDOWN_FAILURE_HANDLER = CallbackFailureHandler cdef class CallbackCompletionQueue: def __cinit__(self): - self._shutdown_completed = asyncio.get_event_loop().create_future() + self._shutdown_completed = grpc_aio_loop().create_future() self._wrapper = CallbackWrapper( self._shutdown_completed, CQ_SHUTDOWN_FAILURE_HANDLER) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi index 4b38779ab63..da3f976f117 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi @@ -13,16 +13,32 @@ # limitations under the License. -cdef bint _grpc_aio_initialized = 0 +cdef bint _grpc_aio_initialized = False +# NOTE(lidiz) Theoretically, applications can run in multiple event loops as +# long as they are in the same thread with same magic. However, I don't think +# we should support this use case. So, the gRPC Python Async Stack should use +# a single event loop picked by "init_grpc_aio". +cdef object _grpc_aio_loop def init_grpc_aio(): global _grpc_aio_initialized + global _grpc_aio_loop if _grpc_aio_initialized: return + else: + _grpc_aio_initialized = True + # Anchors the event loop that the gRPC library going to use. + _grpc_aio_loop = asyncio.get_event_loop() + + # Activates asyncio IO manager install_asyncio_iomgr() + + # TODO(https://github.com/grpc/grpc/issues/22244) we need a the + # grpc_shutdown_blocking() counterpart for this call. Otherwise, the gRPC + # library won't shutdown cleanly. grpc_init() # Timers are triggered by the Asyncio loop. We disable @@ -34,4 +50,9 @@ def init_grpc_aio(): # event loop, as it is being done by the other Asyncio callbacks. Executor.SetThreadingAll(False) - _grpc_aio_initialized = 1 + _grpc_aio_initialized = False + + +def grpc_aio_loop(): + """Returns the one-and-only gRPC Aio event loop.""" + return _grpc_aio_loop diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi index 639cb92d0c5..37ba5f0d346 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/iomgr.pyx.pxi @@ -49,7 +49,6 @@ cdef void asyncio_socket_connect( const grpc_sockaddr* addr, size_t addr_len, grpc_custom_connect_callback connect_cb) with gil: - host, port = sockaddr_to_tuple(addr, addr_len) socket = <_AsyncioSocket>grpc_socket.impl socket.connect(host, port, connect_cb) @@ -185,14 +184,15 @@ cdef void asyncio_resolve_async( cdef void asyncio_timer_start(grpc_custom_timer* grpc_timer) with gil: timer = _AsyncioTimer.create(grpc_timer, grpc_timer.timeout_ms / 1000.0) - Py_INCREF(timer) grpc_timer.timer = timer cdef void asyncio_timer_stop(grpc_custom_timer* grpc_timer) with gil: - timer = <_AsyncioTimer>grpc_timer.timer - timer.stop() - Py_DECREF(timer) + if grpc_timer.timer == NULL: + return + else: + timer = <_AsyncioTimer>grpc_timer.timer + timer.stop() cdef void asyncio_init_loop() with gil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi index d533d6e3a37..7d47fa77b00 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/resolver.pyx.pxi @@ -29,34 +29,27 @@ cdef class _AsyncioResolver: id_ = id(self) return f"<{class_name} {id_}>" - def _resolve_cb(self, future): - error = False + async def _async_resolve(self, bytes host, bytes port): + self._task_resolve = None try: - res = future.result() + resolved = await grpc_aio_loop().getaddrinfo(host, port) except Exception as e: - error = True - error_msg = str(e) - finally: - self._task_resolve = None - - if not error: grpc_custom_resolve_callback( self._grpc_resolver, - tuples_to_resolvaddr(res), - 0 + NULL, + grpc_socket_error("Resolve address [{}:{}] failed: {}: {}".format( + host, port, type(e), str(e)).encode()) ) else: grpc_custom_resolve_callback( self._grpc_resolver, - NULL, - grpc_socket_error("getaddrinfo {}".format(error_msg).encode()) + tuples_to_resolvaddr(resolved), + 0 ) cdef void resolve(self, char* host, char* port): assert not self._task_resolve - loop = asyncio.get_event_loop() - self._task_resolve = asyncio.ensure_future( - loop.getaddrinfo(host, port) + self._task_resolve = grpc_aio_loop().create_task( + self._async_resolve(host, port) ) - self._task_resolve.add_done_callback(self._resolve_cb) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi index 1664ef7e35a..65ee6e24e59 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/socket.pyx.pxi @@ -35,7 +35,6 @@ cdef class _AsyncioSocket: self._server = None self._py_socket = None self._peername = None - self._loop = asyncio.get_event_loop() @staticmethod cdef _AsyncioSocket create(grpc_custom_socket * grpc_socket, @@ -62,27 +61,37 @@ cdef class _AsyncioSocket: connected = self.is_connected() return f"<{class_name} {id_} connected={connected}>" - def _connect_cb(self, future): + async def _async_connect(self, object host, object port,): + self._task_connect = None try: - self._reader, self._writer = future.result() + self._reader, self._writer = await asyncio.open_connection(host, port) except Exception as e: self._grpc_connect_cb( self._grpc_socket, - grpc_socket_error("Socket connect failed: {}".format(e).encode()) + grpc_socket_error("Socket connect failed: {}: {}".format(type(e), str(e)).encode()) ) - return - finally: - self._task_connect = None + else: + # gRPC default posix implementation disables nagle + # algorithm. + sock = self._writer.transport.get_extra_info('socket') + sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) - # gRPC default posix implementation disables nagle - # algorithm. - sock = self._writer.transport.get_extra_info('socket') - sock.setsockopt(native_socket.IPPROTO_TCP, native_socket.TCP_NODELAY, True) + self._grpc_connect_cb( + self._grpc_socket, + 0 + ) - self._grpc_connect_cb( - self._grpc_socket, - 0 + cdef void connect(self, + object host, + object port, + grpc_custom_connect_callback grpc_connect_cb): + assert not self._reader + assert not self._task_connect + + self._task_connect = grpc_aio_loop().create_task( + self._async_connect(host, port) ) + self._grpc_connect_cb = grpc_connect_cb async def _async_read(self, size_t length): self._task_read = None @@ -106,25 +115,12 @@ cdef class _AsyncioSocket: 0 ) - cdef void connect(self, - object host, - object port, - grpc_custom_connect_callback grpc_connect_cb): - assert not self._reader - assert not self._task_connect - - self._task_connect = asyncio.ensure_future( - asyncio.open_connection(host, port) - ) - self._grpc_connect_cb = grpc_connect_cb - self._task_connect.add_done_callback(self._connect_cb) - cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback grpc_read_cb): assert not self._task_read self._grpc_read_cb = grpc_read_cb self._read_buffer = buffer_ - self._task_read = self._loop.create_task(self._async_read(length)) + self._task_read = grpc_aio_loop().create_task(self._async_read(length)) async def _async_write(self, bytearray outbound_buffer): self._writer.write(outbound_buffer) @@ -157,7 +153,7 @@ cdef class _AsyncioSocket: outbound_buffer.extend(start[:length]) self._grpc_write_cb = grpc_write_cb - self._task_write = self._loop.create_task(self._async_write(outbound_buffer)) + self._task_write = grpc_aio_loop().create_task(self._async_write(outbound_buffer)) cdef bint is_connected(self): return self._reader and not self._reader._transport.is_closing() @@ -201,7 +197,7 @@ cdef class _AsyncioSocket: sock=self._py_socket, ) - self._loop.create_task(create_asyncio_server()) + grpc_aio_loop().create_task(create_asyncio_server()) cdef accept(self, grpc_custom_socket* grpc_socket_client, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi index 5af5dcd9282..d2979c86b49 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pxd.pxi @@ -15,11 +15,10 @@ cdef class _AsyncioTimer: cdef: grpc_custom_timer * _grpc_timer - object _deadline - object _timer_handler - int _active + object _timer_future + bint _active @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline) + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout) cdef stop(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi index e8edb4a5cf8..51145116e21 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/iomgr/timer.pyx.pxi @@ -16,21 +16,22 @@ cdef class _AsyncioTimer: def __cinit__(self): self._grpc_timer = NULL - self._timer_handler = None - self._active = 0 + self._timer_future = None + self._active = False + cpython.Py_INCREF(self) @staticmethod - cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, deadline): + cdef _AsyncioTimer create(grpc_custom_timer * grpc_timer, float timeout): timer = _AsyncioTimer() timer._grpc_timer = grpc_timer - timer._deadline = deadline - timer._timer_handler = asyncio.get_event_loop().call_later(deadline, timer._on_deadline) - timer._active = 1 + timer._timer_future = grpc_aio_loop().call_later(timeout, timer.on_time_up) + timer._active = True return timer - def _on_deadline(self): - self._active = 0 + def on_time_up(self): + self._active = False grpc_custom_timer_callback(self._grpc_timer, 0) + cpython.Py_DECREF(self) def __repr__(self): class_name = self.__class__.__name__ @@ -38,8 +39,9 @@ cdef class _AsyncioTimer: return f"<{class_name} {id_} deadline={self._deadline} active={self._active}>" cdef stop(self): - if self._active == 0: + if not self._active: return - self._timer_handler.cancel() - self._active = 0 + self._timer_future.cancel() + self._active = False + cpython.Py_DECREF(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 01baa5288ef..74c7f6c1405 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -256,6 +256,8 @@ cdef void _call( on_success(started_tags) else: raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) + + cdef void _process_integrated_call_tag( _ChannelState state, _BatchOperationTag tag) except *: cdef _CallState call_state = state.integrated_call_states.pop(tag) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 00311b5ea2a..eff95c4f299 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -148,8 +148,9 @@ cdef class Server: # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) - grpc_server_destroy(self.c_server) - self.c_server = NULL + with nogil: + grpc_server_destroy(self.c_server) + self.c_server = NULL def __dealloc__(self): if self.c_server == NULL: diff --git a/src/python/grpcio/grpc/_simple_stubs.py b/src/python/grpcio/grpc/_simple_stubs.py index 72872e163c7..44308657d58 100644 --- a/src/python/grpcio/grpc/_simple_stubs.py +++ b/src/python/grpcio/grpc/_simple_stubs.py @@ -53,8 +53,10 @@ else: def _create_channel(target: str, options: Sequence[Tuple[str, str]], channel_credentials: Optional[grpc.ChannelCredentials], compression: Optional[grpc.Compression]) -> grpc.Channel: - channel_credentials = channel_credentials or grpc.local_channel_credentials( - ) + # TODO(rbellevi): Revisit the default value for this. + if channel_credentials is None: + raise NotImplementedError( + "channel_credentials must be supplied explicitly.") if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials: _LOGGER.debug(f"Creating insecure channel with options '{options}' " + f"and compression '{compression}'") @@ -156,26 +158,13 @@ class ChannelCache: return len(self._mapping) -# TODO(rbellevi): Consider a credential type that has the -# following functionality matrix: -# -# +----------+-------+--------+ -# | | local | remote | -# |----------+-------+--------+ -# | secure | o | o | -# | insecure | o | x | -# +----------+-------+--------+ -# -# Make this the default option. - - @experimental_api def unary_unary( request: RequestType, target: str, method: str, request_serializer: Optional[Callable[[Any], bytes]] = None, - request_deserializer: Optional[Callable[[bytes], Any]] = None, + response_deserializer: Optional[Callable[[bytes], Any]] = None, options: Sequence[Tuple[AnyStr, AnyStr]] = (), channel_credentials: Optional[grpc.ChannelCredentials] = None, call_credentials: Optional[grpc.CallCredentials] = None, @@ -232,7 +221,7 @@ def unary_unary( channel = ChannelCache.get().get_channel(target, options, channel_credentials, compression) multicallable = channel.unary_unary(method, request_serializer, - request_deserializer) + response_deserializer) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -246,7 +235,7 @@ def unary_stream( target: str, method: str, request_serializer: Optional[Callable[[Any], bytes]] = None, - request_deserializer: Optional[Callable[[bytes], Any]] = None, + response_deserializer: Optional[Callable[[bytes], Any]] = None, options: Sequence[Tuple[AnyStr, AnyStr]] = (), channel_credentials: Optional[grpc.ChannelCredentials] = None, call_credentials: Optional[grpc.CallCredentials] = None, @@ -302,7 +291,7 @@ def unary_stream( channel = ChannelCache.get().get_channel(target, options, channel_credentials, compression) multicallable = channel.unary_stream(method, request_serializer, - request_deserializer) + response_deserializer) return multicallable(request, metadata=metadata, wait_for_ready=wait_for_ready, @@ -316,7 +305,7 @@ def stream_unary( target: str, method: str, request_serializer: Optional[Callable[[Any], bytes]] = None, - request_deserializer: Optional[Callable[[bytes], Any]] = None, + response_deserializer: Optional[Callable[[bytes], Any]] = None, options: Sequence[Tuple[AnyStr, AnyStr]] = (), channel_credentials: Optional[grpc.ChannelCredentials] = None, call_credentials: Optional[grpc.CallCredentials] = None, @@ -372,7 +361,7 @@ def stream_unary( channel = ChannelCache.get().get_channel(target, options, channel_credentials, compression) multicallable = channel.stream_unary(method, request_serializer, - request_deserializer) + response_deserializer) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, @@ -386,7 +375,7 @@ def stream_stream( target: str, method: str, request_serializer: Optional[Callable[[Any], bytes]] = None, - request_deserializer: Optional[Callable[[bytes], Any]] = None, + response_deserializer: Optional[Callable[[bytes], Any]] = None, options: Sequence[Tuple[AnyStr, AnyStr]] = (), channel_credentials: Optional[grpc.ChannelCredentials] = None, call_credentials: Optional[grpc.CallCredentials] = None, @@ -442,7 +431,7 @@ def stream_stream( channel = ChannelCache.get().get_channel(target, options, channel_credentials, compression) multicallable = channel.stream_stream(method, request_serializer, - request_deserializer) + response_deserializer) return multicallable(request_iterator, metadata=metadata, wait_for_ready=wait_for_ready, diff --git a/src/python/grpcio/grpc/experimental/aio/_channel.py b/src/python/grpcio/grpc/experimental/aio/_channel.py index fcc61338ca3..300a1b3490d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_channel.py +++ b/src/python/grpcio/grpc/experimental/aio/_channel.py @@ -228,7 +228,7 @@ class Channel(_base_channel.Channel): "UnaryUnaryClientInterceptors, the following are invalid: {}"\ .format(invalid_interceptors)) - self._loop = asyncio.get_event_loop() + self._loop = cygrpc.grpc_aio_loop() self._channel = cygrpc.AioChannel( _common.encode(target), _augment_channel_arguments(options, compression), credentials, @@ -240,7 +240,7 @@ class Channel(_base_channel.Channel): async def __aexit__(self, exc_type, exc_val, exc_tb): await self._close(None) - async def _close(self, grace): + async def _close(self, grace): # pylint: disable=too-many-branches if self._channel.closed(): return @@ -252,7 +252,27 @@ class Channel(_base_channel.Channel): calls = [] call_tasks = [] for task in tasks: - stack = task.get_stack(limit=1) + try: + stack = task.get_stack(limit=1) + except AttributeError as attribute_error: + # NOTE(lidiz) tl;dr: If the Task is created with a CPython + # object, it will trigger AttributeError. + # + # In the global finalizer, the event loop schedules + # a CPython PyAsyncGenAThrow object. + # https://github.com/python/cpython/blob/00e45877e33d32bb61aa13a2033e3bba370bda4d/Lib/asyncio/base_events.py#L484 + # + # However, the PyAsyncGenAThrow object is written in C and + # failed to include the normal Python frame objects. Hence, + # this exception is a false negative, and it is safe to ignore + # the failure. It is fixed by https://github.com/python/cpython/pull/18669, + # but not available until 3.9 or 3.8.3. So, we have to keep it + # for a while. + # TODO(lidiz) drop this hack after 3.8 deprecation + if 'frame' in str(attribute_error): + continue + else: + raise # If the Task is created by a C-extension, the stack will be empty. if not stack: diff --git a/src/python/grpcio/grpc/experimental/aio/_interceptor.py b/src/python/grpcio/grpc/experimental/aio/_interceptor.py index 04f2f72e275..9e99a1b125d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_interceptor.py +++ b/src/python/grpcio/grpc/experimental/aio/_interceptor.py @@ -160,10 +160,10 @@ class InterceptedUnaryUnaryCall(_base_call.UnaryUnaryCall): loop: asyncio.AbstractEventLoop) -> None: self._channel = channel self._loop = loop - self._interceptors_task = asyncio.ensure_future(self._invoke( - interceptors, method, timeout, metadata, credentials, - wait_for_ready, request, request_serializer, response_deserializer), - loop=loop) + self._interceptors_task = loop.create_task( + self._invoke(interceptors, method, timeout, metadata, credentials, + wait_for_ready, request, request_serializer, + response_deserializer)) self._pending_add_done_callbacks = [] self._interceptors_task.add_done_callback( self._fire_pending_add_done_callbacks) diff --git a/src/python/grpcio/grpc/experimental/aio/_server.py b/src/python/grpcio/grpc/experimental/aio/_server.py index 587d8096c69..18e2bf65553 100644 --- a/src/python/grpcio/grpc/experimental/aio/_server.py +++ b/src/python/grpcio/grpc/experimental/aio/_server.py @@ -13,7 +13,6 @@ # limitations under the License. """Server-side implementation of gRPC Asyncio Python.""" -import asyncio from concurrent.futures import Executor from typing import Any, Optional, Sequence @@ -41,7 +40,7 @@ class Server(_base_server.Server): options: ChannelArgumentType, maximum_concurrent_rpcs: Optional[int], compression: Optional[grpc.Compression]): - self._loop = asyncio.get_event_loop() + self._loop = cygrpc.grpc_aio_loop() if interceptors: invalid_interceptors = [ interceptor for interceptor in interceptors diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index a8f119fd46b..53999c3caa4 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -193,6 +193,7 @@ class TestGevent(setuptools.Command): 'unit._server_ssl_cert_config_test', # TODO(https://github.com/grpc/grpc/issues/14901) enable this test 'protoc_plugin._python_plugin_test.PythonPluginTest', + 'protoc_plugin._python_plugin_test.SimpleStubsPluginTest', # Beta API is unsupported for gevent 'protoc_plugin.beta_python_plugin_test', 'unit.beta._beta_features_test', diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 963dc766822..94ffecc63f0 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -27,6 +27,7 @@ import unittest from six import moves import grpc +import grpc.experimental from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -503,5 +504,118 @@ class PythonPluginTest(unittest.TestCase): service.server.stop(None) +@unittest.skipIf(sys.version_info[0] < 3, "Unsupported on Python 2.") +class SimpleStubsPluginTest(unittest.TestCase): + servicer_methods = _ServicerMethods() + + class Servicer(service_pb2_grpc.TestServiceServicer): + + def UnaryCall(self, request, context): + return SimpleStubsPluginTest.servicer_methods.UnaryCall( + request, context) + + def StreamingOutputCall(self, request, context): + return SimpleStubsPluginTest.servicer_methods.StreamingOutputCall( + request, context) + + def StreamingInputCall(self, request_iterator, context): + return SimpleStubsPluginTest.servicer_methods.StreamingInputCall( + request_iterator, context) + + def FullDuplexCall(self, request_iterator, context): + return SimpleStubsPluginTest.servicer_methods.FullDuplexCall( + request_iterator, context) + + def HalfDuplexCall(self, request_iterator, context): + return SimpleStubsPluginTest.servicer_methods.HalfDuplexCall( + request_iterator, context) + + def setUp(self): + super(SimpleStubsPluginTest, self).setUp() + self._server = test_common.test_server() + service_pb2_grpc.add_TestServiceServicer_to_server( + self.Servicer(), self._server) + self._port = self._server.add_insecure_port('[::]:0') + self._server.start() + self._target = 'localhost:{}'.format(self._port) + + def tearDown(self): + self._server.stop(None) + super(SimpleStubsPluginTest, self).tearDown() + + def testUnaryCall(self): + request = request_pb2.SimpleRequest(response_size=13) + response = service_pb2_grpc.TestService.UnaryCall( + request, + self._target, + channel_credentials=grpc.experimental.insecure_channel_credentials( + ), + wait_for_ready=True) + expected_response = self.servicer_methods.UnaryCall( + request, 'not a real context!') + self.assertEqual(expected_response, response) + + def testStreamingOutputCall(self): + request = _streaming_output_request() + expected_responses = self.servicer_methods.StreamingOutputCall( + request, 'not a real RpcContext!') + responses = service_pb2_grpc.TestService.StreamingOutputCall( + request, + self._target, + channel_credentials=grpc.experimental.insecure_channel_credentials( + ), + wait_for_ready=True) + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + def testStreamingInputCall(self): + response = service_pb2_grpc.TestService.StreamingInputCall( + _streaming_input_request_iterator(), + self._target, + channel_credentials=grpc.experimental.insecure_channel_credentials( + ), + wait_for_ready=True) + expected_response = self.servicer_methods.StreamingInputCall( + _streaming_input_request_iterator(), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testFullDuplexCall(self): + responses = service_pb2_grpc.TestService.FullDuplexCall( + _full_duplex_request_iterator(), + self._target, + channel_credentials=grpc.experimental.insecure_channel_credentials( + ), + wait_for_ready=True) + expected_responses = self.servicer_methods.FullDuplexCall( + _full_duplex_request_iterator(), 'not a real RpcContext!') + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + def testHalfDuplexCall(self): + + def half_duplex_request_iterator(): + request = request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + responses = service_pb2_grpc.TestService.HalfDuplexCall( + half_duplex_request_iterator(), + self._target, + channel_credentials=grpc.experimental.insecure_channel_credentials( + ), + wait_for_ready=True) + expected_responses = self.servicer_methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index eb702f6c8b0..196e9f08b0a 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -7,6 +7,7 @@ "interop._insecure_intraop_test.InsecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest", "protoc_plugin._python_plugin_test.PythonPluginTest", + "protoc_plugin._python_plugin_test.SimpleStubsPluginTest", "protoc_plugin._split_definitions_test.SameProtoGrpcBeforeProtoProtocStyleTest", "protoc_plugin._split_definitions_test.SameProtoMid2016ProtocStyleTest", "protoc_plugin._split_definitions_test.SameProtoProtoBeforeGrpcProtocStyleTest", diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 74bbc04be2f..f845e078684 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -457,16 +457,16 @@ class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): # Should be around the same as the timeout remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 2) + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT * 3 / 2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 5 / 2) response = await call.read() self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) # Should be around the timeout minus a unit of wait time remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT // 2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT / 2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 3 / 2) self.assertEqual(grpc.StatusCode.OK, await call.code()) diff --git a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py index 76de5232d35..45ed9691bcc 100644 --- a/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py +++ b/src/python/grpcio_tests/tests_py3_only/unit/_simple_stubs_test.py @@ -174,13 +174,6 @@ class SimpleStubsTest(unittest.TestCase): channel_credentials=grpc.local_channel_credentials()) self.assertEqual(_REQUEST, response) - def test_channel_credentials_default(self): - with _server(grpc.local_server_credentials()) as port: - target = f'localhost:{port}' - response = grpc.experimental.unary_unary(_REQUEST, target, - _UNARY_UNARY) - self.assertEqual(_REQUEST, response) - def test_channels_cached(self): with _server(grpc.local_server_credentials()) as port: target = f'localhost:{port}' diff --git a/templates/CMakeLists.txt.template b/templates/CMakeLists.txt.template index b5b1ae30811..1a5c81f5357 100644 --- a/templates/CMakeLists.txt.template +++ b/templates/CMakeLists.txt.template @@ -242,6 +242,14 @@ set(_gRPC_PLATFORM_WINDOWS ON) endif() + # Use C99 standard + set(CMAKE_C_STANDARD 99) + + # Add c++11 flags + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) + set(CMAKE_CXX_EXTENSIONS OFF) + ## Some libraries are shared even with BUILD_SHARED_LIBRARIES=OFF set(CMAKE_POSITION_INDEPENDENT_CODE TRUE) set(CMAKE_MODULE_PATH "<%text>${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules") @@ -292,11 +300,6 @@ include(cmake/upb.cmake) include(cmake/zlib.cmake) - if(NOT MSVC) - set(CMAKE_C_FLAGS "<%text>${CMAKE_C_FLAGS} -std=c99") - set(CMAKE_CXX_FLAGS "<%text>${CMAKE_CXX_FLAGS} -std=c++11") - endif() - if(_gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_IOS) set(_gRPC_ALLTARGETS_LIBRARIES <%text>${CMAKE_DL_LIBS} m pthread) elseif(_gRPC_PLATFORM_ANDROID) diff --git a/test/cpp/end2end/xds_end2end_test.cc b/test/cpp/end2end/xds_end2end_test.cc index 1fbdbad953e..e8430e3b01c 100644 --- a/test/cpp/end2end/xds_end2end_test.cc +++ b/test/cpp/end2end/xds_end2end_test.cc @@ -400,7 +400,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, std::pair>; // A struct representing a client's subscription to a particular resource. - struct SubscriberState { + struct SubscriptionState { // Version that the client currently knows about. int current_version = 0; // The queue upon which to place updates when the resource is updated. @@ -408,23 +408,25 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, }; // A struct representing the a client's subscription to all the resources. + using SubscriptionNameMap = + std::map; using SubscriptionMap = - std::map>; + std::map; // A struct representing the current state for a resource: // - the version of the resource that is set by the SetResource() methods. - // - a list of subscribers interested in this resource. + // - a list of subscriptions interested in this resource. struct ResourceState { int version = 0; absl::optional resource; - std::set subscribers; + std::set subscriptions; }; // A struct representing the current state for all resources: // LDS, CDS, EDS, and RDS for the class as a whole. - using ResourcesMap = - std::map>; + using ResourceNameMap = + std::map; + using ResourceMap = std::map; AdsServiceImpl(bool enable_load_reporting) { // Construct RDS response data. @@ -475,101 +477,61 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, } // Checks whether the client needs to receive a newer version of - // the resource. - bool ClientNeedsResourceUpdate(const string& resource_type, - const string& name, - SubscriptionMap* subscription_map) { - auto subscriber_it = (*subscription_map)[resource_type].find(name); - if (subscriber_it == (*subscription_map)[resource_type].end()) { - gpr_log(GPR_INFO, - "ADS[%p]: Skipping an unsubscribed update for resource %s and " - "name %s", - this, resource_type.c_str(), name.c_str()); - return false; - } - const auto& resource_state = resources_map_[resource_type][name]; - if (subscriber_it->second.current_version < resource_state.version) { - subscriber_it->second.current_version = resource_state.version; - gpr_log(GPR_INFO, - "ADS[%p]: Need to process new %s update %s, bring current to %d", - this, resource_type.c_str(), name.c_str(), - subscriber_it->second.current_version); + // the resource. If so, updates subscription_state->current_version and + // returns true. + bool ClientNeedsResourceUpdate(const ResourceState& resource_state, + SubscriptionState* subscription_state) { + if (subscription_state->current_version < resource_state.version) { + subscription_state->current_version = resource_state.version; return true; - } else { - gpr_log(GPR_INFO, - "ADS[%p]: Skipping an old %s update %s, current is at %d", this, - resource_type.c_str(), name.c_str(), - subscriber_it->second.current_version); - return false; } return false; } - // Resource subscription: - // 1. inserting an entry into the subscription map indexed by resource - // type/name pair. - // 2. inserting or updating an entry into the resources map indexed - // by resource type/name pair about this subscription. - void ResourceSubscribe(const std::string& resource_type, - const std::string& name, UpdateQueue* update_queue, - SubscriptionMap* subscription_map) { - SubscriberState& subscriber_state = - (*subscription_map)[resource_type][name]; - subscriber_state.update_queue = update_queue; - ResourceState& resource_state = resources_map_[resource_type][name]; - resource_state.subscribers.emplace(&subscriber_state); - gpr_log( - GPR_INFO, - "ADS[%p]: subscribe to resource type %s name %s version %d state %p", - this, resource_type.c_str(), name.c_str(), resource_state.version, - &subscriber_state); - } - - // Resource unsubscription: - // 1. update the entry in the resources map indexed - // by resource type/name pair to remove this subscription - // 2. remove this entry from the subscription map. - // 3. remove this resource type from the subscription map if there are no more - // resources subscribed for the resource type. - void ResourceUnsubscribe(const std::string& resource_type, - const std::string& name, - SubscriptionMap* subscription_map) { - auto subscription_by_type_it = subscription_map->find(resource_type); - if (subscription_by_type_it == subscription_map->end()) { - gpr_log(GPR_INFO, "ADS[%p]: resource type %s not subscribed", this, - resource_type.c_str()); - return; - } - auto& subscription_by_type_map = subscription_by_type_it->second; - auto subscription_it = subscription_by_type_map.find(name); - if (subscription_it == subscription_by_type_map.end()) { - gpr_log(GPR_INFO, "ADS[%p]: resource name %s of type %s not subscribed", - this, name.c_str(), resource_type.c_str()); - return; - } - gpr_log(GPR_INFO, - "ADS[%p]: Unsubscribe to resource type %s name %s state %p", this, - resource_type.c_str(), name.c_str(), &subscription_it->second); - auto resource_by_type_it = resources_map_.find(resource_type); - GPR_ASSERT(resource_by_type_it != resources_map_.end()); - auto& resource_by_type_map = resource_by_type_it->second; - auto resource_it = resource_by_type_map.find(name); - GPR_ASSERT(resource_it != resource_by_type_map.end()); - resource_it->second.subscribers.erase(&subscription_it->second); - if (resource_it->second.subscribers.empty() && - !resource_it->second.resource.has_value()) { - gpr_log(GPR_INFO, - "ADS[%p]: Erasing resource type %s name %s from resource map " - "since there are no more subscribers for this unset resource", - this, resource_type.c_str(), name.c_str()); - resource_by_type_map.erase(resource_it); - } - subscription_by_type_map.erase(subscription_it); - if (subscription_by_type_map.empty()) { - gpr_log(GPR_INFO, - "ADS[%p]: Erasing resource type %s from subscription_map", this, - resource_type.c_str()); - subscription_map->erase(subscription_by_type_it); + // Subscribes to a resource if not already subscribed: + // 1. Sets the update_queue field in subscription_state. + // 2. Adds subscription_state to resource_state->subscriptions. + void MaybeSubscribe(const std::string& resource_type, + const std::string& resource_name, + SubscriptionState* subscription_state, + ResourceState* resource_state, + UpdateQueue* update_queue) { + if (subscription_state->update_queue != nullptr) return; + subscription_state->update_queue = update_queue; + resource_state->subscriptions.emplace(subscription_state); + gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + } + + // Removes subscriptions for resources no longer present in the + // current request. + void ProcessUnsubscriptions( + const std::string& resource_type, + const std::set& resources_in_current_request, + SubscriptionNameMap* subscription_name_map, + ResourceNameMap* resource_name_map) { + for (auto it = subscription_name_map->begin(); + it != subscription_name_map->end();) { + const std::string& resource_name = it->first; + SubscriptionState& subscription_state = it->second; + if (resources_in_current_request.find(resource_name) != + resources_in_current_request.end()) { + ++it; + continue; + } + gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", + this, resource_type.c_str(), resource_name.c_str(), + &subscription_state); + auto resource_it = resource_name_map->find(resource_name); + GPR_ASSERT(resource_it != resource_name_map->end()); + auto& resource_state = resource_it->second; + resource_state.subscriptions.erase(&subscription_state); + if (resource_state.subscriptions.empty() && + !resource_state.resource.has_value()) { + resource_name_map->erase(resource_it); + } + it = subscription_name_map->erase(it); } } @@ -577,7 +539,7 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // for all resources and by adding all subscribed resources for LDS and CDS. void CompleteBuildingDiscoveryResponse( const std::string& resource_type, const int version, - const SubscriptionMap& subscription_map, + const SubscriptionNameMap& subscription_name_map, const std::set& resources_added_to_response, DiscoveryResponse* response) { resource_type_response_state_[resource_type] = SENT; @@ -587,18 +549,15 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) { // For LDS and CDS we must send back all subscribed resources // (even the unchanged ones) - auto subscription_map_by_type_it = subscription_map.find(resource_type); - GPR_ASSERT(subscription_map_by_type_it != subscription_map.end()); - for (const auto& subscription : subscription_map_by_type_it->second) { - if (resources_added_to_response.find(subscription.first) == + for (const auto& p : subscription_name_map) { + const std::string& resource_name = p.first; + if (resources_added_to_response.find(resource_name) == resources_added_to_response.end()) { - absl::optional& resource = - resources_map_[resource_type][subscription.first].resource; - if (resource.has_value()) { - response->add_resources()->CopyFrom(resource.value()); - } else { - gpr_log(GPR_INFO, "ADS[%p]: Unknown resource type %s and name %s", - this, resource_type.c_str(), subscription.first.c_str()); + const ResourceState& resource_state = + resource_map_[resource_type][resource_name]; + if (resource_state.resource.has_value()) { + response->add_resources()->CopyFrom( + resource_state.resource.value()); } } } @@ -622,7 +581,6 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Resources that the client will be subscribed to keyed by resource type // url. SubscriptionMap subscription_map; - std::map subscriber_map; // Current Version map keyed by resource type url. std::map resource_type_version; // Creating blocking thread to read from stream. @@ -647,7 +605,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, DiscoveryRequest request = std::move(requests.front()); requests.pop_front(); did_work = true; - gpr_log(GPR_INFO, "ADS[%p]: Handling request %s with content %s", + gpr_log(GPR_INFO, + "ADS[%p]: Received request for type %s with content %s", this, request.type_url().c_str(), request.DebugString().c_str()); // Identify ACK and NACK by looking for version information and @@ -667,58 +626,51 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // 3. unsubscribe if necessary if (resource_types_to_ignore_.find(request.type_url()) == resource_types_to_ignore_.end()) { + auto& subscription_name_map = + subscription_map[request.type_url()]; + auto& resource_name_map = resource_map_[request.type_url()]; std::set resources_in_current_request; std::set resources_added_to_response; for (const std::string& resource_name : request.resource_names()) { resources_in_current_request.emplace(resource_name); - auto subscriber_it = - subscription_map[request.type_url()].find(resource_name); - if (subscriber_it == - subscription_map[request.type_url()].end()) { - ResourceSubscribe(request.type_url(), resource_name, - &update_queue, &subscription_map); - } - if (ClientNeedsResourceUpdate(request.type_url(), resource_name, - &subscription_map)) { + auto& subscription_state = subscription_name_map[resource_name]; + auto& resource_state = resource_name_map[resource_name]; + MaybeSubscribe(request.type_url(), resource_name, + &subscription_state, &resource_state, + &update_queue); + if (ClientNeedsResourceUpdate(resource_state, + &subscription_state)) { + gpr_log( + GPR_INFO, + "ADS[%p]: Sending update for type=%s name=%s version=%d", + this, request.type_url().c_str(), resource_name.c_str(), + resource_state.version); resources_added_to_response.emplace(resource_name); - gpr_log(GPR_INFO, - "ADS[%p]: Handling resource type %s and name %s", - this, request.type_url().c_str(), - resource_name.c_str()); - auto resource = - resources_map_[request.type_url()][resource_name]; - GPR_ASSERT(resource.resource.has_value()); - response.add_resources()->CopyFrom(resource.resource.value()); - } - } - // Remove subscriptions no longer requested: build a list of - // unsubscriber names first while iterating the subscription_map - // and then erase from the subscription_map in - // ResourceUnsubscribe. - std::set unsubscriber_list; - for (const auto& subscription : - subscription_map[request.type_url()]) { - if (resources_in_current_request.find(subscription.first) == - resources_in_current_request.end()) { - unsubscriber_list.emplace(subscription.first); + if (resource_state.resource.has_value()) { + response.add_resources()->CopyFrom( + resource_state.resource.value()); + } } } - for (const auto& name : unsubscriber_list) { - ResourceUnsubscribe(request.type_url(), name, - &subscription_map); - } - if (!response.resources().empty()) { + // Process unsubscriptions for any resource no longer + // present in the request's resource list. + ProcessUnsubscriptions( + request.type_url(), resources_in_current_request, + &subscription_name_map, &resource_name_map); + // Send response if needed. + if (!resources_added_to_response.empty()) { CompleteBuildingDiscoveryResponse( request.type_url(), ++resource_type_version[request.type_url()], - subscription_map, resources_added_to_response, &response); + subscription_name_map, resources_added_to_response, + &response); } } } } if (!response.resources().empty()) { - gpr_log(GPR_INFO, "ADS[%p]: sending request response '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this, response.DebugString().c_str()); stream->Write(response); } @@ -727,32 +679,40 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, { grpc_core::MutexLock lock(&ads_mu_); if (!update_queue.empty()) { - std::pair update = - std::move(update_queue.front()); + const std::string resource_type = + std::move(update_queue.front().first); + const std::string resource_name = + std::move(update_queue.front().second); update_queue.pop_front(); did_work = true; - gpr_log(GPR_INFO, "ADS[%p]: Handling update type %s name %s", this, - update.first.c_str(), update.second.c_str()); - auto subscriber_it = - subscription_map[update.first].find(update.second); - if (subscriber_it != subscription_map[update.first].end()) { - if (ClientNeedsResourceUpdate(update.first, update.second, - &subscription_map)) { - gpr_log(GPR_INFO, - "ADS[%p]: Updating resource type %s and name %s", this, - update.first.c_str(), update.second.c_str()); - auto resource = resources_map_[update.first][update.second]; - GPR_ASSERT(resource.resource.has_value()); - response.add_resources()->CopyFrom(resource.resource.value()); - CompleteBuildingDiscoveryResponse( - update.first, ++resource_type_version[update.first], - subscription_map, {update.second}, &response); + gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", + this, resource_type.c_str(), resource_name.c_str()); + auto& subscription_name_map = subscription_map[resource_type]; + auto& resource_name_map = resource_map_[resource_type]; + auto it = subscription_name_map.find(resource_name); + if (it != subscription_name_map.end()) { + SubscriptionState& subscription_state = it->second; + ResourceState& resource_state = resource_name_map[resource_name]; + if (ClientNeedsResourceUpdate(resource_state, + &subscription_state)) { + gpr_log( + GPR_INFO, + "ADS[%p]: Sending update for type=%s name=%s version=%d", + this, resource_type.c_str(), resource_name.c_str(), + resource_state.version); + if (resource_state.resource.has_value()) { + response.add_resources()->CopyFrom( + resource_state.resource.value()); + CompleteBuildingDiscoveryResponse( + resource_type, ++resource_type_version[resource_type], + subscription_name_map, {resource_name}, &response); + } } } } } if (!response.resources().empty()) { - gpr_log(GPR_INFO, "ADS[%p]: sending update response '%s'", this, + gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this, response.DebugString().c_str()); stream->Write(response); } @@ -808,13 +768,13 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, void SetResource(google::protobuf::Any resource, const std::string& type_url, const std::string& name) { grpc_core::MutexLock lock(&ads_mu_); - ResourceState& state = resources_map_[type_url][name]; + ResourceState& state = resource_map_[type_url][name]; ++state.version; state.resource = std::move(resource); gpr_log(GPR_INFO, "ADS[%p]: Updating %s resource %s to version %u", this, type_url.c_str(), name.c_str(), state.version); - for (SubscriberState* subscriber : state.subscribers) { - subscriber->update_queue->emplace_back(type_url, name); + for (SubscriptionState* subscription : state.subscriptions) { + subscription->update_queue->emplace_back(type_url, name); } } @@ -873,15 +833,17 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, { grpc_core::MutexLock lock(&ads_mu_); NotifyDoneWithAdsCallLocked(); - resources_map_.clear(); + resource_map_.clear(); resource_type_response_state_.clear(); } gpr_log(GPR_INFO, "ADS[%p]: shut down", this); } - static ClusterLoadAssignment BuildEdsResource(const EdsResourceArgs& args) { + static ClusterLoadAssignment BuildEdsResource( + const EdsResourceArgs& args, + const char* cluster_name = kDefaultResourceName) { ClusterLoadAssignment assignment; - assignment.set_cluster_name(kDefaultResourceName); + assignment.set_cluster_name(cluster_name); for (const auto& locality : args.locality_list) { auto* endpoints = assignment.add_endpoints(); endpoints->mutable_load_balancing_weight()->set_value(locality.lb_weight); @@ -946,8 +908,8 @@ class AdsServiceImpl : public AggregatedDiscoveryService::Service, // Note that an entry will exist whenever either of the following is true: // - The resource exists (i.e., has been created by SetResource() and has not // yet been destroyed by UnsetResource()). - // - There is at least one subscriber for the resource. - ResourcesMap resources_map_; + // - There is at least one subscription for the resource. + ResourceMap resource_map_; }; class LrsServiceImpl : public LrsService, diff --git a/test/cpp/interop/xds_interop_client.cc b/test/cpp/interop/xds_interop_client.cc index 30811267a90..6cb32b2b0e8 100644 --- a/test/cpp/interop/xds_interop_client.cc +++ b/test/cpp/interop/xds_interop_client.cc @@ -124,8 +124,6 @@ class TestClient { void AsyncUnaryCall() { SimpleResponse response; - ClientContext context; - int saved_request_id; { std::lock_guard lk(mu); @@ -134,9 +132,8 @@ class TestClient { std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(FLAGS_rpc_timeout_sec); - context.set_deadline(deadline); - AsyncClientCall* call = new AsyncClientCall; + call->context.set_deadline(deadline); call->saved_request_id = saved_request_id; call->response_reader = stub_->PrepareAsyncUnaryCall( &call->context, SimpleRequest::default_instance(), &cq_); diff --git a/tools/buildgen/extract_metadata_from_bazel_xml.py b/tools/buildgen/extract_metadata_from_bazel_xml.py index 9676b143e24..4b647ef56c0 100755 --- a/tools/buildgen/extract_metadata_from_bazel_xml.py +++ b/tools/buildgen/extract_metadata_from_bazel_xml.py @@ -753,6 +753,20 @@ _BUILD_EXTRA_METADATA = { '_TYPE': 'target', '_RENAME': 'interop_server' }, + 'test/cpp/interop:xds_interop_client': { + 'language': 'c++', + 'build': 'test', + 'run': False, + '_TYPE': 'target', + '_RENAME': 'xds_interop_client' + }, + 'test/cpp/interop:xds_interop_server': { + 'language': 'c++', + 'build': 'test', + 'run': False, + '_TYPE': 'target', + '_RENAME': 'xds_interop_server' + }, 'test/cpp/interop:http2_client': { 'language': 'c++', 'build': 'test', diff --git a/tools/internal_ci/linux/grpc_xds.cfg b/tools/internal_ci/linux/grpc_xds.cfg index d547e7c0e08..888be05cde5 100644 --- a/tools/internal_ci/linux/grpc_xds.cfg +++ b/tools/internal_ci/linux/grpc_xds.cfg @@ -16,7 +16,7 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/linux/grpc_bazel.sh" -timeout_mins: 60 +timeout_mins: 90 env_vars { key: "BAZEL_SCRIPT" value: "tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh" diff --git a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh index 188aff018ab..9129e259eef 100755 --- a/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh +++ b/tools/internal_ci/linux/grpc_xds_bazel_test_in_docker.sh @@ -52,4 +52,4 @@ bazel build test/cpp/interop:xds_interop_client --project_id=grpc-testing \ --gcp_suffix=$(date '+%s') \ --verbose \ - --client_cmd='bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{service_host}:{service_port} --stats_port={stats_port} --qps={qps}' + --client_cmd='GRPC_VERBOSITY=debug GRPC_TRACE=xds,xds_client bazel-bin/test/cpp/interop/xds_interop_client --server=xds-experimental:///{server_uri} --stats_port={stats_port} --qps={qps}' diff --git a/tools/run_tests/run_xds_tests.py b/tools/run_tests/run_xds_tests.py index 1b5a771474d..c713a35a347 100755 --- a/tools/run_tests/run_xds_tests.py +++ b/tools/run_tests/run_xds_tests.py @@ -34,6 +34,8 @@ from src.proto.grpc.testing import test_pb2_grpc logger = logging.getLogger() console_handler = logging.StreamHandler() +formatter = logging.Formatter(fmt='%(asctime)s: %(levelname)-8s %(message)s') +console_handler.setFormatter(formatter) logger.addHandler(console_handler) @@ -51,24 +53,38 @@ argp.add_argument('--project_id', help='GCP project id') argp.add_argument( '--gcp_suffix', default='', - help='Optional suffix for all generated GCP resource names. Useful to ensure ' - 'distinct names across test runs.') -argp.add_argument('--test_case', - default=None, - choices=['all', 'ping_pong', 'round_robin']) + help='Optional suffix for all generated GCP resource names. Useful to ' + 'ensure distinct names across test runs.') +argp.add_argument( + '--test_case', + default='ping_pong', + choices=[ + 'all', + 'backends_restart', + 'change_backend_service', + 'new_instance_group_receives_traffic', + 'ping_pong', + 'remove_instance_group', + 'round_robin', + 'secondary_locality_gets_no_requests_on_partial_primary_failure', + 'secondary_locality_gets_requests_on_primary_failure', + ]) argp.add_argument( '--client_cmd', default=None, help='Command to launch xDS test client. This script will fill in ' - '{service_host}, {service_port},{stats_port} and {qps} parameters using ' - 'str.format(), and generate the GRPC_XDS_BOOTSTRAP file.') + '{server_uri}, {stats_port} and {qps} parameters using str.format(), and ' + 'generate the GRPC_XDS_BOOTSTRAP file.') argp.add_argument('--zone', default='us-central1-a') +argp.add_argument('--secondary_zone', + default='us-west1-b', + help='Zone to use for secondary TD locality tests') argp.add_argument('--qps', default=10, help='Client QPS') argp.add_argument( '--wait_for_backend_sec', - default=900, - help='Time limit for waiting for created backend services to report healthy ' - 'when launching test suite') + default=600, + help='Time limit for waiting for created backend services to report ' + 'healthy when launching or updated GCP resources') argp.add_argument( '--keep_gcp_resources', default=False, @@ -81,18 +97,22 @@ argp.add_argument( default=None, type=str, help= - 'If provided, uses this file instead of retrieving via the GCP discovery API' -) + 'If provided, uses this file instead of retrieving via the GCP discovery ' + 'API') argp.add_argument('--network', default='global/networks/default', help='GCP network to use') argp.add_argument('--service_port_range', - default='8080:8180', + default='80', type=parse_port_range, help='Listening port for created gRPC backends. Specified as ' 'either a single int or as a range in the format min:max, in ' 'which case an available port p will be chosen s.t. min <= p ' '<= max') +argp.add_argument('--forwarding_rule_ip_prefix', + default='172.16.0.', + help='If set, an available IP with this prefix followed by ' + '0-255 will be used for the generated forwarding rule.') argp.add_argument( '--stats_port', default=8079, @@ -115,35 +135,19 @@ argp.add_argument( argp.add_argument('--verbose', help='verbose log output', default=False, - action="store_true") + action='store_true') args = argp.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) -PROJECT_ID = args.project_id -ZONE = args.zone -QPS = args.qps -TEST_CASE = args.test_case -CLIENT_CMD = args.client_cmd -WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec -TEMPLATE_NAME = 'test-template' + args.gcp_suffix -INSTANCE_GROUP_NAME = 'test-ig' + args.gcp_suffix -HEALTH_CHECK_NAME = 'test-hc' + args.gcp_suffix -FIREWALL_RULE_NAME = 'test-fw-rule' + args.gcp_suffix -BACKEND_SERVICE_NAME = 'test-backend-service' + args.gcp_suffix -URL_MAP_NAME = 'test-map' + args.gcp_suffix -SERVICE_HOST = 'grpc-test' + args.gcp_suffix -TARGET_PROXY_NAME = 'test-target-proxy' + args.gcp_suffix -FORWARDING_RULE_NAME = 'test-forwarding-rule' + args.gcp_suffix -KEEP_GCP_RESOURCES = args.keep_gcp_resources -TOLERATE_GCP_ERRORS = args.tolerate_gcp_errors -STATS_PORT = args.stats_port -INSTANCE_GROUP_SIZE = 2 -WAIT_FOR_OPERATION_SEC = 60 -NUM_TEST_RPCS = 10 * QPS -WAIT_FOR_STATS_SEC = 30 -BOOTSTRAP_TEMPLATE = """ +_DEFAULT_SERVICE_PORT = 80 +_WAIT_FOR_BACKEND_SEC = args.wait_for_backend_sec +_WAIT_FOR_OPERATION_SEC = 60 +_INSTANCE_GROUP_SIZE = 2 +_NUM_TEST_RPCS = 10 * args.qps +_WAIT_FOR_STATS_SEC = 60 +_BOOTSTRAP_TEMPLATE = """ {{ "node": {{ "id": "{node_id}" @@ -158,10 +162,20 @@ BOOTSTRAP_TEMPLATE = """ ] }}] }}""" % args.xds_server +_PATH_MATCHER_NAME = 'path-matcher' +_BASE_TEMPLATE_NAME = 'test-template' +_BASE_INSTANCE_GROUP_NAME = 'test-ig' +_BASE_HEALTH_CHECK_NAME = 'test-hc' +_BASE_FIREWALL_RULE_NAME = 'test-fw-rule' +_BASE_BACKEND_SERVICE_NAME = 'test-backend-service' +_BASE_URL_MAP_NAME = 'test-map' +_BASE_SERVICE_HOST = 'grpc-test' +_BASE_TARGET_PROXY_NAME = 'test-target-proxy' +_BASE_FORWARDING_RULE_NAME = 'test-forwarding-rule' def get_client_stats(num_rpcs, timeout_sec): - with grpc.insecure_channel('localhost:%d' % STATS_PORT) as channel: + with grpc.insecure_channel('localhost:%d' % args.stats_port) as channel: stub = test_pb2_grpc.LoadBalancerStatsServiceStub(channel) request = messages_pb2.LoadBalancerStatsRequest() request.num_rpcs = num_rpcs @@ -177,12 +191,15 @@ def get_client_stats(num_rpcs, timeout_sec): raise Exception('GetClientStats RPC failed') -def wait_until_only_given_backends_receive_load(backends, timeout_sec): +def _verify_rpcs_to_given_backends(backends, timeout_sec, num_rpcs, + allow_failures): start_time = time.time() error_msg = None + logger.debug('Waiting for %d sec until backends %s receive load' % + (timeout_sec, backends)) while time.time() - start_time <= timeout_sec: error_msg = None - stats = get_client_stats(max(len(backends), 1), timeout_sec) + stats = get_client_stats(num_rpcs, timeout_sec) rpcs_by_peer = stats.rpcs_by_peer for backend in backends: if backend not in rpcs_by_peer: @@ -190,52 +207,230 @@ def wait_until_only_given_backends_receive_load(backends, timeout_sec): break if not error_msg and len(rpcs_by_peer) > len(backends): error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer + if not allow_failures and stats.num_failures > 0: + error_msg = '%d RPCs failed' % stats.num_failures if not error_msg: return raise Exception(error_msg) -def test_ping_pong(backends, num_rpcs, stats_timeout_sec): - start_time = time.time() - error_msg = None - while time.time() - start_time <= stats_timeout_sec: - error_msg = None - stats = get_client_stats(num_rpcs, stats_timeout_sec) - rpcs_by_peer = stats.rpcs_by_peer - for backend in backends: - if backend not in rpcs_by_peer: - error_msg = 'Backend %s did not receive load' % backend - break - if not error_msg and len(rpcs_by_peer) > len(backends): - error_msg = 'Unexpected backend received load: %s' % rpcs_by_peer - if not error_msg: - return - raise Exception(error_msg) +def wait_until_all_rpcs_go_to_given_backends_or_fail(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=True) + + +def wait_until_all_rpcs_go_to_given_backends(backends, + timeout_sec, + num_rpcs=100): + _verify_rpcs_to_given_backends(backends, + timeout_sec, + num_rpcs, + allow_failures=False) -def test_round_robin(backends, num_rpcs, stats_timeout_sec): +def test_backends_restart(gcp, backend_service, instance_group): + logger.info('Running test_backends_restart') + instance_names = get_instance_names(gcp, instance_group) + num_instances = len(instance_names) + start_time = time.time() + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + try: + resize_instance_group(gcp, instance_group, 0) + wait_until_all_rpcs_go_to_given_backends_or_fail([], + _WAIT_FOR_BACKEND_SEC) + finally: + resize_instance_group(gcp, instance_group, num_instances) + wait_for_healthy_backends(gcp, backend_service, instance_group) + new_instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + new_stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + original_distribution = list(stats.rpcs_by_peer.values()) + original_distribution.sort() + new_distribution = list(new_stats.rpcs_by_peer.values()) + new_distribution.sort() + threshold = 3 + for i in range(len(original_distribution)): + if abs(original_distribution[i] - new_distribution[i]) > threshold: + raise Exception('Distributions do not match: ', stats, new_stats) + + +def test_change_backend_service(gcp, original_backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group): + logger.info('Running test_change_backend_service') + original_backend_instances = get_instance_names(gcp, instance_group) + alternate_backend_instances = get_instance_names(gcp, + same_zone_instance_group) + patch_backend_instances(gcp, alternate_backend_service, + [same_zone_instance_group]) + wait_for_healthy_backends(gcp, original_backend_service, instance_group) + wait_for_healthy_backends(gcp, alternate_backend_service, + same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(original_backend_instances, + _WAIT_FOR_STATS_SEC) + try: + patch_url_map_backend_service(gcp, alternate_backend_service) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) + if stats.num_failures > 0: + raise Exception('Unexpected failure: %s', stats) + wait_until_all_rpcs_go_to_given_backends(alternate_backend_instances, + _WAIT_FOR_STATS_SEC) + finally: + patch_url_map_backend_service(gcp, original_backend_service) + patch_backend_instances(gcp, alternate_backend_service, []) + + +def test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group): + logger.info('Running test_new_instance_group_receives_traffic') + instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + combined_instance_names = instance_names + get_instance_names( + gcp, same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(combined_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + + +def test_ping_pong(gcp, backend_service, instance_group): + logger.info('Running test_ping_pong') + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + + +def test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group): + logger.info('Running test_remove_instance_group') + try: + patch_backend_instances(gcp, + backend_service, + [instance_group, same_zone_instance_group], + balancing_mode='RATE') + wait_for_healthy_backends(gcp, backend_service, instance_group) + wait_for_healthy_backends(gcp, backend_service, + same_zone_instance_group) + instance_names = get_instance_names(gcp, instance_group) + same_zone_instance_names = get_instance_names(gcp, + same_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends( + instance_names + same_zone_instance_names, _WAIT_FOR_BACKEND_SEC) + patch_backend_instances(gcp, + backend_service, [same_zone_instance_group], + balancing_mode='RATE') + wait_until_all_rpcs_go_to_given_backends(same_zone_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [instance_group]) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_BACKEND_SEC) + + +def test_round_robin(gcp, backend_service, instance_group): + logger.info('Running test_round_robin') + wait_for_healthy_backends(gcp, backend_service, instance_group) + instance_names = get_instance_names(gcp, instance_group) threshold = 1 - wait_until_only_given_backends_receive_load(backends, stats_timeout_sec) - stats = get_client_stats(num_rpcs, stats_timeout_sec) + wait_until_all_rpcs_go_to_given_backends(instance_names, + _WAIT_FOR_STATS_SEC) + stats = get_client_stats(_NUM_TEST_RPCS, _WAIT_FOR_STATS_SEC) requests_received = [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer] - total_requests_received = sum( - [stats.rpcs_by_peer[x] for x in stats.rpcs_by_peer]) - if total_requests_received != num_rpcs: + total_requests_received = sum(requests_received) + if total_requests_received != _NUM_TEST_RPCS: raise Exception('Unexpected RPC failures', stats) - expected_requests = total_requests_received / len(backends) - for backend in backends: - if abs(stats.rpcs_by_peer[backend] - expected_requests) > threshold: + expected_requests = total_requests_received / len(instance_names) + for instance in instance_names: + if abs(stats.rpcs_by_peer[instance] - expected_requests) > threshold: raise Exception( - 'RPC peer distribution differs from expected by more than %d for backend %s (%s)', - threshold, backend, stats) + 'RPC peer distribution differs from expected by more than %d ' + 'for instance %s (%s)', threshold, instance, stats) -def create_instance_template(compute, project, name, grpc_port): +def test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + logger.info( + 'Running test_secondary_locality_gets_no_requests_on_partial_primary_failure' + ) + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_STATS_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, original_size - 1) + remaining_instance_names = get_instance_names(gcp, + primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(remaining_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + resize_instance_group(gcp, primary_instance_group, original_size) + + +def test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, primary_instance_group, + secondary_zone_instance_group): + logger.info( + 'Running test_secondary_locality_gets_requests_on_primary_failure') + try: + patch_backend_instances( + gcp, backend_service, + [primary_instance_group, secondary_zone_instance_group]) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, + secondary_zone_instance_group) + primary_instance_names = get_instance_names(gcp, instance_group) + secondary_instance_names = get_instance_names( + gcp, secondary_zone_instance_group) + wait_until_all_rpcs_go_to_given_backends(primary_instance_names, + _WAIT_FOR_BACKEND_SEC) + original_size = len(primary_instance_names) + resize_instance_group(gcp, primary_instance_group, 0) + wait_until_all_rpcs_go_to_given_backends(secondary_instance_names, + _WAIT_FOR_BACKEND_SEC) + + resize_instance_group(gcp, primary_instance_group, original_size) + new_instance_names = get_instance_names(gcp, primary_instance_group) + wait_for_healthy_backends(gcp, backend_service, primary_instance_group) + wait_until_all_rpcs_go_to_given_backends(new_instance_names, + _WAIT_FOR_BACKEND_SEC) + finally: + patch_backend_instances(gcp, backend_service, [primary_instance_group]) + + +def create_instance_template(gcp, name, network, source_image): config = { 'name': name, 'properties': { 'tags': { - 'items': ['grpc-allow-healthcheck'] + 'items': ['allow-health-checks'] }, 'machineType': 'e2-standard-2', 'serviceAccounts': [{ @@ -246,12 +441,12 @@ def create_instance_template(compute, project, name, grpc_port): 'accessConfigs': [{ 'type': 'ONE_TO_ONE_NAT' }], - 'network': args.network + 'network': network }], 'disks': [{ 'boot': True, 'initializeParams': { - 'sourceImage': args.source_image + 'sourceImage': source_image } }], 'metadata': { @@ -260,7 +455,6 @@ def create_instance_template(compute, project, name, grpc_port): 'startup-script', 'value': """#!/bin/bash - sudo apt update sudo apt install -y git default-jdk mkdir java_server @@ -271,40 +465,45 @@ pushd interop-testing ../gradlew installDist -x test -PskipCodegen=true -PskipAndroid=true nohup build/install/grpc-interop-testing/bin/xds-test-server --port=%d 1>/dev/null &""" - % grpc_port + % gcp.service_port }] } } } - result = compute.instanceTemplates().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.instanceTemplates().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.instance_template = GcpResource(config['name'], result['targetLink']) -def create_instance_group(compute, project, zone, name, size, grpc_port, - template_url): +def add_instance_group(gcp, zone, name, size): config = { 'name': name, - 'instanceTemplate': template_url, + 'instanceTemplate': gcp.instance_template.url, 'targetSize': size, 'namedPorts': [{ 'name': 'grpc', - 'port': grpc_port + 'port': gcp.service_port }] } - result = compute.instanceGroupManagers().insert(project=project, - zone=zone, - body=config).execute() - wait_for_zone_operation(compute, project, zone, result['name']) - result = compute.instanceGroupManagers().get( - project=PROJECT_ID, zone=ZONE, instanceGroupManager=name).execute() - return result['instanceGroup'] - - -def create_health_check(compute, project, name): + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.instanceGroupManagers().insert(project=gcp.project, + zone=zone, + body=config).execute() + wait_for_zone_operation(gcp, zone, result['name']) + result = gcp.compute.instanceGroupManagers().get( + project=gcp.project, zone=zone, + instanceGroupManager=config['name']).execute() + instance_group = InstanceGroup(config['name'], result['instanceGroup'], + zone) + gcp.instance_groups.append(instance_group) + return instance_group + + +def create_health_check(gcp, name): config = { 'name': name, 'type': 'TCP', @@ -312,13 +511,14 @@ def create_health_check(compute, project, name): 'portName': 'grpc' } } - result = compute.healthChecks().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.healthChecks().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check = GcpResource(config['name'], result['targetLink']) -def create_health_check_firewall_rule(compute, project, name): +def create_health_check_firewall_rule(gcp, name): config = { 'name': name, 'direction': 'INGRESS', @@ -326,169 +526,227 @@ def create_health_check_firewall_rule(compute, project, name): 'IPProtocol': 'tcp' }], 'sourceRanges': ['35.191.0.0/16', '130.211.0.0/22'], - 'targetTags': ['grpc-allow-healthcheck'], + 'targetTags': ['allow-health-checks'], } - result = compute.firewalls().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.firewalls().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.health_check_firewall_rule = GcpResource(config['name'], + result['targetLink']) -def create_backend_service(compute, project, name, health_check): +def add_backend_service(gcp, name): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'healthChecks': [health_check], + 'healthChecks': [gcp.health_check.url], 'portName': 'grpc', 'protocol': 'HTTP2' } - result = compute.backendServices().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.backendServices().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + backend_service = GcpResource(config['name'], result['targetLink']) + gcp.backend_services.append(backend_service) + return backend_service -def create_url_map(compute, project, name, backend_service_url, host_name): - path_matcher_name = 'path-matcher' +def create_url_map(gcp, name, backend_service, host_name): config = { 'name': name, - 'defaultService': backend_service_url, + 'defaultService': backend_service.url, 'pathMatchers': [{ - 'name': path_matcher_name, - 'defaultService': backend_service_url, + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, }], 'hostRules': [{ 'hosts': [host_name], - 'pathMatcher': path_matcher_name + 'pathMatcher': _PATH_MATCHER_NAME }] } - result = compute.urlMaps().insert(project=project, body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.urlMaps().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.url_map = GcpResource(config['name'], result['targetLink']) -def create_target_http_proxy(compute, project, name, url_map_url): +def create_target_http_proxy(gcp, name): config = { 'name': name, - 'url_map': url_map_url, + 'url_map': gcp.url_map.url, } - result = compute.targetHttpProxies().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) - return result['targetLink'] + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.targetHttpProxies().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.target_http_proxy = GcpResource(config['name'], result['targetLink']) -def create_global_forwarding_rule(compute, project, name, grpc_port, - target_http_proxy_url): +def create_global_forwarding_rule(gcp, name, ip, port): config = { 'name': name, 'loadBalancingScheme': 'INTERNAL_SELF_MANAGED', - 'portRange': str(grpc_port), - 'IPAddress': '0.0.0.0', + 'portRange': str(port), + 'IPAddress': ip, 'network': args.network, - 'target': target_http_proxy_url, + 'target': gcp.target_http_proxy.url, } - result = compute.globalForwardingRules().insert(project=project, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.globalForwardingRules().insert(project=gcp.project, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + gcp.global_forwarding_rule = GcpResource(config['name'], + result['targetLink']) -def delete_global_forwarding_rule(compute, project, forwarding_rule): +def delete_global_forwarding_rule(gcp): try: - result = compute.globalForwardingRules().delete( - project=project, forwardingRule=forwarding_rule).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.globalForwardingRules().delete( + project=gcp.project, + forwardingRule=gcp.global_forwarding_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_target_http_proxy(compute, project, target_http_proxy): +def delete_target_http_proxy(gcp): try: - result = compute.targetHttpProxies().delete( - project=project, targetHttpProxy=target_http_proxy).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.targetHttpProxies().delete( + project=gcp.project, + targetHttpProxy=gcp.target_http_proxy.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_url_map(compute, project, url_map): +def delete_url_map(gcp): try: - result = compute.urlMaps().delete(project=project, - urlMap=url_map).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.urlMaps().delete( + project=gcp.project, urlMap=gcp.url_map.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_backend_service(compute, project, backend_service): - try: - result = compute.backendServices().delete( - project=project, backendService=backend_service).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) +def delete_backend_services(gcp): + for backend_service in gcp.backend_services: + try: + result = gcp.compute.backendServices().delete( + project=gcp.project, + backendService=backend_service.name).execute() + wait_for_global_operation(gcp, result['name']) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) -def delete_firewall(compute, project, firewall_rule): +def delete_firewall(gcp): try: - result = compute.firewalls().delete(project=project, - firewall=firewall_rule).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.firewalls().delete( + project=gcp.project, + firewall=gcp.health_check_firewall_rule.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_health_check(compute, project, health_check): +def delete_health_check(gcp): try: - result = compute.healthChecks().delete( - project=project, healthCheck=health_check).execute() - wait_for_global_operation(compute, project, result['name']) + result = gcp.compute.healthChecks().delete( + project=gcp.project, healthCheck=gcp.health_check.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_group(compute, project, zone, instance_group): +def delete_instance_groups(gcp): + for instance_group in gcp.instance_groups: + try: + result = gcp.compute.instanceGroupManagers().delete( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name).execute() + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=_WAIT_FOR_BACKEND_SEC) + except googleapiclient.errors.HttpError as http_error: + logger.info('Delete failed: %s', http_error) + + +def delete_instance_template(gcp): try: - result = compute.instanceGroupManagers().delete( - project=project, zone=zone, - instanceGroupManager=instance_group).execute() - timeout_sec = 180 # Deleting an instance group can be slow - wait_for_zone_operation(compute, - project, - ZONE, - result['name'], - timeout_sec=timeout_sec) + result = gcp.compute.instanceTemplates().delete( + project=gcp.project, + instanceTemplate=gcp.instance_template.name).execute() + wait_for_global_operation(gcp, result['name']) except googleapiclient.errors.HttpError as http_error: logger.info('Delete failed: %s', http_error) -def delete_instance_template(compute, project, instance_template): - try: - result = compute.instanceTemplates().delete( - project=project, instanceTemplate=instance_template).execute() - wait_for_global_operation(compute, project, result['name']) - except googleapiclient.errors.HttpError as http_error: - logger.info('Delete failed: %s', http_error) +def patch_backend_instances(gcp, + backend_service, + instance_groups, + balancing_mode='UTILIZATION'): + config = { + 'backends': [{ + 'group': instance_group.url, + 'balancingMode': balancing_mode, + 'maxRate': 1 if balancing_mode == 'RATE' else None + } for instance_group in instance_groups], + } + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.backendServices().patch( + project=gcp.project, backendService=backend_service.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) + + +def resize_instance_group(gcp, instance_group, new_size, timeout_sec=120): + result = gcp.compute.instanceGroupManagers().resize( + project=gcp.project, + zone=instance_group.zone, + instanceGroupManager=instance_group.name, + size=new_size).execute() + wait_for_zone_operation(gcp, + instance_group.zone, + result['name'], + timeout_sec=360) + start_time = time.time() + while True: + current_size = len(get_instance_names(gcp, instance_group)) + if current_size == new_size: + break + if time.time() - start_time > timeout_sec: + raise Exception('Failed to resize primary instance group') + time.sleep(1) -def add_instances_to_backend(compute, project, backend_service, instance_group): +def patch_url_map_backend_service(gcp, backend_service): config = { - 'backends': [{ - 'group': instance_group, - }], + 'defaultService': + backend_service.url, + 'pathMatchers': [{ + 'name': _PATH_MATCHER_NAME, + 'defaultService': backend_service.url, + }] } - result = compute.backendServices().patch(project=project, - backendService=backend_service, - body=config).execute() - wait_for_global_operation(compute, project, result['name']) + logger.debug('Sending GCP request with body=%s', config) + result = gcp.compute.urlMaps().patch(project=gcp.project, + urlMap=gcp.url_map.name, + body=config).execute() + wait_for_global_operation(gcp, result['name']) -def wait_for_global_operation(compute, - project, +def wait_for_global_operation(gcp, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.globalOperations().get(project=project, - operation=operation).execute() + result = gcp.compute.globalOperations().get( + project=gcp.project, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -498,16 +756,14 @@ def wait_for_global_operation(compute, timeout_sec) -def wait_for_zone_operation(compute, - project, +def wait_for_zone_operation(gcp, zone, operation, - timeout_sec=WAIT_FOR_OPERATION_SEC): + timeout_sec=_WAIT_FOR_OPERATION_SEC): start_time = time.time() while time.time() - start_time <= timeout_sec: - result = compute.zoneOperations().get(project=project, - zone=zone, - operation=operation).execute() + result = gcp.compute.zoneOperations().get( + project=gcp.project, zone=zone, operation=operation).execute() if result['status'] == 'DONE': if 'error' in result: raise Exception(result['error']) @@ -517,13 +773,16 @@ def wait_for_zone_operation(compute, timeout_sec) -def wait_for_healthy_backends(compute, project_id, backend_service, - instance_group_url, timeout_sec): +def wait_for_healthy_backends(gcp, + backend_service, + instance_group, + timeout_sec=_WAIT_FOR_BACKEND_SEC): start_time = time.time() - config = {'group': instance_group_url} + config = {'group': instance_group.url} while time.time() - start_time <= timeout_sec: - result = compute.backendServices().getHealth( - project=project_id, backendService=backend_service, + result = gcp.compute.backendServices().getHealth( + project=gcp.project, + backendService=backend_service.name, body=config).execute() if 'healthStatus' in result: healthy = True @@ -538,15 +797,32 @@ def wait_for_healthy_backends(compute, project_id, backend_service, (timeout_sec, result)) -def start_xds_client(service_port): - cmd = CLIENT_CMD.format(service_host=SERVICE_HOST, - service_port=service_port, - stats_port=STATS_PORT, - qps=QPS) +def get_instance_names(gcp, instance_group): + instance_names = [] + result = gcp.compute.instanceGroups().listInstances( + project=gcp.project, + zone=instance_group.zone, + instanceGroup=instance_group.name, + body={ + 'instanceState': 'ALL' + }).execute() + if 'items' not in result: + return [] + for item in result['items']: + # listInstances() returns the full URL of the instance, which ends with + # the instance name. compute.instances().get() requires using the + # instance name (not the full URL) to look up instance details, so we + # just extract the name manually. + instance_name = item['instance'].split('/')[-1] + instance_names.append(instance_name) + return instance_names + + +def start_xds_client(cmd): bootstrap_path = None with tempfile.NamedTemporaryFile(delete=False) as bootstrap_file: bootstrap_file.write( - BOOTSTRAP_TEMPLATE.format( + _BOOTSTRAP_TEMPLATE.format( node_id=socket.gethostname()).encode('utf-8')) bootstrap_path = bootstrap_file.name @@ -557,6 +833,54 @@ def start_xds_client(service_port): return client_process +def clean_up(gcp): + if gcp.global_forwarding_rule: + delete_global_forwarding_rule(gcp) + if gcp.target_http_proxy: + delete_target_http_proxy(gcp) + if gcp.url_map: + delete_url_map(gcp) + delete_backend_services(gcp) + if gcp.health_check_firewall_rule: + delete_firewall(gcp) + if gcp.health_check: + delete_health_check(gcp) + delete_instance_groups(gcp) + if gcp.instance_template: + delete_instance_template(gcp) + + +class InstanceGroup(object): + + def __init__(self, name, url, zone): + self.name = name + self.url = url + self.zone = zone + + +class GcpResource(object): + + def __init__(self, name, url): + self.name = name + self.url = url + + +class GcpState(object): + + def __init__(self, compute, project): + self.compute = compute + self.project = project + self.health_check = None + self.health_check_firewall_rule = None + self.backend_services = [] + self.url_map = None + self.target_http_proxy = None + self.global_forwarding_rule = None + self.service_port = None + self.instance_template = None + self.instance_groups = [] + + if args.compute_discovery_document: with open(args.compute_discovery_document, 'r') as discovery_doc: compute = googleapiclient.discovery.build_from_document( @@ -564,107 +888,185 @@ if args.compute_discovery_document: else: compute = googleapiclient.discovery.build('compute', 'v1') -service_port = None client_process = None try: - instance_group_url = None + gcp = GcpState(compute, args.project_id) + health_check_name = _BASE_HEALTH_CHECK_NAME + args.gcp_suffix + firewall_name = _BASE_FIREWALL_RULE_NAME + args.gcp_suffix + backend_service_name = _BASE_BACKEND_SERVICE_NAME + args.gcp_suffix + alternate_backend_service_name = _BASE_BACKEND_SERVICE_NAME + '-alternate' + args.gcp_suffix + url_map_name = _BASE_URL_MAP_NAME + args.gcp_suffix + service_host_name = _BASE_SERVICE_HOST + args.gcp_suffix + target_http_proxy_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + forwarding_rule_name = _BASE_FORWARDING_RULE_NAME + args.gcp_suffix + template_name = _BASE_TARGET_PROXY_NAME + args.gcp_suffix + instance_group_name = _BASE_INSTANCE_GROUP_NAME + args.gcp_suffix + same_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-same-zone' + args.gcp_suffix + secondary_zone_instance_group_name = _BASE_INSTANCE_GROUP_NAME + '-secondary-zone' + args.gcp_suffix try: - health_check_url = create_health_check(compute, PROJECT_ID, - HEALTH_CHECK_NAME) - create_health_check_firewall_rule(compute, PROJECT_ID, - FIREWALL_RULE_NAME) - backend_service_url = create_backend_service(compute, PROJECT_ID, - BACKEND_SERVICE_NAME, - health_check_url) - url_map_url = create_url_map(compute, PROJECT_ID, URL_MAP_NAME, - backend_service_url, SERVICE_HOST) - target_http_proxy_url = create_target_http_proxy( - compute, PROJECT_ID, TARGET_PROXY_NAME, url_map_url) + create_health_check(gcp, health_check_name) + create_health_check_firewall_rule(gcp, firewall_name) + backend_service = add_backend_service(gcp, backend_service_name) + alternate_backend_service = add_backend_service( + gcp, alternate_backend_service_name) + create_url_map(gcp, url_map_name, backend_service, service_host_name) + create_target_http_proxy(gcp, target_http_proxy_name) potential_service_ports = list(args.service_port_range) random.shuffle(potential_service_ports) + if args.forwarding_rule_ip_prefix == '': + potential_ips = ['0.0.0.0'] + else: + potential_ips = [ + args.forwarding_rule_ip_prefix + str(x) for x in range(256) + ] + random.shuffle(potential_ips) for port in potential_service_ports: - try: - create_global_forwarding_rule( - compute, - PROJECT_ID, - FORWARDING_RULE_NAME, - port, - target_http_proxy_url, - ) - service_port = port - break - except googleapiclient.errors.HttpError as http_error: - logger.warning( - 'Got error %s when attempting to create forwarding rule to port %d. Retrying with another port.' - % (http_error, port)) - if not service_port: - raise Exception('Failed to pick a service port in the range %s' % - args.service_port_range) - template_url = create_instance_template(compute, PROJECT_ID, - TEMPLATE_NAME, service_port) - instance_group_url = create_instance_group(compute, PROJECT_ID, ZONE, - INSTANCE_GROUP_NAME, - INSTANCE_GROUP_SIZE, - service_port, template_url) - add_instances_to_backend(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url) + for ip in potential_ips: + try: + create_global_forwarding_rule(gcp, forwarding_rule_name, ip, + port) + gcp.service_port = port + break + except googleapiclient.errors.HttpError as http_error: + logger.warning( + 'Got error %s when attempting to create forwarding rule to ' + '%s:%d. Retrying with another ip:port.' % + (http_error, ip, port)) + if not gcp.service_port: + raise Exception( + 'Failed to find a valid ip:port for the forwarding rule') + create_instance_template(gcp, template_name, args.network, + args.source_image) + instance_group = add_instance_group(gcp, args.zone, instance_group_name, + _INSTANCE_GROUP_SIZE) + patch_backend_instances(gcp, backend_service, [instance_group]) + same_zone_instance_group = add_instance_group( + gcp, args.zone, same_zone_instance_group_name, _INSTANCE_GROUP_SIZE) + secondary_zone_instance_group = add_instance_group( + gcp, args.secondary_zone, secondary_zone_instance_group_name, + _INSTANCE_GROUP_SIZE) except googleapiclient.errors.HttpError as http_error: - if TOLERATE_GCP_ERRORS: + if args.tolerate_gcp_errors: logger.warning( - 'Failed to set up backends: %s. Continuing since ' + 'Failed to set up backends: %s. Attempting to continue since ' '--tolerate_gcp_errors=true', http_error) + if not gcp.instance_template: + result = compute.instanceTemplates().get( + project=args.project_id, + instanceTemplate=template_name).execute() + gcp.instance_template = GcpResource(template_name, + result['selfLink']) + if not gcp.backend_services: + result = compute.backendServices().get( + project=args.project_id, + backendService=backend_service_name).execute() + backend_service = GcpResource(backend_service_name, + result['selfLink']) + gcp.backend_services.append(backend_service) + result = compute.backendServices().get( + project=args.project_id, + backendService=alternate_backend_service_name).execute() + alternate_backend_service = GcpResource( + alternate_backend_service_name, result['selfLink']) + gcp.backend_services.append(alternate_backend_service) + if not gcp.instance_groups: + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=instance_group_name).execute() + instance_group = InstanceGroup(instance_group_name, + result['selfLink'], args.zone) + gcp.instance_groups.append(instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.zone, + instanceGroup=same_zone_instance_group_name).execute() + same_zone_instance_group = InstanceGroup( + same_zone_instance_group_name, result['selfLink'], + args.zone) + gcp.instance_groups.append(same_zone_instance_group) + result = compute.instanceGroups().get( + project=args.project_id, + zone=args.secondary_zone, + instanceGroup=secondary_zone_instance_group_name).execute() + secondary_zone_instance_group = InstanceGroup( + secondary_zone_instance_group_name, result['selfLink'], + args.secondary_zone) + gcp.instance_groups.append(secondary_zone_instance_group) + if not gcp.health_check: + result = compute.healthChecks().get( + project=args.project_id, + healthCheck=health_check_name).execute() + gcp.health_check = GcpResource(health_check_name, + result['selfLink']) + if not gcp.url_map: + result = compute.urlMaps().get(project=args.project_id, + urlMap=url_map_name).execute() + gcp.url_map = GcpResource(url_map_name, result['selfLink']) + if not gcp.service_port: + gcp.service_port = args.service_port_range[0] + logger.warning('Using arbitrary service port in range: %d' % + gcp.service_port) else: raise http_error - if instance_group_url is None: - # Look up the instance group URL, which may be unset if we are running - # with --tolerate_gcp_errors=true. - result = compute.instanceGroups().get( - project=PROJECT_ID, zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME).execute() - instance_group_url = result['selfLink'] - wait_for_healthy_backends(compute, PROJECT_ID, BACKEND_SERVICE_NAME, - instance_group_url, WAIT_FOR_BACKEND_SEC) - - backends = [] - result = compute.instanceGroups().listInstances( - project=PROJECT_ID, - zone=ZONE, - instanceGroup=INSTANCE_GROUP_NAME, - body={ - 'instanceState': 'ALL' - }).execute() - for item in result['items']: - # listInstances() returns the full URL of the instance, which ends with - # the instance name. compute.instances().get() requires using the - # instance name (not the full URL) to look up instance details, so we - # just extract the name manually. - instance_name = item['instance'].split('/')[-1] - backends.append(instance_name) + wait_for_healthy_backends(gcp, backend_service, instance_group) - client_process = start_xds_client(service_port) - - if TEST_CASE == 'all': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'ping_pong': - test_ping_pong(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) - elif TEST_CASE == 'round_robin': - test_round_robin(backends, NUM_TEST_RPCS, WAIT_FOR_STATS_SEC) + if gcp.service_port == _DEFAULT_SERVICE_PORT: + server_uri = service_host_name + else: + server_uri = service_host_name + ':' + str(gcp.service_port) + cmd = args.client_cmd.format(server_uri=server_uri, + stats_port=args.stats_port, + qps=args.qps) + client_process = start_xds_client(cmd) + + if args.test_case == 'all': + test_backends_restart(gcp, backend_service, instance_group) + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) + test_ping_pong(gcp, backend_service, instance_group) + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) + test_round_robin(gcp, backend_service, instance_group) + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'backends_restart': + test_backends_restart(gcp, backend_service, instance_group) + elif args.test_case == 'change_backend_service': + test_change_backend_service(gcp, backend_service, instance_group, + alternate_backend_service, + same_zone_instance_group) + elif args.test_case == 'new_instance_group_receives_traffic': + test_new_instance_group_receives_traffic(gcp, backend_service, + instance_group, + same_zone_instance_group) + elif args.test_case == 'ping_pong': + test_ping_pong(gcp, backend_service, instance_group) + elif args.test_case == 'remove_instance_group': + test_remove_instance_group(gcp, backend_service, instance_group, + same_zone_instance_group) + elif args.test_case == 'round_robin': + test_round_robin(gcp, backend_service, instance_group) + elif args.test_case == 'secondary_locality_gets_no_requests_on_partial_primary_failure': + test_secondary_locality_gets_no_requests_on_partial_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) + elif args.test_case == 'secondary_locality_gets_requests_on_primary_failure': + test_secondary_locality_gets_requests_on_primary_failure( + gcp, backend_service, instance_group, secondary_zone_instance_group) else: - logger.error('Unknown test case: %s', TEST_CASE) + logger.error('Unknown test case: %s', args.test_case) sys.exit(1) finally: if client_process: client_process.terminate() - if not KEEP_GCP_RESOURCES: + if not args.keep_gcp_resources: logger.info('Cleaning up GCP resources. This may take some time.') - delete_global_forwarding_rule(compute, PROJECT_ID, FORWARDING_RULE_NAME) - delete_target_http_proxy(compute, PROJECT_ID, TARGET_PROXY_NAME) - delete_url_map(compute, PROJECT_ID, URL_MAP_NAME) - delete_backend_service(compute, PROJECT_ID, BACKEND_SERVICE_NAME) - delete_firewall(compute, PROJECT_ID, FIREWALL_RULE_NAME) - delete_health_check(compute, PROJECT_ID, HEALTH_CHECK_NAME) - delete_instance_group(compute, PROJECT_ID, ZONE, INSTANCE_GROUP_NAME) - delete_instance_template(compute, PROJECT_ID, TEMPLATE_NAME) + clean_up(gcp)