From 1010d3a6199a02c966e762488cea46b1227ef498 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 23 Jun 2020 14:30:30 -0700 Subject: [PATCH] Batch download jobs --- .../run_interop_matrix_tests.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) mode change 100755 => 100644 tools/interop_matrix/run_interop_matrix_tests.py diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py old mode 100755 new mode 100644 index a68b703f1fd..d6a1489a3eb --- a/tools/interop_matrix/run_interop_matrix_tests.py +++ b/tools/interop_matrix/run_interop_matrix_tests.py @@ -203,23 +203,17 @@ def _generate_test_case_jobspecs(lang, runtime, release, suite_name): return job_spec_list -# TODO: Return a spec and then parallelize. def _pull_image_for_lang(lang, image, release): """Pull an image for a given language form the image registry.""" cmdline = [ 'time gcloud docker -- pull %s && time docker run --rm=true %s /bin/true' % (image, image) ] - spec = jobset.JobSpec(cmdline=cmdline, + return jobset.JobSpec(cmdline=cmdline, shortname='pull_image_{}'.format(image), timeout_seconds=_PULL_IMAGE_TIMEOUT_SECONDS, shell=True, - # TODO: Pull out to constant. flake_retries=2) - num_failures, resultset = jobset.run([spec], - newline_on_success=True, - maxjobs=1) - return not num_failures def _test_release(lang, runtime, release, image, xml_report_tree, skip_tests): @@ -263,17 +257,35 @@ def _run_tests_for_lang(lang, runtime, images, xml_report_tree): skip_tests = False total_num_failures = 0 - # TODO: Do more intelligent chunking. - for release, image in images: - if not skip_tests and not _pull_image_for_lang(lang, image, release): + max_pull_jobs = min(args.jobs, _MAX_PARALLEL_DOWNLOADS) + max_chunk_size = max_pull_jobs + chunk_count = (len(images) + max_chunk_size) // max_chunk_size + + for chunk_index in range(chunk_count): + chunk_start = chunk_index * max_chunk_size + chunk_size = min(max_chunk_size, len(images) - chunk_start) + chunk_end = chunk_start + chunk_size + pull_specs = [] + if not skip_tests: + for release, image in images[chunk_start:chunk_end]: + pull_specs.append(_pull_image_for_lang(lang, image, release)) + + # NOTE(rbellevi): We batch docker pull operations to maximize + # parallelism, without letting the disk usage grow unbounded. + pull_failures, _ = jobset.run(pull_specs, + newline_on_success=True, + maxjobs=max_pull_jobs) + if pull_failures: jobset.message( 'FAILED', 'Image download failed. Skipping tests for language "%s"' % lang, do_newline=True) skip_tests = True - total_num_failures += _test_release(lang, runtime, release, image, xml_report_tree, skip_tests) + for release, image in images[chunk_start:chunk_end]: + total_num_failures += _test_release(lang, runtime, release, image, xml_report_tree, skip_tests) if not args.keep: - _cleanup_docker_image(image) + for _, image in images[chunk_start:chunk_end]: + _cleanup_docker_image(image) if not total_num_failures: jobset.message('SUCCESS', 'All {} tests passed'.format(lang), do_newline=True) else: