Minos Galanakis | f4ca6ac | 2017-12-11 02:39:21 +0100 | [diff] [blame^] | 1 | #!/usr/bin/env python3 |
| 2 | |
| 3 | """ utils.py: |
| 4 | |
| 5 | various simple and commonly used methods and classes shared by the scripts |
| 6 | in the CI environment """ |
| 7 | |
| 8 | from __future__ import print_function |
| 9 | |
| 10 | __copyright__ = """ |
| 11 | /* |
| 12 | * Copyright (c) 2018-2019, Arm Limited. All rights reserved. |
| 13 | * |
| 14 | * SPDX-License-Identifier: BSD-3-Clause |
| 15 | * |
| 16 | */ |
| 17 | """ |
| 18 | __author__ = "Minos Galanakis" |
| 19 | __email__ = "minos.galanakis@linaro.org" |
| 20 | __project__ = "Trusted Firmware-M Open CI" |
| 21 | __status__ = "stable" |
| 22 | __version__ = "1.0" |
| 23 | |
| 24 | import os |
| 25 | import sys |
| 26 | import yaml |
| 27 | import argparse |
| 28 | import json |
| 29 | import itertools |
| 30 | from collections import OrderedDict, namedtuple |
| 31 | from subprocess import Popen, PIPE, STDOUT |
| 32 | |
| 33 | |
| 34 | def detect_python3(): |
| 35 | """ Return true if script is run with Python3 interpreter """ |
| 36 | |
| 37 | return sys.version_info > (3, 0) |
| 38 | |
| 39 | |
| 40 | def print_test_dict(data_dict, |
| 41 | pad_space=80, |
| 42 | identation=5, |
| 43 | titl="Summary", |
| 44 | pad_char="*"): |
| 45 | |
| 46 | """ Configurable print formatter aimed for dictionaries of the type |
| 47 | {"TEST NAME": "RESULT"} used in CI systems. It will also return |
| 48 | the string which is printing """ |
| 49 | |
| 50 | # Calculate pad space bewteen variables x, y t achieve alignment on y |
| 51 | # taking into consideration a maximum aligment boundary p and |
| 52 | # possible indentation i |
| 53 | def flex_pad(x, y, p, i): |
| 54 | return " " * (p - i * 2 - len(x) - len(y)) + "-> " |
| 55 | |
| 56 | # Calculate the padding for the dataset |
| 57 | tests = [k + flex_pad(k, |
| 58 | v, |
| 59 | pad_space, |
| 60 | identation) + v for k, v in data_dict.items()] |
| 61 | |
| 62 | # Add the identation |
| 63 | tests = map(lambda x: " " * identation + x, tests) |
| 64 | |
| 65 | # Convert to string |
| 66 | tests = "\n".join(tests) |
| 67 | |
| 68 | # Calcuate the top header padding ceiling any rounding errors |
| 69 | hdr_pad = (pad_space - len(titl) - 3) / 2 |
| 70 | |
| 71 | if detect_python3(): |
| 72 | hdr_pad = int(hdr_pad) |
| 73 | |
| 74 | # Generate a print formatting dictionary |
| 75 | print_dict = {"pad0": pad_char * (hdr_pad), |
| 76 | "pad1": pad_char * (hdr_pad + 1 if len(titl) % 2 |
| 77 | else hdr_pad), |
| 78 | "sumry": tests, |
| 79 | "pad2": pad_char * pad_space, |
| 80 | "titl": titl} |
| 81 | |
| 82 | # Compose & print the report |
| 83 | r = "\n%(pad0)s %(titl)s %(pad1)s\n\n%(sumry)s\n\n%(pad2)s\n" % print_dict |
| 84 | print(r) |
| 85 | return r |
| 86 | |
| 87 | |
| 88 | def print_test(t_name=None, t_list=None, status="failed", tname="Tests"): |
| 89 | """ Print a list of tests in a stuctured ascii table format """ |
| 90 | |
| 91 | gfx_line1 = "=" * 80 |
| 92 | gfx_line2 = "\t" + "-" * 70 |
| 93 | if t_name: |
| 94 | print("%(line)s\n%(name)s\n%(line)s" % {"line": gfx_line1, |
| 95 | "name": t_name}) |
| 96 | print("%s %s:" % (tname, status)) |
| 97 | print(gfx_line2 + "\n" + |
| 98 | "\n".join(["\t| %(key)s%(pad)s|\n%(line)s" % { |
| 99 | "key": n, |
| 100 | "pad": (66 - len(n)) * " ", |
| 101 | "line": gfx_line2} for n in t_list])) |
| 102 | |
| 103 | |
| 104 | def test(test_list, |
| 105 | test_dict, |
| 106 | test_name="TF-M Test", |
| 107 | pass_text=["PASSED", "PRESENT"], |
| 108 | error_on_failed=True, |
| 109 | summary=True): |
| 110 | |
| 111 | """ Using input of a test_lst and a test results dictionary in the format |
| 112 | of test_name: resut key-value pairs, test() method will verify that Every |
| 113 | single method in the test_list has been tested and passed. Pass and Failed, |
| 114 | status tests can be overriden and error_on_failed flag, exits the script |
| 115 | with failure if a single test fails or is not detected. Returns a json |
| 116 | containing status and fields for each test passed/failed/missing, if error |
| 117 | on failed is not set. |
| 118 | """ |
| 119 | |
| 120 | t_report = {"name": test_name, |
| 121 | "success": None, |
| 122 | "passed": [], |
| 123 | "failed": [], |
| 124 | "missing": []} |
| 125 | # Clean-up tests that are not requested by test_list |
| 126 | test_dict = {k: v for k, v in test_dict.items() if k in test_list} |
| 127 | |
| 128 | # Calculate the difference of the two sets to find missing tests |
| 129 | t_report["missing"] = list(set(test_list) - set(test_dict.keys())) |
| 130 | |
| 131 | # Sor the items into the apropriate lists (failed or passed) |
| 132 | # based on their status. |
| 133 | for k, v in test_dict.items(): |
| 134 | # print(k, v) |
| 135 | key = "passed" if v in pass_text else "failed" |
| 136 | t_report[key] += [k] |
| 137 | |
| 138 | # For the test to pass every singe test in test_list needs to be present |
| 139 | # and be in the passed list |
| 140 | if len(test_list) == len(t_report["passed"]): |
| 141 | t_report["success"] = True |
| 142 | else: |
| 143 | t_report["success"] = False |
| 144 | |
| 145 | # Print a summary |
| 146 | if summary: |
| 147 | if t_report["passed"]: |
| 148 | print_test(test_name, t_report["passed"], status="passed") |
| 149 | if t_report["missing"]: |
| 150 | print_test(test_name, t_report["missing"], status="missing") |
| 151 | if t_report["failed"]: |
| 152 | print_test(test_name, t_report["failed"], status="Failed") |
| 153 | |
| 154 | print("\nTest %s has %s!" % (t_report["name"], |
| 155 | " been successful" if t_report["success"] |
| 156 | else "failed")) |
| 157 | print("-" * 80) |
| 158 | if error_on_failed: |
| 159 | syscode = 0 if t_report["success"] else 1 |
| 160 | sys.exit(syscode) |
| 161 | return t_report |
| 162 | |
| 163 | |
| 164 | def save_json(f_name, data_object): |
| 165 | """ Save object to json file """ |
| 166 | |
| 167 | with open(f_name, "w") as F: |
| 168 | F.write(json.dumps(data_object, indent=2)) |
| 169 | |
| 170 | |
| 171 | def save_dict_json(f_name, data_dict, sort_list=None): |
| 172 | """ Save a dictionary object to file with optional sorting """ |
| 173 | |
| 174 | if sort_list: |
| 175 | data_object = (sort_dict(data_dict, sort_list)) |
| 176 | save_json(f_name, data_object) |
| 177 | |
| 178 | |
| 179 | def sort_dict(config_dict, sort_order_list=None): |
| 180 | """ Create a fixed order disctionary out of a config dataset """ |
| 181 | |
| 182 | if sort_order_list: |
| 183 | ret = OrderedDict([(k, config_dict[k]) for k in sort_order_list]) |
| 184 | else: |
| 185 | ret = OrderedDict([(k, config_dict[k]) for k in sorted(config_dict)]) |
| 186 | return ret |
| 187 | |
| 188 | |
| 189 | def load_json(f_name): |
| 190 | """ Load object from json file """ |
| 191 | |
| 192 | with open(f_name, "r") as F: |
| 193 | try: |
| 194 | return json.loads(F.read()) |
| 195 | except ValueError as exc: |
| 196 | print("No JSON object could be decoded from file: %s" % f_name) |
| 197 | except IOError: |
| 198 | print("Error opening file: %s" % f_name) |
| 199 | raise Exception("Failed to load file") |
| 200 | |
| 201 | |
| 202 | def load_yaml(f_name): |
| 203 | |
| 204 | # Parse command line arguments to override config |
| 205 | with open(f_name, "r") as F: |
| 206 | try: |
| 207 | return yaml.load(F.read()) |
| 208 | except yaml.YAMLError as exc: |
| 209 | print("Error parsing file: %s" % f_name) |
| 210 | except IOError: |
| 211 | print("Error opening file: %s" % f_name) |
| 212 | raise Exception("Failed to load file") |
| 213 | |
| 214 | |
| 215 | def subprocess_log(cmd, log_f, prefix=None, append=False, silent=False): |
| 216 | """ Run a command as subproccess an log the output to stdout and fileself. |
| 217 | If prefix is spefified it will be added as the first line in file """ |
| 218 | |
| 219 | with open(log_f, 'a' if append else "w") as F: |
| 220 | if prefix: |
| 221 | F.write(prefix + "\n") |
| 222 | pcss = Popen(cmd, |
| 223 | stdout=PIPE, |
| 224 | stderr=STDOUT, |
| 225 | shell=True, |
| 226 | env=os.environ) |
| 227 | for line in pcss.stdout: |
| 228 | if detect_python3(): |
| 229 | line = line.decode("utf-8") |
| 230 | if not silent: |
| 231 | sys.stdout.write(line) |
| 232 | F.write(line) |
| 233 | pcss.communicate() |
| 234 | return pcss.returncode |
| 235 | return |
| 236 | |
| 237 | |
| 238 | def run_proccess(cmd): |
| 239 | """ Run a command as subproccess an log the output to stdout and file. |
| 240 | If prefix is spefified it will be added as the first line in file """ |
| 241 | |
| 242 | pcss = Popen(cmd, |
| 243 | stdout=PIPE, |
| 244 | stderr=PIPE, |
| 245 | shell=True, |
| 246 | env=os.environ) |
| 247 | pcss.communicate() |
| 248 | return pcss.returncode |
| 249 | |
| 250 | |
| 251 | def list_chunks(l, n): |
| 252 | """ Yield successive n-sized chunks from l. """ |
| 253 | |
| 254 | for i in range(0, len(l), n): |
| 255 | yield l[i:i + n] |
| 256 | |
| 257 | |
| 258 | def export_config_map(config_m, dir=None): |
| 259 | """ Will export a dictionary of configurations to a group of JSON files """ |
| 260 | |
| 261 | _dir = dir if dir else os.getcwd() |
| 262 | for _cname, _cfg in config_m.items(): |
| 263 | _cname = _cname.lower() |
| 264 | _fname = os.path.join(_dir, _cname + ".json") |
| 265 | print("Exporting config %s" % _fname) |
| 266 | save_json(_fname, _cfg) |
| 267 | |
| 268 | |
| 269 | def gen_cfg_combinations(name, categories, *args): |
| 270 | """ Create a list of named tuples of `name`, with elements defined in a |
| 271 | space separated string `categories` and equal ammount of lists for said |
| 272 | categories provided as arguments. Order of arguments should match the |
| 273 | order of the categories lists """ |
| 274 | |
| 275 | build_config = namedtuple(name, categories) |
| 276 | return [build_config(*x) for x in itertools.product(*args)] |
| 277 | |
| 278 | |
| 279 | def get_cmd_args(descr="", parser=None): |
| 280 | """ Parse command line arguments """ |
| 281 | # Parse command line arguments to override config |
| 282 | |
| 283 | if not parser: |
| 284 | parser = argparse.ArgumentParser(description=descr) |
| 285 | return parser.parse_args() |