# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Library containing functions to access a remote test device."""

import functools
import glob
import logging
import os
import pathlib
import re
import shutil
import socket
import stat
import subprocess
import tempfile
import time
from typing import List, Optional, Union

from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import timeout_util
from chromite.scripts import cros_set_lsb_release
from chromite.utils import memoize


_path = os.path.dirname(os.path.realpath(__file__))
TEST_PRIVATE_KEY = os.path.normpath(
    os.path.join(_path, "../ssh_keys/testing_rsa")
)
TEST_PRIVATE_PARTNER_KEY = os.path.normpath(
    os.path.join(_path, "../../sshkeys/partner_testing_rsa")
)
del _path

CHUNK_SIZE = 50 * 1024 * 1024
DEGREE_OF_PARALLELISM = 8
LOCALHOST = "localhost"
LOCALHOST_IP = "127.0.0.1"
ROOT_ACCOUNT = "root"

# IP used for testing that is a valid IP address, but would fail quickly if
# actually used for any real operation (e.g. pinging or making connections).
# https://en.wikipedia.org/wiki/IPv4#Special-use_addresses
TEST_IP = "0.1.2.3"

REBOOT_MAX_WAIT = 180
REBOOT_SSH_CONNECT_TIMEOUT = 4
REBOOT_SSH_CONNECT_ATTEMPTS = 2
CHECK_INTERVAL = 5
DEFAULT_SSH_PORT = 22
# Ssh returns status 255 when it encounters errors in its own code.  Otherwise,
# it returns the status of the command that it ran on the host, including
# possibly 255.  Here we assume that 255 indicates only ssh errors.  This may
# be a reasonable guess for our purposes.
SSH_ERROR_CODE = 255

# SSH default known_hosts filepath.
KNOWN_HOSTS_PATH = os.path.expanduser("~/.ssh/known_hosts")

# Dev/test packages are installed in these paths.
DEV_BIN_PATHS = "/usr/local/bin:/usr/local/sbin"


class RemoteAccessException(Exception):
    """Base exception for this module."""


class SSHConnectionError(RemoteAccessException):
    """Raised when SSH connection has failed."""

    def IsKnownHostsMismatch(self):
        """Returns True if this error was caused by a known_hosts mismatch.

        Will only check for a mismatch, this will return False if the host
        didn't exist in known_hosts at all.
        """
        # Checking for string output is brittle, but there's no exit code that
        # indicates why SSH failed so this might be the best we can do.
        # RemoteAccess.RemoteSh() sets LC_MESSAGES=C, so we only need to check
        # for the English error message.
        # Verified for OpenSSH_6.6.1p1.
        return "REMOTE HOST IDENTIFICATION HAS CHANGED" in str(self)


class DeviceNotPingableError(RemoteAccessException):
    """Raised when device is not pingable."""


class DefaultDeviceError(RemoteAccessException):
    """Raised when a default ChromiumOSDevice can't be found."""


class CatFileError(RemoteAccessException):
    """Raised when error occurs while trying to cat a remote file."""


class RunningPidsError(RemoteAccessException):
    """Raised when unable to get running pids on the device."""


class RebootError(RemoteAccessException):
    """Raised when a device fails to reboot."""


class ProgramNotFoundError(RemoteAccessException):
    """Raised when a program on device is not found."""


def NormalizePort(port, str_ok=True):
    """Checks if |port| is a valid port number and returns the number.

    Args:
        port: The port to normalize.
        str_ok: Accept |port| in string. If set False, only accepts
            an integer. Defaults to True.

    Returns:
        A port number (integer).
    """
    err_msg = "%s is not a valid port number." % port

    if not str_ok and not isinstance(port, int):
        raise ValueError(err_msg)

    port = int(port)
    if port <= 0 or port >= 65536:
        raise ValueError(err_msg)

    return port


def GetUnusedPort(
    ip=LOCALHOST, family=socket.AF_INET, stype=socket.SOCK_STREAM
):
    """Returns a currently unused port.

    Examples:
        Note: Since this does not guarantee the port remains unused when you
        attempt to bind it, your code should retry in a loop like so:
        while True:
            try:
                port = remote_access.GetUnusedPort()
                <attempt to bind the port>
                break
            except socket.error as e:
                if e.errno == errno.EADDRINUSE:
                    continue
                <fallback/raise>

    Args:
        ip: IP to use to bind the port.
        family: Address family.
        stype: Socket type.

    Returns:
        A port number (integer).
    """
    s = None
    try:
        s = socket.socket(family, stype)
        s.bind((ip, 0))
        return s.getsockname()[1]
    # TODO(vapier): Drop socket.error when we're Python 3-only.
    # pylint: disable=overlapping-except
    except (socket.error, OSError):
        pass
    finally:
        if s is not None:
            s.close()


def RunCommandFuncWrapper(func, msg, *args, **kwargs) -> None:
    """Wraps a function that invokes cros_build_lib.run.

    If the command failed, logs warning |msg| if check is not set;
    logs error |msg| if check is set.

    Args:
        func: The function to call.
        msg: The message to display if the command failed.
        ignore_failures: If True, ignore failures during the command.
        *args: Arguments to pass to |func|.
        **kwargs: Keyword arguments to pass to |func|.

    Returns:
        The result of |func|.

    Raises:
        cros_build_lib.RunCommandError if the command failed and check is set.
    """
    check = kwargs.pop("check", True)
    ignore_failures = kwargs.pop("ignore_failures", False)
    result = func(*args, check=False, **kwargs)

    if not ignore_failures:
        if result.returncode != 0 and check:
            raise cros_build_lib.RunCommandError(msg, result)

        if result.returncode != 0:
            logging.warning(msg)


def CompileSSHConnectSettings(**kwargs):
    """Creates a list of SSH connection options.

    Any ssh_config option can be specified in |kwargs|, in addition,
    several options are set to default values if not specified. Any
    option can be set to None to prevent this function from assigning
    a value so that the SSH default value will be used.

    This function doesn't check to make sure the |kwargs| options are
    valid, so a typo or invalid setting won't be caught until the
    resulting arguments are passed into an SSH call.

    Args:
        **kwargs: A dictionary of ssh_config settings.

    Returns:
        A list of arguments to pass to SSH.
    """
    settings = {
        "ConnectTimeout": 60,
        "ConnectionAttempts": 4,
        "NumberOfPasswordPrompts": 0,
        "Protocol": 2,
        "ServerAliveInterval": 10,
        "ServerAliveCountMax": 3,
        "StrictHostKeyChecking": "no",
        "UserKnownHostsFile": "/dev/null",
    }
    settings.update(kwargs)
    return ["-o%s=%s" % (k, v) for k, v in settings.items() if v is not None]


def RemoveKnownHost(host, known_hosts_path=KNOWN_HOSTS_PATH) -> None:
    """Removes |host| from a known_hosts file.

    `ssh-keygen -R` doesn't work on bind mounted files as they can only
    be updated in place. Since we bind mount the default known_hosts file
    when entering the chroot, this function provides an alternate way
    to remove hosts from the file.

    Args:
        host: The host name to remove from the known_hosts file.
        known_hosts_path: Path to the known_hosts file to change.
            Defaults to the standard SSH known_hosts file path.

    Raises:
        cros_build_lib.RunCommandError if ssh-keygen fails.
    """
    # `ssh-keygen -R` creates a backup file to retain the old 'known_hosts'
    # content and never deletes it. Using TempDir here to make sure both the
    # temp files created by us and `ssh-keygen -R` are deleted afterwards.
    with osutils.TempDir(prefix="remote-access-") as tempdir:
        temp_file = os.path.join(tempdir, "temp_known_hosts")
        try:
            # Using shutil.copy2 to preserve the file ownership and permissions.
            shutil.copy2(known_hosts_path, temp_file)
        except IOError:
            # If |known_hosts_path| doesn't exist, neither does |host| so we're
            # done.
            return
        cros_build_lib.dbg_run(
            ["ssh-keygen", "-R", host, "-f", temp_file], capture_output=True
        )
        shutil.copy2(temp_file, known_hosts_path)


class PortForwardSpec:
    """Represent the information required to define an SSH tunnel."""

    def __init__(
        self,
        local_port,
        remote_host="localhost",
        remote_port=None,
        local_host="localhost",
    ) -> None:
        if remote_port is None:
            remote_port = local_port
        self.local_port = NormalizePort(local_port)
        self.remote_port = NormalizePort(remote_port)
        self.local_host = local_host
        self.remote_host = remote_host

    @property
    def command_line_spec(self):
        """Return the port forwarding spec for the `ssh` command."""
        if not self.remote_host:
            return "%d:%s:%d" % (
                self.remote_port,
                self.local_host,
                self.local_port,
            )
        return "%s:%d:%s:%d" % (
            self.remote_host,
            self.remote_port,
            self.local_host,
            self.local_port,
        )


class RemoteAccess:
    """Provides access to a remote test machine."""

    DEFAULT_USERNAME = ROOT_ACCOUNT

    def __init__(
        self,
        remote_host,
        tempdir: Union[str, os.PathLike],
        port: int = None,
        username=None,
        private_key=None,
        debug_level=logging.DEBUG,
        interactive=True,
    ) -> None:
        """Construct the object.

        Args:
            remote_host: The ip or hostname of the remote test machine.  The
                test machine should be running a ChromeOS test image.
            tempdir: A directory that RemoteAccess can use to store temporary
                files. It's the responsibility of the caller to remove it.
            port: The ssh port of the test machine to connect to.
            username: The ssh login username (default: root).
            private_key: The identify file to pass to `ssh -i` (default:
                testing_rsa).
            debug_level: Logging level to use for all run invocations.
            interactive: If set to False, pass /dev/null into stdin for the sh
                cmd.
        """
        # TODO(vapier): Convert this to Path.
        self.tempdir = str(tempdir)
        self.remote_host = remote_host
        self.port = port
        self.username = username if username else self.DEFAULT_USERNAME
        self.debug_level = debug_level
        self.interactive = interactive

        private_key_src = private_key if private_key else TEST_PRIVATE_KEY
        private_key_copy = os.path.join(
            tempdir, os.path.basename(private_key_src)
        )
        self.private_keys = [private_key_copy]

        shutil.copyfile(private_key_src, private_key_copy)
        os.chmod(private_key_copy, stat.S_IRUSR)

        if not private_key and os.path.exists(TEST_PRIVATE_PARTNER_KEY):
            # Try both external and partner private keys if private_key is not
            # specified explicitly.
            partner_key_src = TEST_PRIVATE_PARTNER_KEY
            partner_key_copy = os.path.join(
                tempdir, os.path.basename(partner_key_src)
            )
            self.private_keys.append(partner_key_copy)
            shutil.copyfile(partner_key_src, partner_key_copy)
            os.chmod(partner_key_copy, stat.S_IRUSR)

    @staticmethod
    def _mockable_popen(*args, **kwargs):
        """This wraps subprocess.Popen so it can be mocked in unit tests."""
        # pylint: disable=consider-using-with
        return subprocess.Popen(*args, **kwargs)

    @property
    def target_ssh_url(self):
        return "%s@%s" % (self.username, self.remote_host)

    def _GetSSHCmd(self, connect_settings=None):
        if connect_settings is None:
            connect_settings = CompileSSHConnectSettings()

        cmd = ["ssh"]
        if self.port:
            cmd += ["-p", str(self.port)]
        cmd += connect_settings
        cmd += ["-oIdentitiesOnly=yes"]
        for private_key in self.private_keys:
            cmd.extend(["-i", private_key])
        if not self.interactive:
            cmd.append("-n")

        return cmd

    def GetSSHCommand(self, connect_settings=None):
        """Returns the ssh command that can be used to connect to the device

        Args:
            connect_settings: dict of additional ssh options

        Returns:
            ['ssh', '...', 'user@host']
        """
        ssh_cmd = self._GetSSHCmd(connect_settings=connect_settings)
        ssh_cmd.append(self.target_ssh_url)

        return ssh_cmd

    def RemoteSh(
        self,
        cmd,
        connect_settings=None,
        check=True,
        remote_sudo=False,
        remote_user=None,
        ssh_error_ok=False,
        **kwargs,
    ):
        """Run a sh command on the remote device through ssh.

        Args:
            cmd: The command string or list to run. None or empty string/list
                will start an interactive session.
            connect_settings: The SSH connect settings to use.
            check: Throw an exception when the command exits with a non-zero
                returncode.  This does not cover the case where the ssh command
                itself fails (return code 255).  See ssh_error_ok.
            ssh_error_ok: Does not throw an exception when the ssh command
                itself fails (return code 255).
            remote_sudo: If set, run the command in remote shell with sudo.
            remote_user: If set, run the command as the specified user.
            **kwargs: See cros_build_lib.run documentation.

        Returns:
            A CompletedProcess object.  The returncode is the returncode of the
            command, or 255 if ssh encountered an error (could not connect,
            connection interrupted, etc.)

        Raises:
            RunCommandError when error is not ignored through the check flag.
            SSHConnectionError when ssh command error is not ignored through
            the ssh_error_ok flag.
        """
        if "capture_output" not in kwargs:
            kwargs.setdefault("stdout", True)
            kwargs.setdefault("stderr", True)
        kwargs.setdefault("encoding", "utf-8")
        kwargs.setdefault("debug_level", self.debug_level)
        # Force English SSH messages. SSHConnectionError.IsKnownHostsMismatch()
        # requires English errors to detect a known_hosts key mismatch error.
        kwargs.setdefault("extra_env", {})["LC_MESSAGES"] = "C"

        prev_user = self.username
        if remote_user:
            self.username = remote_user

        ssh_cmd = self.GetSSHCommand(connect_settings=connect_settings)

        if cmd:
            ssh_cmd.append("--")

            if remote_sudo and self.username != ROOT_ACCOUNT:
                # Prepend sudo to cmd.
                ssh_cmd.append("sudo")

            if isinstance(cmd, str):
                if kwargs.get("shell"):
                    ssh_cmd = "%s %s" % (
                        " ".join(ssh_cmd),
                        cros_build_lib.ShellQuote(cmd),
                    )
                else:
                    ssh_cmd += [cmd]
            else:
                ssh_cmd += cmd

        try:
            return cros_build_lib.run(ssh_cmd, **kwargs)
        except cros_build_lib.RunCommandError as e:
            if (e.returncode == SSH_ERROR_CODE and ssh_error_ok) or (
                e.returncode and e.returncode != SSH_ERROR_CODE and not check
            ):
                return e.result
            elif e.returncode == SSH_ERROR_CODE:
                raise SSHConnectionError(e.stderr)
            else:
                raise
        finally:
            # Restore the previous user if we temporarily changed it earlier.
            self.username = prev_user

    def CreateTunnel(
        self, to_local=None, to_remote=None, connect_settings=None
    ):
        """Establishes SSH tunnel to the remote device as a background process.

        Args:
            to_local: A list of PortForwardSpec objects to forward from the
                local machine to the remote machine.
            to_remote: A list of PortForwardSpec to forward from the remote
                machine to the local machine.
            connect_settings: The SSH connect settings to use.

        Returns:
            A Popen object. Note that it represents an already started
            background process. Calling poll() on the return value can be used
            to check that the tunnel is still running. To close the tunnel call
            terminate().
        """

        ssh_cmd = self._GetSSHCmd(connect_settings=connect_settings)
        if to_local is not None:
            ssh_cmd.extend(
                token
                for spec in to_local
                for token in ("-L", spec.command_line_spec)
            )
        if to_remote is not None:
            ssh_cmd.extend(
                token
                for spec in to_remote
                for token in ("-R", spec.command_line_spec)
            )
        ssh_cmd.append("-N")
        ssh_cmd.append(self.target_ssh_url)

        logging.log(self.debug_level, "%s", cros_build_lib.CmdToStr(ssh_cmd))

        return RemoteAccess._mockable_popen(ssh_cmd)

    def _GetBootId(self, rebooting=False) -> None:
        """Obtains unique boot session identifier.

        If rebooting is True, uses a SSH connection with a short timeout,
        which will wait for at most about ten seconds. If the network returns
        an error (e.g. host unreachable) the delay can be shorter.
        If rebooting is True and an ssh error occurs, None is returned.
        """
        if rebooting:
            # In tests SSH seems to be waiting rather longer than would be
            # expected from these parameters. These values produce a ~5 second
            # wait.
            connect_settings = CompileSSHConnectSettings(
                ConnectTimeout=REBOOT_SSH_CONNECT_TIMEOUT,
                ConnectionAttempts=REBOOT_SSH_CONNECT_ATTEMPTS,
            )
            result = self.RemoteSh(
                ["cat", "/proc/sys/kernel/random/boot_id"],
                connect_settings=connect_settings,
                check=False,
                ssh_error_ok=True,
                log_output=True,
            )
            if result.returncode == SSH_ERROR_CODE:
                return None
            elif result.returncode == 0:
                return result.stdout.rstrip()
            else:
                raise Exception(
                    "Unexpected error code %s getting boot ID."
                    % result.returncode
                )
        else:
            result = self.RemoteSh(
                ["cat", "/proc/sys/kernel/random/boot_id"], log_output=True
            )
            return result.stdout.rstrip()

    def CheckIfRebooted(self, old_boot_id):
        """Checks if the remote device has successfully rebooted

        This compares the remote device old and current boot IDs.  If
        ssh errors occur, the device has likely not booted and False is
        returned.  Basically only returns True if it is proven that the
        device has rebooted.  May throw exceptions.

        Returns:
            True if the device has successfully rebooted, False otherwise.
        """
        new_boot_id = self._GetBootId(rebooting=True)
        if new_boot_id is None:
            logging.debug(
                "Unable to get new boot_id after reboot from boot_id %s",
                old_boot_id,
            )
            return False
        elif new_boot_id == old_boot_id:
            logging.debug(
                "Checking if rebooted from boot_id %s, still running %s",
                old_boot_id,
                new_boot_id,
            )
            return False
        else:
            logging.debug(
                "Checking if rebooted from boot_id %s, now running %s",
                old_boot_id,
                new_boot_id,
            )
            return True

    def AwaitReboot(self, old_boot_id, timeout_sec=REBOOT_MAX_WAIT):
        """Await reboot away from old_boot_id.

        Args:
            old_boot_id: The boot_id that must be transitioned away from for
                success.
            timeout_sec: How long to wait for reboot.

        Returns:
            True if the device has successfully rebooted.
        """
        try:
            timeout_util.WaitForReturnTrue(
                lambda: self.CheckIfRebooted(old_boot_id),
                timeout_sec,
                period=CHECK_INTERVAL,
            )
        except timeout_util.TimeoutError:
            return False
        return True

    def RemoteReboot(self, timeout_sec=REBOOT_MAX_WAIT) -> None:
        """Reboot the remote device."""
        logging.info("Rebooting %s...", self.remote_host)
        old_boot_id = self._GetBootId()
        # Use ssh_error_ok=True in the remote shell invocations because the
        # reboot might kill sshd before the connection completes normally.
        self.RemoteSh(["reboot"], ssh_error_ok=True, remote_sudo=True)
        time.sleep(CHECK_INTERVAL)
        if not self.AwaitReboot(old_boot_id, timeout_sec):
            raise RebootError(
                "Reboot has not completed after %s seconds; giving up."
                % (timeout_sec,)
            )

    def Rsync(
        self,
        src: Union[str, os.PathLike],
        dest: Union[str, os.PathLike],
        to_local: bool = False,
        follow_symlinks: bool = False,
        recursive: bool = True,
        inplace: bool = False,
        verbose: bool = False,
        sudo: bool = False,
        remote_sudo: bool = False,
        compress: bool = True,
        files_from: Optional[Union[str, os.PathLike]] = None,
        chmod: str = None,
        chown: str = None,
        relative: bool = False,
        mkpath: bool = False,
        **kwargs,
    ):
        """Rsync a path to the remote device.

        Rsync a path to the remote device. If |to_local| is set True, it
        rsyncs the path from the remote device to the local machine.

        Args:
            src: The local src directory.
            dest: The remote dest directory.
            to_local: If set, rsync remote path to local path.
            follow_symlinks: If set, transform symlinks into referent
                path. Otherwise, copy symlinks as symlinks.
            recursive: Whether to recursively copy entire directories.
            inplace: Cause rsync to overwrite the dest files in place. This
                conserves space, but has some side effects - see rsync man page.
            verbose: Print more verbose output during rsync file transfer.
            sudo: If set, invoke the command via sudo.
            remote_sudo: If set, run the command in remote shell with sudo.
            compress: If set, compress file data during the transfer.
            files_from: Read paths from this file (plus some other changes to
                behaviour per rsync's --files-from).
            chmod: Change file permission on remote device.
            chown: Change file owner and group on remote device. The user/group
                are resolved on the remote device.
            relative: If set, pass relative path to rsync (rsync's --relative
                option).
            mkpath: If set, creates all the missing path on remote device.
            **kwargs: See cros_build_lib.run documentation.
        """
        kwargs.setdefault("debug_level", self.debug_level)

        ssh_cmd = " ".join(self._GetSSHCmd())
        rsync_cmd = [
            "rsync",
            "--perms",
            "--verbose",
            "--times",
            "--omit-dir-times",
            "--exclude",
            ".svn",
        ]
        rsync_cmd.append("--copy-links" if follow_symlinks else "--links")
        if files_from:
            rsync_cmd.extend(["--files-from", files_from])
        rsync_sudo = (
            "sudo" if (remote_sudo and self.username != ROOT_ACCOUNT) else ""
        )
        rsync_cmd += [
            "--rsync-path",
            "PATH=%s:$PATH %s rsync" % (DEV_BIN_PATHS, rsync_sudo),
        ]

        if verbose:
            rsync_cmd.append("--progress")
        if recursive:
            rsync_cmd.append("--recursive")
        if inplace:
            rsync_cmd.append("--inplace")
        if compress:
            rsync_cmd.append("--compress")
        logging.info("Using rsync compression: %s", compress)

        chmod = chmod if chmod else kwargs.pop("chmod", None)
        chown = chown if chown else kwargs.pop("chown", None)
        if chmod:
            rsync_cmd.append(f"--chmod={chmod}")
        if chown:
            rsync_cmd.extend(["--owner", "--group", f"--chown={chown}"])
        if relative:
            rsync_cmd.append("--relative")
        if mkpath:
            rsync_cmd.append("--mkpath")

        if to_local:
            rsync_cmd += [
                "--rsh",
                ssh_cmd,
                "[%s]:%s" % (self.target_ssh_url, src),
                dest,
            ]
        else:
            rsync_cmd += [
                "--rsh",
                ssh_cmd,
                src,
                "[%s]:%s" % (self.target_ssh_url, dest),
            ]

        rc_func = cros_build_lib.run
        if sudo:
            rc_func = cros_build_lib.sudo_run
        return rc_func(rsync_cmd, print_cmd=verbose, **kwargs)

    def RsyncToLocal(self, *args, **kwargs):
        """Rsync a path from the remote device to the local machine."""
        return self.Rsync(
            *args, to_local=kwargs.pop("to_local", True), **kwargs
        )

    def Scp(
        self,
        src: Union[str, os.PathLike],
        dest: Union[str, os.PathLike],
        to_local: bool = False,
        recursive: bool = True,
        verbose: bool = False,
        sudo: bool = False,
        **kwargs,
    ):
        """Scp a file or directory to the remote device.

        Args:
            src: The local src file or directory.
            dest: The remote dest location.
            to_local: If set, scp remote path to local path.
            recursive: Whether to recursively copy entire directories.
            verbose: If set, print more verbose output during scp file transfer.
            sudo: If set, invoke the command via sudo.
            remote_sudo: If set, run the command in remote shell with sudo.
            compress: If set, passes the -C flag to scp to enable compression.
            **kwargs: See cros_build_lib.run documentation.

        Returns:
            A CompletedProcess object containing the information and return code
            of the scp command.
        """
        remote_sudo = kwargs.pop("remote_sudo", False)
        if remote_sudo and self.username != ROOT_ACCOUNT:
            # TODO: Implement scp with remote sudo.
            raise NotImplementedError("Cannot run scp with sudo!")

        compress = kwargs.pop("compress", False)

        kwargs.setdefault("debug_level", self.debug_level)
        # scp relies on 'scp' being in the $PATH of the non-interactive,
        # SSH login shell.
        scp_cmd = ["scp"]
        if self.port:
            scp_cmd += ["-P", str(self.port)]
        scp_cmd += CompileSSHConnectSettings(ConnectTimeout=60)
        for private_key in self.private_keys:
            scp_cmd.extend(["-i", private_key])

        if not self.interactive:
            scp_cmd.append("-n")

        if recursive:
            scp_cmd.append("-r")
        if verbose:
            scp_cmd.append("-v")

        if compress:
            scp_cmd.append("-C")

        # Check for an IPv6 address
        if ":" in self.remote_host:
            target_ssh_url = "%s@[%s]" % (self.username, self.remote_host)
        else:
            target_ssh_url = self.target_ssh_url

        if to_local:
            scp_cmd += ["%s:%s" % (target_ssh_url, src), dest]
        else:
            scp_cmd += [src, "%s:%s" % (target_ssh_url, dest)]

        rc_func = cros_build_lib.run
        if sudo:
            rc_func = cros_build_lib.sudo_run

        return rc_func(scp_cmd, print_cmd=verbose, **kwargs)

    def ScpToLocal(self, *args, **kwargs):
        """Scp a path from the remote device to the local machine."""
        return self.Scp(*args, to_local=kwargs.pop("to_local", True), **kwargs)

    def PipeToRemoteSh(self, producer_cmd, cmd, **kwargs):
        """Run a local command and pipe it to a remote sh command over ssh.

        Args:
            producer_cmd: Command to run locally with its results piped to
                |cmd|.
            cmd: Command to run on the remote device.
            **kwargs: See RemoteSh for documentation.
        """
        result = cros_build_lib.run(
            producer_cmd, print_cmd=False, capture_output=True
        )
        return self.RemoteSh(
            cmd, input=kwargs.pop("input", result.stdout), **kwargs
        )


class RemoteDeviceHandler:
    """A wrapper of RemoteDevice."""

    def __init__(self, *args, **kwargs) -> None:
        """Creates a RemoteDevice object."""
        self.device = RemoteDevice(*args, **kwargs)

    def __enter__(self):
        """Return the temporary directory."""
        return self.device

    def __exit__(self, _type, _value, _traceback) -> None:
        """Cleans up the device."""
        self.device.Cleanup()


class ChromiumOSDeviceHandler:
    """A wrapper of ChromiumOSDevice."""

    def __init__(self, *args, **kwargs) -> None:
        """Creates a RemoteDevice object."""
        self.device = ChromiumOSDevice(*args, **kwargs)

    def __enter__(self):
        """Return the temporary directory."""
        return self.device

    def __exit__(self, _type, _value, _traceback) -> None:
        """Cleans up the device."""
        self.device.Cleanup()


class RemoteDevice:
    """Handling basic SSH communication with a remote device."""

    DEFAULT_BASE_DIR = "/tmp/remote-access"

    def __init__(
        self,
        hostname,
        port=None,
        username=None,
        base_dir=DEFAULT_BASE_DIR,
        connect_settings=None,
        private_key=None,
        debug_level=logging.DEBUG,
        ping=False,
        connect=True,
    ) -> None:
        """Initializes a RemoteDevice object.

        Args:
            hostname: The hostname of the device.
            port: The ssh port of the device.
            username: The ssh login username.
            base_dir: The base work directory to create on the device, or None.
            connect_settings: Default SSH connection settings.
            private_key: The identify file to pass to `ssh -i`.
            debug_level: Setting debug level for logging.
            ping: Whether to ping the device before attempting to connect.
            connect: True to set up the connection, otherwise set up will
            be automatically deferred until device use.
        """
        self.hostname = hostname
        self.port = port
        self.username = username
        # The tempdir is for storing the rsa key and/or some temp files.
        self.tempdir = osutils.TempDir(prefix="ssh-tmp")
        self.connect_settings = (
            connect_settings
            if connect_settings
            else CompileSSHConnectSettings()
        )
        self.private_key = private_key
        self.debug_level = debug_level
        # The temporary work directories on the device.
        self._base_dir = base_dir
        self._work_dir = None
        # Use agent instead of accessing this directly for deferred connect.
        self._agent = None
        self.cleanup_cmds = []

        if ping and not self.Pingable():
            raise DeviceNotPingableError(
                "Device %s is not pingable." % self.hostname
            )

        if connect:
            self._Connect()

    def Pingable(self, timeout=20):
        """Returns True if the device is pingable.

        Args:
            timeout: Timeout in seconds (default: 20 seconds).

        Returns:
            True if the device responded to the ping before |timeout|.
        """
        try:
            addrlist = socket.getaddrinfo(self.hostname, 22)
        except socket.gaierror:
            # If the hostname is the name of a "Host" entry in ~/.ssh/config,
            # it might be ssh-able but not pingable.
            # If the hostname is truly bogus, ssh will fail immediately, so
            # we can safely skip the ping step.
            logging.info(
                'Hostname "%s" not found, falling through to ssh', self.hostname
            )
            return True

        if addrlist[0][0] == socket.AF_INET6:
            ping_command = "ping6"
        else:
            ping_command = "ping"

        result = cros_build_lib.run(
            [ping_command, "-c", "1", "-w", str(timeout), self.hostname],
            check=False,
            capture_output=True,
        )
        return result.returncode == 0

    @property
    def agent(self):
        """Agent accessor; connects the agent if necessary."""
        if not self._agent:
            self._Connect()
        return self._agent

    def _Connect(self) -> None:
        """Sets up the SSH connection and internal state."""
        self._agent = RemoteAccess(
            self.hostname,
            self.tempdir.tempdir,
            port=self.port,
            username=self.username,
            private_key=self.private_key,
        )

    @property
    def work_dir(self):
        """The work directory to create on the device.

        This property exists so we can create the remote paths on demand.  For
        some use cases, it'll never be needed, so skipping creation is faster.
        """
        if self._base_dir is None:
            return None

        if self._work_dir is None:
            self._work_dir = self.run(
                [
                    "mkdir",
                    "-p",
                    self._base_dir,
                    "&&",
                    "mktemp",
                    "-d",
                    "--tmpdir=%s" % self._base_dir,
                ],
                capture_output=True,
            ).stdout.strip()
            logging.debug(
                "The temporary working directory on the device is %s",
                self._work_dir,
            )
            self.RegisterCleanupCmd(["rm", "-rf", self._work_dir])

        return self._work_dir

    @functools.lru_cache(maxsize=256)
    def HasProgramInPath(self, binary: str) -> bool:
        """Checks if the given |binary| exists on the device.

        This will cache the result and assume that $PATH does not have entries
        added or removed for the life of the connection.
        """
        result = self.agent.RemoteSh(
            f"PATH={cros_build_lib.ShellQuote(DEV_BIN_PATHS)}:$PATH which "
            f"{cros_build_lib.ShellQuote(binary)}",
            check=False,
            shell=True,
        )
        return result.returncode == 0

    def HasRsync(self):
        """Checks if rsync exists on the device."""
        return self.HasProgramInPath("rsync")

    @memoize.MemoizedSingleCall
    def HasGigabitEthernet(self):
        """Checks if the device has a gigabit ethernet port.

        The function checks the device's first ethernet interface (eth0).
        """
        result = self.agent.RemoteSh(
            ["ethtool", "eth0"], check=False, capture_output=True
        )
        return re.search(r"Speed: \d+000Mb/s", result.stdout)

    @memoize.MemoizedSingleCall
    def IsSELinuxAvailable(self):
        """Check whether the device has SELinux compiled in."""
        # Note that SELinux can be enabled for some devices that lack SELinux
        # tools, so we need to check for the existence of the restorecon bin
        # along with the sysfs check.
        return self.HasProgramInPath("restorecon") and self.IfFileExists(
            "/sys/fs/selinux/enforce"
        )

    def IsSELinuxEnforced(self):
        """Check whether the device has SELinux-enforced."""
        if not self.IsSELinuxAvailable():
            return False
        return (
            self.CatFile("/sys/fs/selinux/enforce", max_size=None).strip()
            == "1"
        )

    def RegisterCleanupCmd(self, cmd, **kwargs) -> None:
        """Register a cleanup command to be run on the device in Cleanup().

        Args:
            cmd: command to run. See RemoteAccess.RemoteSh documentation.
            **kwargs: keyword arguments to pass along with cmd. See
                RemoteAccess.RemoteSh documentation.
        """
        self.cleanup_cmds.append((cmd, kwargs))

    def Cleanup(self) -> None:
        """Remove work/temp dirs and run all registered cleanup commands."""
        for cmd, kwargs in self.cleanup_cmds:
            # We want to run through all cleanup commands even if there are
            # errors.
            kwargs.setdefault("check", False)
            try:
                self.run(cmd, **kwargs)
            except SSHConnectionError:
                logging.error(
                    "Failed to connect to host in Cleanup, so "
                    "SSHConnectionError will not be raised."
                )

        self.tempdir.Cleanup()

    def _CopyToDeviceInParallel(self, src, dest) -> None:
        """Chop source file in chunks, send them to destination in parallel.

        Transfer chunks of file in parallel and assemble in destination if the
        file size is larger than chunk size. Fall back to scp mode otherwise.

        Args:
            src: Local path as a string.
            dest: rsync/scp path of the form <host>:/<path> as a string.
        """
        src_filename = os.path.basename(src)
        chunk_prefix = src_filename + "_"
        with osutils.TempDir() as tempdir:
            chunk_path = os.path.join(tempdir, chunk_prefix)
            try:
                cmd = ["split", "-b", str(CHUNK_SIZE), src, chunk_path]
                cros_build_lib.run(cmd)
                input_list = [
                    [chunk_file, dest, "scp"]
                    for chunk_file in glob.glob(chunk_path + "*")
                ]
                parallel.RunTasksInProcessPool(
                    self.CopyToDevice,
                    input_list,
                    processes=DEGREE_OF_PARALLELISM,
                )
                logging.info("Assembling these chunks now.....")
                chunks = "%s/%s*" % (dest, chunk_prefix)
                final_dest = "%s/%s" % (dest, src_filename)
                assemble_cmd = ["cat", chunks, ">", final_dest]
                self.run(assemble_cmd)
                cleanup_cmd = ["rm", "-f", chunks]
                self.run(cleanup_cmd)
            except IOError:
                logging.error("Could not complete the payload transfer...")
                raise
        logging.info(
            "Successfully copy %s to %s in chunks in parallel", src, dest
        )

    def CopyToDevice(self, src, dest, mode, **kwargs):
        """Copy path to device.

        Args:
            src: Local path as a string.
            dest: rsync/scp path of the form <host>:/<path> as a string.
            mode: must be one of 'rsync', 'scp', or 'parallel'.
                * Use rsync --compress when copying compressible (factor > 2,
                    text/log) files. This uses a quite a bit of CPU but
                    preserves bandwidth.
                * Use rsync without compression when delta transferring a whole
                    directory tree which exists at the destination and changed
                    very little (say telemetry directory or unpacked stateful
                    or unpacked rootfs). It also often works well for an
                    uncompressed archive, copied over a previous copy (which
                    must exist at the destination) needing minor updates.
                * Use scp when we have incompressible files (say already
                    compressed), especially if we know no previous version
                    exist at the destination.
                * Use parallel when we want to transfer a large file with chunks
                    and transfer them in degree of parallelism for speed
                    especially for slow network (congested, long haul, worse
                    SNR).
        """
        assert mode in ["rsync", "scp", "parallel"]
        logging.info(
            "[mode:%s] copy: %s -> %s:%s", mode, src, self.hostname, dest
        )
        if mode == "parallel":
            # Chop and send chunks in parallel only if the file size is larger
            # than CHUNK_SIZE.
            if os.stat(src).st_size > CHUNK_SIZE:
                self._CopyToDeviceInParallel(src, dest)
                return
            else:
                logging.info(
                    "%s is too small for parallelism, fall back to scp", src
                )
                mode = "scp"
        msg = "Could not copy %s to device." % src
        # Fall back to scp if device has no rsync. Happens when stateful is
        # cleaned.
        if mode == "scp" or not self.HasRsync():
            # scp always follow symlinks
            kwargs.pop("follow_symlinks", None)
            func = self.agent.Scp
        else:
            func = self.agent.Rsync

        return RunCommandFuncWrapper(func, msg, src, dest, **kwargs)

    def CopyFromDevice(self, src, dest, mode="scp", **kwargs):
        """Copy path from device.

        Adding --compress recommended for text like log files.

        Args:
            src: rsync/scp path of the form <host>:/<path> as a string.
            dest: Local path as a string.
            mode: See mode on CopyToDevice.
        """
        msg = "Could not copy %s from device." % src
        # Fall back to scp if device has no rsync. Happens when stateful is
        # cleaned.
        if mode == "scp" or not self.HasRsync():
            # scp always follow symlinks
            kwargs.pop("follow_symlinks", None)
            func = self.agent.ScpToLocal
        else:
            func = self.agent.RsyncToLocal

        return RunCommandFuncWrapper(func, msg, src, dest, **kwargs)

    def CopyFromWorkDir(self, src, dest, **kwargs):
        """Copy path from working directory on the device."""
        return self.CopyFromDevice(
            os.path.join(self.work_dir, src), dest, **kwargs
        )

    def CopyToWorkDir(self, src, dest="", **kwargs):
        """Copy path to working directory on the device."""
        return self.CopyToDevice(
            src, os.path.join(self.work_dir, dest), **kwargs
        )

    def _TestPath(self, path, option, **kwargs):
        """Tests a given path for specific options."""
        kwargs.setdefault("check", False)
        result = self.run(["test", option, path], **kwargs)
        return result.returncode == 0

    def IfFileExists(self, path, **kwargs):
        """Check if the given file exists on the device."""
        return self._TestPath(path, "-f", **kwargs)

    def IfPathExists(self, path, **kwargs):
        """Check if the given path exists on the device."""
        return self._TestPath(path, "-e", **kwargs)

    def IsDirWritable(self, path):
        """Checks if the given directory is writable on the device.

        Args:
            path: Directory on the device to check.
        """
        tmp_file = os.path.join(path, ".tmp.remote_access.is.writable")
        result = self.agent.RemoteSh(
            f"touch {cros_build_lib.ShellQuote(tmp_file)} && "
            f"rm {cros_build_lib.ShellQuote(tmp_file)}",
            check=False,
            remote_sudo=True,
            capture_output=True,
            shell=True,
        )
        return result.returncode == 0

    def IsFileExecutable(self, path):
        """Check if the given file is executable on the device.

        Args:
            path: full path to the file on the device to check.

        Returns:
            True if the file is executable, and false if the file does not exist
            or is not executable.
        """
        cmd = [
            "test",
            "-f",
            path,
            "-a",
            "-x",
            path,
        ]
        result = self.agent.RemoteSh(
            cmd, remote_sudo=True, check=False, capture_output=True
        )
        return result.returncode == 0

    def GetSize(self, path):
        """Gets the size of the given file on the device.

        Args:
            path: full path to the file on the device.

        Returns:
            Size of the file in number of bytes.

        Raises:
            ValueError: if failed to get file size from the remote output.
            cros_build_lib.RunCommandError: if |path| does not exist or the
                remote command to get file size has failed.
        """
        cmd = ["du", "-Lb", "--max-depth=0", path]
        result = self.run(cmd, remote_sudo=True, capture_output=True)
        return int(result.stdout.split()[0])

    def CatFile(self, path, max_size=1000000, encoding="utf-8"):
        """Reads the file on device to string if its size is < |max_size|.

        Args:
            path: The full path to the file on the device to read.
            max_size: Read the file only if its size is less than |max_size| in
                bytes. If None, do not check its size and always cat the path.
            encoding: Encoding for return value.  Use None to get bytes.

        Returns:
            A string of the file content.

        Raises:
            CatFileError: if failed to read the remote file or the file size is
                larger than |max_size|.
        """
        if max_size is not None:
            try:
                file_size = self.GetSize(path)
            except (ValueError, cros_build_lib.RunCommandError) as e:
                raise CatFileError(
                    'Failed to get size of file "%s": %s' % (path, e)
                )
            if file_size > max_size:
                raise CatFileError(
                    'File "%s" is larger than %d bytes' % (path, max_size)
                )

        result = self.run(
            ["cat", path],
            remote_sudo=True,
            check=False,
            capture_output=True,
            encoding=encoding,
        )
        if result.returncode:
            raise CatFileError('Failed to read file "%s" on the device' % path)
        return result.stdout

    def DeletePath(
        self, path, relative_to_work_dir=False, recursive=False
    ) -> None:
        """Deletes a path on the remote device.

        Args:
            path: The path on the remote device that should be deleted.
            relative_to_work_dir: If true, the path is relative to
                |self.work_dir|.
            recursive: If true, the |path| is deleted recursively.

        Raises:
            cros_build_lib.RunCommandError: if |path| does not exist or the
                remote command to delete the |path| has failed.
        """
        if relative_to_work_dir:
            path = os.path.join(self.work_dir, path)

        cmd = ["rm", "-f"]
        if recursive:
            cmd += ["-r"]
        cmd += [path]

        self.run(cmd)

    def chmod(
        self,
        path: Union[Union[str, os.PathLike], List[Union[str, os.PathLike]]],
        mode: Union[int, str],
        check: bool = True,
        recursive: bool = False,
    ) -> None:
        """Changing file modes on paths on the remote device."""
        if isinstance(mode, int):
            mode = f"{mode:o}"
        cmd = ["chmod"]
        if recursive:
            cmd += ["--recursive"]
        cmd += [mode, "--"]
        if isinstance(path, (str, os.PathLike)):
            cmd += [path]
        else:
            cmd += path
        self.run(cmd, check=check)

    def mkdir(
        self,
        path: Union[Union[str, os.PathLike], List[Union[str, os.PathLike]]],
        mode: int = 0o755,
    ) -> None:
        """Create a directory on the remote device."""
        # Always specify the mode to avoid umask confusion.
        cmd = ["mkdir", "-p", f"--mode={mode:o}", "--"]
        if isinstance(path, (str, os.PathLike)):
            cmd += [path]
        else:
            cmd += path
        self.run(cmd)

    def PipeOverSSH(self, filepath, cmd, **kwargs):
        """Cat a file and pipe over SSH."""
        producer_cmd = ["cat", filepath]
        return self.agent.PipeToRemoteSh(producer_cmd, cmd, **kwargs)

    def GetRunningPids(self, exe, full_path=True):
        """Get all the running pids on the device with the executable path.

        Args:
            exe: The executable path to get pids for.
            full_path: Whether |exe| is a full executable path.

        Raises:
            RunningPidsError when failing to parse out pids from command output.
            SSHConnectionError when error occurs during SSH connection.
        """
        try:
            cmd = ["pgrep", exe]
            if full_path:
                cmd.append("-f")
            result = self.agent.RemoteSh(cmd, check=False, capture_output=True)
            try:
                return [int(pid) for pid in result.stdout.splitlines()]
            except ValueError:
                logging.error("Parsing output failed:\n%s", result.stdout)
                raise RunningPidsError("Unable to get running pids of %s" % exe)
        except SSHConnectionError:
            logging.error("Error connecting to device %s", self.hostname)
            raise

    def Reboot(self, timeout_sec=REBOOT_MAX_WAIT):
        """Reboot the device."""
        return self.agent.RemoteReboot(timeout_sec=timeout_sec)

    def run(self, cmd, **kwargs):
        """Executes a shell command on the device.

        Output captured by default.

        Also sets environment variables using dictionary provided by
        keyword argument |extra_env|.

        Args:
            cmd: command to run. See RemoteAccess.RemoteSh documentation.
            **kwargs: keyword arguments to pass along with cmd. See
                RemoteAccess.RemoteSh documentation.
        """
        kwargs.setdefault("debug_level", self.debug_level)
        kwargs.setdefault("connect_settings", self.connect_settings)

        # Handle setting environment variables on the device by copying
        # and sourcing a temporary environment file.
        extra_env = kwargs.pop("extra_env", None)
        if extra_env:
            remote_sudo = kwargs.pop("remote_sudo", False)
            if remote_sudo and self.agent.username == ROOT_ACCOUNT:
                remote_sudo = False

            new_cmd = []
            flat_vars = [
                "%s=%s" % (k, cros_build_lib.ShellQuote(v))
                for k, v in extra_env.items()
            ]

            # If the vars are too large for the command line, do it indirectly.
            # We pick 32k somewhat arbitrarily -- the kernel should accept this
            # and rarely should remote commands get near that size.
            ARG_MAX = 32 * 1024

            # What the command line would generally look like on the remote.
            if isinstance(cmd, str):
                if not kwargs.get("shell", False):
                    raise ValueError(
                        "'shell' must be True when 'cmd' is a string."
                    )
                shell_cmd = cmd
            else:
                if kwargs.get("shell", False):
                    raise ValueError(
                        "'shell' must be False when 'cmd' is a list."
                    )
                # Support pathlib & strings to match subprocess, but allow other
                # types through, so they still fail.
                shell_cmd = " ".join(
                    str(x) if isinstance(x, pathlib.PurePath) else x
                    for x in cmd
                )
            cmdline = " ".join(flat_vars) + shell_cmd

            if len(cmdline) > ARG_MAX:
                env_list = ["export %s" % x for x in flat_vars]
                with tempfile.NamedTemporaryFile(
                    dir=self.tempdir.tempdir, prefix="env"
                ) as f:
                    logging.debug(
                        "Environment variables: %s", " ".join(env_list)
                    )
                    osutils.WriteFile(f.name, "\n".join(env_list))
                    self.CopyToWorkDir(f.name)
                    env_file = os.path.join(
                        self.work_dir, os.path.basename(f.name)
                    )
                    new_cmd += [".", "%s;" % env_file]
                    if remote_sudo:
                        new_cmd += ["sudo", "-E"]
            else:
                if remote_sudo:
                    new_cmd += ["sudo"]
                new_cmd += flat_vars

            if isinstance(cmd, str):
                cmd = " ".join(new_cmd) + " " + shell_cmd
            else:
                cmd = new_cmd + cmd

        try:
            return self.agent.RemoteSh(cmd, **kwargs)
        except SSHConnectionError:
            logging.error("Error connecting to device %s", self.hostname)
            raise

    def CheckIfRebooted(self, old_boot_id):
        """Checks if the remote device has successfully rebooted

        This compares the remote device old and current boot IDs.  If
        ssh errors occur, the device has likely not booted and False is
        returned.  Basically only returns True if it is proven that the
        device has rebooted.  May throw exceptions.

        Returns:
            True if the device has successfully rebooted, false otherwise.
        """
        return self.agent.CheckIfRebooted(old_boot_id)

    def AwaitReboot(self, old_boot_id):
        """Await reboot away from old_boot_id.

        Args:
            old_boot_id: The boot_id that must be transitioned away from for
                success.

        Returns:
            True if the device has successfully rebooted.
        """
        return self.agent.AwaitReboot(old_boot_id)

    def GetDecompressor(self, compression):
        """Returns a decompressor command on a remote device.

        Args:
            compression: The type of compression desired. See
                cros_build_lib.CompressionType.*.

        Returns:
            command to a decompressor as a string list.

        Raises:
            ValueError: If compression is unknown.
        """

        if compression == cros_build_lib.CompressionType.XZ:
            prog = "xz"
        elif compression == cros_build_lib.CompressionType.GZIP:
            prog = "gzip"
        elif compression == cros_build_lib.CompressionType.BZIP2:
            prog = "bzip2"
        elif compression == cros_build_lib.CompressionType.ZSTD:
            prog = "zstd"
        elif compression == cros_build_lib.CompressionType.NONE:
            return ["cat"]
        else:
            raise ValueError(f"Unknown compression: {compression}")

        if self.HasProgramInPath(prog):
            return [prog, "--decompress", "--stdout"]

        raise ProgramNotFoundError(
            f"No decompressor found for compression: {compression}"
        )


