Saul Romero | ee43b16 | 2024-05-02 10:53:50 +0000 | [diff] [blame^] | 1 | import argparse |
| 2 | import time |
| 3 | from enum import Enum |
| 4 | from typing import Dict, List, Any |
| 5 | |
| 6 | import cc_logger |
| 7 | import os |
| 8 | import json |
| 9 | import requests as requests |
| 10 | from parsel import Selector |
| 11 | |
| 12 | |
| 13 | class Metrics(Enum): |
| 14 | LINES = 1 |
| 15 | FUNCTIONS = 2 |
| 16 | BRANCHES = 3 |
| 17 | FILES = 4 |
| 18 | |
| 19 | @staticmethod |
| 20 | def like(s: str): |
| 21 | s = s or '' |
| 22 | for m in Metrics: |
| 23 | if m.name.startswith(s.strip().upper()): |
| 24 | return m |
| 25 | return None |
| 26 | |
| 27 | |
| 28 | logger = cc_logger.logger |
| 29 | |
| 30 | |
| 31 | def to_(f, s, pos=0, default=None): |
| 32 | """ |
| 33 | Function to return a conversion from string to a type given by function f |
| 34 | |
| 35 | :param f: Function used to convert the string |
| 36 | :param s: String to be converted |
| 37 | :param pos: The string is split and this is the position within the |
| 38 | resulting array where resides the string |
| 39 | :param default: Default value if conversion cannot be made |
| 40 | :return: Converted string value |
| 41 | """ |
| 42 | r = None |
| 43 | try: |
| 44 | r = f(s.split()[pos]) |
| 45 | except (ValueError, IndexError): |
| 46 | if default is not None: |
| 47 | return default |
| 48 | return r |
| 49 | |
| 50 | |
| 51 | class ParseCodeCoverageHTMLReport(object): |
| 52 | """ |
| 53 | Class used to scrape information from a LCOV report to be written to a |
| 54 | JSON file in a flat structure to be read and uploaded to a custom DB |
| 55 | """ |
| 56 | |
| 57 | def __init__(self, args): |
| 58 | self.args = args |
| 59 | self.ci_url = args.ci_url |
| 60 | self.lcov_path = args.lcov_path |
| 61 | self.url = f'{self.ci_url}{self.lcov_path}' |
| 62 | self.ci_type = args.ci_type |
| 63 | self.json_file = args.json_file |
| 64 | self.report = None |
| 65 | |
| 66 | def get(self): |
| 67 | logger.info(f'Collecting from {self.url}...') |
| 68 | self.report = ParseCodeCoverageHTMLReport.process(self.url) |
| 69 | if not self.report: |
| 70 | return None |
| 71 | _metadata = self._metadata() |
| 72 | self.report['metadata'].update(_metadata) |
| 73 | return self.report |
| 74 | |
| 75 | def save_json(self, report=None): |
| 76 | if report is not None: |
| 77 | self.report = report |
| 78 | if self.report is None: |
| 79 | self.report = self.get() |
| 80 | if self.report: |
| 81 | with open(self.json_file, 'w', encoding='utf-8') as f: |
| 82 | json.dump(self.report, f, ensure_ascii=False, indent=4) |
| 83 | return True |
| 84 | return False |
| 85 | |
| 86 | def _metadata(self): |
| 87 | metadata = {'uri': self.ci_url, 'ci_type': self.ci_type} |
| 88 | if self.args.metadata: |
| 89 | metadata.update(json.loads(self.args.metadata)) |
| 90 | return metadata |
| 91 | |
| 92 | FIRST_LEVEL = True |
| 93 | LCOV_VERSION = "1.15" |
| 94 | |
| 95 | @staticmethod |
| 96 | def process(url, parent=""): |
| 97 | """ |
| 98 | Static method used to extract the summary and table information from |
| 99 | the LCOV report deployed at the given url |
| 100 | |
| 101 | :param url: URL where the LCOV report resides |
| 102 | :param parent: Parent folder for the LCOV report. Empty if at the |
| 103 | first/root level |
| 104 | :return: List containing dictionaries for every file with the |
| 105 | corresponding metrics/results |
| 106 | """ |
| 107 | |
| 108 | def _metadata() -> {}: |
| 109 | date_time = selector. \ |
| 110 | xpath("//td[contains(@class, 'headerItem') and text() = " |
| 111 | "'Date:']/following-sibling::td[1 and contains(" |
| 112 | "@class, 'headerValue')]/text()").get() |
| 113 | lcov_version = selector. \ |
| 114 | xpath("//td[contains(@class, 'versionInfo')]/a/text()").get() |
| 115 | metadata = {'datetime': date_time, |
| 116 | 'lcov_version': lcov_version.split()[-1], |
| 117 | 'root_url_report': url} |
| 118 | return metadata |
| 119 | |
| 120 | def _summary() -> [{}]: |
| 121 | summary = {"Directory": "", "Parent": parent} |
| 122 | result_cols = selector. \ |
| 123 | xpath('//td[@class="headerCovTableHead"]/text()').getall() |
| 124 | for metric in Metrics: |
| 125 | metric_sel = selector. \ |
| 126 | xpath(f"//td[contains(@class, 'headerItem') " |
| 127 | f"and text() = '{metric.name.title()}:']") |
| 128 | if not metric_sel: |
| 129 | continue |
| 130 | results = metric_sel.xpath( |
| 131 | "./following-sibling::td[1 and contains" |
| 132 | "(@class, 'headerCovTableEntry')]/text()").getall() |
| 133 | for index, result_col in enumerate(result_cols): |
| 134 | summary[f'{metric.name.title()}{result_col}'] = \ |
| 135 | to_(float, results[index], default=-1) |
| 136 | return [summary] |
| 137 | |
| 138 | def _table() -> [{}]: |
| 139 | table = [] |
| 140 | arr = {} |
| 141 | headers = selector. \ |
| 142 | xpath('//td[@class="tableHead"]/text()').getall() |
| 143 | sub_headers = [j for i in headers if (j := i.title().strip()) in [ |
| 144 | 'Total', 'Hit']] |
| 145 | file_type = headers[0].strip() |
| 146 | metric_headers = [metric.name.title() for h in headers |
| 147 | if (metric := Metrics.like(h.split()[0]))] |
| 148 | rows = selector.xpath("//td[contains(@class, 'coverFile')]") |
| 149 | for row in rows: |
| 150 | record = {file_type: row.xpath("./a/text()").get(), |
| 151 | 'Parent': parent} |
| 152 | percentage = row.xpath( |
| 153 | "./following-sibling::td[1 and " |
| 154 | "contains(@class, 'coverPer')]/text()").getall() |
| 155 | hit_total = [v.root.text or '' for v in |
| 156 | row.xpath("./following-sibling::td[1 and " |
| 157 | "contains(@class, 'coverNum')]")] |
| 158 | for index, header in enumerate(metric_headers): |
| 159 | record[f'{header}Coverage'] = to_(float, percentage[index], |
| 160 | default=-1) |
| 161 | if ParseCodeCoverageHTMLReport.LCOV_VERSION \ |
| 162 | in ["1.14", "1.15", "1.16"]: |
| 163 | arr['Hit'], arr['Total'] = ( |
| 164 | hit_total[index].split("/")) |
| 165 | else: |
| 166 | arr[sub_headers[2 * index]], arr[ |
| 167 | sub_headers[2 * index + 1]], *rest = ( |
| 168 | hit_total[2 * index:]) |
| 169 | record[f'{header}Hit'] = to_(int, arr['Hit'], default=0) |
| 170 | record[f'{header}Total'] = to_(int, arr['Total'], default=0) |
| 171 | table.append(record) |
| 172 | if file_type.upper().strip() == "DIRECTORY": |
| 173 | table += ParseCodeCoverageHTMLReport. \ |
| 174 | process(f'{os.path.dirname(url)}' |
| 175 | f'/{row.xpath("./a/@href").get()}', |
| 176 | parent=record[file_type]) |
| 177 | return table |
| 178 | |
| 179 | url = url |
| 180 | parent = parent |
| 181 | req = requests.get(url) |
| 182 | if req.status_code != 200: |
| 183 | logger.warning(f"Url '{url}' return status code " |
| 184 | f"{req.status_code}, returning without collecting " |
| 185 | f"data...") |
| 186 | return [] |
| 187 | text = req.text |
| 188 | selector = Selector(text=text) |
| 189 | metadata = None |
| 190 | if ParseCodeCoverageHTMLReport.FIRST_LEVEL: |
| 191 | ParseCodeCoverageHTMLReport.FIRST_LEVEL = False |
| 192 | metadata = _metadata() |
| 193 | if 'lcov_version' in metadata: |
| 194 | ParseCodeCoverageHTMLReport.LCOV_VERSION = \ |
| 195 | metadata['lcov_version'] |
| 196 | data: [{}] = _summary() + _table() |
| 197 | if metadata is not None: |
| 198 | ParseCodeCoverageHTMLReport.FIRST_LEVEL = True |
| 199 | return {'metadata': metadata, 'records': data} |
| 200 | else: |
| 201 | return data |
| 202 | |
| 203 | |
| 204 | help = """ |
| 205 | Collects data (metrics and results) from lcov report and write it to a json |
| 206 | file. |
| 207 | |
| 208 | The data might be collected in two levels: |
| 209 | - Directory level |
| 210 | - Filename level |
| 211 | """ |
| 212 | |
| 213 | |
| 214 | def main(): |
| 215 | parser = argparse. \ |
| 216 | ArgumentParser(epilog=help, |
| 217 | formatter_class=argparse.RawTextHelpFormatter) |
| 218 | parser.add_argument('--ci-url', help='CI url path including job', |
| 219 | required=True) |
| 220 | parser.add_argument('--lcov-path', help='LCOV report path', required=True) |
| 221 | parser.add_argument('--ci-type', |
| 222 | help='CI type, either Jenkins (default) or Gitlab', |
| 223 | default='Jenkins', |
| 224 | choices=["Jenkins", "Gitlab"]) |
| 225 | parser.add_argument('--json-file', |
| 226 | help='Path and filename of the output JSON file', |
| 227 | default="data.json") |
| 228 | parser.add_argument("--metadata", |
| 229 | metavar="KEY=VALUE", |
| 230 | nargs='*', |
| 231 | help="Set a number of key-value pairs as metadata " |
| 232 | "If a value contains spaces, you should define " |
| 233 | "it with double quotes: " + 'key="value with ' |
| 234 | 'spaces".') |
| 235 | args = parser.parse_args() |
| 236 | return ParseCodeCoverageHTMLReport(args).save_json() |
| 237 | |
| 238 | |
| 239 | if __name__ == '__main__': |
| 240 | start_time = time.time() |
| 241 | main() |
| 242 | elapsed_time = time.time() - start_time |
| 243 | print("Elapsed time: {}s".format(elapsed_time)) |