| # Copyright 1999-2020 Gentoo Authors |
| # Distributed under the terms of the GNU General Public License v2 |
| |
| from portage.dbapi import dbapi |
| from portage.dbapi.dep_expand import dep_expand |
| |
| |
| class PackageVirtualDbapi(dbapi): |
| """ |
| A dbapi-like interface class that represents the state of the installed |
| package database as new packages are installed, replacing any packages |
| that previously existed in the same slot. The main difference between |
| this class and fakedbapi is that this one uses Package instances |
| internally (passed in via cpv_inject() and cpv_remove() calls). |
| """ |
| |
| def __init__(self, settings): |
| dbapi.__init__(self) |
| self.settings = settings |
| self._match_cache = {} |
| self._cp_map = {} |
| self._cpv_map = {} |
| |
| def clear(self): |
| """ |
| Remove all packages. |
| """ |
| if self._cpv_map: |
| self._clear_cache() |
| self._cp_map.clear() |
| self._cpv_map.clear() |
| |
| def copy(self): |
| obj = PackageVirtualDbapi(self.settings) |
| obj._match_cache = self._match_cache.copy() |
| obj._cp_map = self._cp_map.copy() |
| for k, v in obj._cp_map.items(): |
| obj._cp_map[k] = v[:] |
| obj._cpv_map = self._cpv_map.copy() |
| return obj |
| |
| def __bool__(self): |
| return bool(self._cpv_map) |
| |
| def __iter__(self): |
| return iter(self._cpv_map.values()) |
| |
| def __contains__(self, item): |
| existing = self._cpv_map.get(item.cpv) |
| if existing is not None and existing == item: |
| return True |
| return False |
| |
| def get(self, item, default=None): |
| cpv = getattr(item, "cpv", None) |
| if cpv is None: |
| if len(item) != 5: |
| return default |
| type_name, root, cpv, operation, repo_key = item |
| |
| existing = self._cpv_map.get(cpv) |
| if existing is not None and existing == item: |
| return existing |
| return default |
| |
| def match_pkgs(self, atom): |
| return [self._cpv_map[cpv] for cpv in self.match(atom)] |
| |
| def _clear_cache(self): |
| if self._categories is not None: |
| self._categories = None |
| if self._match_cache: |
| self._match_cache = {} |
| |
| def match(self, origdep, use_cache=1): |
| atom = dep_expand(origdep, mydb=self, settings=self.settings) |
| cache_key = (atom, atom.unevaluated_atom) |
| result = self._match_cache.get(cache_key) |
| if result is not None: |
| return result[:] |
| result = list(self._iter_match(atom, self.cp_list(atom.cp))) |
| self._match_cache[cache_key] = result |
| return result[:] |
| |
| def cpv_exists(self, cpv, myrepo=None): |
| return cpv in self._cpv_map |
| |
| def cp_list(self, mycp, use_cache=1): |
| # NOTE: Cache can be safely shared with the match cache, since the |
| # match cache uses the result from dep_expand for the cache_key. |
| cache_key = (mycp, mycp) |
| cachelist = self._match_cache.get(cache_key) |
| if cachelist is not None: |
| return cachelist[:] |
| cpv_list = self._cp_map.get(mycp) |
| if cpv_list is None: |
| cpv_list = [] |
| else: |
| cpv_list = [pkg.cpv for pkg in cpv_list] |
| self._cpv_sort_ascending(cpv_list) |
| self._match_cache[cache_key] = cpv_list |
| return cpv_list[:] |
| |
| def cp_all(self, sort=False): |
| return sorted(self._cp_map) if sort else list(self._cp_map) |
| |
| def cpv_all(self): |
| return list(self._cpv_map) |
| |
| def cpv_inject(self, pkg): |
| cp_list = self._cp_map.get(pkg.cp) |
| if cp_list is None: |
| cp_list = [] |
| self._cp_map[pkg.cp] = cp_list |
| e_pkg = self._cpv_map.get(pkg.cpv) |
| if e_pkg is not None: |
| if e_pkg == pkg: |
| return |
| self.cpv_remove(e_pkg) |
| for e_pkg in cp_list: |
| if e_pkg.slot_atom == pkg.slot_atom: |
| if e_pkg == pkg: |
| return |
| self.cpv_remove(e_pkg) |
| break |
| cp_list.append(pkg) |
| self._cpv_map[pkg.cpv] = pkg |
| self._clear_cache() |
| |
| def cpv_remove(self, pkg): |
| old_pkg = self._cpv_map.get(pkg.cpv) |
| if old_pkg != pkg: |
| raise KeyError(pkg) |
| self._cp_map[pkg.cp].remove(pkg) |
| del self._cpv_map[pkg.cpv] |
| self._clear_cache() |
| |
| def aux_get(self, cpv, wants, myrepo=None): |
| metadata = self._cpv_map[cpv]._metadata |
| return [metadata.get(x, "") for x in wants] |
| |
| def aux_update(self, cpv, values): |
| self._cpv_map[cpv]._metadata.update(values) |
| self._clear_cache() |