| import argparse |
| import time |
| from enum import Enum |
| from typing import Dict, List, Any |
| |
| import cc_logger |
| import os |
| import json |
| import requests as requests |
| from parsel import Selector |
| |
| |
| class Metrics(Enum): |
| LINES = 1 |
| FUNCTIONS = 2 |
| BRANCHES = 3 |
| FILES = 4 |
| |
| @staticmethod |
| def like(s: str): |
| s = s or '' |
| for m in Metrics: |
| if m.name.startswith(s.strip().upper()): |
| return m |
| return None |
| |
| |
| logger = cc_logger.logger |
| |
| |
| def to_(f, s, pos=0, default=None): |
| """ |
| Function to return a conversion from string to a type given by function f |
| |
| :param f: Function used to convert the string |
| :param s: String to be converted |
| :param pos: The string is split and this is the position within the |
| resulting array where resides the string |
| :param default: Default value if conversion cannot be made |
| :return: Converted string value |
| """ |
| r = None |
| try: |
| r = f(s.split()[pos]) |
| except (ValueError, IndexError): |
| if default is not None: |
| return default |
| return r |
| |
| |
| class ParseCodeCoverageHTMLReport(object): |
| """ |
| Class used to scrape information from a LCOV report to be written to a |
| JSON file in a flat structure to be read and uploaded to a custom DB |
| """ |
| |
| def __init__(self, args): |
| self.args = args |
| self.ci_url = args.ci_url |
| self.lcov_path = args.lcov_path |
| self.url = f'{self.ci_url}{self.lcov_path}' |
| self.ci_type = args.ci_type |
| self.json_file = args.json_file |
| self.report = None |
| |
| def get(self): |
| logger.info(f'Collecting from {self.url}...') |
| self.report = ParseCodeCoverageHTMLReport.process(self.url) |
| if not self.report: |
| return None |
| _metadata = self._metadata() |
| self.report['metadata'].update(_metadata) |
| return self.report |
| |
| def save_json(self, report=None): |
| if report is not None: |
| self.report = report |
| if self.report is None: |
| self.report = self.get() |
| if self.report: |
| with open(self.json_file, 'w', encoding='utf-8') as f: |
| json.dump(self.report, f, ensure_ascii=False, indent=4) |
| return True |
| return False |
| |
| def _metadata(self): |
| metadata = {'uri': self.ci_url, 'ci_type': self.ci_type} |
| if self.args.metadata: |
| metadata.update(json.loads(self.args.metadata)) |
| return metadata |
| |
| FIRST_LEVEL = True |
| LCOV_VERSION = "1.15" |
| |
| @staticmethod |
| def process(url, parent=""): |
| """ |
| Static method used to extract the summary and table information from |
| the LCOV report deployed at the given url |
| |
| :param url: URL where the LCOV report resides |
| :param parent: Parent folder for the LCOV report. Empty if at the |
| first/root level |
| :return: List containing dictionaries for every file with the |
| corresponding metrics/results |
| """ |
| |
| def _metadata() -> {}: |
| date_time = selector. \ |
| xpath("//td[contains(@class, 'headerItem') and text() = " |
| "'Date:']/following-sibling::td[1 and contains(" |
| "@class, 'headerValue')]/text()").get() |
| lcov_version = selector. \ |
| xpath("//td[contains(@class, 'versionInfo')]/a/text()").get() |
| metadata = {'datetime': date_time, |
| 'lcov_version': lcov_version.split()[-1], |
| 'root_url_report': url} |
| return metadata |
| |
| def _summary() -> [{}]: |
| summary = {"Directory": "", "Parent": parent} |
| result_cols = selector. \ |
| xpath('//td[@class="headerCovTableHead"]/text()').getall() |
| for metric in Metrics: |
| metric_sel = selector. \ |
| xpath(f"//td[contains(@class, 'headerItem') " |
| f"and text() = '{metric.name.title()}:']") |
| if not metric_sel: |
| continue |
| results = metric_sel.xpath( |
| "./following-sibling::td[1 and contains" |
| "(@class, 'headerCovTableEntry')]/text()").getall() |
| for index, result_col in enumerate(result_cols): |
| summary[f'{metric.name.title()}{result_col}'] = \ |
| to_(float, results[index], default=-1) |
| return [summary] |
| |
| def _table() -> [{}]: |
| table = [] |
| arr = {} |
| headers = selector. \ |
| xpath('//td[@class="tableHead"]/text()').getall() |
| sub_headers = [j for i in headers if (j := i.title().strip()) in [ |
| 'Total', 'Hit']] |
| file_type = headers[0].strip() |
| metric_headers = [metric.name.title() for h in headers |
| if (metric := Metrics.like(h.split()[0]))] |
| rows = selector.xpath("//td[contains(@class, 'coverFile')]") |
| for row in rows: |
| record = {file_type: row.xpath("./a/text()").get(), |
| 'Parent': parent} |
| percentage = row.xpath( |
| "./following-sibling::td[1 and " |
| "contains(@class, 'coverPer')]/text()").getall() |
| hit_total = [v.root.text or '' for v in |
| row.xpath("./following-sibling::td[1 and " |
| "contains(@class, 'coverNum')]")] |
| for index, header in enumerate(metric_headers): |
| record[f'{header}Coverage'] = to_(float, percentage[index], |
| default=-1) |
| if ParseCodeCoverageHTMLReport.LCOV_VERSION \ |
| in ["1.14", "1.15", "1.16"]: |
| arr['Hit'], arr['Total'] = ( |
| hit_total[index].split("/")) |
| else: |
| arr[sub_headers[2 * index]], arr[ |
| sub_headers[2 * index + 1]], *rest = ( |
| hit_total[2 * index:]) |
| record[f'{header}Hit'] = to_(int, arr['Hit'], default=0) |
| record[f'{header}Total'] = to_(int, arr['Total'], default=0) |
| table.append(record) |
| if file_type.upper().strip() == "DIRECTORY": |
| table += ParseCodeCoverageHTMLReport. \ |
| process(f'{os.path.dirname(url)}' |
| f'/{row.xpath("./a/@href").get()}', |
| parent=record[file_type]) |
| return table |
| |
| url = url |
| parent = parent |
| req = requests.get(url) |
| if req.status_code != 200: |
| logger.warning(f"Url '{url}' return status code " |
| f"{req.status_code}, returning without collecting " |
| f"data...") |
| return [] |
| text = req.text |
| selector = Selector(text=text) |
| metadata = None |
| if ParseCodeCoverageHTMLReport.FIRST_LEVEL: |
| ParseCodeCoverageHTMLReport.FIRST_LEVEL = False |
| metadata = _metadata() |
| if 'lcov_version' in metadata: |
| ParseCodeCoverageHTMLReport.LCOV_VERSION = \ |
| metadata['lcov_version'] |
| data: [{}] = _summary() + _table() |
| if metadata is not None: |
| ParseCodeCoverageHTMLReport.FIRST_LEVEL = True |
| return {'metadata': metadata, 'records': data} |
| else: |
| return data |
| |
| |
| help = """ |
| Collects data (metrics and results) from lcov report and write it to a json |
| file. |
| |
| The data might be collected in two levels: |
| - Directory level |
| - Filename level |
| """ |
| |
| |
| def main(): |
| parser = argparse. \ |
| ArgumentParser(epilog=help, |
| formatter_class=argparse.RawTextHelpFormatter) |
| parser.add_argument('--ci-url', help='CI url path including job', |
| required=True) |
| parser.add_argument('--lcov-path', help='LCOV report path', required=True) |
| parser.add_argument('--ci-type', |
| help='CI type, either Jenkins (default) or Gitlab', |
| default='Jenkins', |
| choices=["Jenkins", "Gitlab"]) |
| parser.add_argument('--json-file', |
| help='Path and filename of the output JSON file', |
| default="data.json") |
| parser.add_argument("--metadata", |
| metavar="KEY=VALUE", |
| nargs='*', |
| help="Set a number of key-value pairs as metadata " |
| "If a value contains spaces, you should define " |
| "it with double quotes: " + 'key="value with ' |
| 'spaces".') |
| args = parser.parse_args() |
| return ParseCodeCoverageHTMLReport(args).save_json() |
| |
| |
| if __name__ == '__main__': |
| start_time = time.time() |
| main() |
| elapsed_time = time.time() - start_time |
| print("Elapsed time: {}s".format(elapsed_time)) |