blob: 99c80abc86cc278f0ae2d9ab8ceb60b3656cb7b5 [file] [log] [blame]
# !/usr/bin/env python
###############################################################################
# Copyright (c) 2020, ARM Limited and Contributors. All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
###############################################################################
# FILE: clone_sources.py
#
# DESCRIPTION: Clone the source files for code coverage
###############################################################################
import os
import subprocess
import json
import time
from random import random
def call_cmd(cmd, print_cmd=True):
"""
Function that execute an os command and returns its output
:param cmd: OS command as string
:param print_cmd: Optional argument to print the command in stdout
:return: The string output of the os command
"""
if print_cmd:
print("+" + cmd)
out = subprocess.check_output(cmd, shell=True)
return out
def skip_source(output_dir, source, handler=None):
"""
Function that handles overwriting source files
:param output_dir: Folder where to put the source files and folders
:param source: Dictionary with the information the source
:return: True if must skip the given source cloning False otherwise
"""
location = os.path.join(output_dir, source['LOCATION'])
# Check if exists and have files
if os.path.isdir(location):
if not os.listdir(location):
if handler is not None:
return handler(source, "Directory exists and is empty")
else:
# By default send a warning and overwrite it
print(("WARNING!: Directory {} already exists and is "
"empty. Overwriting it...'").format(location))
os.rmdir(location)
return False
commit_id = call_cmd(("cd {} && git log -1 2>/dev/null | "
"grep commit | awk '{{print $2}}'").format(
location), print_cmd=True).strip()
if source['type'] == "git":
if commit_id == "":
# is not a git
if handler is not None:
return handler(source, "Directory exists and is not git")
else:
print(("WARNING!: Directory {} already exists and is not a"
" git repo: '{}'").format(location, source['URL']))
elif commit_id != source["COMMIT"].strip():
# there are mismatching commit id's
if handler is not None:
return handler(source, "Mismatch in gits")
else:
print(("WARNING!: Mismatch in git repo {}\nExpected {}, "
"Cloned {}").format(source['URL'], source['COMMIT'],
commit_id))
return True
elif source['type'] == "http":
if handler is not None:
return handler(source,
"WARNING!: Directory already exists")
else:
print("WARNING!: Directory {} already exists".format(
location))
return True
return False
class CloneSources(object):
"""Class used to clone the source code needed to produce code coverage
reports.
"""
def __init__(self, json_file):
self.json_file = json_file
self.json_data = None
self.load_json()
def load_json(self):
with open(self.json_file, "r") as json_file:
self.json_data = json.load(json_file)
def clone_repo(self, output_dir, overwrite_handler=None):
"""
Clones or reproduces a folder with source code based in the
configuration in the json file
:param output_dir: Where to put the source files
:param overwrite_handler: Optional function to handle overwrites
"""
if self.json_data is None:
self.load_json()
sources = []
try:
if 'parameters' in self.json_data:
sources = self.json_data['parameters']['sources']
elif 'configuration' in self.json_data:
sources = self.json_data['configuration']['sources']
else:
raise Exception("No correct format for json sources!")
except Exception as ex:
raise Exception(ex)
for source in sources:
if skip_source(output_dir, source, overwrite_handler):
continue
if source['type'] == "git":
git = source
url = git["URL"]
commit_id = git["COMMIT"]
output_loc = os.path.join(output_dir, git["LOCATION"])
cmd = "rm -rf {1} || true;git clone {0} {1}".format(url, output_loc)
output = call_cmd(cmd)
if git['REFSPEC']:
call_cmd("cd {};git fetch -q origin {}".format(
output_loc, git['REFSPEC']))
if commit_id:
call_cmd("cd {};git checkout -q {}".format(
output_loc, commit_id))
else:
call_cmd("cd {};git checkout -q FETCH_HEAD".format(
output_loc))
call_cmd("cd {};git submodule update --init --recursive || true".format(output_loc))
elif source['type'] == 'http':
site = source
output_loc = os.path.join(output_dir, site["LOCATION"])
tmp_folder = os.path.join(output_dir,
"_tmp_{}_{}".format(time.time(),
random()))
call_cmd("mkdir -p {}".format(tmp_folder))
call_cmd("wget -q {} -P {}".format(
site['URL'], tmp_folder))
call_cmd("mkdir -p {}".format(output_loc))
if site['COMPRESSION'] == "xz":
call_cmd("cd {};tar -xzf $(basename {}) -C {} {}".format(
tmp_folder, site['URL'], output_loc,
source.get("EXTRA_PARAMS", "")))
call_cmd("rm -rf {}".format(tmp_folder))