blob: 8713149873d848fb0de028a0d38dcb3bffee3753 [file] [log] [blame]
# !/usr/bin/env python
###############################################################################
# Copyright (c) 2020-2022, ARM Limited and Contributors. All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
###############################################################################
###############################################################################
# FILE: intermediate_layer.py
#
# DESCRIPTION: Creates an intermediate json file with information provided
# by the configuration json file, dwarf signatures and trace
# files.
#
###############################################################################
import os
import re
import glob
import argparse
import subprocess
import json
from argparse import RawTextHelpFormatter
import logging
import time
from typing import Dict
from typing import List
__version__ = "7.0"
# Static map that defines the elf file source type in the intermediate json
ELF_MAP = {
"bl1": 0,
"bl2": 1,
"bl31": 2,
"bl32": 3,
"scp_ram": 10,
"scp_rom": 11,
"mcp_rom": 12,
"mcp_ram": 13,
"secure_hafnium": 14,
"hafium": 15,
"custom_offset": 100
}
def os_command(command, show_command=False):
"""
Function that execute an os command, on fail exit the program
:param command: OS command as string
:param show_command: Optional argument to print the command in stdout
:return: The string output of the os command
"""
try:
if show_command:
print("OS command: {}".format(command))
out = subprocess.check_output(
command, stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError as ex:
raise Exception(
"Exception running command '{}': {}({})".format(
command, ex.output, ex.returncode))
return out.decode("utf8")
def load_stats_from_traces(trace_globs):
"""
Function to process and consolidate statistics from trace files
:param trace_globs: List of trace file patterns
:return: Dictionary with stats from trace files i.e.
{mem address in decimal}=(times executed, inst size)
"""
stats = {}
stat_size = {}
# Make a list of unique trace files
trace_files = []
for tg in trace_globs:
trace_files.extend(glob.glob(tg))
trace_files = set(trace_files)
if not trace_files:
raise Exception("No trace files found for '{}'".format(trace_globs))
# Load stats from the trace files
for trace_file in trace_files:
try:
with open(trace_file, 'r') as f:
for line in f:
data = line.split()
address = int(data[0], 16)
stat = int(data[1])
size = int(data[2])
stat_size[address] = size
if address in stats:
stats[address] += stat
else:
stats[address] = stat
except Exception as ex:
logger.error("@Loading stats from trace files:{}".format(ex))
# Merge the two dicts
for address in stats:
stats[address] = (stats[address], stat_size[address])
return stats
def get_code_sections_for_binary(elf_name):
"""
Function to return the ranges of memory address for sections of code
in the elf file
:param elf_name: Elf binary file name
:return: List of code sections tuples, i.e. (section type, initial
address, end address)
"""
command = """%s -h %s | grep -B 1 CODE | grep -v CODE \
| awk '{print $2" "$4" "$3}'""" % (OBJDUMP, elf_name)
text_out = os_command(command)
sections = text_out.split('\n')
sections.pop()
secs = []
for sec in sections:
try:
d = sec.split()
secs.append((d[0], int(d[1], 16), int(d[2], 16)))
except Exception as ex:
logger.error(
"@Returning memory address code sections:".format(ex))
return secs
def get_executable_ranges_for_binary(elf_name):
"""
Get function ranges from an elf file
:param elf_name: Elf binary file name
:return: List of tuples for ranges i.e. (range start, range end)
"""
# Parse all $x / $d symbols
symbol_table = []
address = None
_type = None
command = r"""%s -s %s | awk '/\$[xatd]/ {print $2" "$8}'""" % (
READELF, elf_name)
text_out = os_command(command)
lines = text_out.split('\n')
lines.pop()
for line in lines:
try:
data = line.split()
address = int(data[0], 16)
_type = 'X' if data[1] in ['$x', '$t', '$a'] else 'D'
except Exception as ex:
logger.error("@Getting executable ranges:".format(ex))
symbol_table.append((address, _type))
# Add markers for end of code sections
sections = get_code_sections_for_binary(elf_name)
for sec in sections:
symbol_table.append((sec[1] + sec[2], 'S'))
# Sort by address
symbol_table = sorted(symbol_table, key=lambda tup: tup[0])
# Create ranges (list of START/END tuples)
ranges = []
range_start = symbol_table[0][0]
rtype = symbol_table[0][1]
for sym in symbol_table:
if sym[1] != rtype:
if rtype == 'X':
# Subtract one because the first address of the
# next range belongs to the next range.
ranges.append((range_start, sym[0] - 1))
range_start = sym[0]
rtype = sym[1]
return ranges
def list_of_functions_for_binary(elf_name: str) -> Dict[str, Dict[str, any]]:
"""
Get an array of the functions in the elf file
:param elf_name: Elf binary file name
:return: An array of function address start, function address end,
function dwarf signature (sources) indexed by function name
"""
_functions = {}
command = "%s -t %s | awk 'NR>4' | sed /^$/d" % (OBJDUMP, elf_name)
symbols_output = os_command(command)
rex = r'([0-9a-fA-F]+) (.{7}) ([^ ]+)[ \t]([0-9a-fA-F]+) (.*)'
symbols = symbols_output.split('\n')[:-1]
for sym in symbols:
try:
symbol_details = re.findall(rex, sym)
symbol_details = symbol_details[0]
if 'F' not in symbol_details[1]:
continue
function_name = symbol_details[4]
# We don't want the .hidden for hidden functions
if function_name.startswith('.hidden '):
function_name = function_name[len('.hidden '):]
if function_name not in _functions:
_functions[function_name] = {'start': symbol_details[0],
'end': symbol_details[3],
'sources': False}
else:
logger.warning("'{}' duplicated in '{}'".format(
function_name,
elf_name))
except Exception as ex:
logger.error("@Listing functions at file {}: {}".format(
elf_name,
ex))
return _functions
def apply_functions_exclude(elf_config, functions):
"""
Remove excluded functions from the list of functions
:param elf_config: Config for elf binary file
:param functions: Array of functions in the binary elf file
:return: Tuple with included and excluded functions
"""
if 'exclude_functions' not in elf_config:
return functions, []
incl = {}
excl = {}
for fname in functions:
exclude = False
for rex in elf_config['exclude_functions']:
if re.match(rex, fname):
exclude = True
excl[fname] = functions[fname]
break
if not exclude:
incl[fname] = functions[fname]
return incl, excl
def remove_workspace(path, workspace):
"""
Get the relative path to a given workspace
:param path: Path relative to the workspace to be returned
:param workspace: Path.
"""
ret = path if workspace is None else os.path.relpath(path, workspace)
return ret
def get_function_line_numbers(source_file: str) -> Dict[str, int]:
"""
Using ctags get all the function names with their line numbers
within the source_file
:return: Dictionary with function name as key and line number as value
"""
command = "ctags -x --c-kinds=f {}".format(source_file)
fln = {}
try:
function_lines = os_command(command).split("\n")
for line in function_lines:
cols = line.split()
if len(cols) < 3:
continue
if cols[1] == "function":
fln[cols[0]] = int(cols[2])
elif cols[1] == "label" and cols[0] == "func":
fln[cols[-1]] = int(cols[2])
except BaseException:
logger.warning("Warning: Can't get all function line numbers from %s" %
source_file)
except Exception as ex:
logger.warning(f"Warning: Unknown error '{ex}' when executing command "
f"'{command}'")
return {}
return fln
class FunctionLineNumbers(object):
"""Helper class used to get a function start line number within
a source code file"""
def __init__(self, workspace: str):
"""
Initialise dictionary to allocate source code files with the
corresponding function start line numbers.
:param workspace: The folder where the source files are deployed
"""
self.filenames = {}
self.workspace = workspace
def get_line_number(self, filename: str, function_name: str) -> int:
if not FUNCTION_LINES_ENABLED:
return 0
if filename not in self.filenames:
newp = os.path.join(self.workspace, filename)
self.filenames[filename] = get_function_line_numbers(newp)
return 0 if function_name not in self.filenames[filename] else \
self.filenames[filename][function_name]
class BinaryParser(object):
"""Class used to create an instance to parse the binary files with a
dwarf signature in order to produce logical information to be matched with
traces and produce a code coverage report"""
def __init__(self, dump: str, function_list: Dict[str, Dict[str, any]],
prefix: str, function_line_numbers: FunctionLineNumbers):
"""
Initialisation of the instance to parse binary files.
:param dump: Binary dump (string) containing assembly code and source
code metadata, i.e. source code location and line number.
:param function_list: Dictionary of functions defined in the binary
dump.
:param prefix: Prefix for every source code file contained in the
binary dump file, usually the workspace (folders) where the source code
files where built.
:param function_line_numbers: Object instance to get a function line
number within a source code file.
"""
self.dump = dump
self.function_list = function_list
self.prefix = prefix
self.function_definition = None
self.function_line_numbers = function_line_numbers
class FunctionBlock(object):
"""Class used to parse and obtain a function block from the
binary dump file that corresponds to a function declaration within
the binary assembly code.
The function block has the following components:
- Function start address in memory (hexadecimal).
- Function name.
- Function code.
"""
def __init__(self, function_group: List[str]):
"""
Create an instance of a function block within a binary dump.
:param function_group: List containing the function start
address, name and code in the function block.
"""
self.start, self.name, self.code = function_group
self.source_file = None
self.function_line_number = None
@staticmethod
def get(dump: str):
"""
Static method generator to extract a function block from the binary
dump.
:param dump: Binary dump (string) that contains the binary file
information.
:return: A FunctionBlock object that is a logical representation
of a function declaration within the binary dump.
"""
function_groups = re.findall(
r"(?s)([0-9a-fA-F]+) <([a-zA-Z0-9_]+)>:\n(.+?)(?=[A-Fa-f0-9]* "
r"<[a-zA-Z0-9_]+>:)", dump, re.DOTALL | re.MULTILINE)
for group in function_groups:
if len(group) != 3:
continue
function_group = list(group)
function_group[-1] += "\n"
yield BinaryParser.FunctionBlock(function_group)
class SourceCodeBlock(object):
"""Class used to represent a source code block of information within
a function block in a binary dump file.
The source code block contains the following components:
- Optional function name where the source code/assembly code is defined.
- Source code file that contains the source code corresponding
to the assembly code.
- Line number within the source code file corresponding to the source
code.
- Assembly code block.
"""
def __init__(self, source_code_block):
"""
Create an instance of a source code block within a function block.
:param source_code_block: Tuple of 4 elements that contains the
components of a source code block.
"""
self.function_name, self.source_file, self.line, self.asm_code \
= source_code_block
def get_assembly_line(self):
"""Getter to return and AssemblyLine instance that corresponds to
a logical representation of an assembly code line contained
within a source code block (assembly code block)"""
return BinaryParser.AssemblyLine.get(self)
class AssemblyLine(object):
"""Class used to represent an assembly code line within an
assembly code block.
The assembly line instruction is formed by the following components:
- Hexadecimal address of the assembly instruction.
- Assembly instruction.
"""
def __init__(self, line):
"""
Create an instance representing an assembly code line within an
assembly code block.
:param line: Tuple of 2 elements [Hexadecimal number,
and assembly code]
"""
self.hex_line_number, self.opcode = line
self.dec_address = int(self.hex_line_number, 16)
@staticmethod
def get(source_code_block):
"""
Static method generator to extract an assembly code line from a
assembly code block.
:param source_code_block: Object that contains the assembly code
within the source code block.
:return: AssemblyLine object.
"""
lines = re.findall(
r"^[\s]+([a-fA-F0-9]+):\t(.+?)\n",
source_code_block.asm_code, re.DOTALL | re.MULTILINE)
for line in lines:
if len(line) != 2:
continue
yield BinaryParser.AssemblyLine(line)
class FunctionDefinition(object):
"""
Class used to handle a function definition i.e. function name, source
code filename and line number where is declared.
"""
def __init__(self, function_name):
"""
Create an instance representing a function definition within a
function code block.
:param function_name: Initial function name
"""
self.function_line_number = None
self.function_name = function_name
self.source_file: str = None
def update_sources(self, source_files, function_line_numbers):
"""
Method to update source files dictionary
:param source_files: Dictionary that contains the representation
of the intermediate layer.
:param function_line_numbers: Object that obtains the start line
number for a function definition inside it source file.
:return:Nothing
"""
source_files.setdefault(self.source_file, {"functions": {},
"lines": {}})
if self.function_name not in \
source_files[self.source_file]["functions"]:
self.function_line_number = \
function_line_numbers.get_line_number(
self.source_file,
self.function_name)
source_files[self.source_file]["functions"][
self.function_name] = {"covered": False,
"line_number":
self.function_line_number}
def get_source_code_block(self, function_block: FunctionBlock):
"""
Generator method to obtain all the source code blocks within a
function block.
:param function_block: FunctionBlock object that contains the code
the source code blocks.
:return: A SourceCodeBlock object.
"""
# When not present the block function name applies
self.function_definition = BinaryParser.FunctionDefinition(
function_block.name)
pattern = r'(?s)(^[a-zA-Z0-9_]+)?(?:\(\):\n)?(^{0}.+?):([0-9]+)[' \
r'^\n]*\n(.+?)(?={0}.+?:[0-9]+.+\n|^[a-zA-Z0-9_]+\(' \
r'\):\n)'.format(self.prefix)
source_code_blocks = re.findall(pattern,
"{}\n{}/:000".format(
function_block.code,
self.prefix),
re.DOTALL |
re.MULTILINE)
for block in source_code_blocks:
if len(block) != 4:
continue
source_code_block = BinaryParser.SourceCodeBlock(block)
if source_code_block.function_name:
# Usually in the first iteration function name is not empty
# and is the function's name block
self.function_definition.function_name = \
source_code_block.function_name
self.function_definition.source_file = remove_workspace(
source_code_block.source_file, self.prefix)
yield source_code_block
def get_function_block(self):
"""Generator method to obtain all the function blocks contained in
the binary dump file.
"""
for function_block in BinaryParser.FunctionBlock.get(self.dump):
# Find out if the function block has C source code filename in
# the function block code
signature_group = re.findall(
r"(?s){}\(\):\n(/.+?):[0-9]+.*(?:\r*\n\n|\n$)".format(
function_block.name), function_block.code,
re.DOTALL | re.MULTILINE)
if not signature_group:
continue # Function does not have dwarf signature (sources)
if function_block.name not in self.function_list:
print("Warning:Function '{}' not found in function list!!!".
format(function_block.name))
continue # Function not found in function list
source_code_file = signature_group[0]
function_block.source_file = remove_workspace(
source_code_file, self.prefix)
function_block.function_line_number = \
self.function_line_numbers.get_line_number(
function_block.source_file, function_block.name)
yield function_block
class IntermediateCodeCoverage(object):
"""Class used to process the trace data along with the dwarf
signature files to produce an intermediate layer in json with
code coverage in assembly and c source code.
"""
def __init__(self, _config, local_workspace):
self._data = {}
self.config = _config
self.local_workspace = local_workspace
self.elfs = self.config['elfs']
# Dictionary with stats from trace files {address}=(times executed,
# inst size)
self.traces_stats = {}
# Dictionary of unique assembly line memory address against source
# file location
# {assembly address} = (opcode, source file location, line number in
# the source file, times executed)
self.asm_lines = {}
# Dictionary of {source file location}=>{'lines': {'covered':Boolean,
# 'elf_index'; {elf index}=>{assembly address}=>(opcode,
# times executed),
# 'functions': {function name}=>is covered(boolean)}
self.source_files_coverage = {}
self.functions = []
# Unique set of elf list of files
self.elf_map = {}
# For elf custom mappings
self.elf_custom = None
def process(self):
"""
Public method to process the trace files and dwarf signatures
using the information contained in the json configuration file.
This method writes the intermediate json file output linking
the trace data and c source and assembly code.
"""
self.source_files_coverage = {}
self.asm_lines = {}
# Initialize for unknown elf files
self.elf_custom = ELF_MAP["custom_offset"]
sources_config = {}
print("Generating intermediate json layer '{}'...".format(
self.config['parameters']['output_file']))
for elf in self.elfs:
# Gather information
elf_name = elf['name']
# Trace data
self.traces_stats = load_stats_from_traces(elf['traces'])
prefix = self.config['parameters']['workspace'] \
if self.config['configuration']['remove_workspace'] else \
None
functions_list = list_of_functions_for_binary(elf_name)
(functions_list, excluded_functions) = apply_functions_exclude(
elf, functions_list)
# Produce code coverage
self.process_binary(elf_name, functions_list, prefix)
sources_config = self.config['parameters']['sources']
# Now check code coverage in the functions with no dwarf signature
# (sources)
nf = {f: functions_list[f] for f in
functions_list if not
functions_list[f]["sources"]}
self.process_fn_no_sources(nf)
# Write to the intermediate json file
data = {"source_files": self.source_files_coverage,
"configuration": {
"sources": sources_config,
"metadata": "" if 'metadata' not in
self.config['parameters'] else
self.config['parameters']['metadata'],
"elf_map": self.elf_map}
}
json_data = json.dumps(data, indent=4, sort_keys=True)
with open(self.config['parameters']['output_file'], "w") as f:
f.write(json_data)
def get_elf_index(self, elf_name: str) -> int:
"""Obtains the elf index and fills the elf_map instance variable"""
if elf_name not in self.elf_map:
if elf_name in ELF_MAP:
self.elf_map[elf_name] = ELF_MAP[elf_name]
else:
self.elf_map[elf_name] = ELF_MAP["custom_offset"]
ELF_MAP["custom_offset"] += 1
return self.elf_map[elf_name]
def process_binary(self, elf_filename: str, function_list, prefix=None):
"""
Process an elf file i.e. match the source code and asm lines against
trace files (coverage).
:param elf_filename: Elf binary file name
:param function_list: List of functions in the elf file i.e.
[(address start, address end, function name)]
:param prefix: Optional path name to be removed at the start of source
file locations
"""
command = "%s -Sl %s | tee %s" % (OBJDUMP, elf_filename,
elf_filename.replace(".elf", ".dump"))
dump = os_command(command, show_command=True)
dump += "\n0 <null>:" # For pattern matching the last function
elf_name = os.path.splitext(os.path.basename(elf_filename))[0]
function_line_numbers = FunctionLineNumbers(self.local_workspace)
elf_index = self.get_elf_index(elf_name)
# Pointer to files dictionary
source_files = self.source_files_coverage
parser = BinaryParser(dump, function_list, prefix,
function_line_numbers)
for function_block in parser.get_function_block():
function_list[function_block.name]["sources"] = True
source_files.setdefault(function_block.source_file,
{"functions": {},
"lines": {}})
source_files[function_block.source_file]["functions"][
function_block.name] = {"covered": False,
"line_number":
function_block.function_line_number}
is_function_block_covered = False
source_code_block: BinaryParser.SourceCodeBlock
for source_code_block in parser.get_source_code_block(
function_block):
if parser.function_definition.function_name in function_list:
function_list[parser.function_definition.function_name][
"sources"] = True
parser.function_definition.update_sources(source_files,
function_line_numbers)
source_file_ln = \
source_files[parser.function_definition.source_file][
"lines"].setdefault(source_code_block.line,
{"covered": False, "elf_index": {}})
for asm_block in source_code_block.get_assembly_line():
times_executed = 0 if \
asm_block.dec_address not in self.traces_stats else \
self.traces_stats[asm_block.dec_address][0]
if times_executed > 0:
is_function_block_covered = True
source_file_ln["covered"] = True
source_files[parser.function_definition.source_file][
"functions"][
parser.function_definition.function_name][
"covered"] = True
source_file_ln.setdefault("elf_index", {'elf_index': {}})
if elf_index not in source_file_ln["elf_index"]:
source_file_ln["elf_index"][elf_index] = {}
if asm_block.dec_address not in \
source_file_ln["elf_index"][elf_index]:
source_file_ln["elf_index"][elf_index][
asm_block.dec_address] = (
asm_block.opcode, times_executed)
source_files[function_block.source_file]["functions"][
function_block.name]["covered"] |= is_function_block_covered
def process_fn_no_sources(self, function_list):
"""
Checks function coverage for functions with no dwarf signature i.e
sources.
:param function_list: Dictionary of functions to be checked
"""
if not FUNCTION_LINES_ENABLED:
return # No source code at the workspace
address_seq = sorted(self.traces_stats.keys())
for function_name in function_list:
# Just check if the start address is in the trace logs
covered = function_list[function_name]["start"] in address_seq
# Find the source file
files = os_command(("grep --include *.c --include *.s -nrw '{}' {}"
"| cut -d: -f1").format(function_name,
self.local_workspace))
unique_files = set(files.split())
sources = []
line_number = 0
for source_file in unique_files:
d = get_function_line_numbers(source_file)
if function_name in d:
line_number = d[function_name]
sources.append(source_file)
if len(sources) > 1:
logger.warning("'{}' declared in {} files:{}".format(
function_name, len(sources),
", ".join(sources)))
elif len(sources) == 1:
source_file = remove_workspace(sources[0],
self.local_workspace)
if source_file not in self.source_files_coverage:
self.source_files_coverage[source_file] = {"functions": {},
"lines": {}}
if function_name not in \
self.source_files_coverage[source_file]["functions"] \
or covered:
self.source_files_coverage[source_file]["functions"][
function_name] = {"covered": covered,
"line_number": line_number}
else:
logger.warning("Function '{}' not found in sources.".format(
function_name))
json_conf_help = """
Produces an intermediate json layer for code coverage reporting
using an input json configuration file.
Input json configuration file format:
{
"configuration":
{
"remove_workspace": <true if 'workspace' must be from removed from the
path of the source files>,
"include_assembly": <true to include assembly source code in the
intermediate layer>
},
"parameters":
{
"objdump": "<Path to the objdump binary to handle dwarf signatures>",
"readelf: "<Path to the readelf binary to handle dwarf signatures>",
"sources": [ <List of source code origins, one or more of the next
options>
{
"type": "git",
"URL": "<URL git repo>",
"COMMIT": "<Commit id>",
"REFSPEC": "<Refspec>",
"LOCATION": "<Folder within 'workspace' where this source
is located>"
},
{
"type": "http",
"URL": <URL link to file>",
"COMPRESSION": "xz",
"LOCATION": "<Folder within 'workspace' where this source
is located>"
}
],
"workspace": "<Workspace folder where the source code was located to
produce the elf/axf files>",
"output_file": "<Intermediate layer output file name and location>",
"metadata": {<Metadata objects to be passed to the intermediate json
files>}
},
"elfs": [ <List of elf files to be traced/parsed>
{
"name": "<Full path name to elf/axf file>",
"traces": [ <List of trace files to be parsed for this
elf/axf file>
"Full path name to the trace file,"
]
}
]
}
"""
OBJDUMP = None
READELF = None
FUNCTION_LINES_ENABLED = None
def main():
global OBJDUMP
global READELF
global FUNCTION_LINES_ENABLED
parser = argparse.ArgumentParser(epilog=json_conf_help,
formatter_class=RawTextHelpFormatter)
parser.add_argument('--config-json', metavar='PATH',
dest="config_json", default='config_file.json',
help='JSON configuration file', required=True)
parser.add_argument('--local-workspace', default="",
help=('Local workspace folder where source code files'
' and folders resides'))
args = parser.parse_args()
try:
with open(args.config_json, 'r') as f:
config = json.load(f)
except Exception as ex:
print("Error at opening and processing JSON: {}".format(ex))
return
# Setting toolchain binary tools variables
OBJDUMP = config['parameters']['objdump']
READELF = config['parameters']['readelf']
# Checking if are installed
os_command("{} --version".format(OBJDUMP))
os_command("{} --version".format(READELF))
if args.local_workspace != "":
# Checking ctags installed
try:
os_command("ctags --version")
except BaseException:
print("Warning!: ctags not installed/working function line numbers\
will be set to 0. [{}]".format(
"sudo apt install exuberant-ctags"))
else:
FUNCTION_LINES_ENABLED = True
intermediate_layer = IntermediateCodeCoverage(config, args.local_workspace)
intermediate_layer.process()
if __name__ == '__main__':
logging.basicConfig(filename='intermediate_layer.log', level=logging.DEBUG,
format=('%(asctime)s %(levelname)s %(name)s '
'%(message)s'))
logger = logging.getLogger(__name__)
start_time = time.time()
main()
elapsed_time = time.time() - start_time
print("Elapsed time: {}s".format(elapsed_time))