blob: 8fdec2a93c0dfd9552a4bec7372724b4d919424a [file] [log] [blame]
Saul Romeroee43b162024-05-02 10:53:50 +00001import argparse
2import time
3from enum import Enum
4from typing import Dict, List, Any
5
6import cc_logger
7import os
8import json
9import requests as requests
10from parsel import Selector
11
12
13class Metrics(Enum):
14 LINES = 1
15 FUNCTIONS = 2
16 BRANCHES = 3
17 FILES = 4
18
19 @staticmethod
20 def like(s: str):
21 s = s or ''
22 for m in Metrics:
23 if m.name.startswith(s.strip().upper()):
24 return m
25 return None
26
27
28logger = cc_logger.logger
29
30
31def to_(f, s, pos=0, default=None):
32 """
33 Function to return a conversion from string to a type given by function f
34
35 :param f: Function used to convert the string
36 :param s: String to be converted
37 :param pos: The string is split and this is the position within the
38 resulting array where resides the string
39 :param default: Default value if conversion cannot be made
40 :return: Converted string value
41 """
42 r = None
43 try:
44 r = f(s.split()[pos])
45 except (ValueError, IndexError):
46 if default is not None:
47 return default
48 return r
49
50
51class ParseCodeCoverageHTMLReport(object):
52 """
53 Class used to scrape information from a LCOV report to be written to a
54 JSON file in a flat structure to be read and uploaded to a custom DB
55 """
56
57 def __init__(self, args):
58 self.args = args
59 self.ci_url = args.ci_url
60 self.lcov_path = args.lcov_path
61 self.url = f'{self.ci_url}{self.lcov_path}'
62 self.ci_type = args.ci_type
63 self.json_file = args.json_file
64 self.report = None
65
66 def get(self):
67 logger.info(f'Collecting from {self.url}...')
68 self.report = ParseCodeCoverageHTMLReport.process(self.url)
69 if not self.report:
70 return None
71 _metadata = self._metadata()
72 self.report['metadata'].update(_metadata)
73 return self.report
74
75 def save_json(self, report=None):
76 if report is not None:
77 self.report = report
78 if self.report is None:
79 self.report = self.get()
80 if self.report:
81 with open(self.json_file, 'w', encoding='utf-8') as f:
82 json.dump(self.report, f, ensure_ascii=False, indent=4)
83 return True
84 return False
85
86 def _metadata(self):
87 metadata = {'uri': self.ci_url, 'ci_type': self.ci_type}
88 if self.args.metadata:
89 metadata.update(json.loads(self.args.metadata))
90 return metadata
91
92 FIRST_LEVEL = True
93 LCOV_VERSION = "1.15"
94
95 @staticmethod
96 def process(url, parent=""):
97 """
98 Static method used to extract the summary and table information from
99 the LCOV report deployed at the given url
100
101 :param url: URL where the LCOV report resides
102 :param parent: Parent folder for the LCOV report. Empty if at the
103 first/root level
104 :return: List containing dictionaries for every file with the
105 corresponding metrics/results
106 """
107
108 def _metadata() -> {}:
109 date_time = selector. \
110 xpath("//td[contains(@class, 'headerItem') and text() = "
111 "'Date:']/following-sibling::td[1 and contains("
112 "@class, 'headerValue')]/text()").get()
113 lcov_version = selector. \
114 xpath("//td[contains(@class, 'versionInfo')]/a/text()").get()
115 metadata = {'datetime': date_time,
116 'lcov_version': lcov_version.split()[-1],
117 'root_url_report': url}
118 return metadata
119
120 def _summary() -> [{}]:
121 summary = {"Directory": "", "Parent": parent}
122 result_cols = selector. \
123 xpath('//td[@class="headerCovTableHead"]/text()').getall()
124 for metric in Metrics:
125 metric_sel = selector. \
126 xpath(f"//td[contains(@class, 'headerItem') "
127 f"and text() = '{metric.name.title()}:']")
128 if not metric_sel:
129 continue
130 results = metric_sel.xpath(
131 "./following-sibling::td[1 and contains"
132 "(@class, 'headerCovTableEntry')]/text()").getall()
133 for index, result_col in enumerate(result_cols):
134 summary[f'{metric.name.title()}{result_col}'] = \
135 to_(float, results[index], default=-1)
136 return [summary]
137
138 def _table() -> [{}]:
139 table = []
140 arr = {}
141 headers = selector. \
142 xpath('//td[@class="tableHead"]/text()').getall()
143 sub_headers = [j for i in headers if (j := i.title().strip()) in [
144 'Total', 'Hit']]
145 file_type = headers[0].strip()
146 metric_headers = [metric.name.title() for h in headers
147 if (metric := Metrics.like(h.split()[0]))]
148 rows = selector.xpath("//td[contains(@class, 'coverFile')]")
149 for row in rows:
150 record = {file_type: row.xpath("./a/text()").get(),
151 'Parent': parent}
152 percentage = row.xpath(
153 "./following-sibling::td[1 and "
154 "contains(@class, 'coverPer')]/text()").getall()
155 hit_total = [v.root.text or '' for v in
156 row.xpath("./following-sibling::td[1 and "
157 "contains(@class, 'coverNum')]")]
158 for index, header in enumerate(metric_headers):
159 record[f'{header}Coverage'] = to_(float, percentage[index],
160 default=-1)
161 if ParseCodeCoverageHTMLReport.LCOV_VERSION \
162 in ["1.14", "1.15", "1.16"]:
163 arr['Hit'], arr['Total'] = (
164 hit_total[index].split("/"))
165 else:
166 arr[sub_headers[2 * index]], arr[
167 sub_headers[2 * index + 1]], *rest = (
168 hit_total[2 * index:])
169 record[f'{header}Hit'] = to_(int, arr['Hit'], default=0)
170 record[f'{header}Total'] = to_(int, arr['Total'], default=0)
171 table.append(record)
172 if file_type.upper().strip() == "DIRECTORY":
173 table += ParseCodeCoverageHTMLReport. \
174 process(f'{os.path.dirname(url)}'
175 f'/{row.xpath("./a/@href").get()}',
176 parent=record[file_type])
177 return table
178
179 url = url
180 parent = parent
181 req = requests.get(url)
182 if req.status_code != 200:
183 logger.warning(f"Url '{url}' return status code "
184 f"{req.status_code}, returning without collecting "
185 f"data...")
186 return []
187 text = req.text
188 selector = Selector(text=text)
189 metadata = None
190 if ParseCodeCoverageHTMLReport.FIRST_LEVEL:
191 ParseCodeCoverageHTMLReport.FIRST_LEVEL = False
192 metadata = _metadata()
193 if 'lcov_version' in metadata:
194 ParseCodeCoverageHTMLReport.LCOV_VERSION = \
195 metadata['lcov_version']
196 data: [{}] = _summary() + _table()
197 if metadata is not None:
198 ParseCodeCoverageHTMLReport.FIRST_LEVEL = True
199 return {'metadata': metadata, 'records': data}
200 else:
201 return data
202
203
204help = """
205Collects data (metrics and results) from lcov report and write it to a json
206file.
207
208The data might be collected in two levels:
209- Directory level
210- Filename level
211"""
212
213
214def main():
215 parser = argparse. \
216 ArgumentParser(epilog=help,
217 formatter_class=argparse.RawTextHelpFormatter)
218 parser.add_argument('--ci-url', help='CI url path including job',
219 required=True)
220 parser.add_argument('--lcov-path', help='LCOV report path', required=True)
221 parser.add_argument('--ci-type',
222 help='CI type, either Jenkins (default) or Gitlab',
223 default='Jenkins',
224 choices=["Jenkins", "Gitlab"])
225 parser.add_argument('--json-file',
226 help='Path and filename of the output JSON file',
227 default="data.json")
228 parser.add_argument("--metadata",
229 metavar="KEY=VALUE",
230 nargs='*',
231 help="Set a number of key-value pairs as metadata "
232 "If a value contains spaces, you should define "
233 "it with double quotes: " + 'key="value with '
234 'spaces".')
235 args = parser.parse_args()
236 return ParseCodeCoverageHTMLReport(args).save_json()
237
238
239if __name__ == '__main__':
240 start_time = time.time()
241 main()
242 elapsed_time = time.time() - start_time
243 print("Elapsed time: {}s".format(elapsed_time))