Sync scripts in 'script' directory with internal CI

Sync scripts with platform-ci commit:
539c151d0cd99a5e6ca6c0e6966f6d8579fe864e

Signed-off-by: Zelalem <zelalem.aweke@arm.com>
Change-Id: I455770dea2e3974f652de317b21e53cfc0b9199e
diff --git a/script/juno_manual.py b/script/juno_manual.py
new file mode 100755
index 0000000..4dcbead
--- /dev/null
+++ b/script/juno_manual.py
@@ -0,0 +1,629 @@
+#!/usr/bin/env python3
+
+"""
+Script to automate the manual juno tests that used to require the Juno board
+to be manually power cycled.
+
+"""
+
+import argparse
+import datetime
+import enum
+import logging
+import os
+import pexpect
+import re
+import shutil
+import sys
+import time
+import zipfile
+from pexpect import pxssh
+
+################################################################################
+# Classes                                                                      #
+################################################################################
+
+class CriticalError(Exception):
+    """
+    Raised when a serious issue occurs that will likely mean abort.
+    """
+    pass
+
+class TestStatus(enum.Enum):
+    """
+    This is an enum to describe possible return values from test handlers.
+    """
+    SUCCESS = 0
+    FAILURE = 1
+    CONTINUE = 2
+
+class JunoBoardManager(object):
+    """
+    Manage Juno board reservation and mounts with support for context
+    management.
+    Parameters
+      ssh (pxssh object): SSH connection to remote machine
+      password (string): sudo password for mounting/unmounting
+    """
+
+    def __init__(self, ssh, password):
+        self.ssh = ssh
+        self.path = ""
+        self.password = password
+        self.mounted = False
+
+    def reserve(self):
+        """
+        Try to reserve a Juno board.
+        """
+        for path in ["/home/login/generaljuno1", "/home/login/generaljuno2"]:
+            logging.info("Trying %s...", path)
+            self.ssh.before = ""
+            self.ssh.sendline("%s/reserve.sh" % path)
+            res = self.ssh.expect(["RESERVE_SCRIPT_SUCCESS", "RESERVE_SCRIPT_FAIL", pexpect.EOF, \
+                pexpect.TIMEOUT], timeout=10)
+            if res == 0:
+                self.path = path
+                return
+            if res == 1:
+                continue
+            else:
+                logging.error(self.ssh.before.decode("utf-8"))
+                raise CriticalError("Unexpected pexpect result: %d" % res)
+        raise CriticalError("Could not reserve a Juno board.")
+
+    def release(self):
+        """
+        Release a previously reserved Juno board.
+        """
+        if self.mounted:
+            self.unmount()
+            logging.info("Unmounted Juno storage device.")
+        if self.path == "":
+            raise CriticalError("No Juno board reserved.")
+        self.ssh.before = ""
+        self.ssh.sendline("%s/release.sh" % self.path)
+        res = self.ssh.expect(["RELEASE_SCRIPT_SUCCESS", "RELEASE_SCRIPT_FAIL", pexpect.EOF, \
+            pexpect.TIMEOUT], timeout=10)
+        if res == 0:
+            return
+        logging.error(self.ssh.before.decode("utf-8"))
+        raise CriticalError("Unexpected pexpect result: %d" % res)
+
+    def mount(self):
+        """
+        Mount the reserved Juno board storage device.
+        """
+        if self.path == "":
+            raise CriticalError("No Juno board reserved.")
+        if self.mounted:
+            return
+        self.ssh.before = ""
+        self.ssh.sendline("%s/mount.sh" % self.path)
+        res = self.ssh.expect(["password for", "MOUNT_SCRIPT_SUCCESS", pexpect.TIMEOUT, \
+            pexpect.EOF, "MOUNT_SCRIPT_FAIL"], timeout=10)
+        if res == 0:
+            self.ssh.before = ""
+            self.ssh.sendline("%s" % self.password)
+            res = self.ssh.expect(["MOUNT_SCRIPT_SUCCESS", "Sorry, try again.", pexpect.TIMEOUT, \
+                pexpect.EOF, "MOUNT_SCRIPT_FAIL"], timeout=10)
+            if res == 0:
+                self.mounted = True
+                return
+            elif res == 1:
+                raise CriticalError("Incorrect sudo password.")
+            logging.error(self.ssh.before.decode("utf-8"))
+            raise CriticalError("Unexpected pexpect result: %d" % res)
+        elif res == 1:
+            self.mounted = True
+            return
+        logging.error(self.ssh.before.decode("utf-8"))
+        raise CriticalError("Unexpected pexpect result: %d" % res)
+
+    def unmount(self):
+        """
+        Unmount the reserved Juno board storage device.
+        """
+        if self.path == "":
+            raise CriticalError("No Juno board reserved.")
+        if not self.mounted:
+            return
+        self.ssh.before = ""
+        self.ssh.sendline("%s/unmount.sh" % self.path)
+        # long timeout here since linux likes to queue file IO operations
+        res = self.ssh.expect(["password for", "UNMOUNT_SCRIPT_SUCCESS", pexpect.TIMEOUT, \
+            pexpect.EOF, "UNMOUNT_SCRIPT_FAIL"], timeout=600)
+        if res == 0:
+            self.ssh.before = ""
+            self.ssh.sendline("%s" % self.password)
+            res = self.ssh.expect(["UNMOUNT_SCRIPT_SUCCESS", "Sorry, try again.", pexpect.TIMEOUT, \
+                pexpect.EOF, "UNMOUNT_SCRIPT_FAIL"], timeout=600)
+            if res == 0:
+                self.mounted = False
+                return
+            elif res == 1:
+                raise CriticalError("Incorrect sudo password.")
+            logging.error(self.ssh.before.decode("utf-8"))
+            raise CriticalError("Unexpected pexpect result: %d" % res)
+        elif res == 1:
+            self.mounted = False
+            return
+        elif res == 2:
+            raise CriticalError("Timed out waiting for unmount.")
+        logging.error(self.ssh.before.decode("utf-8"))
+        raise CriticalError("Unexpected pexpect result: %d" % res)
+
+    def get_path(self):
+        """
+        Get the path to the reserved Juno board.
+        """
+        if self.path == "":
+            raise CriticalError("No Juno board reserved.")
+        return self.path
+
+    def __enter__(self):
+        # Attempt to reserve if it hasn't been done already
+        if self.path == "":
+            self.reserve()
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        self.release()
+
+################################################################################
+# Helper Functions                                                             #
+################################################################################
+
+def recover_juno(ssh, juno_board, uart):
+    """
+    If mount fails, this function attempts to power cycle the juno board, cancel
+    auto-boot, and manually enable debug USB before any potentially bad code
+    can run.
+    Parameters
+      ssh (pxssh object): ssh connection to remote machine
+      juno_board (JunoBoardManager): Juno instance to attempt to recover
+      uart (pexpect): Connection to juno uart
+    """
+    power_off(ssh, juno_board.get_path())
+    time.sleep(10)
+    power_on(ssh, juno_board.get_path())
+    # Wait for auto boot message thens end an enter press
+    res = uart.expect(["Press Enter to stop auto boot", pexpect.EOF, pexpect.TIMEOUT], timeout=60)
+    if res != 0:
+        raise CriticalError("Juno auto boot prompt not detected, recovery failed.")
+    uart.sendline("")
+    # Wait for MCC command prompt then send "usb_on"
+    res = uart.expect(["Cmd>", pexpect.EOF, pexpect.TIMEOUT], timeout=10)
+    if res != 0:
+        raise CriticalError("Juno MCC prompt not detected, recovery failed.")
+    uart.sendline("usb_on")
+    # Wait for debug usb confirmation
+    res = uart.expect(["Enabling debug USB...", pexpect.EOF, pexpect.TIMEOUT], timeout=10)
+    if res != 0:
+        raise CriticalError("Debug usb not enabled, recovery failed.")
+    # Dead wait for linux to detect the USB device then try to mount again
+    time.sleep(10)
+    juno_board.mount()
+
+def copy_file_to_remote(source, dest, remote, user, password):
+    """
+    Uses SCP to copy a file to a remote machine.
+    Parameters
+      source (string): Source path
+      dest (string): Destination path
+      remote (string): Name or IP address of remote machine
+      user (string): Username to login with
+      password (string): Password to login/sudo with
+    """
+    scp = "scp -r %s %s@%s:%s" % (source, user, remote, dest)
+    copy = pexpect.spawn(scp)
+    copy.expect("password:")
+    copy.sendline(password)
+    res = copy.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=600)
+    if res == 0:
+        copy.close()
+        if copy.exitstatus == 0:
+            return
+        raise CriticalError("Unexpected error occurred during SCP: %d" % copy.exitstatus)
+    elif res == 1:
+        raise CriticalError("SCP operation timed out.")
+    raise CriticalError("Unexpected pexpect result: %d" % res)
+
+def extract_zip_file(source, dest):
+    """
+    Extracts a zip file on the local machine.
+    Parameters
+      source (string): Path to input zip file
+      dest (string): Path to output directory
+    """
+    try:
+        with zipfile.ZipFile(source, 'r') as src:
+            src.extractall(dest)
+        return
+    except Exception:
+        raise CriticalError("Could not extract boardfiles.")
+
+def remote_copy(ssh, source, dest):
+    """
+    Copy files from remote workspace to Juno directory using rsync
+    Parameters
+      ssh (pxssh object): Connection to remote system
+      source (string): Source file path
+      dest (string): Destination file path
+    """
+    ssh.before = ""
+    ssh.sendline("rsync -rt %s %s" % (source, dest))
+    res = ssh.expect(["$", pexpect.EOF, pexpect.TIMEOUT], timeout=60)
+    if res != 0:
+        logging.error(ssh.before.decode("utf-8"))
+        raise CriticalError("Unexpected error occurred during rsync operation.")
+    ssh.before = ""
+    ssh.sendline("echo $?")
+    res = ssh.expect(["0", pexpect.EOF, pexpect.TIMEOUT], timeout=10)
+    if res != 0:
+        logging.error(ssh.before.decode("utf-8"))
+        raise CriticalError("rsync failed")
+    return
+
+def connect_juno_uart(host, port):
+    """
+    Spawn a pexpect object for the Juno UART
+    Parameters
+      host (string): Telnet host name or IP addres
+      port (int): Telnet port number
+    Returns
+        pexpect object if successful
+    """
+    uart = pexpect.spawn("telnet %s %d" % (host, port))
+    result = uart.expect(["Escape character is", pexpect.EOF, pexpect.TIMEOUT], timeout=10)
+    if result == 0:
+        return uart
+    raise CriticalError("Could not connect to Juno UART.")
+
+def get_uart_port(ssh, juno):
+    """
+    Get the telnet port for the Juno UART
+    Parameters
+      ssh (pxssh object): SSH session to remote machine
+    Returns
+      int: Telnet port number
+    """
+    ssh.before = ""
+    ssh.sendline("cat %s/telnetport" % juno)
+    res = ssh.expect([pexpect.TIMEOUT], timeout=1)
+    if res == 0:
+        match = re.search(r"port: (\d+)", ssh.before.decode("utf-8"))
+        if match:
+            return int(match.group(1))
+    raise CriticalError("Could not get telnet port.")
+
+def power_off(ssh, juno):
+    """
+    Power off the Juno board
+    Parameters
+      ssh (pxssh object): SSH session to remote machine
+      juno (string): Path to Juno directory on remote
+    """
+    ssh.before = ""
+    ssh.sendline("%s/poweroff.sh" % juno)
+    res = ssh.expect(["POWEROFF_SCRIPT_SUCCESS", pexpect.EOF, pexpect.TIMEOUT, \
+        "POWEROFF_SCRIPT_FAIL"], timeout=10)
+    if res == 0:
+        return
+    logging.error(ssh.before.decode("utf-8"))
+    raise CriticalError("Could not power off the Juno board.")
+
+def power_on(ssh, juno):
+    """
+    Power on the Juno board
+    Parameters
+      ssh (pxssh object): SSH session to remote machine
+      juno (string): Path to Juno directory on remote
+    """
+    ssh.before = ""
+    ssh.sendline("%s/poweron.sh" % juno)
+    res = ssh.expect(["POWERON_SCRIPT_SUCCESS", pexpect.EOF, pexpect.TIMEOUT, \
+        "POWERON_SCRIPT_FAIL"], timeout=10)
+    if res == 0:
+        return
+    logging.error(ssh.before.decode("utf-8"))
+    raise CriticalError("Could not power on the Juno board.")
+
+def erase_juno(ssh, juno):
+    """
+    Erase the mounted Juno storage device
+    Parameters
+      ssh (pxssh object): SSH session to remote machine
+      juno (string): Path to Juno directory on remote
+    """
+    ssh.before = ""
+    ssh.sendline("%s/erasejuno.sh" % juno)
+    res = ssh.expect(["ERASEJUNO_SCRIPT_SUCCESS", "ERASEJUNO_SCRIPT_FAIL", pexpect.EOF, \
+        pexpect.TIMEOUT], timeout=30)
+    if res == 0:
+        return
+    logging.error(ssh.before.decode("utf-8"))
+    raise CriticalError("Could not erase the Juno storage device.")
+
+def erase_juno_workspace(ssh, juno):
+    """
+    Erase the Juno workspace
+    Parameters
+      ssh (pxssh object): SSH session to remote machine
+      juno (string): Path to Juno directory on remote
+    """
+    ssh.before = ""
+    ssh.sendline("%s/eraseworkspace.sh" % juno)
+    res = ssh.expect(["ERASEWORKSPACE_SCRIPT_SUCCESS", "ERASEWORKSPACE_SCRIPT_FAIL", pexpect.EOF, \
+        pexpect.TIMEOUT], timeout=30)
+    if res == 0:
+        return
+    logging.error(ssh.before.decode("utf-8"))
+    raise CriticalError("Could not erase the remote workspace.")
+
+def process_uart_output(uart, timeout, handler, telnethost, telnetport):
+    """
+    This function receives UART data from the Juno board, creates a full line
+    of text, then passes it to a test handler function.
+    Parameters
+      uart (pexpect): Pexpect process containing UART telnet session
+      timeout (int): How long to wait for test completion.
+      handler (function): Function to pass each line of test output to.
+      telnethost (string): Telnet host to use if uart connection fails.
+      telnetport (int): Telnet port to use if uart connection fails.
+    """
+    # Start timeout counter
+    timeout_start = datetime.datetime.now()
+
+    line = ""
+    while True:
+        try:
+            # Check if timeout has expired
+            elapsed = datetime.datetime.now() - timeout_start
+            if elapsed.total_seconds() > timeout:
+                raise CriticalError("Test timed out, see log file.")
+
+            # Read next character from Juno
+            char = uart.read_nonblocking(size=1, timeout=1).decode("utf-8")
+            if '\n' in char:
+                logging.info("JUNO: %s", line)
+
+                result = handler(uart, line)
+                if result == TestStatus.SUCCESS:
+                    return
+                elif result == TestStatus.FAILURE:
+                    raise CriticalError("Test manager returned TestStatus.FAILURE")
+
+                line = ""
+            else:
+                line = line + char
+
+        # uart.read_nonblocking will throw timeouts a lot by design so catch and ignore
+        except pexpect.TIMEOUT:
+            continue
+        except pexpect.EOF:
+            logging.warning("Connection lost unexpectedly, attempting to restart.")
+            try:
+                uart = connect_juno_uart(telnethost, telnetport)
+            except CriticalError:
+                raise CriticalError("Could not reopen Juno UART")
+            continue
+        except OSError:
+            raise CriticalError("Unexpected OSError occurred.")
+        except UnicodeDecodeError:
+            continue
+        except Exception as e:
+            # This case exists to catch any weird or rare exceptions.
+            raise CriticalError("Unexpected exception occurred: %s" % str(e))
+
+################################################################################
+# Test Handlers                                                                #
+################################################################################
+
+TEST_CASE_TFTF_MANUAL_PASS_COUNT = 0
+TEST_CASE_TFTF_MANUAL_CRASH_COUNT = 0
+TEST_CASE_TFTF_MANUAL_FAIL_COUNT = 0
+def test_case_tftf_manual(uart, line):
+    """
+    This function handles TFTF tests and parses the output into a pass or fail
+    result.  Any crashes or fails result in an overall test failure but skips
+    and passes are fine.
+    """
+    global TEST_CASE_TFTF_MANUAL_PASS_COUNT
+    global TEST_CASE_TFTF_MANUAL_CRASH_COUNT
+    global TEST_CASE_TFTF_MANUAL_FAIL_COUNT
+
+    # This test needs to be powered back on a few times
+    if "Board powered down, use REBOOT to restart." in line:
+        # time delay to let things finish up
+        time.sleep(3)
+        uart.sendline("reboot")
+        return TestStatus.CONTINUE
+
+    elif "Tests Passed" in line:
+        match = re.search(r"Tests Passed  : (\d+)", line)
+        if match:
+            TEST_CASE_TFTF_MANUAL_PASS_COUNT = int(match.group(1))
+            return TestStatus.CONTINUE
+        logging.error(r"Error parsing line: %s", line)
+        return TestStatus.FAILURE
+
+    elif "Tests Failed" in line:
+        match = re.search(r"Tests Failed  : (\d+)", line)
+        if match:
+            TEST_CASE_TFTF_MANUAL_FAIL_COUNT = int(match.group(1))
+            return TestStatus.CONTINUE
+        logging.error("Error parsing line: %s", line)
+        return TestStatus.FAILURE
+
+    elif "Tests Crashed" in line:
+        match = re.search(r"Tests Crashed : (\d+)", line)
+        if match:
+            TEST_CASE_TFTF_MANUAL_CRASH_COUNT = int(match.group(1))
+            return TestStatus.CONTINUE
+        logging.error("Error parsing line: %s", line)
+        return TestStatus.FAILURE
+
+    elif "Total tests" in line:
+        if TEST_CASE_TFTF_MANUAL_PASS_COUNT == 0:
+            return TestStatus.FAILURE
+        if TEST_CASE_TFTF_MANUAL_CRASH_COUNT > 0:
+            return TestStatus.FAILURE
+        if TEST_CASE_TFTF_MANUAL_FAIL_COUNT > 0:
+            return TestStatus.FAILURE
+        return TestStatus.SUCCESS
+
+    return TestStatus.CONTINUE
+
+TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT = False
+def test_case_linux_manual_shutdown(uart, line):
+    """
+    This handler performs a linux manual shutdown test by waiting for the linux
+    prompt and sending the appropriate halt command, then waiting for the
+    expected output.
+    """
+    global TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT
+
+    # Look for Linux prompt
+    if "/ #" in line and TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT is False:
+        time.sleep(3)
+        uart.sendline("halt -f")
+        TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT = True
+        return TestStatus.CONTINUE
+
+    # Once halt command has been issued, wait for confirmation
+    elif "reboot: System halted" in line:
+        return TestStatus.SUCCESS
+
+    # For any other result, continue.
+    return TestStatus.CONTINUE
+
+################################################################################
+# Script Main                                                                  #
+################################################################################
+
+def main():
+    """
+    Main function, handles the initial set up and test dispatch to Juno board.
+    """
+    parser = argparse.ArgumentParser(description="Launch a Juno manual test.")
+    parser.add_argument("host", type=str, help="Name or IP address of Juno host system.")
+    parser.add_argument("username", type=str, help="Username to login to host system.")
+    parser.add_argument("password", type=str, help="Password to login to host system.")
+    parser.add_argument("boardfiles", type=str, help="ZIP file containing Juno boardfiles.")
+    parser.add_argument("workspace", type=str, help="Directory for scratch files.")
+    parser.add_argument("testname", type=str, help="Name of test to run.")
+    parser.add_argument("timeout", type=int, help="Time to wait for test completion.")
+    parser.add_argument("logfile", type=str, help="Path to log file to create.")
+    parser.add_argument("-l", "--list", action='store_true', help="List supported test cases.")
+    args = parser.parse_args()
+
+    # Print list if requested
+    if args.list:
+        print("Supported Tests")
+        print("  tftf-manual-generic - Should work for all TFTF tests.")
+        print("  linux-manual-shutdown - Waits for Linux prompt and sends halt command.")
+        exit(0)
+
+    # Start logging
+    print("Creating log file: %s" % args.logfile)
+    logging.basicConfig(filename=args.logfile, level=logging.DEBUG, \
+        format="[%(asctime)s] %(message)s", datefmt="%I:%M:%S")
+    logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
+
+    # Make sure test name is supported so we don't waste time if it isn't.
+    if args.testname == "tftf-manual-generic":
+        test_handler = test_case_tftf_manual
+    elif args.testname == "linux-manual-shutdown":
+        test_handler = test_case_linux_manual_shutdown
+    else:
+        logging.error("Test name \"%s\" invalid or not supported.", args.testname)
+        exit(1)
+    logging.info("Selected test \"%s\"", args.testname)
+
+    # Helper functions either succeed or raise CriticalError so no error checking is done here
+    try:
+        # Start SSH session to host machine
+        logging.info("Starting SSH session to remote machine.")
+        ssh = pxssh.pxssh()
+        if not ssh.login(args.host, args.username, args.password):
+            raise CriticalError("Could not start SSH session.")
+        # Disable character echo
+        ssh.sendline("stty -echo")
+        with ssh:
+
+            # Try to reserve a juno board
+            juno_board = JunoBoardManager(ssh, args.password)
+            juno_board.reserve()
+            juno = juno_board.get_path()
+            logging.info("Reserved %s", juno)
+            with juno_board:
+
+                # Get UART port and start telnet session
+                logging.info("Opening Juno UART")
+                port = get_uart_port(ssh, juno)
+                logging.info("Using telnet port %d", port)
+                uart = connect_juno_uart(args.host, port)
+                with uart:
+
+                    # Extract boardfiles locally
+                    logging.info("Extracting boardfiles.")
+                    local_boardfiles = os.path.join(args.workspace, "boardfiles")
+                    if os.path.exists(local_boardfiles):
+                        shutil.rmtree(local_boardfiles)
+                    os.mkdir(local_boardfiles)
+                    extract_zip_file(args.boardfiles, local_boardfiles)
+
+                    # Clear out the workspace directory on the remote system
+                    logging.info("Erasing remote workspace.")
+                    erase_juno_workspace(ssh, juno)
+
+                    # SCP boardfiles to juno host
+                    logging.info("Copying boardfiles to remote system.")
+                    copy_file_to_remote(os.path.join(local_boardfiles), \
+                        os.path.join(juno, "workspace"), args.host, args.username, args.password)
+
+                    # Try to mount the storage device
+                    logging.info("Mounting the Juno storage device.")
+                    try:
+                        juno_board.mount()
+                    except CriticalError:
+                        logging.info("Mount failed, attempting to recover Juno board.")
+                        recover_juno(ssh, juno_board, uart)
+                        logging.info("Juno board recovered.")
+
+                    # Move boardfiles from temp directory to juno storage
+                    logging.info("Copying new boardfiles to storage device.")
+                    remote_copy(ssh, os.path.join(juno, "workspace", "boardfiles", "*"), \
+                        os.path.join(juno, "juno"))
+
+                    # Unmounting the juno board.
+                    logging.info("Unmounting Juno storage device and finishing pending I/O.")
+                    juno_board.unmount()
+
+                    # Power cycle the juno board to reboot it. */
+                    logging.info("Rebooting the Juno board.")
+                    power_off(ssh, juno)
+                    # dead wait to let the power supply do its thing
+                    time.sleep(10)
+                    power_on(ssh, juno)
+
+                    # dead wait to let the power supply do its thing
+                    time.sleep(10)
+
+                    # Process UART output and wait for test completion
+                    process_uart_output(uart, args.timeout, test_handler, args.host, port)
+                    logging.info("Tests Passed!")
+
+    except CriticalError as exception:
+        logging.error(str(exception))
+        exit(1)
+
+    # Exit with 0 on successful finish
+    exit(0)
+
+################################################################################
+# Script Entry Point                                                           #
+################################################################################
+
+if __name__ == "__main__":
+    main()