blob: 03500370e2b986491b65612ffa3c9f6072b027bf [file] [log] [blame]
# Copyright 2012 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Purpose of this module is to hold common script/commandline functionality.
This ranges from optparse, to a basic script wrapper setup (much like
what is used for chromite.bin.*).
"""
import argparse
import collections
import datetime
import functools
import logging
import optparse # pylint: disable=deprecated-module
import os
from pathlib import Path
import re
import signal
import sys
from typing import List, NamedTuple, Optional
import urllib.parse
from chromite.lib import constants
from chromite.lib import cros_build_lib
from chromite.lib import gs
from chromite.lib import osutils
from chromite.lib import path_util
from chromite.lib import terminal
from chromite.utils import attrs_freezer
from chromite.utils import path_filter
# TODO(build): Convert this to enum module.
DEVICE_SCHEME_FILE = "file"
DEVICE_SCHEME_SERVO = "servo"
DEVICE_SCHEME_SSH = "ssh"
DEVICE_SCHEME_USB = "usb"
class ChrootRequiredError(Exception):
"""Raised when a command must be run in the chroot
This exception is intended to be caught by code which will restart execution
in the chroot. Throwing this exception allows contexts to be exited and
general cleanup to happen before we exec an external binary.
The command to run inside the chroot, and (optionally) special cros_sdk
arguments are attached to the exception. Any adjustments to the arguments
should be done before raising the exception.
"""
def __init__(self, cmd, chroot_args=None, extra_env=None):
"""Constructor for ChrootRequiredError.
Args:
cmd: Command line to run inside the chroot as a list of strings.
chroot_args: Arguments to pass directly to cros_sdk.
extra_env: Environmental variables to set in the chroot.
"""
super().__init__()
self.cmd = cmd
self.chroot_args = chroot_args
self.extra_env = extra_env
class ExecRequiredError(Exception):
"""Raised when a command needs to exec, after cleanup.
This exception is intended to be caught by code which will exec another
command. Throwing this exception allows contexts to be exited and general
cleanup to happen before we exec an external binary.
The command to run is attached to the exception. Any adjustments to the
arguments should be done before raising the exception.
"""
def __init__(self, cmd):
"""Constructor for ExecRequiredError.
Args:
cmd: Command line to run inside the chroot as a list of strings.
"""
super().__init__()
self.cmd = cmd
def NormalizeGSPath(value):
"""Normalize GS paths."""
url = gs.CanonicalizeURL(value, strict=True)
return "%s%s" % (
gs.BASE_GS_URL,
os.path.normpath(url[len(gs.BASE_GS_URL) :]),
)
def NormalizeLocalOrGSPath(value):
"""Normalize a local or GS path."""
ptype = "gs_path" if gs.PathIsGs(value) else "path"
return VALID_TYPES[ptype](value)
def NormalizeAbUrl(value):
"""Normalize an androidbuild URL."""
if not value.startswith("ab://"):
# Give a helpful error message about the format expected. Putting this
# message in the exception is useless because argparse ignores the
# exception message and just says the value is invalid.
msg = "Invalid ab:// URL format: [%s]." % value
logging.error(msg)
raise ValueError(msg)
# If no errors, just return the unmodified value.
return value
def ValidateCipdURL(value):
"""Return plain string."""
if not value.startswith("cipd://"):
msg = "Invalid cipd:// URL format: %s" % value
logging.error(msg)
raise ValueError(msg)
return value
def ParseBool(value):
"""Parse bool argument into a bool value.
For the existing type=bool functionality, the parser uses the built-in
bool(x) function to determine the value. This function will only return
false if x is False or omitted. Even with this type specified, however,
arguments that are generated from a command line initially get parsed as a
string, and for any string value passed in to bool(x), it will always return
True.
Args:
value: String representing a boolean value.
Returns:
True or False.
"""
return cros_build_lib.BooleanShellValue(value, False)
def ParseDate(value):
"""Parse date argument into a datetime.date object.
Args:
value: String representing a single date in "YYYY-MM-DD" format.
Returns:
A datetime.date object.
"""
try:
return datetime.datetime.strptime(value, "%Y-%m-%d").date()
except ValueError:
# Give a helpful error message about the format expected. Putting this
# message in the exception is useless because argparse ignores the
# exception message and just says the value is invalid.
logging.error("Date is expected to be in format YYYY-MM-DD.")
raise
def ParseEmail(value: str) -> str:
"""Validate an e-mail address.
Args:
value: E-mail address.
Returns:
The input value.
"""
if not re.fullmatch(constants.EMAIL_REGEX, value):
raise ValueError(f"invalid e-mail address: {value}")
return value
def ParseTimedelta(value: str):
"""Parse timedelta argument into datetime.timedelta object.
Args:
value: String in seconds.
Returns:
A datetime.timedelta object.
"""
try:
seconds = int(value)
if seconds < 0:
raise ValueError("Timedelta is expected to be a positive integer.")
return datetime.timedelta(seconds=seconds)
except ValueError:
logging.error("Timedelta is expected to be a positive integer.")
raise
def NormalizeUri(value):
"""Normalize a local path or URI."""
o = urllib.parse.urlparse(value)
if o.scheme == "file":
# Trim off the file:// prefix.
return VALID_TYPES["path"](value[7:])
elif o.scheme not in ("", "gs"):
o = list(o)
o[2] = os.path.normpath(o[2])
return urllib.parse.urlunparse(o)
else:
return NormalizeLocalOrGSPath(value)
class Device(NamedTuple):
"""A Device object holds information parsed from the command line input.
For now this is a superset of all information for USB, SSH, or file devices.
If functionality diverges based on type, it may be useful to split this into
separate device classes instead.
"""
# DEVICE_SCHEME_SSH, DEVICE_SCHEME_USB, DEVICE_SCHEME_SERVO, or
# DEVICE_SCHEME_FILE.
scheme: str
# SSH username.
username: Optional[str] = None
# SSH hostname.
hostname: Optional[str] = None
# SSH or Servo port.
port: Optional[int] = None
# USB/file path.
path: Optional[str] = None
# Raw input from the command line.
raw: Optional[str] = None
# Servo serial number.
serial_number: Optional[str] = None
class DeviceParser:
"""Parses devices as an argparse argument type.
In addition to parsing user input, this class will also ensure that only
supported device schemes are accepted by the parser. For example,
`cros deploy` only makes sense with an SSH device, but `cros flash` can use
SSH, USB, or file device schemes.
If the device input is malformed or the scheme is wrong, an error message
will be printed and the program will exit.
Valid device inputs are:
- [ssh://][username@]hostname[:port].
- usb://[path].
- file://path or /absolute_path.
- servo:port[:port] to use a port via dut-control, e.g. servo:port:1234.
- servo:serial:serial-number to use the servo's serial number,
e.g. servo:serial:641220-00057 servo:serial:C1230024192.
- [ssh://]:vm:.
The last item above is an alias for ssh'ing into a virtual machine on a
localhost. It gets translated into 'localhost:9222'.
Examples:
parser = argparse.ArgumentParser()
parser.add_argument(
'ssh_device',
type=commandline.DeviceParser(commandline.DEVICE_SCHEME_SSH)
)
parser.add_argument(
"usb_or_file_device",
type=commandline.DeviceParser(
[commandline.DEVICE_SCHEME_USB, commandline.DEVICE_SCHEME_FILE]
),
)
"""
def __init__(self, schemes):
"""Initializes the parser.
See the class comments for usage examples.
Args:
schemes: A scheme or list of schemes to accept.
"""
self.schemes = [schemes] if isinstance(schemes, str) else schemes
# Provide __name__ for argparse to print on failure, or else it will use
# repr() which creates a confusing error message.
self.__name__ = type(self).__name__
def __call__(self, value):
"""Parses a device input and enforces constraints.
DeviceParser is an object so that a set of valid schemes can be
specified, but argparse expects a parsing function, so we overload
__call__() for argparse to use.
Args:
value: String representing a device target. See class comments for
valid device input formats.
Returns:
A Device object.
Raises:
ValueError: |value| is not a valid device specifier or doesn't
match the supported list of schemes.
"""
try:
device = self._ParseDevice(value)
self._EnforceConstraints(device, value)
return device
except ValueError as e:
# argparse ignores exception messages, so print the message
# manually.
logging.error(e)
raise
except Exception as e:
logging.error("Internal error while parsing device input: %s", e)
raise
def _EnforceConstraints(self, device, value):
"""Verifies that user-specified constraints are upheld.
Checks that the parsed device has a scheme that matches what the user
expects. Additional constraints can be added if needed.
Args:
device: Device object.
value: String representing a device target.
Raises:
ValueError: |device| has the wrong scheme.
"""
if device.scheme not in self.schemes:
raise ValueError(
'Unsupported scheme "%s" for device "%s"'
% (device.scheme, value)
)
def _ParseDevice(self, value):
"""Parse a device argument.
Args:
value: String representing a device target.
Returns:
A Device object.
Raises:
ValueError: |value| is not a valid device specifier.
"""
# ':vm:' is an alias for ssh'ing into a virtual machihne on localhost;
# translate it appropriately.
if value.strip().lower() == ":vm:":
value = "localhost:9222"
elif value.strip().lower() == "ssh://:vm:":
value = "ssh://localhost:9222"
parsed = urllib.parse.urlparse(value)
# crbug.com/1069325: Starting in python 3.7 urllib has different parsing
# results. 127.0.0.1:9999 parses as scheme='127.0.0.1' path='9999'
# instead of scheme='' path='127.0.0.1:9999'. We want that parsed as
# ssh. Check for '.' or 'localhost' in the scheme to catch the most
# common cases for this result.
if (
not parsed.scheme
or "." in parsed.scheme
or parsed.scheme == "localhost"
):
# Default to a file scheme for absolute paths, SSH scheme otherwise.
if value and value[0] == "/":
scheme = DEVICE_SCHEME_FILE
else:
# urlparse won't provide hostname/username/port unless a scheme
# is specified, so we need to reparse.
parsed = urllib.parse.urlparse(
"%s://%s" % (DEVICE_SCHEME_SSH, value)
)
scheme = DEVICE_SCHEME_SSH
else:
scheme = parsed.scheme.lower()
if scheme == DEVICE_SCHEME_SSH:
hostname = parsed.hostname
if not hostname and parsed.netloc.count(":") >= 2:
# Likely an IPv6 address that is missing brackets. Remind the
# user to add those brackets.
raise ValueError(
"To write an IPv6 address, you must include brackets to "
"distinguish between host and port. For example, write "
"[::1]:2222 instead of ::1:2222."
)
port = parsed.port
if hostname == "localhost" and not port:
# Use of localhost as the actual machine is uncommon enough
# relative to the use of KVM that we require users to specify
# localhost:22 if they actually want to connect to the
# localhost. Otherwise, the expectation is that they intend to
# access the VM but forget or didn't know to use port 9222.
raise ValueError(
"To connect to localhost, use ssh://localhost:22 "
"explicitly, or use ssh://localhost:9222 for the local"
" VM."
)
if not hostname:
raise ValueError('Hostname is required for device "%s"' % value)
return Device(
scheme=scheme,
username=parsed.username,
hostname=hostname,
port=port,
raw=value,
)
elif scheme == DEVICE_SCHEME_USB:
path = parsed.netloc + parsed.path
# Change path '' to None for consistency.
return Device(scheme=scheme, path=path if path else None, raw=value)
elif scheme == DEVICE_SCHEME_FILE:
path = parsed.netloc + parsed.path
if not path:
raise ValueError('Path is required for "%s"' % value)
return Device(scheme=scheme, path=path, raw=value)
elif scheme == DEVICE_SCHEME_SERVO:
# Parse the identifier type and value.
servo_type, _, servo_id = parsed.path.partition(":")
# Don't want to do the netloc before the split in case of serial
# number.
servo_type = servo_type.lower()
return self._parse_servo(servo_type, servo_id)
else:
raise ValueError(
'Unknown device scheme "%s" in "%s"' % (scheme, value)
)
@staticmethod
def _parse_servo(servo_type, servo_id):
"""Parse a servo device from the parsed servo uri info.
Args:
servo_type: The servo identifier type, either port or serial.
servo_id: The servo identifier, either the port number it is
communicating through or its serial number.
"""
servo_port = None
serial_number = None
if servo_type == "serial":
if servo_id:
serial_number = servo_id
else:
raise ValueError("No serial number given.")
elif servo_type == "port":
if servo_id:
# Parse and validate when given.
try:
servo_port = int(servo_id)
except ValueError:
raise ValueError("Invalid servo port value: %s" % servo_id)
if servo_port <= 0 or servo_port > 65535:
raise ValueError(
"Invalid port, must be 1-65535: %d given." % servo_port
)
else:
raise ValueError("Invalid servo type given: %s" % servo_type)
return Device(
scheme=DEVICE_SCHEME_SERVO,
port=servo_port,
serial_number=serial_number,
)
class _AppendOption(argparse.Action):
"""Append the command line option (with no arguments) to dest.
parser.add_argument('-b', '--barg', dest='out', action='append_option')
options = parser.parse_args(['-b', '--barg'])
options.out == ['-b', '--barg']
"""
def __init__(self, option_strings, dest, **kwargs):
if "nargs" in kwargs:
raise ValueError("nargs is not supported for append_option action")
super().__init__(option_strings, dest, nargs=0, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
getattr(namespace, self.dest).append(option_string)
class _AppendOptionValue(argparse.Action):
"""Append the command line option to dest. Useful for pass along arguments.
parser.add_argument(
"-b",
"--barg",
dest="out",
action="append_option_value",
)
options = parser.parse_args(["--barg", "foo", "-b", "bar"])
options.out == ["-barg", "foo", "-b", "bar"]
"""
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
getattr(namespace, self.dest).extend([option_string, str(values)])
class _EnumAction(argparse.Action):
"""Allows adding enums as an argument with minimal syntax.
For example:
class Size(enum.Enum):
SMALL = 0
MEDIUM = 1
LARGE = 2
...
parser.add_argument(
"--size",
action="enum",
enum=Size,
help="The size to use (either small, medium, or large)",
)
"""
def __init__(self, *args, **kwargs):
"""Init override to extract the "enum" argument."""
self.enum = kwargs.pop("enum", None)
if self.enum:
kwargs.setdefault("choices", self.enum.__members__.values())
valid_inputs = [x.lower() for x in self.enum.__members__]
kwargs.setdefault("metavar", "{%s}" % ",".join(valid_inputs))
def _parse_arg(arg):
if arg not in valid_inputs:
raise argparse.ArgumentTypeError(
f"{arg!r} is not recognized. Choose from "
f"{valid_inputs!r}"
)
return self.enum[arg.upper()]
kwargs.setdefault("type", _parse_arg)
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
class _SplitExtendAction(argparse.Action):
"""Callback to split the argument and extend existing value.
We normalize whitespace before splitting. This is to support the forms:
cbuildbot -p 'proj:branch ' ...
cbuildbot -p ' proj:branch' ...
cbuildbot -p 'proj:branch proj2:branch' ...
cbuildbot -p "$(some_command_that_returns_nothing)" ...
"""
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
getattr(namespace, self.dest).extend(values.split())
def ExistingPath(value: str) -> Path:
"""Expands ~/ paths and standardizes to the real path.
Checks that the path exists.
"""
ret = osutils.ExpandPath(value)
path = Path(ret)
if not path.exists():
msg = f"Path does not exist: {value}"
logging.error(msg)
raise ValueError(msg)
return path
def ExistingDirectory(value: str) -> Path:
"""Expands ~/ paths and standardizes to the real path.
Checks that the path exists and is a file.
"""
path = ExistingPath(value)
if not path.is_dir():
msg = f"Path is not a directory: {value}"
logging.error(msg)
raise ValueError(msg)
return path
def ExistingFile(value: str) -> Path:
"""Expands ~/ paths and standardizes to the real path.
Checks that the path exists and is a directory.
"""
path = ExistingPath(value)
if not path.is_file():
msg = f"Path is not a file: {value}"
logging.error(msg)
raise ValueError(msg)
return path
VALID_TYPES = {
"ab_url": NormalizeAbUrl,
"bool": ParseBool,
"cipd": ValidateCipdURL,
"date": ParseDate,
"email": ParseEmail,
"path": osutils.ExpandPath,
"path_exists": ExistingPath,
"dir_exists": ExistingDirectory,
"file_exists": ExistingFile,
"gs_path": NormalizeGSPath,
"local_or_gs_path": NormalizeLocalOrGSPath,
"path_or_uri": NormalizeUri,
"timedelta": ParseTimedelta,
}
VALID_ACTIONS = {
"append_option": _AppendOption,
"append_option_value": _AppendOptionValue,
"enum": _EnumAction,
"split_extend": _SplitExtendAction,
}
_DEPRECATE_ACTIONS = [
None,
"store",
"store_const",
"store_true",
"store_false",
"append",
"append_const",
"count",
] + list(VALID_ACTIONS)
class _DeprecatedAction:
"""Base functionality to allow adding warnings for deprecated arguments.
To add a deprecated warning, simply include a deprecated=message argument
to the add_argument call for the deprecated argument. Beside logging the
deprecation warning, the argument will behave as normal.
"""
def __init__(self, *args, **kwargs):
"""Init override to extract the deprecated argument when it exists."""
self.deprecated_message = kwargs.pop("deprecated", None)
super().__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
"""Log the message then defer to the parent action."""
if self.deprecated_message:
logging.warning(
"Argument %s is deprecated: %s",
option_string,
self.deprecated_message,
)
return super().__call__(
parser, namespace, values, option_string=option_string
)
def OptparseWrapCheck(desc, check_f, _option, opt, value):
"""Optparse adapter for type checking functionality."""
try:
return check_f(value)
except ValueError:
raise optparse.OptionValueError(
"Invalid %s given: --%s=%s" % (desc, opt, value)
)
class Option(optparse.Option):
"""Subclass to implement path evaluation & other useful types."""
_EXTRA_TYPES = ("path", "gs_path")
TYPES = optparse.Option.TYPES + _EXTRA_TYPES
TYPE_CHECKER = optparse.Option.TYPE_CHECKER.copy()
for t in _EXTRA_TYPES:
TYPE_CHECKER[t] = functools.partial(
OptparseWrapCheck, t, VALID_TYPES[t]
)
class FilteringOption(Option):
"""Subclass that supports Option filtering for FilteringOptionParser"""
_EXTRA_ACTIONS = ("split_extend",)
ACTIONS = Option.ACTIONS + _EXTRA_ACTIONS
STORE_ACTIONS = Option.STORE_ACTIONS + _EXTRA_ACTIONS
TYPED_ACTIONS = Option.TYPED_ACTIONS + _EXTRA_ACTIONS
ALWAYS_TYPED_ACTIONS = Option.ALWAYS_TYPED_ACTIONS + _EXTRA_ACTIONS
def take_action(self, action, dest, opt, value, values, parser):
if action == "split_extend":
lvalue = value.split()
values.ensure_value(dest, []).extend(lvalue)
else:
Option.take_action(self, action, dest, opt, value, values, parser)
if value is None:
value = []
elif not self.nargs or self.nargs <= 1:
value = [value]
parser.AddParsedArg(self, opt, [str(v) for v in value])
class _PathFilterAction(argparse.Action):
"""Setup a path filter."""
def __init__(self, option_strings, dest, **kwargs):
if "nargs" in kwargs:
raise ValueError("nargs is not supported for filter action")
super().__init__(option_strings, dest, nargs=1, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, path_filter.PathFilter([]))
getattr(namespace, self.dest).rules.extend(values)
class ColoredFormatter(logging.Formatter):
"""A logging formatter that can color the messages."""
_COLOR_MAPPING = {
"WARNING": terminal.Color.YELLOW,
"ERROR": terminal.Color.RED,
}
def __init__(self, *args, **kwargs):
"""Initializes the formatter.
Args:
*args: See logging.Formatter for specifics.
**kwargs: See logging.Formatter for specifics.
enable_color: Whether to enable colored logging. Defaults
to None, where terminal.Color will set to a reasonable default.
"""
self.color = terminal.Color(enabled=kwargs.pop("enable_color", None))
super().__init__(*args, **kwargs)
def format(self, record):
"""Formats |record| with color."""
msg = super().format(record)
color = self._COLOR_MAPPING.get(record.levelname)
return msg if not color else self.color.Color(color, msg)
class ChromiteStreamHandler(logging.StreamHandler):
"""A stream handler for logging."""
class BaseParser:
"""Base parser class that includes the logic to add logging controls."""
DEFAULT_LOG_LEVELS = (
"fatal",
"critical",
"error",
"warning",
"notice",
"info",
"debug",
)
DEFAULT_LOG_LEVEL = "info"
ALLOW_LOGGING = True
def __init__(self, **kwargs):
"""Initialize this parser instance.
kwargs:
logging: Defaults to ALLOW_LOGGING from the class; if given,
add --log-level.
default_log_level: If logging is enabled, override the default
logging level. Defaults to the class's DEFAULT_LOG_LEVEL value.
log_levels: If logging is enabled, this overrides the enumeration of
allowed logging levels. If not given, defaults to the classes
DEFAULT_LOG_LEVELS value.
manual_debug: If logging is enabled and this is True, suppress
addition of a --debug alias. This option defaults to True unless
'debug' has been exempted from the allowed logging level
targets.
caching: If given, must be either a callable that discerns the cache
location if it wasn't specified (the prototype must be akin to
lambda parser, values:calculated_cache_dir_path; it may return
None to indicate that it handles setting the value on its own
later in the parsing including setting the env), or True; if
True, the machinery defaults to invoking the class's
FindCacheDir method (which can be overridden). FindCacheDir
$CROS_CACHEDIR, falling back to $REPO/.cache, finally falling
back to $TMP. Note that the cache_dir is not created, just
discerned where it should live.
If False, or caching is not given, then no --cache-dir option
will be added.
dryrun: Whether to make --dry-run available.
filter: If given, set up a filter for --include and --exclude paths.
The resulting filter is in opts.filter.
"""
self.debug_enabled = False
self.caching_group = None
self.debug_group = None
self.default_log_level = None
self.log_levels = None
self.logging_enabled = kwargs.get("logging", self.ALLOW_LOGGING)
self.default_log_level = kwargs.get(
"default_log_level", self.DEFAULT_LOG_LEVEL
)
self.log_levels = tuple(
x.lower() for x in kwargs.get("log_levels", self.DEFAULT_LOG_LEVELS)
)
self.debug_enabled = (
not kwargs.get("manual_debug", False) and "debug" in self.log_levels
)
self.caching = kwargs.get("caching", False)
self.dryrun_enabled = kwargs.get("dryrun", False)
self.filter_enabled = kwargs.get("filter", False)
self._cros_defaults = {}
@staticmethod
def PopUsedArgs(kwarg_dict):
"""Removes keys used by the base parser from the kwarg namespace."""
parser_keys = [
"logging",
"default_log_level",
"log_levels",
"manual_debug",
"caching",
"dryrun",
"filter",
]
for key in parser_keys:
kwarg_dict.pop(key, None)
def SetupOptions(self):
"""Sets up standard chromite options."""
# NB: All options here must go through add_common_argument_to_group.
# You cannot use add_argument or such helpers directly. This is to
# support default values with subparsers.
#
# You should also explicitly add default=None here when you want the
# default to be set up in the parsed option namespace.
if self.logging_enabled:
self.debug_group = self.add_argument_group("Debug options")
self.add_common_argument_to_group(
self.debug_group,
"--log-level",
choices=self.log_levels,
default=self.default_log_level,
help="Set logging level to report at.",
)
self.add_common_argument_to_group(
self.debug_group,
"--log-format",
action="store",
default=constants.LOGGER_FMT,
help="Set logging format to use.",
)
# Backwards compat name. We should delete this at some point.
self.add_common_argument_to_group(
self.debug_group,
"--log_format",
action="store",
default=constants.LOGGER_FMT,
help=argparse.SUPPRESS,
)
self.add_common_argument_to_group(
self.debug_group,
"-v",
"--verbose",
action="store_const",
const="info",
dest="log_level",
help="Alias for `--log-level=info`.",
)
if self.debug_enabled:
self.add_common_argument_to_group(
self.debug_group,
"--debug",
action="store_const",
const="debug",
dest="log_level",
help="Alias for `--log-level=debug`. "
"Useful for debugging bugs/failures.",
)
self.add_common_argument_to_group(
self.debug_group,
"--color",
action="store_true",
default=None,
help="Colorize output (default: auto-detect).",
)
self.add_common_argument_to_group(
self.debug_group,
"--no-color",
"--nocolor",
action="store_false",
dest="color",
help="Do not colorize output (or `export NOCOLOR=true`).",
)
if self.caching:
self.caching_group = self.add_argument_group("Caching Options")
self.add_common_argument_to_group(
self.caching_group,
"--cache-dir",
default=None,
type="path",
help="Override the calculated chromeos cache directory; "
"typically defaults to '$REPO/.cache' .",
)
if self.dryrun_enabled:
self.add_argument(
"-n",
"--dry-run",
dest="dryrun",
action="store_true",
help="Show what would be done, but don't do it.",
)
self.add_argument(
"--dryrun",
dest="dryrun",
action="store_true",
help=argparse.SUPPRESS,
)
if self.filter_enabled:
filter_group = self.add_argument_group(
"Path filter options",
"Filter file paths based on PATTERN (see man 3 fnmatch). "
"If multiple --exclude and --include rules are specified, "
"the first that matches takes effect. "
"If no rules are matched, the path is included by default. "
"PATTERNS apply to the full path of the file. "
"For example --exclude='*.py' matches a/foo.py and bar.py; "
"--exclude=BUILD matches BUILD but not a/BUILD.",
)
filter_group.add_argument(
"--exclude",
metavar="PATTERN",
action=_PathFilterAction,
dest="filter",
type=path_filter.exclude,
default=path_filter.PathFilter([]),
help="Exclude files matching PATTERN.",
)
filter_group.add_argument(
"--include",
metavar="PATTERN",
action=_PathFilterAction,
dest="filter",
type=path_filter.include,
help="Include files matching PATTERN.",
)
def SetupLogging(self, opts):
"""Sets up logging based on |opts|."""
value = opts.log_level.upper()
logger = logging.getLogger()
log_level = getattr(logging, value)
logger.setLevel(log_level)
# If verbose levels, include millisecond output.
log_format = opts.log_format
if log_level < logging.NOTICE:
log_format = log_format.replace(
"%(asctime)s:", "%(asctime)s.%(msecs)03d:"
)
formatter = ColoredFormatter(
fmt=log_format,
datefmt=constants.LOGGER_TIME_FMT,
enable_color=opts.color,
)
# Only set colored formatter for ChromiteStreamHandler instances,
# which could have been added by ScriptWrapperMain() below.
chromite_handlers = [
x for x in logger.handlers if isinstance(x, ChromiteStreamHandler)
]
for handler in chromite_handlers:
handler.setFormatter(formatter)
logging.captureWarnings(True)
return value
def DoPostParseSetup(self, opts, args):
"""Method called to handle post opts/args setup.
This can be anything from logging setup to positional arg count
validation.
Args:
opts: optparse.Values or argparse.Namespace instance
args: position arguments unconsumed from parsing.
Returns:
(opts, args), w/ whatever modification done.
"""
for dest, default in self._cros_defaults.items():
if not hasattr(opts, dest):
setattr(opts, dest, default)
if self.logging_enabled:
value = self.SetupLogging(opts)
if self.debug_enabled:
opts.debug = value == "DEBUG"
opts.verbose = value in ("INFO", "DEBUG")
if self.caching:
path = os.environ.get(constants.SHARED_CACHE_ENVVAR)
if path is not None and opts.cache_dir is None:
opts.cache_dir = os.path.abspath(path)
opts.cache_dir_specified = opts.cache_dir is not None
if not opts.cache_dir_specified:
func = (
self.FindCacheDir
if not callable(self.caching)
else self.caching
)
opts.cache_dir = func(self, opts)
if opts.cache_dir is not None:
self.ConfigureCacheDir(opts.cache_dir)
return opts, args
@staticmethod
def ConfigureCacheDir(cache_dir):
if cache_dir is None:
os.environ.pop(constants.SHARED_CACHE_ENVVAR, None)
logging.debug("Removed cache_dir setting")
else:
os.environ[constants.SHARED_CACHE_ENVVAR] = cache_dir
logging.debug("Configured cache_dir to %r", cache_dir)
@classmethod
def FindCacheDir(cls, _parser, _opts):
logging.debug("Cache dir lookup.")
return path_util.FindCacheDir()
class ArgumentNamespace(argparse.Namespace, metaclass=attrs_freezer.Class):
"""Class to mimic argparse.Namespace with value freezing support."""
_FROZEN_ERR_MSG = "Option values are frozen, cannot alter %s."
# Note that because optparse.Values is not a new-style class this class
# must use the mixin rather than the metaclass.
class OptionValues(attrs_freezer.Mixin, optparse.Values):
"""Class to mimic optparse.Values with value freezing support."""
_FROZEN_ERR_MSG = "Option values are frozen, cannot alter %s."
def __init__(self, defaults, *args, **kwargs):
attrs_freezer.Mixin.__init__(self)
optparse.Values.__init__(self, defaults, *args, **kwargs)
# Used by FilteringParser.
self.parsed_args = None
PassedOption = collections.namedtuple(
"PassedOption", ["opt_inst", "opt_str", "value_str"]
)
class FilteringParser(optparse.OptionParser, BaseParser):
"""Custom option parser for filtering options.
Aside from adding a couple of types (path for absolute paths,
gs_path for google storage urls, and log_level for logging level control),
this additionally exposes logging control by default; if undesired,
either derive from this class setting ALLOW_LOGGING to False, or
pass in logging=False to the constructor.
"""
DEFAULT_OPTION_CLASS = FilteringOption
def __init__(self, usage=None, **kwargs):
BaseParser.__init__(self, **kwargs)
self.PopUsedArgs(kwargs)
kwargs.setdefault("option_class", self.DEFAULT_OPTION_CLASS)
optparse.OptionParser.__init__(self, usage=usage, **kwargs)
self.SetupOptions()
def add_common_argument_to_group(self, group, *args, **kwargs):
"""Adds the given option defined by args and kwargs to group."""
return group.add_option(*args, **kwargs)
def add_argument_group(self, *args, **kwargs):
"""Return an option group rather than an argument group."""
return self.add_option_group(*args, **kwargs)
def parse_args(self, args=None, values=None):
# If no Values object is specified then use our custom OptionValues.
if values is None:
values = OptionValues(defaults=self.defaults)
values.parsed_args = []
opts, remaining = optparse.OptionParser.parse_args(
self, args=args, values=values
)
return self.DoPostParseSetup(opts, remaining)
def AddParsedArg(self, opt_inst, opt_str, value_str):
"""Add a parsed argument with attributes.
Args:
opt_inst: An instance of a raw optparse.Option object that
represents the option.
opt_str: The option string.
value_str: A list of string-ified values dentified by OptParse.
"""
self.values.parsed_args.append(
PassedOption(opt_inst, opt_str, value_str)
)
@staticmethod
def FilterArgs(parsed_args, filter_fn):
"""Filter the argument by passing it through a function.
Args:
parsed_args: The list of parsed argument namedtuples to filter.
Tuples are of the form (opt_inst, opt_str, value_str).
filter_fn: A function with signature f(PassedOption), and returns
True if the argument is to be passed through. False if not.
Returns:
A tuple containing two lists - one of accepted arguments and one of
removed arguments.
"""
removed = []
accepted = []
for arg in parsed_args:
target = accepted if filter_fn(arg) else removed
target.append(arg.opt_str)
target.extend(arg.value_str)
return accepted, removed
class ArgumentParser(BaseParser, argparse.ArgumentParser):
"""Custom argument parser for use by chromite.
This class additionally exposes logging control by default; if undesired,
either derive from this class setting ALLOW_LOGGING to False, or
pass in logging=False to the constructor.
"""
def __init__(self, usage=None, **kwargs):
kwargs.setdefault(
"formatter_class", argparse.RawDescriptionHelpFormatter
)
BaseParser.__init__(self, **kwargs)
self.PopUsedArgs(kwargs)
argparse.ArgumentParser.__init__(self, usage=usage, **kwargs)
self._SetupTypes()
self.SetupOptions()
self._RegisterActions()
def _SetupTypes(self):
"""Register types with ArgumentParser."""
for t, check_f in VALID_TYPES.items():
self.register("type", t, check_f)
for a, class_a in VALID_ACTIONS.items():
self.register("action", a, class_a)
def _RegisterActions(self):
"""Update the container's actions.
This method builds out a new action class to register for each action
type. The new action class allows handling the deprecated argument
without any other changes to the argument parser logic. See
_DeprecatedAction.
"""
for action in _DEPRECATE_ACTIONS:
current_class = self._registry_get("action", action, object)
# Base classes for the new class. The _DeprecatedAction must be
# first to ensure its method overrides are called first.
bases = (_DeprecatedAction, current_class)
try:
self.register(
"action", action, type("deprecated-wrapper", bases, {})
)
except TypeError:
# Method resolution order error. This occurs when the
# _DeprecatedAction class is inherited multiple times, so we've
# already registered the replacement class. The underlying
# _ActionsContainer gets passed around, so this may get
# triggered in non-obvious ways.
continue
def add_argument(self, *args, **kwargs) -> argparse.Action:
"""Override of argparse.ArgumentParser.add_argument for chromite."""
# Ban (unquoted) `type=bool` which only accepts the empty string for a
# `False` parameter value. The argparse documentation also recommends
# against it. The quoted, `type="bool"`, chromite extension is fine, but
# add_bool_argument is preferred.
if "type" in kwargs and kwargs["type"] == bool:
raise ValueError(
"Unquoted `add_argument(...type=bool)` is not recommended."
' Use `add_bool_argument()` (preferred), or use `type="bool"`.'
)
return super().add_argument(*args, **kwargs)
def add_common_argument_to_group(self, group, *args, **kwargs):
"""Adds the given argument to the group.
This argument is expected to show up across the base parser and
subparsers that might be added later on. The default argparse module
does not handle this scenario well -- it processes the base parser first
(defaults and the user arguments), then it processes the subparser
(defaults and arguments). That means defaults in the subparser will
clobber user arguments passed in to the base parser!
"""
default = kwargs.pop("default", None)
kwargs["default"] = argparse.SUPPRESS
action = group.add_argument(*args, **kwargs)
self._cros_defaults.setdefault(action.dest, default)
return action
def add_bool_argument(
self, flag: str, default: bool, enabled_desc: str, disabled_desc: str
) -> None:
"""Adds a boolean argument conforming to chromite recommendations.
This will add both --flag and --no-flag, storing into dest="flag", and
with corresponding help strings. " (DEFAULT)" is appended to the help
string of the one that is default when no flag is provided.
See
https://chromium.googlesource.com/chromiumos/chromite/+/HEAD/docs/cli-guidelines.md#Boolean-Options
Args:
flag: The name of the flag in kebab case (e.g. "--my-bool").
default: The default value when no flag is provided.
enabled_desc: The help text to use for "--my-bool".
disabled_desc: The help text to use for "--no-my-bool".
"""
if not flag.startswith("--"):
raise ValueError(f"Bool flag `{flag}` must start with `--`")
if "_" in flag:
raise ValueError(f"Bool flag `{flag}` must be kebab-case")
enabled_desc += " (DEFAULT)" if default else ""
disabled_desc += " (DEFAULT)" if not default else ""
flag = flag.lstrip("-")
dest = flag.replace("-", "_")
self.add_argument(
f"--{flag}", action="store_true", default=default, help=enabled_desc
)
self.add_argument(
f"--no-{flag}", action="store_false", dest=dest, help=disabled_desc
)
def parse_args(self, args=None, namespace=None):
"""Translates OptionParser call to equivalent ArgumentParser call."""
# If no Namespace object is specified then use our custom
# ArgumentNamespace.
if namespace is None:
namespace = ArgumentNamespace()
# Unlike OptionParser, ArgParser works only with a single namespace and
# no args. Re-use BaseParser DoPostParseSetup but only take the
# namespace.
namespace = argparse.ArgumentParser.parse_args(
self, args=args, namespace=namespace
)
return self.DoPostParseSetup(namespace, None)[0]
class _ShutDownException(SystemExit):
"""Exception raised when user hits CTRL+C."""
def __init__(self, sig_num, message):
self.signal = sig_num
# Setup a usage message primarily for any code that may intercept it
# while this exception is crashing back up the stack to us.
SystemExit.__init__(self, 128 + sig_num)
self.args = (sig_num, message)
def __str__(self):
"""Stringify this exception."""
return self.args[1]
def _DefaultHandler(signum, _frame):
# Don't double process sigterms; just trigger shutdown from the first
# exception.
signal.signal(signum, signal.SIG_IGN)
raise _ShutDownException(
signum, "Received signal %i; shutting down" % (signum,)
)
def _RestartInChroot(cmd, chroot_args, extra_env):
"""Rerun inside the chroot.
Args:
cmd: Command line to run inside the chroot as a list of strings.
chroot_args: Arguments to pass directly to cros_sdk (or None).
extra_env: Dictionary of environmental variables to set inside the
chroot (or None).
"""
return cros_build_lib.run(
cmd,
check=False,
enter_chroot=True,
chroot_args=chroot_args,
extra_env=extra_env,
cwd=constants.SOURCE_ROOT,
).returncode
def RunInsideChroot(command=None, chroot_args=None):
"""Restart the current command inside the chroot.
This method is only valid for any code that is run via ScriptWrapperMain.
It allows proper cleanup of the local context by raising an exception
handled in ScriptWrapperMain.
Args:
command: An instance of CliCommand to be restarted inside the chroot.
|command| can be None if you do not wish to modify the log_level.
chroot_args: List of command-line arguments to pass to cros_sdk, if
invoked.
"""
if cros_build_lib.IsInsideChroot():
return
# Produce the command line to execute inside the chroot.
argv = command.TranslateToChrootArgv() if command else sys.argv[:]
argv[0] = path_util.ToChrootPath(argv[0])
# Set log-level of cros_sdk to be same as log-level of command entering the
# chroot.
if chroot_args is None:
chroot_args = []
if command is not None:
chroot_args += ["--log-level", command.options.log_level]
raise ChrootRequiredError(argv, chroot_args)
def RunAsRootUser(argv: List[str], preserve_env: bool = False):
"""Run the given command as the root user.
Args:
argv: Command line arguments to run as the root user.
preserve_env: If True, preserve existing environment variables when
re-executing.
Raises:
ValueError: If a command is not provided.
"""
if not argv:
raise ValueError("Command not provided to run as the root user.")
if osutils.IsRootUser():
return
cmd = ["sudo"]
if preserve_env:
cmd.append("--preserve-env")
cmd.extend(
[f'HOME={os.environ["HOME"]}', f'PATH={os.environ["PATH"]}', "--"]
)
cmd.extend(argv)
os.execvp(cmd[0], cmd)
def ReExec():
"""Restart the current command.
This method is only valid for any code that is run via ScriptWrapperMain.
It allows proper cleanup of the local context by raising an exception
handled in ScriptWrapperMain.
"""
# The command to exec.
raise ExecRequiredError(sys.argv[:])
def ScriptWrapperMain(
find_target_func,
argv=None,
log_level=logging.DEBUG,
log_format=constants.LOGGER_FMT,
):
"""Function usable for chromite.script.* style wrapping.
Note that this function invokes sys.exit on the way out by default.
Args:
find_target_func: a function, which, when given the absolute
pathway the script was invoked via (for example,
/home/ferringb/chromiumos/chromite/bin/cros_sdk; note that any
trailing .py from the path name will be removed),
will return the main function to invoke (that functor will take
a single arg- a list of arguments, and shall return either None
or an integer, to indicate the exit code).
argv: sys.argv, or an equivalent tuple for testing. If nothing is
given, sys.argv is defaulted to.
log_level: Default logging level to start at.
log_format: Default logging format to use.
"""
if argv is None:
argv = sys.argv[:]
target = os.path.abspath(argv[0])
name = os.path.basename(target)
if target.endswith(".py"):
target = os.path.splitext(target)[0]
target = find_target_func(target)
if target is None:
print(
"Internal error detected- no main functor found in module %r."
% (name,),
file=sys.stderr,
)
sys.exit(100)
# If verbose levels, include millisecond output.
if log_level < logging.NOTICE:
log_format = log_format.replace(
"%(asctime)s:", "%(asctime)s.%(msecs)03d:"
)
# Set up basic logging information for all modules that use logging.
# Note a script target may setup default logging in its module namespace
# which will take precedence over this.
logger = logging.getLogger()
logger.setLevel(log_level)
logger_handler = ChromiteStreamHandler()
logger_handler.setFormatter(
logging.Formatter(fmt=log_format, datefmt=constants.LOGGER_TIME_FMT)
)
logger.addHandler(logger_handler)
logging.captureWarnings(True)
signal.signal(signal.SIGTERM, _DefaultHandler)
ret = 1
try:
ret = target(argv[1:])
except _ShutDownException as e:
sys.stdout.flush()
print(
"%s: Signaled to shutdown: caught %i signal." % (name, e.signal),
file=sys.stderr,
)
sys.stderr.flush()
except SystemExit:
# Right now, let this crash through - longer term, we'll update the
# scripts in question to not use sys.exit, and make this into a flagged
# error.
raise
except ChrootRequiredError as e:
ret = _RestartInChroot(e.cmd, e.chroot_args, e.extra_env)
except ExecRequiredError as e:
logging.shutdown()
# This does not return.
os.execv(e.cmd[0], e.cmd)
except Exception as e:
sys.stdout.flush()
print("%s: Unhandled exception:" % (name,), file=sys.stderr)
sys.stderr.flush()
raise
finally:
logging.shutdown()
if ret is None:
ret = 0
sys.exit(ret)