lava_wait_jobs.py: Fetch artifacts for finished jobs immediately
Intention is to parallelize waiting for job completion and fetching
artifacts, i.e. as soon as a complete job is detected, fetch artifacts
for it, instead of spending time waiting for other jobs to complete
first.
Signed-off-by: Paul Sokolovsky <paul.sokolovsky@linaro.org>
Change-Id: I7bbfb31516ad53fe119f1775b5d795ffd07378bc
diff --git a/lava_helper/lava_wait_jobs.py b/lava_helper/lava_wait_jobs.py
index d9a1d70..c3f7e06 100755
--- a/lava_helper/lava_wait_jobs.py
+++ b/lava_helper/lava_wait_jobs.py
@@ -51,7 +51,20 @@
def get_finished_jobs(job_list, user_args, lava):
_log.info("Waiting for %d LAVA/Tux jobs", len(job_list))
- finished_jobs = lava.block_wait_for_jobs(job_list, user_args.dispatch_timeout, 5)
+
+ fetched_artifacts = set()
+
+ def inline_fetch_artifacts(job_id, info):
+ try:
+ if user_args.artifacts_path:
+ info['job_dir'] = os.path.join(user_args.artifacts_path, "{}_{}".format(job_id, info['description']))
+ fetch_artifacts_for_job(job_id, info, user_args, lava)
+ fetched_artifacts.add(job_id)
+ except Exception as e:
+ _log.exception("")
+ _log.warning("Failed to fetch artifacts for job %s inline, will retry later", job_id)
+
+ finished_jobs = lava.block_wait_for_jobs(job_list, user_args.dispatch_timeout, 5, callback=inline_fetch_artifacts)
unfinished_jobs = [item for item in job_list if item not in finished_jobs]
for job in unfinished_jobs:
_log.info("Cancelling unfinished job %d because of timeout.", job)
@@ -61,8 +74,9 @@
if user_args.artifacts_path:
for job, info in finished_jobs.items():
info['job_dir'] = os.path.join(user_args.artifacts_path, "{}_{}".format(str(job), info['description']))
- finished_jobs[job] = info
- finished_jobs = fetch_artifacts(finished_jobs, user_args, lava)
+ to_fetch = {job_id: info for job_id, info in finished_jobs.items() if job_id not in fetched_artifacts}
+ _log.info("Fetching artifacts for remaining jobs: %s", to_fetch.keys())
+ fetch_artifacts(to_fetch, user_args, lava)
return finished_jobs
def resubmit_failed_jobs(jobs, user_args):
@@ -130,8 +144,6 @@
for job_id, info in jobs.items():
fetch_artifacts_for_job(job_id, info, user_args, lava)
- return(jobs)
-
def lava_id_to_url(id, user_args):
if LAVA_RPC_connector.is_tux_id(id):
diff --git a/tfm_ci_pylib/lava_rpc_connector.py b/tfm_ci_pylib/lava_rpc_connector.py
index f9547e5..1e27932 100644
--- a/tfm_ci_pylib/lava_rpc_connector.py
+++ b/tfm_ci_pylib/lava_rpc_connector.py
@@ -296,7 +296,7 @@
break
return self.scheduler.job_health(job_id)["job_health"]
- def block_wait_for_jobs(self, job_ids, timeout, poll_freq=10):
+ def block_wait_for_jobs(self, job_ids, timeout, poll_freq=10, callback=None):
""" Wait for multiple LAVA job ids to finish and return finished list """
start_t = int(time.time())
@@ -331,6 +331,8 @@
cur_status['health'],
len(job_ids) - len(finished_jobs)
)
+ if callback:
+ callback(job_id, cur_status)
if len(job_ids) == len(finished_jobs):
break
else: