| #!/usr/bin/env python3 |
| |
| """ |
| Script to automate the manual juno tests that used to require the Juno board |
| to be manually power cycled. |
| |
| """ |
| |
| import argparse |
| import datetime |
| import enum |
| import logging |
| import os |
| import pexpect |
| import re |
| import shutil |
| import sys |
| import time |
| import zipfile |
| from pexpect import pxssh |
| |
| ################################################################################ |
| # Classes # |
| ################################################################################ |
| |
| class CriticalError(Exception): |
| """ |
| Raised when a serious issue occurs that will likely mean abort. |
| """ |
| pass |
| |
| class TestStatus(enum.Enum): |
| """ |
| This is an enum to describe possible return values from test handlers. |
| """ |
| SUCCESS = 0 |
| FAILURE = 1 |
| CONTINUE = 2 |
| |
| class JunoBoardManager(object): |
| """ |
| Manage Juno board reservation and mounts with support for context |
| management. |
| Parameters |
| ssh (pxssh object): SSH connection to remote machine |
| password (string): sudo password for mounting/unmounting |
| """ |
| |
| def __init__(self, ssh, password): |
| self.ssh = ssh |
| self.path = "" |
| self.password = password |
| self.mounted = False |
| |
| def reserve(self): |
| """ |
| Try to reserve a Juno board. |
| """ |
| for path in ["/home/login/generaljuno1", "/home/login/generaljuno2"]: |
| logging.info("Trying %s...", path) |
| self.ssh.before = "" |
| self.ssh.sendline("%s/reserve.sh" % path) |
| res = self.ssh.expect(["RESERVE_SCRIPT_SUCCESS", "RESERVE_SCRIPT_FAIL", pexpect.EOF, \ |
| pexpect.TIMEOUT], timeout=10) |
| if res == 0: |
| self.path = path |
| return |
| if res == 1: |
| continue |
| else: |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| raise CriticalError("Could not reserve a Juno board.") |
| |
| def release(self): |
| """ |
| Release a previously reserved Juno board. |
| """ |
| if self.mounted: |
| self.unmount() |
| logging.info("Unmounted Juno storage device.") |
| if self.path == "": |
| raise CriticalError("No Juno board reserved.") |
| self.ssh.before = "" |
| self.ssh.sendline("%s/release.sh" % self.path) |
| res = self.ssh.expect(["RELEASE_SCRIPT_SUCCESS", "RELEASE_SCRIPT_FAIL", pexpect.EOF, \ |
| pexpect.TIMEOUT], timeout=10) |
| if res == 0: |
| return |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| |
| def mount(self): |
| """ |
| Mount the reserved Juno board storage device. |
| """ |
| if self.path == "": |
| raise CriticalError("No Juno board reserved.") |
| if self.mounted: |
| return |
| self.ssh.before = "" |
| self.ssh.sendline("%s/mount.sh" % self.path) |
| res = self.ssh.expect(["password for", "MOUNT_SCRIPT_SUCCESS", pexpect.TIMEOUT, \ |
| pexpect.EOF, "MOUNT_SCRIPT_FAIL"], timeout=10) |
| if res == 0: |
| self.ssh.before = "" |
| self.ssh.sendline("%s" % self.password) |
| res = self.ssh.expect(["MOUNT_SCRIPT_SUCCESS", "Sorry, try again.", pexpect.TIMEOUT, \ |
| pexpect.EOF, "MOUNT_SCRIPT_FAIL"], timeout=10) |
| if res == 0: |
| self.mounted = True |
| return |
| elif res == 1: |
| raise CriticalError("Incorrect sudo password.") |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| elif res == 1: |
| self.mounted = True |
| return |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| |
| def unmount(self): |
| """ |
| Unmount the reserved Juno board storage device. |
| """ |
| if self.path == "": |
| raise CriticalError("No Juno board reserved.") |
| if not self.mounted: |
| return |
| self.ssh.before = "" |
| self.ssh.sendline("%s/unmount.sh" % self.path) |
| # long timeout here since linux likes to queue file IO operations |
| res = self.ssh.expect(["password for", "UNMOUNT_SCRIPT_SUCCESS", pexpect.TIMEOUT, \ |
| pexpect.EOF, "UNMOUNT_SCRIPT_FAIL"], timeout=600) |
| if res == 0: |
| self.ssh.before = "" |
| self.ssh.sendline("%s" % self.password) |
| res = self.ssh.expect(["UNMOUNT_SCRIPT_SUCCESS", "Sorry, try again.", pexpect.TIMEOUT, \ |
| pexpect.EOF, "UNMOUNT_SCRIPT_FAIL"], timeout=600) |
| if res == 0: |
| self.mounted = False |
| return |
| elif res == 1: |
| raise CriticalError("Incorrect sudo password.") |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| elif res == 1: |
| self.mounted = False |
| return |
| elif res == 2: |
| raise CriticalError("Timed out waiting for unmount.") |
| logging.error(self.ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| |
| def get_path(self): |
| """ |
| Get the path to the reserved Juno board. |
| """ |
| if self.path == "": |
| raise CriticalError("No Juno board reserved.") |
| return self.path |
| |
| def __enter__(self): |
| # Attempt to reserve if it hasn't been done already |
| if self.path == "": |
| self.reserve() |
| |
| def __exit__(self, exc_type, exc_value, exc_traceback): |
| self.release() |
| |
| ################################################################################ |
| # Helper Functions # |
| ################################################################################ |
| |
| def recover_juno(ssh, juno_board, uart): |
| """ |
| If mount fails, this function attempts to power cycle the juno board, cancel |
| auto-boot, and manually enable debug USB before any potentially bad code |
| can run. |
| Parameters |
| ssh (pxssh object): ssh connection to remote machine |
| juno_board (JunoBoardManager): Juno instance to attempt to recover |
| uart (pexpect): Connection to juno uart |
| """ |
| power_off(ssh, juno_board.get_path()) |
| time.sleep(10) |
| power_on(ssh, juno_board.get_path()) |
| # Wait for auto boot message thens end an enter press |
| res = uart.expect(["Press Enter to stop auto boot", pexpect.EOF, pexpect.TIMEOUT], timeout=60) |
| if res != 0: |
| raise CriticalError("Juno auto boot prompt not detected, recovery failed.") |
| uart.sendline("") |
| # Wait for MCC command prompt then send "usb_on" |
| res = uart.expect(["Cmd>", pexpect.EOF, pexpect.TIMEOUT], timeout=10) |
| if res != 0: |
| raise CriticalError("Juno MCC prompt not detected, recovery failed.") |
| uart.sendline("usb_on") |
| # Wait for debug usb confirmation |
| res = uart.expect(["Enabling debug USB...", pexpect.EOF, pexpect.TIMEOUT], timeout=10) |
| if res != 0: |
| raise CriticalError("Debug usb not enabled, recovery failed.") |
| # Dead wait for linux to detect the USB device then try to mount again |
| time.sleep(10) |
| juno_board.mount() |
| |
| def copy_file_to_remote(source, dest, remote, user, password): |
| """ |
| Uses SCP to copy a file to a remote machine. |
| Parameters |
| source (string): Source path |
| dest (string): Destination path |
| remote (string): Name or IP address of remote machine |
| user (string): Username to login with |
| password (string): Password to login/sudo with |
| """ |
| scp = "scp -r %s %s@%s:%s" % (source, user, remote, dest) |
| copy = pexpect.spawn(scp) |
| copy.expect("password:") |
| copy.sendline(password) |
| res = copy.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=600) |
| if res == 0: |
| copy.close() |
| if copy.exitstatus == 0: |
| return |
| raise CriticalError("Unexpected error occurred during SCP: %d" % copy.exitstatus) |
| elif res == 1: |
| raise CriticalError("SCP operation timed out.") |
| raise CriticalError("Unexpected pexpect result: %d" % res) |
| |
| def extract_zip_file(source, dest): |
| """ |
| Extracts a zip file on the local machine. |
| Parameters |
| source (string): Path to input zip file |
| dest (string): Path to output directory |
| """ |
| try: |
| with zipfile.ZipFile(source, 'r') as src: |
| src.extractall(dest) |
| return |
| except Exception: |
| raise CriticalError("Could not extract boardfiles.") |
| |
| def remote_copy(ssh, source, dest): |
| """ |
| Copy files from remote workspace to Juno directory using rsync |
| Parameters |
| ssh (pxssh object): Connection to remote system |
| source (string): Source file path |
| dest (string): Destination file path |
| """ |
| ssh.before = "" |
| ssh.sendline("rsync -rt %s %s" % (source, dest)) |
| res = ssh.expect(["$", pexpect.EOF, pexpect.TIMEOUT], timeout=60) |
| if res != 0: |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("Unexpected error occurred during rsync operation.") |
| ssh.before = "" |
| ssh.sendline("echo $?") |
| res = ssh.expect(["0", pexpect.EOF, pexpect.TIMEOUT], timeout=10) |
| if res != 0: |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("rsync failed") |
| return |
| |
| def connect_juno_uart(host, port): |
| """ |
| Spawn a pexpect object for the Juno UART |
| Parameters |
| host (string): Telnet host name or IP addres |
| port (int): Telnet port number |
| Returns |
| pexpect object if successful |
| """ |
| uart = pexpect.spawn("telnet %s %d" % (host, port)) |
| result = uart.expect(["Escape character is", pexpect.EOF, pexpect.TIMEOUT], timeout=10) |
| if result == 0: |
| return uart |
| raise CriticalError("Could not connect to Juno UART.") |
| |
| def get_uart_port(ssh, juno): |
| """ |
| Get the telnet port for the Juno UART |
| Parameters |
| ssh (pxssh object): SSH session to remote machine |
| Returns |
| int: Telnet port number |
| """ |
| ssh.before = "" |
| ssh.sendline("cat %s/telnetport" % juno) |
| res = ssh.expect([pexpect.TIMEOUT], timeout=1) |
| if res == 0: |
| match = re.search(r"port: (\d+)", ssh.before.decode("utf-8")) |
| if match: |
| return int(match.group(1)) |
| raise CriticalError("Could not get telnet port.") |
| |
| def power_off(ssh, juno): |
| """ |
| Power off the Juno board |
| Parameters |
| ssh (pxssh object): SSH session to remote machine |
| juno (string): Path to Juno directory on remote |
| """ |
| ssh.before = "" |
| ssh.sendline("%s/poweroff.sh" % juno) |
| res = ssh.expect(["POWEROFF_SCRIPT_SUCCESS", pexpect.EOF, pexpect.TIMEOUT, \ |
| "POWEROFF_SCRIPT_FAIL"], timeout=10) |
| if res == 0: |
| return |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("Could not power off the Juno board.") |
| |
| def power_on(ssh, juno): |
| """ |
| Power on the Juno board |
| Parameters |
| ssh (pxssh object): SSH session to remote machine |
| juno (string): Path to Juno directory on remote |
| """ |
| ssh.before = "" |
| ssh.sendline("%s/poweron.sh" % juno) |
| res = ssh.expect(["POWERON_SCRIPT_SUCCESS", pexpect.EOF, pexpect.TIMEOUT, \ |
| "POWERON_SCRIPT_FAIL"], timeout=10) |
| if res == 0: |
| return |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("Could not power on the Juno board.") |
| |
| def erase_juno(ssh, juno): |
| """ |
| Erase the mounted Juno storage device |
| Parameters |
| ssh (pxssh object): SSH session to remote machine |
| juno (string): Path to Juno directory on remote |
| """ |
| ssh.before = "" |
| ssh.sendline("%s/erasejuno.sh" % juno) |
| res = ssh.expect(["ERASEJUNO_SCRIPT_SUCCESS", "ERASEJUNO_SCRIPT_FAIL", pexpect.EOF, \ |
| pexpect.TIMEOUT], timeout=30) |
| if res == 0: |
| return |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("Could not erase the Juno storage device.") |
| |
| def erase_juno_workspace(ssh, juno): |
| """ |
| Erase the Juno workspace |
| Parameters |
| ssh (pxssh object): SSH session to remote machine |
| juno (string): Path to Juno directory on remote |
| """ |
| ssh.before = "" |
| ssh.sendline("%s/eraseworkspace.sh" % juno) |
| res = ssh.expect(["ERASEWORKSPACE_SCRIPT_SUCCESS", "ERASEWORKSPACE_SCRIPT_FAIL", pexpect.EOF, \ |
| pexpect.TIMEOUT], timeout=30) |
| if res == 0: |
| return |
| logging.error(ssh.before.decode("utf-8")) |
| raise CriticalError("Could not erase the remote workspace.") |
| |
| def process_uart_output(uart, timeout, handler, telnethost, telnetport): |
| """ |
| This function receives UART data from the Juno board, creates a full line |
| of text, then passes it to a test handler function. |
| Parameters |
| uart (pexpect): Pexpect process containing UART telnet session |
| timeout (int): How long to wait for test completion. |
| handler (function): Function to pass each line of test output to. |
| telnethost (string): Telnet host to use if uart connection fails. |
| telnetport (int): Telnet port to use if uart connection fails. |
| """ |
| # Start timeout counter |
| timeout_start = datetime.datetime.now() |
| |
| line = "" |
| while True: |
| try: |
| # Check if timeout has expired |
| elapsed = datetime.datetime.now() - timeout_start |
| if elapsed.total_seconds() > timeout: |
| raise CriticalError("Test timed out, see log file.") |
| |
| # Read next character from Juno |
| char = uart.read_nonblocking(size=1, timeout=1).decode("utf-8") |
| if '\n' in char: |
| logging.info("JUNO: %s", line) |
| |
| result = handler(uart, line) |
| if result == TestStatus.SUCCESS: |
| return |
| elif result == TestStatus.FAILURE: |
| raise CriticalError("Test manager returned TestStatus.FAILURE") |
| |
| line = "" |
| else: |
| line = line + char |
| |
| # uart.read_nonblocking will throw timeouts a lot by design so catch and ignore |
| except pexpect.TIMEOUT: |
| continue |
| except pexpect.EOF: |
| logging.warning("Connection lost unexpectedly, attempting to restart.") |
| try: |
| uart = connect_juno_uart(telnethost, telnetport) |
| except CriticalError: |
| raise CriticalError("Could not reopen Juno UART") |
| continue |
| except OSError: |
| raise CriticalError("Unexpected OSError occurred.") |
| except UnicodeDecodeError: |
| continue |
| except Exception as e: |
| # This case exists to catch any weird or rare exceptions. |
| raise CriticalError("Unexpected exception occurred: %s" % str(e)) |
| |
| ################################################################################ |
| # Test Handlers # |
| ################################################################################ |
| |
| TEST_CASE_TFTF_MANUAL_PASS_COUNT = 0 |
| TEST_CASE_TFTF_MANUAL_CRASH_COUNT = 0 |
| TEST_CASE_TFTF_MANUAL_FAIL_COUNT = 0 |
| def test_case_tftf_manual(uart, line): |
| """ |
| This function handles TFTF tests and parses the output into a pass or fail |
| result. Any crashes or fails result in an overall test failure but skips |
| and passes are fine. |
| """ |
| global TEST_CASE_TFTF_MANUAL_PASS_COUNT |
| global TEST_CASE_TFTF_MANUAL_CRASH_COUNT |
| global TEST_CASE_TFTF_MANUAL_FAIL_COUNT |
| |
| # This test needs to be powered back on a few times |
| if "Board powered down, use REBOOT to restart." in line: |
| # time delay to let things finish up |
| time.sleep(3) |
| uart.sendline("reboot") |
| return TestStatus.CONTINUE |
| |
| elif "Tests Passed" in line: |
| match = re.search(r"Tests Passed : (\d+)", line) |
| if match: |
| TEST_CASE_TFTF_MANUAL_PASS_COUNT = int(match.group(1)) |
| return TestStatus.CONTINUE |
| logging.error(r"Error parsing line: %s", line) |
| return TestStatus.FAILURE |
| |
| elif "Tests Failed" in line: |
| match = re.search(r"Tests Failed : (\d+)", line) |
| if match: |
| TEST_CASE_TFTF_MANUAL_FAIL_COUNT = int(match.group(1)) |
| return TestStatus.CONTINUE |
| logging.error("Error parsing line: %s", line) |
| return TestStatus.FAILURE |
| |
| elif "Tests Crashed" in line: |
| match = re.search(r"Tests Crashed : (\d+)", line) |
| if match: |
| TEST_CASE_TFTF_MANUAL_CRASH_COUNT = int(match.group(1)) |
| return TestStatus.CONTINUE |
| logging.error("Error parsing line: %s", line) |
| return TestStatus.FAILURE |
| |
| elif "Total tests" in line: |
| if TEST_CASE_TFTF_MANUAL_PASS_COUNT == 0: |
| return TestStatus.FAILURE |
| if TEST_CASE_TFTF_MANUAL_CRASH_COUNT > 0: |
| return TestStatus.FAILURE |
| if TEST_CASE_TFTF_MANUAL_FAIL_COUNT > 0: |
| return TestStatus.FAILURE |
| return TestStatus.SUCCESS |
| |
| return TestStatus.CONTINUE |
| |
| TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT = False |
| def test_case_linux_manual_shutdown(uart, line): |
| """ |
| This handler performs a linux manual shutdown test by waiting for the linux |
| prompt and sending the appropriate halt command, then waiting for the |
| expected output. |
| """ |
| global TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT |
| |
| # Look for Linux prompt |
| if "/ #" in line and TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT is False: |
| time.sleep(3) |
| uart.sendline("halt -f") |
| TEST_CASE_LINUX_MANUAL_SHUTDOWN_HALT_SENT = True |
| return TestStatus.CONTINUE |
| |
| # Once halt command has been issued, wait for confirmation |
| elif "reboot: System halted" in line: |
| return TestStatus.SUCCESS |
| |
| # For any other result, continue. |
| return TestStatus.CONTINUE |
| |
| ################################################################################ |
| # Script Main # |
| ################################################################################ |
| |
| def main(): |
| """ |
| Main function, handles the initial set up and test dispatch to Juno board. |
| """ |
| parser = argparse.ArgumentParser(description="Launch a Juno manual test.") |
| parser.add_argument("host", type=str, help="Name or IP address of Juno host system.") |
| parser.add_argument("username", type=str, help="Username to login to host system.") |
| parser.add_argument("password", type=str, help="Password to login to host system.") |
| parser.add_argument("boardfiles", type=str, help="ZIP file containing Juno boardfiles.") |
| parser.add_argument("workspace", type=str, help="Directory for scratch files.") |
| parser.add_argument("testname", type=str, help="Name of test to run.") |
| parser.add_argument("timeout", type=int, help="Time to wait for test completion.") |
| parser.add_argument("logfile", type=str, help="Path to log file to create.") |
| parser.add_argument("-l", "--list", action='store_true', help="List supported test cases.") |
| args = parser.parse_args() |
| |
| # Print list if requested |
| if args.list: |
| print("Supported Tests") |
| print(" tftf-manual-generic - Should work for all TFTF tests.") |
| print(" linux-manual-shutdown - Waits for Linux prompt and sends halt command.") |
| exit(0) |
| |
| # Start logging |
| print("Creating log file: %s" % args.logfile) |
| logging.basicConfig(filename=args.logfile, level=logging.DEBUG, \ |
| format="[%(asctime)s] %(message)s", datefmt="%I:%M:%S") |
| logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) |
| |
| # Make sure test name is supported so we don't waste time if it isn't. |
| if args.testname == "tftf-manual-generic": |
| test_handler = test_case_tftf_manual |
| elif args.testname == "linux-manual-shutdown": |
| test_handler = test_case_linux_manual_shutdown |
| else: |
| logging.error("Test name \"%s\" invalid or not supported.", args.testname) |
| exit(1) |
| logging.info("Selected test \"%s\"", args.testname) |
| |
| # Helper functions either succeed or raise CriticalError so no error checking is done here |
| try: |
| # Start SSH session to host machine |
| logging.info("Starting SSH session to remote machine.") |
| ssh = pxssh.pxssh() |
| if not ssh.login(args.host, args.username, args.password): |
| raise CriticalError("Could not start SSH session.") |
| # Disable character echo |
| ssh.sendline("stty -echo") |
| with ssh: |
| |
| # Try to reserve a juno board |
| juno_board = JunoBoardManager(ssh, args.password) |
| juno_board.reserve() |
| juno = juno_board.get_path() |
| logging.info("Reserved %s", juno) |
| with juno_board: |
| |
| # Get UART port and start telnet session |
| logging.info("Opening Juno UART") |
| port = get_uart_port(ssh, juno) |
| logging.info("Using telnet port %d", port) |
| uart = connect_juno_uart(args.host, port) |
| with uart: |
| |
| # Extract boardfiles locally |
| logging.info("Extracting boardfiles.") |
| local_boardfiles = os.path.join(args.workspace, "boardfiles") |
| if os.path.exists(local_boardfiles): |
| shutil.rmtree(local_boardfiles) |
| os.mkdir(local_boardfiles) |
| extract_zip_file(args.boardfiles, local_boardfiles) |
| |
| # Clear out the workspace directory on the remote system |
| logging.info("Erasing remote workspace.") |
| erase_juno_workspace(ssh, juno) |
| |
| # SCP boardfiles to juno host |
| logging.info("Copying boardfiles to remote system.") |
| copy_file_to_remote(os.path.join(local_boardfiles), \ |
| os.path.join(juno, "workspace"), args.host, args.username, args.password) |
| |
| # Try to mount the storage device |
| logging.info("Mounting the Juno storage device.") |
| try: |
| juno_board.mount() |
| except CriticalError: |
| logging.info("Mount failed, attempting to recover Juno board.") |
| recover_juno(ssh, juno_board, uart) |
| logging.info("Juno board recovered.") |
| |
| # Move boardfiles from temp directory to juno storage |
| logging.info("Copying new boardfiles to storage device.") |
| remote_copy(ssh, os.path.join(juno, "workspace", "boardfiles", "*"), \ |
| os.path.join(juno, "juno")) |
| |
| # Unmounting the juno board. |
| logging.info("Unmounting Juno storage device and finishing pending I/O.") |
| juno_board.unmount() |
| |
| # Power cycle the juno board to reboot it. */ |
| logging.info("Rebooting the Juno board.") |
| power_off(ssh, juno) |
| # dead wait to let the power supply do its thing |
| time.sleep(10) |
| power_on(ssh, juno) |
| |
| # dead wait to let the power supply do its thing |
| time.sleep(10) |
| |
| # Process UART output and wait for test completion |
| process_uart_output(uart, args.timeout, test_handler, args.host, port) |
| logging.info("Tests Passed!") |
| |
| except CriticalError as exception: |
| logging.error(str(exception)) |
| exit(1) |
| |
| # Exit with 0 on successful finish |
| exit(0) |
| |
| ################################################################################ |
| # Script Entry Point # |
| ################################################################################ |
| |
| if __name__ == "__main__": |
| main() |