#!/usr/bin/env python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This script provides a suite of utility functions for the chroot.
import optparse
import os
import shutil
import subprocess
import sys
_PROG = "crdev"
_SSH_KEY_FILEPATH = os.path.expanduser("~/.ssh/id_rsa")
_PUB_KEY_FILENAME = "~/.ssh/"
_PUB_KEY_FILEPATH = os.path.expanduser(_PUB_KEY_FILENAME)
_AUTH_KEYS_FILEPATH = os.path.expanduser("~/.ssh/authorized_keys")
_CHROME_MOUNT_PATH = "/tmp/chrome"
# Note: no ~/ for sshfs:
_DEFAULT_HOST_CHROOT_DIR = "chromeos/chroot"
_HOST_CHROOT_DIR_FILENAME = os.path.expanduser("~/.chromedir")
RAN_SUDO = False
# Utility commands
def _PrintError(msg, err=""):
"""Print a message with an optional error message to stderr"""
if err:
print >> sys.stderr, "%s: %s" % (msg, err)
print >> sys.stderr, msg
def _Confirm(message, default_response="n"):
"""Request confirmation and return True/False."""
default_lc = default_response.lower()
if default_lc == "y":
input_msg = "%s [Y/n] " % message
input_msg = "%s [y/N] " % message
reply = raw_input(input_msg).lower()
if reply:
return reply == "y"
return default_lc == "y"
def _CheckOverwriteFile(filepath):
"""Check for a file and request confirmation if it exists."""
if os.path.isfile(filepath):
if not _Confirm("%s already exists. Overwrite?" % filepath):
return False
return True
def _WriteFile(filepath, contents):
"""Write string |contents| to |filepath|."""
print "Writing to: ", filepath
with open(filepath,"w") as f_1:
except EnvironmentError as err:
_PrintError("Failed to write to file '%s'" % filepath, err)
return False
return True
def _ReadFile(filepath):
"""Return contents of |filepath| as a string."""
print "Reading from: " + filepath
with open(filepath,"r") as f_1:
result =
except EnvironmentError as err:
_PrintError("Failed to read from file '%s'" % filepath, err)
return None
return result.rstrip()
def _Sudo():
"""Request sudo access with message."""
global RAN_SUDO
if not RAN_SUDO:
print "Executing: sudo -v"
try:["sudo", "-v"])
except EnvironmentError as err:
_PrintError("Failed to run sudo", err)
return False
return True
def _RunCommand(args):
"""Pass |args| to and check the result."""
cmd = ''.join([x + " " for x in args])
if args[0] == "sudo":
if _Sudo() == False:
return False
print "Executing: ", cmd
retcode =
except EnvironmentError as err:
_PrintError("Failed to run command '%s'" % cmd, err)
return False
if retcode != 0:
_PrintError("Error running command '%s'" % cmd, "%s" % retcode)
return False
return True
def _GetCommandOutput(args):
"""Pass |args| to subprocess.Popen and return the output."""
cmd = ''.join([x + " " for x in args])
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
result = proc.communicate()[0]
except EnvironmentError as err:
_PrintError("Failed to run command '%s'" % cmd, err)
return None
result = result.rstrip('\n')
return result
def _MakeDirectory(dirpath, warn=True):
"""Create directory |dirpath| if it does not exist."""
if not os.path.isdir(dirpath):
print "Creating directory: ", dirpath
except EnvironmentError as err:
if warn:
_PrintError("Failed to create directory '%s'" % dirpath, err)
return False
return True
def _RemoveFile(filepath):
"""Remove file |filepath| if it exists."""
if os.path.isfile(filepath):
print "Removing file: ", filepath
except EnvironmentError as err:
_PrintError("Failed to remove file '%s'" % filepath, err)
return False
return True
def _RemoveDirectory(dirpath):
"""Recursively remove directory |dirpath| if it exists."""
if os.path.isdir(dirpath):
print "Removing directory: ", dirpath
except EnvironmentError as err:
_PrintError("Failed to remove dir '%s'" % dirpath, err)
return False
return True
def _DevUser():
"""Extract the user name from lsb-release."""
# TODO(stevenjb): Refactor this using python re.
awk_expr += """ sub(/.*Build - /,"");"""
awk_expr += """ sub(/\).*/,"");"""
awk_expr += """ print; }"""
return _GetCommandOutput(["awk", awk_expr, "/etc/lsb-release"])
def _DevHost():
"""Extract the host name from lsb-release."""
# TODO(stevenjb): Refactor this using python re.
awk_expr = """/CHROMEOS_DEVSERVER/ {"""
awk_expr += """ sub(/.*http:\/\//,"");"""
awk_expr += """ sub(/:8080.*/,"");"""
awk_expr += """ print; }"""
return _GetCommandOutput(["awk", awk_expr, "/etc/lsb-release"])
def _DevBoard():
"""Extract the board from lsb-release."""
# TODO(stevenjb): Refactor this using python re.
awk_expr = """/CHROMEOS_RELEASE_BOARD/ {"""
awk_expr += """ sub(/.*=/,"");"""
awk_expr += """ print; }"""
return _GetCommandOutput(["awk", awk_expr, "/etc/lsb-release"])
def _GetChrootDir(prompt_for_dir=False):
"""Get the name for the chrome directory on the host."""
if os.path.isfile(_HOST_CHROOT_DIR_FILENAME):
chromedir = _ReadFile(_HOST_CHROOT_DIR_FILENAME)
if prompt_for_dir:
host = _DevHost()
prompt = ("Chroot directory on %s [ %s ]: " % (host, chromedir))
inputdir = raw_input(prompt).rstrip()
if inputdir:
chromedir = inputdir
_WriteFile(_HOST_CHROOT_DIR_FILENAME, chromedir)
return chromedir
def _DaemonizeAndReRunAsRoot(cmd):
"""Double-fork to become owned by init, then re-exec this tool as root.
Double-forking is the standard way to detach yourself from a
process tree and become owned by init. By doing this, you get to
live on even if your parent goes away."""
pid = os.fork()
if pid > 0:
sys.exit(0) # Just want to go away silently.
except OSError, e:
return False
pid = os.fork()
if pid > 0:
sys.exit(0) # Just want to go away silently.
except OSError, e:
return False
# re-run self as root.
os.execlp('sudo', sys.executable, __file__, cmd)
except OSError, e:
_PrintError("Unable to exec self as root: %s", str(e))
return False
# Other Commands
def TestCommand(args):
"""Test command."""
_WriteFile("/foo/test", "test")
if len(args) == 0:
args = ["sudo", "ls", "/"]
return _RunCommand(args)
return False
def GetBoard(unused_args=0):
"""Gets the board name from /etc/lsb-release."""
print _DevBoard()
return True
def GetUser(unused_args=0):
"""Gets the user name from /etc/lsb-release."""
print _DevUser()
return True
def GetHost(unused_args=0):
"""Gets the host name from /etc/lsb-release."""
print _DevHost()
return True
def MountWriteable(unused_args=0):
"""Remounts / as rw."""
return _RunCommand(["sudo", "mount", "-o", "remount,rw", "/"])
def ShowIP(unused_args=0):
"""Shows the IP address of the device."""
proc1 = subprocess.Popen(["/sbin/ifconfig", "eth0"], stdout=subprocess.PIPE)
awk_cmd = """/inet addr/ {"""
awk_cmd += """ sub(/.*inet addr:/,""); sub(/ Bcast:.*/,"");"""
awk_cmd += """ print; }"""
proc2 = subprocess.Popen(
["awk", awk_cmd], stdin=proc1.stdout, stdout=subprocess.PIPE)
result = proc2.communicate()[0].rstrip('\n')
print "IP: ", result
return True
def KillChrome(unused_args=0):
"""Kills all chrome processes and prevents restarting of chrome."""
res = True
res &= _RunCommand(["touch", "/run/disable_chrome_restart"])
res &= _RunCommand(["sudo", "pkill", "-9", "chrome"])
return res
def ClearOwnership(unused_args=0):
"""Clears any state related to the device Ownership (NOT TPM ownership)."""
res = True
res &= _RunCommand(["sudo", "stop", "ui"])
whitelist_dir = "/var/lib/whitelist"
for file in os.listdir(whitelist_dir):
res &= _RemoveFile(os.path.join(whitelist_dir, file))
res &= _RemoveDirectory("/home/chronos/Local State")
res &= _RemoveFile("/home/chronos/Consent To Send Stats")
res &= _RunCommand(["sudo", "start", "ui"])
if not res:
_PrintError("Errors encountered while clearing ownership state.")
return False
return True
def ShowOobe(unused_args=0):
"""Removes .oobe_completed and Local State directory."""
res = True
res &= _RemoveFile("/home/chronos/.oobe_completed")
res &= ClearOwnership()
if not res:
_PrintError("Errors encountered while clearing setting up OOBE mode.")
return False
return _RunCommand(["sudo", "reboot"])
# Setup Commands
def SetBash(unused_args):
"""Sets the default shell to bash."""
if not _Sudo():
return False
if not MountWriteable():
return False
res = True
res &= _RunCommand(["chsh", "-s", "/bin/bash"])
res &= _RunCommand(["chsh", "-s", "/bin/bash", "chronos"])
return res
def SetupBashrc(unused_args):
"""Sets up .bashrc."""
filepath = os.path.expanduser("~/.bashrc")
if not _CheckOverwriteFile(filepath):
return True
print "Writing to: ", filepath
bashrc = "#!/bin/bash\n"
bashrc += "# .bashrc file set by %s\n" % _PROG
bashrc += "export DISPLAY=:0.0\n"
bashrc += "export PATH=$PATH:/sbin:/usr/sbin:/usr/local/sbin\n"
bashrc += "/sbin/ifconfig eth0 | grep 'inet addr'\n"
if not _WriteFile(filepath, bashrc):
return False
filepath = os.path.expanduser("~/.bash_profile")
print "Writing to: ", filepath
bashprofile = "#!/bin/bash\n"
bashprofile += ". $HOME/.bashrc\n"
if not _WriteFile(filepath, bashprofile):
return False
return True
def SetupDev(unused_args):
"""Developer friendly setup: skip oobe, disable suspend,
\t\tenable ssh and remote debugging, run commands from
if not _Sudo():
return False
if not MountWriteable():
return False
res = True
res &= _WriteFile("/home/chronos/.oobe_completed", "1\n")
res &= _MakeDirectory("/usr/share/power_manager")
res &= _WriteFile("/tmp/disable_idle_suspend", "1\n")
res &= _RunCommand(["sudo", "cp", "/tmp/disable_idle_suspend",
# Enable iptables and system-services for remote debugging
for filename in ["iptables", "saft"]:
res &= _RunCommand(["sudo", "sed", "-i", "-e",
"s/#for_test //", "/etc/init/%s.conf" % filename])
# Allow commands to be run from /home and /tmp
res &= _RunCommand(["sudo", "sed", "-i", "-e",
"s/#mod_for_test#//g", "/sbin/chromeos_startup"])
return res
def SetupSsh(unused_args):
"""Sets up ssh configuration so that the dev host can ssh to the device."""
if not MountWriteable():
return False
user = _DevUser()
host = _DevHost()
res = True
if _CheckOverwriteFile(_SSH_KEY_FILEPATH):
res &= _RemoveFile(_SSH_KEY_FILEPATH)
# Generate an ssh key
if _RunCommand(["ssh-keygen", "-f", _SSH_KEY_FILEPATH, "-N", "", "-q"]):
host_source_path = "%s@%s:%s" % (user, host, _PUB_KEY_FILENAME)
# Copy the ssh key to the host
res &= _RunCommand(["scp", host_source_path, _AUTH_KEYS_FILEPATH])
# Enable ssh to device
res &= _RunCommand(
["sudo", "sed", "-i", "s/#for_test //", "/etc/init/openssh-server.conf"])
return res
def AuthorizeSsh(unused_args):
"""Authorizes this netbook to connect to your dev host.
\t\t*Only use this on a device in a secure location!*"""
user = _DevUser()
host = _DevHost()
if not os.path.isdir(os.path.expanduser("~/.ssh")):
print "Run '%s ssh' to set up .ssh directory first." % _PROG
return False
if not _Confirm("This will append %s to authorized_keys on %s. "
"Are you sure?" % (_PUB_KEY_FILENAME, host)):
return False
proc1 = subprocess.Popen(["cat", _PUB_KEY_FILEPATH], stdout=subprocess.PIPE)
ssh_args = ["ssh", user+"@"+host, "cat >> ~/.ssh/authorized_keys"]
proc2 = subprocess.Popen(
ssh_args, stdin=proc1.stdout, stdout=subprocess.PIPE)
except EnvironmentError as err1:
_PrintError("Error executing '%s'" % ' '.join(ssh_args), err1)
return False
result, err2 = proc2.communicate()
except EnvironmentError:
_PrintError("Error completing ssh command '%s'" % result, err2)
return False
return True
def SetupSshfsForChrome(unused_args):
"""<chrome-drir> Sets up sshfs mount to chrome directory on host."""
user = _DevUser()
host = _DevHost()
chrootdir = _GetChrootDir(True)
target = ("%s@%s:%s/var/lib/portage/distfiles-target/chrome-src"
% (user, host, chrootdir))
print "Setting up sshfs mount to: ", target
res = True
res &= _RunCommand(["sudo", "modprobe", "fuse"])
if os.path.isdir(_CHROME_MOUNT_PATH):
res &= _RunCommand(["fusermount", "-q", "-u", _CHROME_MOUNT_PATH])
res &= _RemoveDirectory(_CHROME_MOUNT_PATH)
res &= _MakeDirectory(_CHROME_MOUNT_PATH)
res &= _RunCommand(["sshfs", target, _CHROME_MOUNT_PATH])
res &= _RunCommand(["sudo", "/sbin/iptables",
"-A", "INPUT", "-p", "tcp", "--dport", "1234",
"-j", "ACCEPT"])
return res
# Multi-commands (convenience functions)
def Setup(args):
"""Performs default developer setup (bash,bashrc,dev,ssh)."""
if not SetBash(args):
if not _Confirm("Bash setup failed. Continue?", "y"):
return False
if not SetupBashrc(args):
if not _Confirm(".bashrc setup failed. Continue?", "y"):
return False
if not SetupDev(args):
if not _Confirm("Dev setup failed. Continue?", "y"):
return False
if not SetupSsh(args):
return False
return True
'setup': Setup,
'dev': SetupDev,
'bash': SetBash,
'bashrc': SetupBashrc,
'ssh': SetupSsh,
'sshauthorize': AuthorizeSsh,
'sshfs': SetupSshfsForChrome,
'mountrw': MountWriteable,
'ip': ShowIP,
'test': TestCommand,
'board': GetBoard,
'user': GetUser,
'host': GetHost,
'show_oobe': ShowOobe,
'clear_owner': ClearOwnership,
def GetUsage(commands):
"""Get the docstring for each command."""
usage = ""
for cmd in commands.items():
usage += " "
usage += cmd[0]
usage += ":\t"
if len(cmd[0]) < 6:
usage += "\t"
doc = cmd[1].__doc__
if doc:
usage += doc
usage += "\n"
return usage
def main():
"""Main crdev function"""
usage = """usage: crdev command [options]
Note: Beta! Feature requests / changes can be sent to: (for now)
usage += "Setup Commands:\n"
usage += GetUsage(_SETUP_COMMANDS)
usage += "Other Commands:\n"
usage += GetUsage(_OTHER_COMMANDS)
usage += "Cleanup Commands:\n"
usage += GetUsage(_CLEANUP_COMMANDS)
parser = optparse.OptionParser(usage)
args = parser.parse_args()[1]
if not args:
print usage
cmd = args[0]
root_ok = cmd in _CLEANUP_COMMANDS.keys()
if not root_ok or os.geteuid() != 0:
if os.getenv('USER') != 'chronos':
_PrintError("%s must be run as chronos." % _PROG)
if cmd in _SETUP_COMMANDS.keys():
res = _SETUP_COMMANDS[cmd](args[1:])
elif cmd in _OTHER_COMMANDS.keys():
res = _OTHER_COMMANDS[cmd](args[1:])
elif cmd in _CLEANUP_COMMANDS.keys():
if os.geteuid() != 0:
_PrintError("Should never return from DaemonizeAndReRunAsRoot()")
res = _CLEANUP_COMMANDS[cmd](args[1:])
parser.error("Unknown command: " + cmd)
if not res:
_PrintError("Errors encountered when running '%s'" % ' '.join(args))
print "Success!"
if __name__ == '__main__':