| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Logic to parse and merge account databases in overlay stacks.""" |
| |
| from __future__ import print_function |
| |
| import collections |
| |
| from chromite.lib import json_lib |
| from chromite.lib import user_db |
| |
| |
| GROUPS_KEY = 'groups' |
| USERS_KEY = 'users' |
| |
| USER_COMMENT_KEY = 'gecos' |
| USER_DEFUNCT_KEY = 'defunct' |
| USER_FIXED_ID_KEY = 'fixed_id' |
| USER_GROUP_KEY = 'group_name' |
| USER_HOME_KEY = 'home' |
| USER_ID_KEY = 'uid' |
| USER_NAME_KEY = 'user' |
| USER_PASSWORD_KEY = 'password' |
| USER_SHELL_KEY = 'shell' |
| |
| GROUP_DEFUNCT_KEY = 'defunct' |
| GROUP_FIXED_ID_KEY = 'fixed_id' |
| GROUP_ID_KEY = 'gid' |
| GROUP_NAME_KEY = 'group' |
| GROUP_PASSWORD_KEY = 'password' |
| GROUP_USERS_KEY = 'users' |
| |
| User = collections.namedtuple( |
| 'User', ('name', 'password', 'uid', 'group_name', 'description', 'home', |
| 'shell', 'is_fixed_id', 'is_defunct')) |
| |
| Group = collections.namedtuple( |
| 'Group', ('name', 'password', 'gid', 'users', 'is_fixed_id', 'is_defunct')) |
| |
| |
| class AccountDatabase(object): |
| """Parses, validates, and combines account databases from overlays.""" |
| |
| def __init__(self): |
| """Construct an an empty instance.""" |
| self.groups = {} |
| self.users = {} |
| |
| def AddAccountsFromDatabase(self, account_db_path): |
| """Add accounts from the database at |account_db_path| to self. |
| |
| Overrides previously loaded accounts. |
| |
| Args: |
| account_db_path: path to file containing an account database. |
| """ |
| raw_db = json_lib.ParseJsonFileWithComments(account_db_path) |
| json_lib.AssertIsInstance(raw_db, dict, 'accounts database') |
| |
| # We don't mandate that an accounts database specify either field. |
| raw_db.setdefault(USERS_KEY, []) |
| raw_db.setdefault(GROUPS_KEY, []) |
| user_list = json_lib.PopValueOfType(raw_db, USERS_KEY, list, |
| 'list of users in accounts database') |
| group_list = json_lib.PopValueOfType(raw_db, GROUPS_KEY, list, |
| 'list of groups in accounts database') |
| |
| # We do mandate that the database contain only fields we know about. |
| if raw_db: |
| raise ValueError('Accounts database include unknown fields: %r' % |
| raw_db.keys()) |
| |
| for user in user_list: |
| json_lib.AssertIsInstance( |
| user, dict, 'user specification in accounts database') |
| self._AddUser(user) |
| |
| for group in group_list: |
| json_lib.AssertIsInstance( |
| group, dict, 'group specification in accounts database') |
| self._AddGroup(group) |
| |
| def _AddUser(self, user_spec): |
| """Add a user to this account database based on |user_spec|. |
| |
| Args: |
| user_spec: dict of information from an accounts database. |
| This fragment is expected to have been parsed from |
| developer supplied JSON and will be type checked. |
| """ |
| # By default, user accounts are locked and cannot be logged into. |
| user_spec.setdefault(USER_PASSWORD_KEY, u'!') |
| # By default, users don't get a shell. |
| user_spec.setdefault(USER_SHELL_KEY, u'/bin/false') |
| # By default, users don't get a home directory. |
| user_spec.setdefault(USER_HOME_KEY, u'/dev/null') |
| # By default, users don't get a fixed UID. |
| user_spec.setdefault(USER_FIXED_ID_KEY, False) |
| # By default, users don't need a comment. |
| user_spec.setdefault(USER_COMMENT_KEY, u'') |
| # By default, users are not defunct. |
| user_spec.setdefault(USER_DEFUNCT_KEY, False) |
| |
| name = json_lib.PopValueOfType(user_spec, USER_NAME_KEY, unicode, |
| 'username from user spec') |
| password = json_lib.PopValueOfType(user_spec, USER_PASSWORD_KEY, unicode, |
| 'password for user %s' % name) |
| uid = json_lib.PopValueOfType(user_spec, USER_ID_KEY, int, |
| 'default uid for user %s' % name) |
| group_name = json_lib.PopValueOfType(user_spec, USER_GROUP_KEY, unicode, |
| 'primary group for user %s' % name) |
| description = json_lib.PopValueOfType(user_spec, USER_COMMENT_KEY, unicode, |
| 'description for user %s' % name) |
| home = json_lib.PopValueOfType(user_spec, USER_HOME_KEY, unicode, |
| 'home directory for user %s' % name) |
| shell = json_lib.PopValueOfType(user_spec, USER_SHELL_KEY, unicode, |
| 'shell for user %s' % name) |
| is_fixed_id = json_lib.PopValueOfType(user_spec, USER_FIXED_ID_KEY, bool, |
| 'whether UID for user %s is fixed' % |
| name) |
| is_defunct = json_lib.PopValueOfType(user_spec, USER_DEFUNCT_KEY, bool, |
| 'whether user %s is defunct.' % name) |
| |
| if user_spec: |
| raise ValueError('Unexpected keys in user spec for user %s: %r' % |
| (name, user_spec.keys())) |
| |
| self.users[name] = User(name=name, password=password, uid=uid, |
| group_name=group_name, description=description, |
| home=home, shell=shell, is_fixed_id=is_fixed_id, |
| is_defunct=is_defunct) |
| |
| def _AddGroup(self, group_spec): |
| """Add a group to this account database based on |group_spec|. |
| |
| Args: |
| group_spec: dict of information from an accounts database. |
| This fragment is expected to have been parsed from |
| developer supplied JSON and will be type checked. |
| """ |
| # By default, groups don't get a fixed GID. |
| group_spec.setdefault(GROUP_FIXED_ID_KEY, False) |
| # By default, groups don't get a password. |
| group_spec.setdefault(GROUP_PASSWORD_KEY, u'!') |
| # By default, groups are not defunct. |
| group_spec.setdefault(GROUP_DEFUNCT_KEY, False) |
| |
| name = json_lib.PopValueOfType(group_spec, GROUP_NAME_KEY, unicode, |
| 'groupname from group spec') |
| password = json_lib.PopValueOfType(group_spec, GROUP_PASSWORD_KEY, unicode, |
| 'password for group %s' % name) |
| gid = json_lib.PopValueOfType(group_spec, GROUP_ID_KEY, int, |
| 'gid for group %s' % name) |
| users = json_lib.PopValueOfType(group_spec, GROUP_USERS_KEY, list, |
| 'users in group %s' % name) |
| is_fixed_id = json_lib.PopValueOfType(group_spec, GROUP_FIXED_ID_KEY, bool, |
| 'whether GID for group %s is fixed' % |
| name) |
| is_defunct = json_lib.PopValueOfType(group_spec, GROUP_DEFUNCT_KEY, bool, |
| 'whether group %s is defunct' % name) |
| |
| for username in users: |
| json_lib.AssertIsInstance(username, unicode, 'user in group %s' % name) |
| |
| if group_spec: |
| raise ValueError('Unexpected keys in group spec for group %s: %r' % |
| (name, group_spec.keys())) |
| |
| self.groups[name] = Group(name=name, password=password, gid=gid, |
| users=users, is_fixed_id=is_fixed_id, |
| is_defunct=is_defunct) |
| |
| def InstallUser(self, username, sysroot_user_db, |
| uid=None, shell=None, homedir=None, primary_group=None): |
| """Install a user in |sysroot_user_db|. |
| |
| Args: |
| username: name of user to install. |
| sysroot_user_db: user_db.UserDB instance representing the installed users |
| of a particular sysroot. |
| uid: ebuild specified uid. |
| shell: ebuild specified shell. |
| homedir: ebuild specified home directory. |
| primary_group: ebuild specified primary group for user. |
| """ |
| if not username in self.users: |
| raise ValueError('Cannot add unknown user "%s"' % username) |
| user = self.users[username] |
| |
| if user.is_defunct: |
| raise ValueError('Refusing to install defunct user: "%s"' % username) |
| |
| def RaiseIfNotCompatible(user_specified, db_specified, fieldname): |
| if user_specified is not None and user_specified != db_specified: |
| raise ValueError('Accounts database %s (%s) for user %s differs from ' |
| 'requested %s (%s)' % |
| (fieldname, db_specified, user.name, fieldname, |
| user_specified)) |
| |
| RaiseIfNotCompatible(uid, user.uid, 'UID') |
| RaiseIfNotCompatible(shell, user.shell, 'shell') |
| RaiseIfNotCompatible(homedir, user.home, 'homedir') |
| RaiseIfNotCompatible(primary_group, user.group_name, 'group') |
| |
| if not user.group_name in self.groups: |
| raise ValueError('Refusing to install user %s with unknown group %s' % |
| (user.name, user.group_name)) |
| |
| installable_user = user_db.User( |
| user=user.name, password=user.password, uid=user.uid, |
| gid=self.groups[user.group_name].gid, gecos=user.description, |
| home=user.home, shell=user.shell) |
| sysroot_user_db.AddUser(installable_user) |
| |
| def InstallGroup(self, groupname, sysroot_user_db, gid=None): |
| """Install a group in |sysroot_user_db|. |
| |
| Args: |
| groupname: name of group to install. |
| sysroot_user_db: user_db.UserDB instance representing the installed |
| groups. |
| gid: ebuild specified gid. |
| """ |
| if not groupname in self.groups: |
| raise ValueError('Cannot add unknown group "%s"' % groupname) |
| group = self.groups[groupname] |
| |
| if group.is_defunct: |
| raise ValueError('Refusing to install defunct group: "%s"' % groupname) |
| |
| if gid and gid != group.gid: |
| raise ValueError('Refusing to install group %s with gid=%d while account ' |
| 'database indicates gid=%d' % |
| (groupname, gid, group.gid)) |
| |
| installable_group = user_db.Group( |
| group=group.name, password=group.password, |
| gid=group.gid, users=group.users) |
| sysroot_user_db.AddGroup(installable_group) |