Open CI Scripts: Initial Commit
* build_helper: Python script which builds sets
of configurations from a json file input
* checkpatch: Bash scripts helping with running checkpatch
* cppcheck: Bash script helping with running cppcheck
* lava_helper: Python script which generates a lava job
definition and parses the output of a lava dispatcher
* tfm_ci_pylib: Generic Python module for Open CI
* configs: Directory storing reference configurations
Change-Id: Ibda0cbfeb5b004b35fef3c2af4cb5c012f2672b4
Signed-off-by: Galanakis, Minos <minos.galanakis@linaro.org>
diff --git a/tfm_ci_pylib/__init__.py b/tfm_ci_pylib/__init__.py
new file mode 100644
index 0000000..0ea14cc
--- /dev/null
+++ b/tfm_ci_pylib/__init__.py
@@ -0,0 +1,12 @@
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+
+__all__ = ["tfm_builder",
+ "tfm_build_manager",
+ "utils"]
diff --git a/tfm_ci_pylib/lava_rpc_connector.py b/tfm_ci_pylib/lava_rpc_connector.py
new file mode 100644
index 0000000..269cbbf
--- /dev/null
+++ b/tfm_ci_pylib/lava_rpc_connector.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3
+
+""" lava_rpc_connector.py:
+
+ class that extends xmlrpc in order to add LAVA specific functionality.
+ Used in managing communication with the back-end. """
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+__author__ = "Minos Galanakis"
+__email__ = "minos.galanakis@linaro.org"
+__project__ = "Trusted Firmware-M Open CI"
+__status__ = "stable"
+__version__ = "1.0"
+
+import xmlrpc.client
+import time
+
+
+class LAVA_RPC_connector(xmlrpc.client.ServerProxy, object):
+
+ def __init__(self,
+ username,
+ token,
+ hostname,
+ rest_prefix="RPC2",
+ https=False):
+
+ # If user provides hostname with http/s prefix
+ if "://" in hostname:
+ htp_pre, hostname = hostname.split("://")
+ server_addr = "%s://%s:%s@%s/%s" % (htp_pre,
+ username,
+ token,
+ hostname,
+ rest_prefix)
+ self.server_url = "%s://%s" % (htp_pre, hostname)
+ else:
+ server_addr = "%s://%s:%s@%s/%s" % ("https" if https else "http",
+ username,
+ token,
+ hostname,
+ rest_prefix)
+ self.server_url = "%s://%s" % ("https" if https else "http",
+ hostname)
+
+ self.server_job_prefix = "%s/scheduler/job/%%s" % self.server_url
+ super(LAVA_RPC_connector, self).__init__(server_addr)
+
+ def _rpc_cmd_raw(self, cmd, params=None):
+ """ Run a remote comand and return the result. There is no constrain
+ check on the syntax of the command. """
+
+ cmd = "self.%s(%s)" % (cmd, params if params else "")
+ return eval(cmd)
+
+ def ls_cmd(self):
+ """ Return a list of supported commands """
+
+ print("\n".join(self.system.listMethods()))
+
+ def get_job_results(self, job_id, yaml_out_file=None):
+ results = self.results.get_testjob_results_yaml(job_id)
+ if yaml_out_file:
+ with open(yaml_out_file, "w") as F:
+ F.write(results)
+ return results
+
+ def get_job_state(self, job_id):
+ return self.scheduler.job_state(job_id)["job_state"]
+
+ def get_job_status(self, job_id):
+ return self.scheduler.job_status(job_id)["job_status"]
+
+ def cancel_job(self, job_id):
+ """ Cancell job with id=job_id. Returns True if successfull """
+
+ return self.scheduler.jobs.cancel(job_id)
+
+ def validate_job_yaml(self, job_definition, print_err=False):
+ """ Validate a job definition syntax. Returns true is server considers
+ the syntax valid """
+
+ try:
+ with open(job_definition) as F:
+ input_yaml = F.read()
+ self.scheduler.validate_yaml(input_yaml)
+ return True
+ except Exception as E:
+ if print_err:
+ print(E)
+ return False
+
+ def submit_job(self, job_definition):
+ """ Will submit a yaml definition pointed by job_definition after
+ validating it againist the remote backend. Returns resulting job id,
+ and server url for job"""
+
+ try:
+ if not self.validate_job_yaml(job_definition):
+ print("Served rejected job's syntax")
+ raise Exception("Invalid job")
+ with open(job_definition, "r") as F:
+ job_data = F.read()
+ except Exception as e:
+ print("Cannot submit invalid job. Check %s's content" %
+ job_definition)
+ print(e)
+ return None, None
+
+ job_id = self.scheduler.submit_job(job_data)
+ job_url = self.server_job_prefix % job_id
+ return(job_id, job_url)
+
+ def resubmit_job(self, job_id):
+ """ Re-submit job with provided id. Returns resulting job id,
+ and server url for job"""
+
+ job_id = self.scheduler.resubmit_job(job_id)
+ job_url = self.server_job_prefix % job_id
+ return(job_id, job_url)
+
+ def block_wait_for_job(self, job_id, timeout, poll_freq=1):
+ """ Will block code execution and wait for the job to submit.
+ Returns job status on completion """
+
+ start_t = int(time.time())
+ while(True):
+ cur_t = int(time.time())
+ if cur_t - start_t >= timeout:
+ print("Breaking because of timeout")
+ break
+ # Check if the job is not running
+ cur_status = self.get_job_status(job_id)
+ # If in queue or running wait
+ if cur_status == "Running" or cur_status == "Submitted":
+ time.sleep(poll_freq)
+ else:
+ break
+ return self.get_job_status(job_id)
+
+ def test_credentials(self):
+ """ Attempt to querry the back-end and verify that the user provided
+ authentication is valid """
+
+ try:
+ self._rpc_cmd_raw("system.listMethods")
+ return True
+ except Exception as e:
+ print(e)
+ print("Credential validation failed")
+ return False
+
+
+if __name__ == "__main__":
+ pass
diff --git a/tfm_ci_pylib/structured_task.py b/tfm_ci_pylib/structured_task.py
new file mode 100644
index 0000000..b97cae9
--- /dev/null
+++ b/tfm_ci_pylib/structured_task.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python3
+
+""" structured_task.py:
+
+ A generic abstraction class for executing a task with prerequesites and
+ post execution action """
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+__author__ = "Minos Galanakis"
+__email__ = "minos.galanakis@linaro.org"
+__project__ = "Trusted Firmware-M Open CI"
+__status__ = "stable"
+__version__ = "1.0"
+
+import abc
+import time
+import multiprocessing
+
+
+class structuredTask(multiprocessing.Process):
+ """ A class that defined well structured chained execution of commands """
+
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, name):
+
+ self._stopevent = multiprocessing.Event()
+ self._exec_sleep_period = 1.0
+ self._join_timeout = 1.0
+ self._exec_timeout = 0.0
+ self._task_name = name
+
+ # multiprocessing safe shared memory variables
+ self._mprc_manager = multiprocessing.Manager()
+
+ # Dictionary used to store objects between stages
+ self._mprc_stash = self._mprc_manager.dict()
+
+ # Integer variable that stores status of flow
+ self._mprc_status = multiprocessing.Value('i', False)
+ super(structuredTask, self).__init__(name=name)
+
+ # Perform initialization
+ # If user code raises exception, class memory will not be allocated
+ # Variables can be safely shared in the pre stages, use stash for
+ # next stages
+ self.pre_exec(self.pre_eval())
+
+ # Class API/Interface
+
+ @abc.abstractmethod
+ def pre_eval(self):
+ """ Tests that need to be run in set-up state """
+
+ @abc.abstractmethod
+ def pre_exec(self, eval_ret):
+ """ Tasks that set-up execution enviroment """
+
+ @abc.abstractmethod
+ def task_exec(self):
+ """ Main tasks """
+
+ @abc.abstractmethod
+ def post_eval(self, eval_ret):
+ """ Tests that need to be run after main task """
+
+ @abc.abstractmethod
+ def post_exec(self):
+ """ Tasks that are run after main task """
+
+ def stash(self, key, data):
+ """ Store object in a shared memory interface """
+
+ self._mprc_stash[key] = data
+
+ def unstash(self, key):
+ """ Retrieve object from a shared memory interface """
+
+ try:
+ return self._mprc_stash[key]
+ except KeyError:
+ return None
+
+ def get_name(self):
+ """" Return name label of class """
+ return self._task_name
+
+ def get_status(self):
+ """ Return the status of the execution flow """
+ with self._mprc_status.get_lock():
+ return self._mprc_status.value
+
+ def set_status(self, status):
+ """ Return the status of the execution flow """
+ with self._mprc_status.get_lock():
+ self._mprc_status.value = status
+
+ def run(self):
+ try:
+
+ # Run Core code
+ while not self._stopevent.is_set():
+ self.task_exec()
+ time.sleep(self._exec_sleep_period)
+ break
+ # print("Stop Event Detected")
+ # TODO Upgrade reporting to a similar format
+ print("%s ==> Stop Event Detected" % self.get_name())
+
+ # Post stage
+ # If something faifs in post the user should set the correct status
+ self.set_status(0)
+ print("%s ==> Stop Event Set OK Status" % self.get_name())
+ except Exception as exc:
+ print(("ERROR: Stopping %s "
+ "with Exception: \"%s\"") % (self.get_name(), exc))
+ self.set_status(1)
+ # Always call post, and determine success failed by get_status
+ self.post_exec(self.post_eval())
+
+ def _t_stop(self):
+ """ Internal class stop to be called through thread """
+ print("Thead is alive0 %s" % self.is_alive())
+ if(self.is_alive()):
+ print("%s =========> STOP" % self.get_name())
+ self._stopevent.set()
+ print("Thead is alive %s" % self.is_alive())
+ print("Stop Event Triggered")
+
+ def stop(self):
+ """ External stop to be called by user code """
+
+ self._t_stop()
+ super(structuredTask, self).join(self._join_timeout)
diff --git a/tfm_ci_pylib/tfm_build_manager.py b/tfm_ci_pylib/tfm_build_manager.py
new file mode 100644
index 0000000..dcf75de
--- /dev/null
+++ b/tfm_ci_pylib/tfm_build_manager.py
@@ -0,0 +1,260 @@
+#!/usr/bin/env python3
+
+""" tfm_build_manager.py:
+
+ Controlling class managing multiple build configruations for tfm """
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+__author__ = "Minos Galanakis"
+__email__ = "minos.galanakis@linaro.org"
+__project__ = "Trusted Firmware-M Open CI"
+__status__ = "stable"
+__version__ = "1.0"
+
+import os
+import sys
+from pprint import pprint
+from copy import deepcopy
+from .utils import gen_cfg_combinations, list_chunks, load_json,\
+ save_json, print_test
+from .structured_task import structuredTask
+from .tfm_builder import TFM_Builder
+
+
+class TFM_Build_Manager(structuredTask):
+ """ Class that will load a configuration out of a json file, schedule
+ the builds, and produce a report """
+
+ def __init__(self,
+ tfm_dir, # TFM root directory
+ work_dir, # Current working directory(ie logs)
+ cfg_dict, # Input config dictionary of the following form
+ # input_dict = {"PROJ_CONFIG": "ConfigRegression",
+ # "TARGET_PLATFORM": "MUSCA_A",
+ # "COMPILER": "ARMCLANG",
+ # "CMAKE_BUILD_TYPE": "Debug"}
+ report=None, # File to produce report
+ parallel_builds=3, # Number of builds to run in parallel
+ build_threads=4, # Number of threads used per build
+ markdown=True, # Create markdown report
+ html=True, # Create html report
+ ret_code=True, # Set ret_code of script if build failed
+ install=False): # Install libraries after build
+
+ self._tbm_build_threads = build_threads
+ self._tbm_conc_builds = parallel_builds
+ self._tbm_install = install
+ self._tbm_markdown = markdown
+ self._tbm_html = html
+ self._tbm_ret_code = ret_code
+
+ # Required by other methods, always set working directory first
+ self._tbm_work_dir = os.path.abspath(os.path.expanduser(work_dir))
+
+ self._tbm_tfm_dir = os.path.abspath(os.path.expanduser(tfm_dir))
+
+ # Entries will be filled after sanity test on cfg_dict dring pre_exec
+ self._tbm_build_dir = None
+ self._tbm_report = report
+
+ # TODO move them to pre_eval
+ self._tbm_cfg = self.load_config(cfg_dict, self._tbm_work_dir)
+ self._tbm_build_cfg_list = self.parse_config(self._tbm_cfg)
+
+ super(TFM_Build_Manager, self).__init__(name="TFM_Build_Manager")
+
+ def pre_eval(self):
+ """ Tests that need to be run in set-up state """
+ return True
+
+ def pre_exec(self, eval_ret):
+ """ """
+
+ def task_exec(self):
+ """ Create a build pool and execute them in parallel """
+
+ build_pool = []
+ for i in self._tbm_build_cfg_list:
+
+ name = "%s_%s_%s_%s_%s" % (i.TARGET_PLATFORM,
+ i.COMPILER,
+ i.PROJ_CONFIG,
+ i.CMAKE_BUILD_TYPE,
+ "BL2" if i.WITH_MCUBOOT else "NOBL2")
+ print("Loading config %s" % name)
+ build_pool.append(TFM_Builder(name,
+ self._tbm_tfm_dir,
+ self._tbm_work_dir,
+ dict(i._asdict()),
+ self._tbm_install,
+ self._tbm_build_threads))
+
+ status_rep = {}
+ full_rep = {}
+ print("Build: Running %d parallel build jobs" % self._tbm_conc_builds)
+ for build_pool_slice in list_chunks(build_pool, self._tbm_conc_builds):
+
+ # Start the builds
+ for build in build_pool_slice:
+ # Only produce output for the first build
+ if build_pool_slice.index(build) != 0:
+ build.mute()
+ print("Build: Starting %s" % build.get_name())
+ build.start()
+
+ # Wait for the builds to complete
+ for build in build_pool_slice:
+ # Wait for build to finish
+ build.join()
+ # Similarly print the logs of the other builds as they complete
+ if build_pool_slice.index(build) != 0:
+ build.log()
+ print("Build: Finished %s" % build.get_name())
+
+ # Store status in report
+ status_rep[build.get_name()] = build.get_status()
+ full_rep[build.get_name()] = build.report()
+ # Store the report
+ self.stash("Build Status", status_rep)
+ self.stash("Build Report", full_rep)
+
+ if self._tbm_report:
+ print("Exported build report to file:", self._tbm_report)
+ save_json(self._tbm_report, full_rep)
+
+ def post_eval(self):
+ """ If a single build failed fail the test """
+ try:
+ retcode_sum = sum(self.unstash("Build Status").values())
+ if retcode_sum != 0:
+ raise Exception()
+ return True
+ except Exception as e:
+ return False
+
+ def post_exec(self, eval_ret):
+ """ Generate a report and fail the script if build == unsuccessfull"""
+
+ self.print_summary()
+ if not eval_ret:
+ print("ERROR: ====> Build Failed! %s" % self.get_name())
+ self.set_status(1)
+ else:
+ print("SUCCESS: ====> Build Complete!")
+ self.set_status(0)
+
+ def get_report(self):
+ """ Expose the internal report to a new object for external classes """
+ return deepcopy(self.unstash("Build Report"))
+
+ def print_summary(self):
+ """ Print an comprehensive list of the build jobs with their status """
+
+ full_rep = self.unstash("Build Report")
+
+ # Filter out build jobs based on status
+ fl = ([k for k, v in full_rep.items() if v['status'] == 'Failed'])
+ ps = ([k for k, v in full_rep.items() if v['status'] == 'Success'])
+
+ print_test(t_list=fl, status="failed", tname="Builds")
+ print_test(t_list=ps, status="passed", tname="Builds")
+
+ def gen_cfg_comb(self, platform_l, compiler_l, config_l, build_l, boot_l):
+ """ Generate all possible configuration combinations from a group of
+ lists of compiler options"""
+ return gen_cfg_combinations("TFM_Build_CFG",
+ ("TARGET_PLATFORM COMPILER PROJ_CONFIG"
+ " CMAKE_BUILD_TYPE WITH_MCUBOOT"),
+ platform_l,
+ compiler_l,
+ config_l,
+ build_l,
+ boot_l)
+
+ def load_config(self, config, work_dir):
+ try:
+ # passing config_name param supersseeds fileparam
+ if isinstance(config, dict):
+ ret_cfg = deepcopy(config)
+ elif isinstance(config, str):
+ # If the string does not descrive a file try to look for it in
+ # work directory
+ if not os.path.isfile(config):
+ # remove path from file
+ config_2 = os.path.split(config)[-1]
+ # look in the current working directory
+ config_2 = os.path.join(work_dir, config_2)
+ if not os.path.isfile(config_2):
+ m = "Could not find cfg in %s or %s " % (config,
+ config_2)
+ raise Exception(m)
+ # If fille exists in working directory
+ else:
+ config = config_2
+ ret_cfg = load_json(config)
+
+ else:
+ raise Exception("Need to provide a valid config name or file."
+ "Please use --config/--config-file parameter.")
+ except Exception as e:
+ print("Error:%s \nCould not load a valid config" % e)
+ sys.exit(1)
+
+ pprint(ret_cfg)
+ return ret_cfg
+
+ def parse_config(self, cfg):
+ """ Parse a valid configuration file into a set of build dicts """
+
+ # Generate a list of all possible confugration combinations
+ full_cfg = self.gen_cfg_comb(cfg["platform"],
+ cfg["compiler"],
+ cfg["config"],
+ cfg["build"],
+ cfg["with_mcuboot"])
+
+ # Generate a list of all invalid combinations
+ rejection_cfg = []
+
+ for k in cfg["invalid"]:
+ # Pad the omitted values with wildcard char *
+ res_list = list(k) + ["*"] * (5 - len(k))
+
+ print("Working on rejection input: %s" % (res_list))
+
+ # Key order matters. Use index to retrieve default values When
+ # wildcard * char is present
+ _cfg_keys = ["platform",
+ "compiler",
+ "config",
+ "build",
+ "with_mcuboot"]
+
+ # Replace wildcard ( "*") entries with every inluded in cfg variant
+ for n in range(len(res_list)):
+ res_list[n] = [res_list[n]] if res_list[n] != "*" \
+ else cfg[_cfg_keys[n]]
+
+ rejection_cfg += self.gen_cfg_comb(*res_list)
+
+ # Notfy the user for the rejected configuations
+ for i in rejection_cfg:
+
+ name = "%s_%s_%s_%s_%s" % (i.TARGET_PLATFORM,
+ i.COMPILER,
+ i.PROJ_CONFIG,
+ i.CMAKE_BUILD_TYPE,
+ "BL2" if i.WITH_MCUBOOT else "NOBL2")
+ print("Rejecting config %s" % name)
+
+ # Subtract the two lists and convert to dictionary
+ return list(set(full_cfg) - set(rejection_cfg))
diff --git a/tfm_ci_pylib/tfm_builder.py b/tfm_ci_pylib/tfm_builder.py
new file mode 100644
index 0000000..07ed776
--- /dev/null
+++ b/tfm_ci_pylib/tfm_builder.py
@@ -0,0 +1,378 @@
+#!/usr/bin/env python3
+
+""" tfm_builder.py:
+
+ Build wrapping class that builds a specific tfm configuration """
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+__author__ = "Minos Galanakis"
+__email__ = "minos.galanakis@linaro.org"
+__project__ = "Trusted Firmware-M Open CI"
+__status__ = "stable"
+__version__ = "1.0"
+
+import os
+from .utils import *
+import shutil
+from .structured_task import structuredTask
+
+
+class TFM_Builder(structuredTask):
+ """ Wrap around tfm cmake system and spawn a thread to build the project.
+ """
+ _tfb_build_params = ["TARGET_PLATFORM",
+ "COMPILER",
+ "PROJ_CONFIG",
+ "CMAKE_BUILD_TYPE",
+ "WITH_MCUBOOT"
+ ]
+
+ _tfb_build_template = ("cmake -G \"Unix Makefiles\" -DPROJ_CONFIG=`"
+ "readlink -f %(PROJ_CONFIG)s.cmake` "
+ "-DTARGET_PLATFORM=%(TARGET_PLATFORM)s "
+ "-DCOMPILER=%(COMPILER)s "
+ "-DCMAKE_BUILD_TYPE=%(CMAKE_BUILD_TYPE)s "
+ "-DBL2=%(WITH_MCUBOOT)s "
+ "%(TFM_ROOT)s")
+
+ def __init__(self,
+ name, # Proccess name
+ tfm_dir, # TFM root directory
+ work_dir, # Current working directory(ie logs)
+ cfg_dict, # Input config dictionary of the following form
+ # input_dict = {"PROJ_CONFIG": "ConfigRegression",
+ # "TARGET_PLATFORM": "MUSCA_A",
+ # "COMPILER": "ARMCLANG",
+ # "CMAKE_BUILD_TYPE": "Debug"}
+ install=False, # Install library after build
+ build_threads=4, # Number of CPU thrads used in build
+ silent=False): # Silence stdout ouptut
+
+ self._tfb_cfg = cfg_dict
+ self._tfb_build_threads = build_threads
+ self._tfb_install = install
+ self._tfb_silent = silent
+ self._tfb_binaries = []
+
+ # Required by other methods, always set working directory first
+ self._tfb_work_dir = os.path.abspath(os.path.expanduser(work_dir))
+
+ self._tfb_tfm_dir = os.path.abspath(os.path.expanduser(tfm_dir))
+ # Entries will be filled after sanity test on cfg_dict dring pre_exec
+ self._tfb_build_dir = None
+ self._tfb_log_f = None
+ super(TFM_Builder, self).__init__(name=name)
+
+ def mute(self):
+ self._tfb_silent = True
+
+ def log(self):
+ """ Print and return the contents of log file """
+ with open(self._tfb_log_f, "r") as F:
+ log = F.read()
+ print(log)
+ return log
+
+ def report(self):
+ """Return the report on the job """
+ return self.unstash("Build Report")
+
+ def pre_eval(self):
+ """ Tests that need to be run in set-up state """
+
+ # Test that all required entries exist in config
+ diff = list(set(self._tfb_build_params) - set(self._tfb_cfg.keys()))
+ if diff:
+ print("Cound't find require build entry: %s in config" % diff)
+ return False
+ # TODO check validity of passed config values
+ # TODO test detection of srec
+ # self.srec_path = shutil.which("srec_cat")
+ return True
+
+ def pre_exec(self, eval_ret):
+ """ Create all required directories, files if they do not exist """
+
+ self._tfb_build_dir = os.path.join(self._tfb_work_dir,
+ self.get_name())
+ # Ensure we have a clean build directory
+ shutil.rmtree(self._tfb_build_dir, ignore_errors=True)
+
+ self._tfb_cfg["TFM_ROOT"] = self._tfb_tfm_dir
+
+ # Append the path for the config
+ self._tfb_cfg["PROJ_CONFIG"] = os.path.join(self._tfb_tfm_dir,
+ self._tfb_cfg[("PROJ_"
+ "CONFIG")])
+
+ # Log will be placed in work directory, named as the build dir
+ self._tfb_log_f = "%s.log" % self._tfb_build_dir
+
+ # Confirm that the work/build directory exists
+ for p in [self._tfb_work_dir, self._tfb_build_dir]:
+ if not os.path.exists(p):
+ os.makedirs(p)
+
+ # Calcuate a list of expected binaries
+ binaries = []
+
+ # If install is asserted pick the iems from the appropriate location
+ if self._tfb_install:
+
+ fvp_path = os.path.join(self._tfb_build_dir,
+ "install", "outputs", "fvp")
+ platform_path = os.path.join(self._tfb_build_dir,
+ "install",
+ "outputs",
+ self._tfb_cfg["TARGET_PLATFORM"])
+
+ # Generate a list of binaries included in both directories
+ common_bin_list = ["tfm_%s.%s" % (s, e) for s in ["s", "ns"]
+ for e in ["bin", "axf"]]
+ if self._tfb_cfg["WITH_MCUBOOT"]:
+ common_bin_list += ["mcuboot.%s" % e for e in ["bin", "axf"]]
+
+ # When building with bootloader extra binaries are expected
+ binaries += [os.path.join(platform_path, b) for b in
+ ["tfm_sign.bin",
+ "tfm_full.bin"]]
+ binaries += [os.path.join(fvp_path, b) for b in
+ ["tfm_s_ns_concatenated.bin",
+ "tfm_s_ns_signed.bin"]]
+
+ binaries += [os.path.join(p, b) for p in [fvp_path, platform_path]
+ for b in common_bin_list]
+
+ # Add Musca required binaries
+ if self._tfb_cfg["TARGET_PLATFORM"] == "MUSCA_A":
+ binaries += [os.path.join(platform_path,
+ "musca_firmware.hex")]
+
+ self._tfb_binaries = binaries
+
+ else:
+ binaries += [os.path.join(self._tfb_build_dir, "app", "tfm_ns")]
+ if "ConfigCoreTest" in self._tfb_build_dir:
+ binaries += [os.path.join(self._tfb_build_dir,
+ "unit_test", "tfm_s")]
+ else:
+ binaries += [os.path.join(self._tfb_build_dir, "app",
+ "secure_fw", "tfm_s")]
+ if self._tfb_cfg["WITH_MCUBOOT"]:
+ binaries += [os.path.join(self._tfb_build_dir,
+ "bl2", "ext", "mcuboot", "mcuboot")]
+
+ ext = ['.bin', '.axf']
+ self._tfb_binaries = ["%s%s" % (n, e) for n in binaries
+ for e in ext]
+
+ # Add Musca required binaries
+ if self._tfb_cfg["TARGET_PLATFORM"] == "MUSCA_A":
+ self._tfb_binaries += [os.path.join(self._tfb_build_dir,
+ "tfm_sign.bin")]
+ self._tfb_binaries += [os.path.join(self._tfb_build_dir,
+ "musca_firmware.hex")]
+
+ def get_binaries(self,
+ bootl=None,
+ bin_s=None,
+ bin_ns=None,
+ bin_sign=None,
+ filt=None):
+ """ Return the absolute location of binaries (from config)
+ if they exist. Can add a filter parameter which will only
+ consider entries with /filter/ in their path as a directory """
+ ret_boot = None
+ ret_bin_ns = None
+ ret_bin_s = None
+ ret_bin_sign = None
+
+ # Apply filter as a /filter/ string to the binary list
+ filt = "/" + filt + "/" if filter else None
+ binaries = list(filter(lambda x: filt in x, self._tfb_binaries)) \
+ if filt else self._tfb_binaries
+
+ for obj_file in binaries:
+ fname = os.path.split(obj_file)[-1]
+ if bootl:
+ if fname == bootl:
+ ret_boot = obj_file
+ continue
+ if bin_s:
+ if fname == bin_s:
+ ret_bin_s = obj_file
+ continue
+
+ if bin_ns:
+ if fname == bin_ns:
+ ret_bin_ns = obj_file
+ continue
+ if bin_sign:
+ if fname == bin_sign:
+ ret_bin_sign = obj_file
+ continue
+ return [ret_boot, ret_bin_s, ret_bin_ns, ret_bin_sign]
+
+ def task_exec(self):
+ """ Main tasks """
+
+ # Mark proccess running as status
+ self.set_status(-1)
+ # Go to build directory
+ os.chdir(self._tfb_build_dir)
+ # Compile the build commands
+ cmake_cmd = self._tfb_build_template % self._tfb_cfg
+ build_cmd = "cmake --build ./ -- -j %s" % self._tfb_build_threads
+
+ # Pass the report to later stages
+ rep = {"build_cmd": "%s" % build_cmd,
+ "cmake_cmd": "%s" % cmake_cmd}
+ self.stash("Build Report", rep)
+
+ # Calll camke to configure the project
+ if not subprocess_log(cmake_cmd,
+ self._tfb_log_f,
+ prefix=cmake_cmd,
+ silent=self._tfb_silent):
+ # Build it
+ if subprocess_log(build_cmd,
+ self._tfb_log_f,
+ append=True,
+ prefix=build_cmd,
+ silent=self._tfb_silent):
+ raise Exception("Build Failed please check log: %s" %
+ self._tfb_log_f)
+ else:
+ raise Exception("Cmake Failed please check log: %s" %
+ self._tfb_log_f)
+
+ if self._tfb_install:
+ install_cmd = "cmake --build ./ -- -j install"
+ if subprocess_log(install_cmd,
+ self._tfb_log_f,
+ append=True,
+ prefix=install_cmd,
+ silent=self._tfb_silent):
+ raise Exception(("Make install Failed."
+ " please check log: %s") % self._tfb_log_f)
+ if self._tfb_cfg["TARGET_PLATFORM"] == "MUSCA_A":
+ boot_f, s_bin, ns_bin, sns_signed_bin = self.get_binaries(
+ bootl="mcuboot.bin",
+ bin_s="tfm_s.bin",
+ bin_ns="tfm_ns.bin",
+ bin_sign="tfm_sign.bin",
+ filt="MUSCA_A")
+ self.convert_to_hex(boot_f, sns_signed_bin)
+ self._t_stop()
+
+ def sign_img(self, secure_bin, non_secure_bin):
+ """Join a secure and non secure image and sign them"""
+
+ imgtool_dir = os.path.join(self._tfb_tfm_dir,
+ "bl2/ext/mcuboot/scripts/")
+ flash_layout = os.path.join(self._tfb_tfm_dir,
+ "platform/ext/target/musca_a/"
+ "partition/flash_layout.h")
+ sign_cert = os.path.join(self._tfb_tfm_dir,
+ "bl2/ext/mcuboot/root-rsa-2048.pem")
+ sns_unsigned_bin = os.path.join(self._tfb_build_dir,
+ "sns_unsigned.bin")
+ sns_signed_bin = os.path.join(self._tfb_build_dir, "sns_signed.bin")
+
+ # Early versions of the tool hard relative imports, run from its dir
+ os.chdir(imgtool_dir)
+ assemble_cmd = ("python3 assemble.py -l %(layout)s -s %(s)s "
+ "-n %(ns)s -o %(sns)s") % {"layout": flash_layout,
+ "s": secure_bin,
+ "ns": non_secure_bin,
+ "sns": sns_unsigned_bin
+ }
+ sign_cmd = ("python3 imgtool.py sign -k %(cert)s --align 1 -v "
+ "1.0 -H 0x400 --pad 0x30000 "
+ "%(sns)s %(sns_signed)s") % {"cert": sign_cert,
+ "sns": sns_unsigned_bin,
+ "sns_signed": sns_signed_bin
+ }
+ run_proccess(assemble_cmd)
+ run_proccess(sign_cmd)
+ # Return to build directory
+ os.chdir(self._tfb_build_dir)
+ return sns_signed_bin
+
+ def convert_to_hex(self,
+ boot_bin,
+ sns_signed_bin,
+ qspi_base=0x200000,
+ boot_size=0x10000):
+ """Convert a signed image to an intel hex format with mcuboot """
+ if self._tfb_install:
+ platform_path = os.path.join(self._tfb_build_dir,
+ "install",
+ "outputs",
+ self._tfb_cfg["TARGET_PLATFORM"])
+ firmware_hex = os.path.join(platform_path, "musca_firmware.hex")
+ else:
+ firmware_hex = os.path.join(self._tfb_build_dir,
+ "musca_firmware.hex")
+
+ img_offset = qspi_base + boot_size
+ merge_cmd = ("srec_cat %(boot)s -Binary -offset 0x%(qspi_offset)x "
+ "%(sns_signed)s -Binary -offset 0x%(img_offset)x "
+ "-o %(hex)s -Intel") % {"boot": boot_bin,
+ "sns_signed": sns_signed_bin,
+ "hex": firmware_hex,
+ "qspi_offset": qspi_base,
+ "img_offset": img_offset
+ }
+ run_proccess(merge_cmd)
+ return
+
+ def post_eval(self):
+ """ Verify that the artefacts exist """
+ print("%s Post eval" % self.get_name())
+
+ ret_eval = False
+ rep = self.unstash("Build Report")
+ missing_binaries = list(filter(lambda x: not os.path.isfile(x),
+ self._tfb_binaries))
+
+ if len(missing_binaries):
+ print("ERROR: Could not locate the following binaries:")
+ print("\n".join(missing_binaries))
+
+ # Update the artifacts to not include missing ones
+ artf = [n for n in self._tfb_binaries if n not in missing_binaries]
+ # TODO update self._tfb_binaries
+ ret_eval = False
+ else:
+ print("SUCCESS: Produced binaries:")
+ print("\n".join(self._tfb_binaries))
+ ret_eval = True
+
+ artf = self._tfb_binaries
+
+ # Add artefact related information to report
+ rep["log"] = self._tfb_log_f
+ rep["missing_artefacts"] = missing_binaries
+ rep["artefacts"] = artf
+
+ rep["status"] = "Success" if ret_eval else "Failed"
+ self.stash("Build Report", rep)
+ return ret_eval
+
+ def post_exec(self, eval_ret):
+ """ """
+
+ if eval_ret:
+ print("TFM Builder %s was Successful" % self.get_name())
+ else:
+ print("TFM Builder %s was UnSuccessful" % self.get_name())
diff --git a/tfm_ci_pylib/utils.py b/tfm_ci_pylib/utils.py
new file mode 100755
index 0000000..7d1ca46
--- /dev/null
+++ b/tfm_ci_pylib/utils.py
@@ -0,0 +1,285 @@
+#!/usr/bin/env python3
+
+""" utils.py:
+
+ various simple and commonly used methods and classes shared by the scripts
+ in the CI environment """
+
+from __future__ import print_function
+
+__copyright__ = """
+/*
+ * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ *
+ */
+ """
+__author__ = "Minos Galanakis"
+__email__ = "minos.galanakis@linaro.org"
+__project__ = "Trusted Firmware-M Open CI"
+__status__ = "stable"
+__version__ = "1.0"
+
+import os
+import sys
+import yaml
+import argparse
+import json
+import itertools
+from collections import OrderedDict, namedtuple
+from subprocess import Popen, PIPE, STDOUT
+
+
+def detect_python3():
+ """ Return true if script is run with Python3 interpreter """
+
+ return sys.version_info > (3, 0)
+
+
+def print_test_dict(data_dict,
+ pad_space=80,
+ identation=5,
+ titl="Summary",
+ pad_char="*"):
+
+ """ Configurable print formatter aimed for dictionaries of the type
+ {"TEST NAME": "RESULT"} used in CI systems. It will also return
+ the string which is printing """
+
+ # Calculate pad space bewteen variables x, y t achieve alignment on y
+ # taking into consideration a maximum aligment boundary p and
+ # possible indentation i
+ def flex_pad(x, y, p, i):
+ return " " * (p - i * 2 - len(x) - len(y)) + "-> "
+
+ # Calculate the padding for the dataset
+ tests = [k + flex_pad(k,
+ v,
+ pad_space,
+ identation) + v for k, v in data_dict.items()]
+
+ # Add the identation
+ tests = map(lambda x: " " * identation + x, tests)
+
+ # Convert to string
+ tests = "\n".join(tests)
+
+ # Calcuate the top header padding ceiling any rounding errors
+ hdr_pad = (pad_space - len(titl) - 3) / 2
+
+ if detect_python3():
+ hdr_pad = int(hdr_pad)
+
+ # Generate a print formatting dictionary
+ print_dict = {"pad0": pad_char * (hdr_pad),
+ "pad1": pad_char * (hdr_pad + 1 if len(titl) % 2
+ else hdr_pad),
+ "sumry": tests,
+ "pad2": pad_char * pad_space,
+ "titl": titl}
+
+ # Compose & print the report
+ r = "\n%(pad0)s %(titl)s %(pad1)s\n\n%(sumry)s\n\n%(pad2)s\n" % print_dict
+ print(r)
+ return r
+
+
+def print_test(t_name=None, t_list=None, status="failed", tname="Tests"):
+ """ Print a list of tests in a stuctured ascii table format """
+
+ gfx_line1 = "=" * 80
+ gfx_line2 = "\t" + "-" * 70
+ if t_name:
+ print("%(line)s\n%(name)s\n%(line)s" % {"line": gfx_line1,
+ "name": t_name})
+ print("%s %s:" % (tname, status))
+ print(gfx_line2 + "\n" +
+ "\n".join(["\t| %(key)s%(pad)s|\n%(line)s" % {
+ "key": n,
+ "pad": (66 - len(n)) * " ",
+ "line": gfx_line2} for n in t_list]))
+
+
+def test(test_list,
+ test_dict,
+ test_name="TF-M Test",
+ pass_text=["PASSED", "PRESENT"],
+ error_on_failed=True,
+ summary=True):
+
+ """ Using input of a test_lst and a test results dictionary in the format
+ of test_name: resut key-value pairs, test() method will verify that Every
+ single method in the test_list has been tested and passed. Pass and Failed,
+ status tests can be overriden and error_on_failed flag, exits the script
+ with failure if a single test fails or is not detected. Returns a json
+ containing status and fields for each test passed/failed/missing, if error
+ on failed is not set.
+ """
+
+ t_report = {"name": test_name,
+ "success": None,
+ "passed": [],
+ "failed": [],
+ "missing": []}
+ # Clean-up tests that are not requested by test_list
+ test_dict = {k: v for k, v in test_dict.items() if k in test_list}
+
+ # Calculate the difference of the two sets to find missing tests
+ t_report["missing"] = list(set(test_list) - set(test_dict.keys()))
+
+ # Sor the items into the apropriate lists (failed or passed)
+ # based on their status.
+ for k, v in test_dict.items():
+ # print(k, v)
+ key = "passed" if v in pass_text else "failed"
+ t_report[key] += [k]
+
+ # For the test to pass every singe test in test_list needs to be present
+ # and be in the passed list
+ if len(test_list) == len(t_report["passed"]):
+ t_report["success"] = True
+ else:
+ t_report["success"] = False
+
+ # Print a summary
+ if summary:
+ if t_report["passed"]:
+ print_test(test_name, t_report["passed"], status="passed")
+ if t_report["missing"]:
+ print_test(test_name, t_report["missing"], status="missing")
+ if t_report["failed"]:
+ print_test(test_name, t_report["failed"], status="Failed")
+
+ print("\nTest %s has %s!" % (t_report["name"],
+ " been successful" if t_report["success"]
+ else "failed"))
+ print("-" * 80)
+ if error_on_failed:
+ syscode = 0 if t_report["success"] else 1
+ sys.exit(syscode)
+ return t_report
+
+
+def save_json(f_name, data_object):
+ """ Save object to json file """
+
+ with open(f_name, "w") as F:
+ F.write(json.dumps(data_object, indent=2))
+
+
+def save_dict_json(f_name, data_dict, sort_list=None):
+ """ Save a dictionary object to file with optional sorting """
+
+ if sort_list:
+ data_object = (sort_dict(data_dict, sort_list))
+ save_json(f_name, data_object)
+
+
+def sort_dict(config_dict, sort_order_list=None):
+ """ Create a fixed order disctionary out of a config dataset """
+
+ if sort_order_list:
+ ret = OrderedDict([(k, config_dict[k]) for k in sort_order_list])
+ else:
+ ret = OrderedDict([(k, config_dict[k]) for k in sorted(config_dict)])
+ return ret
+
+
+def load_json(f_name):
+ """ Load object from json file """
+
+ with open(f_name, "r") as F:
+ try:
+ return json.loads(F.read())
+ except ValueError as exc:
+ print("No JSON object could be decoded from file: %s" % f_name)
+ except IOError:
+ print("Error opening file: %s" % f_name)
+ raise Exception("Failed to load file")
+
+
+def load_yaml(f_name):
+
+ # Parse command line arguments to override config
+ with open(f_name, "r") as F:
+ try:
+ return yaml.load(F.read())
+ except yaml.YAMLError as exc:
+ print("Error parsing file: %s" % f_name)
+ except IOError:
+ print("Error opening file: %s" % f_name)
+ raise Exception("Failed to load file")
+
+
+def subprocess_log(cmd, log_f, prefix=None, append=False, silent=False):
+ """ Run a command as subproccess an log the output to stdout and fileself.
+ If prefix is spefified it will be added as the first line in file """
+
+ with open(log_f, 'a' if append else "w") as F:
+ if prefix:
+ F.write(prefix + "\n")
+ pcss = Popen(cmd,
+ stdout=PIPE,
+ stderr=STDOUT,
+ shell=True,
+ env=os.environ)
+ for line in pcss.stdout:
+ if detect_python3():
+ line = line.decode("utf-8")
+ if not silent:
+ sys.stdout.write(line)
+ F.write(line)
+ pcss.communicate()
+ return pcss.returncode
+ return
+
+
+def run_proccess(cmd):
+ """ Run a command as subproccess an log the output to stdout and file.
+ If prefix is spefified it will be added as the first line in file """
+
+ pcss = Popen(cmd,
+ stdout=PIPE,
+ stderr=PIPE,
+ shell=True,
+ env=os.environ)
+ pcss.communicate()
+ return pcss.returncode
+
+
+def list_chunks(l, n):
+ """ Yield successive n-sized chunks from l. """
+
+ for i in range(0, len(l), n):
+ yield l[i:i + n]
+
+
+def export_config_map(config_m, dir=None):
+ """ Will export a dictionary of configurations to a group of JSON files """
+
+ _dir = dir if dir else os.getcwd()
+ for _cname, _cfg in config_m.items():
+ _cname = _cname.lower()
+ _fname = os.path.join(_dir, _cname + ".json")
+ print("Exporting config %s" % _fname)
+ save_json(_fname, _cfg)
+
+
+def gen_cfg_combinations(name, categories, *args):
+ """ Create a list of named tuples of `name`, with elements defined in a
+ space separated string `categories` and equal ammount of lists for said
+ categories provided as arguments. Order of arguments should match the
+ order of the categories lists """
+
+ build_config = namedtuple(name, categories)
+ return [build_config(*x) for x in itertools.product(*args)]
+
+
+def get_cmd_args(descr="", parser=None):
+ """ Parse command line arguments """
+ # Parse command line arguments to override config
+
+ if not parser:
+ parser = argparse.ArgumentParser(description=descr)
+ return parser.parse_args()