| # Copyright 2015 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Logic to parse and merge account databases in overlay stacks.""" |
| |
| import collections |
| |
| from chromite.lib import json_lib |
| from chromite.lib import user_db |
| |
| |
| GROUPS_KEY = "groups" |
| USERS_KEY = "users" |
| |
| USER_COMMENT_KEY = "gecos" |
| USER_DEFUNCT_KEY = "defunct" |
| USER_FIXED_ID_KEY = "fixed_id" |
| USER_GROUP_KEY = "group_name" |
| USER_HOME_KEY = "home" |
| USER_ID_KEY = "uid" |
| USER_NAME_KEY = "user" |
| USER_PASSWORD_KEY = "password" |
| USER_SHELL_KEY = "shell" |
| |
| GROUP_DEFUNCT_KEY = "defunct" |
| GROUP_FIXED_ID_KEY = "fixed_id" |
| GROUP_ID_KEY = "gid" |
| GROUP_NAME_KEY = "group" |
| GROUP_PASSWORD_KEY = "password" |
| GROUP_USERS_KEY = "users" |
| |
| User = collections.namedtuple( |
| "User", |
| ( |
| "name", |
| "password", |
| "uid", |
| "group_name", |
| "description", |
| "home", |
| "shell", |
| "is_fixed_id", |
| "is_defunct", |
| ), |
| ) |
| |
| Group = collections.namedtuple( |
| "Group", ("name", "password", "gid", "users", "is_fixed_id", "is_defunct") |
| ) |
| |
| |
| class AccountDatabase: |
| """Parses, validates, and combines account databases from overlays.""" |
| |
| def __init__(self) -> None: |
| """Construct an an empty instance.""" |
| self.groups = {} |
| self.users = {} |
| |
| def AddAccountsFromDatabase(self, account_db_path) -> None: |
| """Add accounts from the database at |account_db_path| to self. |
| |
| Overrides previously loaded accounts. |
| |
| Args: |
| account_db_path: path to file containing an account database. |
| """ |
| raw_db = json_lib.ParseJsonFileWithComments(account_db_path) |
| json_lib.AssertIsInstance(raw_db, dict, "accounts database") |
| |
| # We don't mandate that an accounts database specify either field. |
| raw_db.setdefault(USERS_KEY, []) |
| raw_db.setdefault(GROUPS_KEY, []) |
| user_list = json_lib.PopValueOfType( |
| raw_db, USERS_KEY, list, "list of users in accounts database" |
| ) |
| group_list = json_lib.PopValueOfType( |
| raw_db, GROUPS_KEY, list, "list of groups in accounts database" |
| ) |
| |
| # We do mandate that the database contain only fields we know about. |
| if raw_db: |
| raise ValueError( |
| "Accounts database include unknown fields: %r" % raw_db.keys() |
| ) |
| |
| for user in user_list: |
| json_lib.AssertIsInstance( |
| user, dict, "user specification in accounts database" |
| ) |
| self._AddUser(user) |
| |
| for group in group_list: |
| json_lib.AssertIsInstance( |
| group, dict, "group specification in accounts database" |
| ) |
| self._AddGroup(group) |
| |
| def _AddUser(self, user_spec) -> None: |
| """Add a user to this account database based on |user_spec|. |
| |
| Args: |
| user_spec: dict of information from an accounts database. |
| This fragment is expected to have been parsed from |
| developer supplied JSON and will be type checked. |
| """ |
| # By default, user accounts are locked and cannot be logged into. |
| user_spec.setdefault(USER_PASSWORD_KEY, "!") |
| # By default, users don't get a shell. |
| user_spec.setdefault(USER_SHELL_KEY, "/bin/false") |
| # By default, users don't get a home directory. |
| user_spec.setdefault(USER_HOME_KEY, "/dev/null") |
| # By default, users don't get a fixed UID. |
| user_spec.setdefault(USER_FIXED_ID_KEY, False) |
| # By default, users don't need a comment. |
| user_spec.setdefault(USER_COMMENT_KEY, "") |
| # By default, users are not defunct. |
| user_spec.setdefault(USER_DEFUNCT_KEY, False) |
| |
| name = json_lib.PopValueOfType( |
| user_spec, USER_NAME_KEY, str, "username from user spec" |
| ) |
| password = json_lib.PopValueOfType( |
| user_spec, USER_PASSWORD_KEY, str, "password for user %s" % name |
| ) |
| uid = json_lib.PopValueOfType( |
| user_spec, USER_ID_KEY, int, "default uid for user %s" % name |
| ) |
| group_name = json_lib.PopValueOfType( |
| user_spec, USER_GROUP_KEY, str, "primary group for user %s" % name |
| ) |
| description = json_lib.PopValueOfType( |
| user_spec, USER_COMMENT_KEY, str, "description for user %s" % name |
| ) |
| home = json_lib.PopValueOfType( |
| user_spec, USER_HOME_KEY, str, "home directory for user %s" % name |
| ) |
| shell = json_lib.PopValueOfType( |
| user_spec, USER_SHELL_KEY, str, "shell for user %s" % name |
| ) |
| is_fixed_id = json_lib.PopValueOfType( |
| user_spec, |
| USER_FIXED_ID_KEY, |
| bool, |
| "whether UID for user %s is fixed" % name, |
| ) |
| is_defunct = json_lib.PopValueOfType( |
| user_spec, |
| USER_DEFUNCT_KEY, |
| bool, |
| "whether user %s is defunct." % name, |
| ) |
| |
| if user_spec: |
| raise ValueError( |
| "Unexpected keys in user spec for user %s: %r" |
| % (name, user_spec.keys()) |
| ) |
| |
| self.users[name] = User( |
| name=name, |
| password=password, |
| uid=uid, |
| group_name=group_name, |
| description=description, |
| home=home, |
| shell=shell, |
| is_fixed_id=is_fixed_id, |
| is_defunct=is_defunct, |
| ) |
| |
| def _AddGroup(self, group_spec) -> None: |
| """Add a group to this account database based on |group_spec|. |
| |
| Args: |
| group_spec: dict of information from an accounts database. |
| This fragment is expected to have been parsed from |
| developer supplied JSON and will be type checked. |
| """ |
| # By default, groups don't get a fixed GID. |
| group_spec.setdefault(GROUP_FIXED_ID_KEY, False) |
| # By default, groups don't get a password. |
| group_spec.setdefault(GROUP_PASSWORD_KEY, "!") |
| # By default, groups are not defunct. |
| group_spec.setdefault(GROUP_DEFUNCT_KEY, False) |
| |
| name = json_lib.PopValueOfType( |
| group_spec, GROUP_NAME_KEY, str, "groupname from group spec" |
| ) |
| password = json_lib.PopValueOfType( |
| group_spec, GROUP_PASSWORD_KEY, str, "password for group %s" % name |
| ) |
| gid = json_lib.PopValueOfType( |
| group_spec, GROUP_ID_KEY, int, "gid for group %s" % name |
| ) |
| users = json_lib.PopValueOfType( |
| group_spec, GROUP_USERS_KEY, list, "users in group %s" % name |
| ) |
| is_fixed_id = json_lib.PopValueOfType( |
| group_spec, |
| GROUP_FIXED_ID_KEY, |
| bool, |
| "whether GID for group %s is fixed" % name, |
| ) |
| is_defunct = json_lib.PopValueOfType( |
| group_spec, |
| GROUP_DEFUNCT_KEY, |
| bool, |
| "whether group %s is defunct" % name, |
| ) |
| |
| for username in users: |
| json_lib.AssertIsInstance(username, str, "user in group %s" % name) |
| |
| if group_spec: |
| raise ValueError( |
| "Unexpected keys in group spec for group %s: %r" |
| % (name, group_spec.keys()) |
| ) |
| |
| self.groups[name] = Group( |
| name=name, |
| password=password, |
| gid=gid, |
| users=users, |
| is_fixed_id=is_fixed_id, |
| is_defunct=is_defunct, |
| ) |
| |
| def InstallUser( |
| self, |
| username, |
| sysroot_user_db, |
| uid=None, |
| shell=None, |
| homedir=None, |
| primary_group=None, |
| ) -> None: |
| """Install a user in |sysroot_user_db|. |
| |
| Args: |
| username: name of user to install. |
| sysroot_user_db: user_db.UserDB instance representing the installed |
| users of a particular sysroot. |
| uid: ebuild specified uid. |
| shell: ebuild specified shell. |
| homedir: ebuild specified home directory. |
| primary_group: ebuild specified primary group for user. |
| """ |
| if not username in self.users: |
| raise ValueError('Cannot add unknown user "%s"' % username) |
| user = self.users[username] |
| |
| if user.is_defunct: |
| raise ValueError( |
| 'Refusing to install defunct user: "%s"' % username |
| ) |
| |
| def RaiseIfNotCompatible( |
| user_specified, db_specified, fieldname |
| ) -> None: |
| if user_specified is not None and user_specified != db_specified: |
| raise ValueError( |
| "Accounts database %s (%s) for user %s differs from " |
| "requested %s (%s)" |
| % ( |
| fieldname, |
| db_specified, |
| user.name, |
| fieldname, |
| user_specified, |
| ) |
| ) |
| |
| RaiseIfNotCompatible(uid, user.uid, "UID") |
| RaiseIfNotCompatible(shell, user.shell, "shell") |
| RaiseIfNotCompatible(homedir, user.home, "homedir") |
| RaiseIfNotCompatible(primary_group, user.group_name, "group") |
| |
| if not user.group_name in self.groups: |
| raise ValueError( |
| "Refusing to install user %s with unknown group %s" |
| % (user.name, user.group_name) |
| ) |
| |
| installable_user = user_db.User( |
| user=user.name, |
| password=user.password, |
| uid=user.uid, |
| gid=self.groups[user.group_name].gid, |
| gecos=user.description, |
| home=user.home, |
| shell=user.shell, |
| ) |
| sysroot_user_db.AddUser(installable_user) |
| |
| def InstallGroup(self, groupname, sysroot_user_db, gid=None) -> None: |
| """Install a group in |sysroot_user_db|. |
| |
| Args: |
| groupname: name of group to install. |
| sysroot_user_db: user_db.UserDB instance representing the installed |
| groups. |
| gid: ebuild specified gid. |
| """ |
| if not groupname in self.groups: |
| raise ValueError('Cannot add unknown group "%s"' % groupname) |
| group = self.groups[groupname] |
| |
| if group.is_defunct: |
| raise ValueError( |
| 'Refusing to install defunct group: "%s"' % groupname |
| ) |
| |
| if gid and gid != group.gid: |
| raise ValueError( |
| "Refusing to install group %s with gid=%d while account " |
| "database indicates gid=%d" % (groupname, gid, group.gid) |
| ) |
| |
| installable_group = user_db.Group( |
| group=group.name, |
| password=group.password, |
| gid=group.gid, |
| users=group.users, |
| ) |
| sysroot_user_db.AddGroup(installable_group) |