From 5212dba112ca78febb7d7ea4ca230d015ec0d2b0 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 12 Jun 2018 13:35:49 +0200 Subject: [PATCH] really upload bq test results in batches --- .../python_utils/upload_test_results.py | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/tools/run_tests/python_utils/upload_test_results.py b/tools/run_tests/python_utils/upload_test_results.py index 63fa38b6789..cbb4c32a2af 100644 --- a/tools/run_tests/python_utils/upload_test_results.py +++ b/tools/run_tests/python_utils/upload_test_results.py @@ -86,6 +86,26 @@ def _get_build_metadata(test_results): test_results['job_name'] = job_name +def _insert_rows_with_retries(bq, bq_table, bq_rows): + """Insert rows to bq table. Retry on error.""" + # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. + for i in range((len(bq_rows) / 1000) + 1): + max_retries = 3 + for attempt in range(max_retries): + if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, + bq_table, + bq_rows[i * 1000:(i + 1) * 1000]): + break + else: + if attempt < max_retries - 1: + print('Error uploading result to bigquery, will retry.') + else: + print( + 'Error uploading result to bigquery, all attempts failed.' + ) + sys.exit(1) + + def upload_results_to_bq(resultset, bq_table, args, platform): """Upload test results to a BQ table. @@ -106,6 +126,7 @@ def upload_results_to_bq(resultset, bq_table, args, platform): partition_type=_PARTITION_TYPE, expiration_ms=_EXPIRATION_MS) + bq_rows = [] for shortname, results in six.iteritems(resultset): for result in results: test_results = {} @@ -124,23 +145,9 @@ def upload_results_to_bq(resultset, bq_table, args, platform): test_results['return_code'] = result.returncode test_results['test_name'] = shortname test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') - row = big_query_utils.make_row(str(uuid.uuid4()), test_results) - - # TODO(jtattermusch): rows are inserted one by one, very inefficient - max_retries = 3 - for attempt in range(max_retries): - if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID, - bq_table, [row]): - break - else: - if attempt < max_retries - 1: - print('Error uploading result to bigquery, will retry.') - else: - print( - 'Error uploading result to bigquery, all attempts failed.' - ) - sys.exit(1) + bq_rows.append(row) + _insert_rows_with_retries(bq, bq_table, bq_rows) def upload_interop_results_to_bq(resultset, bq_table, args): @@ -162,8 +169,8 @@ def upload_interop_results_to_bq(resultset, bq_table, args): partition_type=_PARTITION_TYPE, expiration_ms=_EXPIRATION_MS) + bq_rows = [] for shortname, results in six.iteritems(resultset): - bq_rows = [] for result in results: test_results = {} _get_build_metadata(test_results) @@ -177,20 +184,4 @@ def upload_interop_results_to_bq(resultset, bq_table, args): test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') row = big_query_utils.make_row(str(uuid.uuid4()), test_results) bq_rows.append(row) - - # BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time. - for i in range((len(bq_rows) / 1000) + 1): - max_retries = 3 - for attempt in range(max_retries): - if big_query_utils.insert_rows( - bq, _PROJECT_ID, _DATASET_ID, bq_table, - bq_rows[i * 1000:(i + 1) * 1000]): - break - else: - if attempt < max_retries - 1: - print('Error uploading result to bigquery, will retry.') - else: - print( - 'Error uploading result to bigquery, all attempts failed.' - ) - sys.exit(1) + _insert_rows_with_retries(bq, bq_table, bq_rows)