class ChromiumOSDevice(RemoteDevice):
    """Basic commands to interact with a ChromiumOS device over SSH."""

    MAKE_DEV_SSD_BIN = "/usr/share/vboot/bin/make_dev_ssd.sh"
    MOUNT_ROOTFS_RW_CMD = ["mount", "-o", "remount,rw", "/"]
    LIST_MOUNTS_CMD = ["cat", "/proc/mounts"]

    def __init__(self, hostname, include_dev_paths=True, **kwargs) -> None:
        """Initializes this object.

        Args:
            hostname: A network hostname.
            include_dev_paths: If true, add DEV_BIN_PATHS to $PATH for all
                commands.
            **kwargs: Args to pass to the parent constructor.
        """
        super().__init__(hostname, **kwargs)
        self._orig_path = None
        self._path = None
        self._include_dev_paths = include_dev_paths
        self._lsb_release = {}

    @property
    def orig_path(self):
        """The $PATH variable on the device."""
        if not self._orig_path:
            try:
                # We can't use self.run since it calls us.
                result = super().run(["echo", "${PATH}"])
            except cros_build_lib.RunCommandError as e:
                logging.error("Failed to get $PATH on the device: %s", e.stderr)
                raise

            self._orig_path = result.stdout.strip()

        return self._orig_path

    @property
    def path(self):
        """The $PATH variable on the device prepended with DEV_BIN_PATHS."""
        if not self._path:
            # If the remote path already has our dev paths (which is common),
            # then there is no need for us to prepend.
            orig_paths = self.orig_path.split(":")
            for path in reversed(DEV_BIN_PATHS.split(":")):
                if path not in orig_paths:
                    orig_paths.insert(0, path)

            self._path = ":".join(orig_paths)

        return self._path

    @property
    def lsb_release(self):
        """The /etc/lsb-release content on the device.

        Returns a dict of entries in /etc/lsb-release file. If multiple entries
        have the same key, only the first entry is recorded. Returns an empty
        dict if the reading command failed or the file is corrupted (i.e., does
        not have the format of <key>=<value> for every line).
        """
        if not self._lsb_release:
            try:
                content = self.CatFile(
                    constants.LSB_RELEASE_PATH, max_size=None
                )
            except CatFileError as e:
                logging.debug(
                    'Failed to read "%s" on the device: %s',
                    constants.LSB_RELEASE_PATH,
                    e,
                )
            else:
                try:
                    self._lsb_release = dict(
                        e.split("=", 1) for e in reversed(content.splitlines())
                    )
                except ValueError:
                    logging.error(
                        'File "%s" on the device is mal-formatted.',
                        constants.LSB_RELEASE_PATH,
                    )

        return self._lsb_release

    @property
    def board(self):
        """The board name of the device."""
        return self.lsb_release.get(cros_set_lsb_release.LSB_KEY_BOARD, "")

    @property
    def version(self):
        """The OS version of the device."""
        return self.lsb_release.get(cros_set_lsb_release.LSB_KEY_VERSION, "")

    @property
    def app_id(self):
        """The App ID of the device."""
        return self.lsb_release.get(
            cros_set_lsb_release.LSB_KEY_APPID_RELEASE, ""
        )

    @property
    def root_dev(self):
        """The current root device path."""
        return self.run(["rootdev", "-s"], capture_output=True).stdout.strip()

    def _RemountRootfsAsWritable(self) -> None:
        """Attempts to Remount the root partition."""
        logging.info("Remounting '/' with rw...")
        self.run(self.MOUNT_ROOTFS_RW_CMD, check=False, remote_sudo=True)

    def _RootfsIsReadOnly(self):
        """Returns True if rootfs on is mounted as read-only."""
        r = self.run(self.LIST_MOUNTS_CMD, capture_output=True)
        for line in r.stdout.splitlines():
            if not line:
                continue

            chunks = line.split()
            if chunks[1] == "/" and "ro" in chunks[3].split(","):
                return True

        return False

    def DisableRootfsVerification(self, timeout_sec=REBOOT_MAX_WAIT) -> None:
        """Disables device rootfs verification."""
        logging.info("Disabling rootfs verification on device...")
        self.run(
            [self.MAKE_DEV_SSD_BIN, "--remove_rootfs_verification", "--force"],
            check=False,
            remote_sudo=True,
        )
        # TODO(yjhong): Make sure an update is not pending.
        logging.info("Need to reboot to actually disable the verification.")
        self.Reboot(timeout_sec=timeout_sec)
        # After reboot, the rootfs is mounted read-only, so remount as
        # read-write.
        self._RemountRootfsAsWritable()

    def MountRootfsReadWrite(self):
        """Checks mount types and remounts them as read-write if needed.

        Returns:
            True if rootfs is mounted as read-write. False otherwise.
        """
        if not self._RootfsIsReadOnly():
            return True

        # If the image on the device is built with rootfs verification
        # disabled, we can simply remount '/' as read-write.
        self._RemountRootfsAsWritable()

        if not self._RootfsIsReadOnly():
            return True

        logging.info(
            "Unable to remount rootfs as rw (normal w/verified rootfs)."
        )
        # If the image is built with rootfs verification, turn it off.
        self.DisableRootfsVerification()

        return not self._RootfsIsReadOnly()

    def ClearTpmOwner(self) -> None:
        """Clears the TPM owner flag."""
        logging.info("Clearing TPM owner.")
        self.run(["crossystem", "clear_tpm_owner_request=1"])

    def BootstrapDevTools(self) -> None:
        """Installs basic dev tools, including Portage (emerge command).

        This also invalidates HasProgramInPath cache because some programs may
        become available after the installation.
        """
        self.run(["dev_install", "--reinstall", "--only_bootstrap", "--yes"])
        self.HasProgramInPath.cache_clear()

    def Reboot(self, timeout_sec=REBOOT_MAX_WAIT):
        """Reboot the device."""
        # A reboot in developer mode takes a while (and has delays), so the user
        # will have time to read and act on the USB boot instructions below.
        logging.info(
            "Please remember to press Ctrl-U if you are booting from USB."
        )
        return super().Reboot(timeout_sec=timeout_sec)

    def run(self, cmd, **kwargs):
        """Executes a shell command on the device.

        Output captured by default.

        Also makes sure $PATH is set correctly by adding DEV_BIN_PATHS to
        'PATH' in |extra_env| if self._include_dev_paths is True.

        Args:
            cmd: command to run. See RemoteAccess.RemoteSh documentation.
            **kwargs: keyword arguments to pass along with cmd. See
                RemoteAccess.RemoteSh documentation.
        """
        if self._include_dev_paths:
            extra_env = kwargs.pop("extra_env", {})
            extra_env.setdefault("PATH", self.path)
            kwargs["extra_env"] = extra_env
        return super().run(cmd, **kwargs)
