blob: d69c8a3c22d62e89d7395f3fb42e4e30c823c3f5 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Logic to parse and merge account databases in overlay stacks."""
from __future__ import print_function
import collections
import sys
import six
from chromite.lib import json_lib
from chromite.lib import user_db
assert sys.version_info >= (3, 6), 'This module requires Python 3.6+'
GROUPS_KEY = 'groups'
USERS_KEY = 'users'
USER_DEFUNCT_KEY = 'defunct'
USER_FIXED_ID_KEY = 'fixed_id'
USER_GROUP_KEY = 'group_name'
USER_HOME_KEY = 'home'
USER_ID_KEY = 'uid'
USER_NAME_KEY = 'user'
USER_PASSWORD_KEY = 'password'
USER_SHELL_KEY = 'shell'
GROUP_FIXED_ID_KEY = 'fixed_id'
GROUP_ID_KEY = 'gid'
GROUP_NAME_KEY = 'group'
User = collections.namedtuple(
'User', ('name', 'password', 'uid', 'group_name', 'description', 'home',
'shell', 'is_fixed_id', 'is_defunct'))
Group = collections.namedtuple(
'Group', ('name', 'password', 'gid', 'users', 'is_fixed_id', 'is_defunct'))
class AccountDatabase(object):
"""Parses, validates, and combines account databases from overlays."""
def __init__(self):
"""Construct an an empty instance."""
self.groups = {}
self.users = {}
def AddAccountsFromDatabase(self, account_db_path):
"""Add accounts from the database at |account_db_path| to self.
Overrides previously loaded accounts.
account_db_path: path to file containing an account database.
raw_db = json_lib.ParseJsonFileWithComments(account_db_path)
json_lib.AssertIsInstance(raw_db, dict, 'accounts database')
# We don't mandate that an accounts database specify either field.
raw_db.setdefault(USERS_KEY, [])
raw_db.setdefault(GROUPS_KEY, [])
user_list = json_lib.PopValueOfType(raw_db, USERS_KEY, list,
'list of users in accounts database')
group_list = json_lib.PopValueOfType(raw_db, GROUPS_KEY, list,
'list of groups in accounts database')
# We do mandate that the database contain only fields we know about.
if raw_db:
raise ValueError('Accounts database include unknown fields: %r' %
for user in user_list:
user, dict, 'user specification in accounts database')
for group in group_list:
group, dict, 'group specification in accounts database')
def _AddUser(self, user_spec):
"""Add a user to this account database based on |user_spec|.
user_spec: dict of information from an accounts database.
This fragment is expected to have been parsed from
developer supplied JSON and will be type checked.
# By default, user accounts are locked and cannot be logged into.
user_spec.setdefault(USER_PASSWORD_KEY, u'!')
# By default, users don't get a shell.
user_spec.setdefault(USER_SHELL_KEY, u'/bin/false')
# By default, users don't get a home directory.
user_spec.setdefault(USER_HOME_KEY, u'/dev/null')
# By default, users don't get a fixed UID.
user_spec.setdefault(USER_FIXED_ID_KEY, False)
# By default, users don't need a comment.
user_spec.setdefault(USER_COMMENT_KEY, u'')
# By default, users are not defunct.
user_spec.setdefault(USER_DEFUNCT_KEY, False)
name = json_lib.PopValueOfType(user_spec, USER_NAME_KEY, six.text_type,
'username from user spec')
password = json_lib.PopValueOfType(
user_spec, USER_PASSWORD_KEY, six.string_types,
'password for user %s' % name)
uid = json_lib.PopValueOfType(user_spec, USER_ID_KEY, int,
'default uid for user %s' % name)
group_name = json_lib.PopValueOfType(
user_spec, USER_GROUP_KEY, six.text_type,
'primary group for user %s' % name)
description = json_lib.PopValueOfType(
user_spec, USER_COMMENT_KEY, six.text_type,
'description for user %s' % name)
home = json_lib.PopValueOfType(user_spec, USER_HOME_KEY, six.text_type,
'home directory for user %s' % name)
shell = json_lib.PopValueOfType(user_spec, USER_SHELL_KEY, six.text_type,
'shell for user %s' % name)
is_fixed_id = json_lib.PopValueOfType(user_spec, USER_FIXED_ID_KEY, bool,
'whether UID for user %s is fixed' %
is_defunct = json_lib.PopValueOfType(user_spec, USER_DEFUNCT_KEY, bool,
'whether user %s is defunct.' % name)
if user_spec:
raise ValueError('Unexpected keys in user spec for user %s: %r' %
(name, user_spec.keys()))
self.users[name] = User(name=name, password=password, uid=uid,
group_name=group_name, description=description,
home=home, shell=shell, is_fixed_id=is_fixed_id,
def _AddGroup(self, group_spec):
"""Add a group to this account database based on |group_spec|.
group_spec: dict of information from an accounts database.
This fragment is expected to have been parsed from
developer supplied JSON and will be type checked.
# By default, groups don't get a fixed GID.
group_spec.setdefault(GROUP_FIXED_ID_KEY, False)
# By default, groups don't get a password.
group_spec.setdefault(GROUP_PASSWORD_KEY, u'!')
# By default, groups are not defunct.
group_spec.setdefault(GROUP_DEFUNCT_KEY, False)
name = json_lib.PopValueOfType(group_spec, GROUP_NAME_KEY, six.text_type,
'groupname from group spec')
password = json_lib.PopValueOfType(
group_spec, GROUP_PASSWORD_KEY, six.text_type,
'password for group %s' % name)
gid = json_lib.PopValueOfType(group_spec, GROUP_ID_KEY, int,
'gid for group %s' % name)
users = json_lib.PopValueOfType(group_spec, GROUP_USERS_KEY, list,
'users in group %s' % name)
is_fixed_id = json_lib.PopValueOfType(group_spec, GROUP_FIXED_ID_KEY, bool,
'whether GID for group %s is fixed' %
is_defunct = json_lib.PopValueOfType(group_spec, GROUP_DEFUNCT_KEY, bool,
'whether group %s is defunct' % name)
for username in users:
username, six.text_type, 'user in group %s' % name)
if group_spec:
raise ValueError('Unexpected keys in group spec for group %s: %r' %
(name, group_spec.keys()))
self.groups[name] = Group(name=name, password=password, gid=gid,
users=users, is_fixed_id=is_fixed_id,
def InstallUser(self, username, sysroot_user_db,
uid=None, shell=None, homedir=None, primary_group=None):
"""Install a user in |sysroot_user_db|.
username: name of user to install.
sysroot_user_db: user_db.UserDB instance representing the installed users
of a particular sysroot.
uid: ebuild specified uid.
shell: ebuild specified shell.
homedir: ebuild specified home directory.
primary_group: ebuild specified primary group for user.
if not username in self.users:
raise ValueError('Cannot add unknown user "%s"' % username)
user = self.users[username]
if user.is_defunct:
raise ValueError('Refusing to install defunct user: "%s"' % username)
def RaiseIfNotCompatible(user_specified, db_specified, fieldname):
if user_specified is not None and user_specified != db_specified:
raise ValueError('Accounts database %s (%s) for user %s differs from '
'requested %s (%s)' %
(fieldname, db_specified,, fieldname,
RaiseIfNotCompatible(uid, user.uid, 'UID')
RaiseIfNotCompatible(shell,, 'shell')
RaiseIfNotCompatible(homedir, user.home, 'homedir')
RaiseIfNotCompatible(primary_group, user.group_name, 'group')
if not user.group_name in self.groups:
raise ValueError('Refusing to install user %s with unknown group %s' %
(, user.group_name))
installable_user = user_db.User(, password=user.password, uid=user.uid,
gid=self.groups[user.group_name].gid, gecos=user.description,
def InstallGroup(self, groupname, sysroot_user_db, gid=None):
"""Install a group in |sysroot_user_db|.
groupname: name of group to install.
sysroot_user_db: user_db.UserDB instance representing the installed
gid: ebuild specified gid.
if not groupname in self.groups:
raise ValueError('Cannot add unknown group "%s"' % groupname)
group = self.groups[groupname]
if group.is_defunct:
raise ValueError('Refusing to install defunct group: "%s"' % groupname)
if gid and gid != group.gid:
raise ValueError('Refusing to install group %s with gid=%d while account '
'database indicates gid=%d' %
(groupname, gid, group.gid))
installable_group = user_db.Group(, password=group.password,
gid=group.gid, users=group.users)