blob: 24990e4dbaf026b072ea464c9c722d4a4f2349a9 [file] [log] [blame]
# Copyright 2019-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import functools
import io
import tempfile
import types
import portage
from portage import shutil, os
from portage.checksum import checksum_str
from portage.const import BASH_BINARY, MANIFEST2_HASH_DEFAULTS, PORTAGE_PYM_PATH
from portage.tests import TestCase
from portage.tests.resolver.ResolverPlayground import ResolverPlayground
from portage.tests.util.test_socks5 import AsyncHTTPServer
from portage.util.configparser import ConfigParserError
from portage.util.futures import asyncio
from portage.util.futures.executor.fork import ForkExecutor
from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.package.ebuild.config import config
from portage.package.ebuild.digestgen import digestgen
from portage.package.ebuild.fetch import (
ContentHashLayout,
DistfileName,
_download_suffix,
fetch,
FilenameHashLayout,
FlatLayout,
MirrorLayoutConfig,
)
from portage._emirrordist.Config import Config as EmirrordistConfig
from _emerge.EbuildFetcher import EbuildFetcher
from _emerge.Package import Package
class EbuildFetchTestCase(TestCase):
def testEbuildFetch(self):
user_config = {
"make.conf":
(
'GENTOO_MIRRORS="{scheme}://{host}:{port}"',
),
}
distfiles = {
'bar': b'bar\n',
'foo': b'foo\n',
}
ebuilds = {
'dev-libs/A-1': {
'EAPI': '7',
'SRC_URI': '''{scheme}://{host}:{port}/distfiles/bar.txt -> bar
{scheme}://{host}:{port}/distfiles/foo.txt -> foo''',
},
}
loop = SchedulerInterface(global_event_loop())
scheme = 'http'
host = '127.0.0.1'
content = {}
with AsyncHTTPServer(host, content, loop) as server:
ebuilds_subst = {}
for cpv, metadata in ebuilds.items():
metadata = metadata.copy()
metadata['SRC_URI'] = metadata['SRC_URI'].format(
scheme=scheme, host=host, port=server.server_port)
ebuilds_subst[cpv] = metadata
user_config_subst = user_config.copy()
for configname, configdata in user_config.items():
configdata_sub = []
for line in configdata:
configdata_sub.append(line.format(
scheme=scheme, host=host, port=server.server_port))
user_config_subst[configname] = tuple(configdata_sub)
playground = ResolverPlayground(ebuilds=ebuilds_subst, distfiles=distfiles, user_config=user_config_subst)
ro_distdir = tempfile.mkdtemp()
try:
self._testEbuildFetch(loop, scheme, host, distfiles, ebuilds, content, server, playground, ro_distdir)
finally:
shutil.rmtree(ro_distdir)
playground.cleanup()
def _testEbuildFetch(
self,
loop,
scheme,
host,
orig_distfiles,
ebuilds,
content,
server,
playground,
ro_distdir,
):
mirror_layouts = (
(
"[structure]",
"0=filename-hash BLAKE2B 8",
"1=flat",
),
(
"[structure]",
"1=filename-hash BLAKE2B 8",
"0=flat",
),
(
"[structure]",
"0=content-hash SHA512 8:8:8",
"1=flat",
),
)
fetchcommand = portage.util.shlex_split(playground.settings["FETCHCOMMAND"])
fetch_bin = portage.process.find_binary(fetchcommand[0])
if fetch_bin is None:
self.skipTest(
"FETCHCOMMAND not found: {}".format(playground.settings["FETCHCOMMAND"])
)
eubin = os.path.join(playground.eprefix, "usr", "bin")
os.symlink(fetch_bin, os.path.join(eubin, os.path.basename(fetch_bin)))
resumecommand = portage.util.shlex_split(playground.settings["RESUMECOMMAND"])
resume_bin = portage.process.find_binary(resumecommand[0])
if resume_bin is None:
self.skipTest(
"RESUMECOMMAND not found: {}".format(
playground.settings["RESUMECOMMAND"]
)
)
if resume_bin != fetch_bin:
os.symlink(resume_bin, os.path.join(eubin, os.path.basename(resume_bin)))
root_config = playground.trees[playground.eroot]["root_config"]
portdb = root_config.trees["porttree"].dbapi
def run_async(func, *args, **kwargs):
with ForkExecutor(loop=loop) as executor:
return loop.run_until_complete(
loop.run_in_executor(
executor, functools.partial(func, *args, **kwargs)
)
)
for layout_lines in mirror_layouts:
settings = config(clone=playground.settings)
layout_data = "".join("{}\n".format(line) for line in layout_lines)
mirror_conf = MirrorLayoutConfig()
mirror_conf.read_from_file(io.StringIO(layout_data))
layouts = mirror_conf.get_all_layouts()
content["/distfiles/layout.conf"] = layout_data.encode("utf8")
distfiles = {}
for k, v in orig_distfiles.items():
filename = DistfileName(
k,
digests=dict((algo, checksum_str(v, hashname=algo)) for algo in MANIFEST2_HASH_DEFAULTS),
)
distfiles[filename] = v
# mirror path
for layout in layouts:
content["/distfiles/" + layout.get_path(filename)] = v
# upstream path
content["/distfiles/{}.txt".format(k)] = v
shutil.rmtree(settings["DISTDIR"])
os.makedirs(settings["DISTDIR"])
with open(os.path.join(settings['DISTDIR'], 'layout.conf'), 'wt') as f:
f.write(layout_data)
if any(isinstance(layout, ContentHashLayout) for layout in layouts):
content_db = os.path.join(playground.eprefix, 'var/db/emirrordist/content.db')
os.makedirs(os.path.dirname(content_db), exist_ok=True)
try:
os.unlink(content_db)
except OSError:
pass
else:
content_db = None
# Demonstrate that fetch preserves a stale file in DISTDIR when no digests are given.
foo_uri = {'foo': ('{scheme}://{host}:{port}/distfiles/foo'.format(scheme=scheme, host=host, port=server.server_port),)}
foo_path = os.path.join(settings['DISTDIR'], 'foo')
foo_stale_content = b'stale content\n'
with open(foo_path, 'wb') as f:
f.write(b'stale content\n')
self.assertTrue(bool(run_async(fetch, foo_uri, settings, try_mirrors=False)))
with open(foo_path, 'rb') as f:
self.assertEqual(f.read(), foo_stale_content)
with open(foo_path, 'rb') as f:
self.assertNotEqual(f.read(), distfiles['foo'])
# Use force=True to update the stale file.
self.assertTrue(bool(run_async(fetch, foo_uri, settings, try_mirrors=False, force=True)))
with open(foo_path, 'rb') as f:
self.assertEqual(f.read(), distfiles['foo'])
# Test force=True with FEATURES=skiprocheck, using read-only DISTDIR.
# FETCHCOMMAND is set to temporarily chmod +w DISTDIR. Note that
# FETCHCOMMAND must perform atomic rename itself due to read-only
# DISTDIR.
with open(foo_path, 'wb') as f:
f.write(b'stale content\n')
orig_fetchcommand = settings['FETCHCOMMAND']
orig_distdir_mode = os.stat(settings['DISTDIR']).st_mode
temp_fetchcommand = os.path.join(eubin, 'fetchcommand')
with open(temp_fetchcommand, 'w') as f:
f.write("""
set -e
URI=$1
DISTDIR=$2
FILE=$3
trap 'chmod a-w "${DISTDIR}"' EXIT
chmod ug+w "${DISTDIR}"
%s
mv -f "${DISTDIR}/${FILE}.__download__" "${DISTDIR}/${FILE}"
""" % orig_fetchcommand.replace('${FILE}', '${FILE}.__download__'))
settings['FETCHCOMMAND'] = '"%s" "%s" "${URI}" "${DISTDIR}" "${FILE}"' % (BASH_BINARY, temp_fetchcommand)
settings.features.add('skiprocheck')
settings.features.remove('distlocks')
os.chmod(settings['DISTDIR'], 0o555)
try:
self.assertTrue(bool(run_async(fetch, foo_uri, settings, try_mirrors=False, force=True)))
finally:
settings['FETCHCOMMAND'] = orig_fetchcommand
os.chmod(settings['DISTDIR'], orig_distdir_mode)
settings.features.remove('skiprocheck')
settings.features.add('distlocks')
os.unlink(temp_fetchcommand)
with open(foo_path, 'rb') as f:
self.assertEqual(f.read(), distfiles['foo'])
# Test emirrordist invocation.
emirrordist_cmd = (portage._python_interpreter, '-b', '-Wd',
os.path.join(self.bindir, 'emirrordist'),
'--distfiles', settings['DISTDIR'],
'--config-root', settings['EPREFIX'],
'--delete',
'--repositories-configuration', settings.repositories.config_string(),
'--repo', 'test_repo', '--mirror')
if content_db is not None:
emirrordist_cmd = emirrordist_cmd + ('--content-db', content_db,)
env = settings.environ()
env['PYTHONPATH'] = ':'.join(
filter(None, [PORTAGE_PYM_PATH] + os.environ.get('PYTHONPATH', '').split(':')))
for k in distfiles:
try:
os.unlink(os.path.join(settings['DISTDIR'], k))
except OSError:
pass
proc = loop.run_until_complete(asyncio.create_subprocess_exec(*emirrordist_cmd, env=env))
self.assertEqual(loop.run_until_complete(proc.wait()), 0)
for k in distfiles:
with open(os.path.join(settings['DISTDIR'], layouts[0].get_path(k)), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
if content_db is not None:
loop.run_until_complete(
self._test_content_db(
emirrordist_cmd,
env,
layouts,
content_db,
distfiles,
settings,
portdb,
)
)
# Tests only work with one ebuild at a time, so the config
# pool only needs a single config instance.
class config_pool:
@staticmethod
def allocate():
return settings
@staticmethod
def deallocate(settings):
pass
def async_fetch(pkg, ebuild_path):
fetcher = EbuildFetcher(config_pool=config_pool, ebuild_path=ebuild_path,
fetchonly=False, fetchall=True, pkg=pkg, scheduler=loop)
fetcher.start()
return fetcher.async_wait()
for cpv in ebuilds:
metadata = dict(zip(Package.metadata_keys,
portdb.aux_get(cpv, Package.metadata_keys)))
pkg = Package(built=False, cpv=cpv, installed=False,
metadata=metadata, root_config=root_config,
type_name='ebuild')
settings.setcpv(pkg)
ebuild_path = portdb.findname(pkg.cpv)
portage.doebuild_environment(ebuild_path, 'fetch', settings=settings, db=portdb)
# Test good files in DISTDIR
for k in settings['AA'].split():
os.stat(os.path.join(settings['DISTDIR'], k))
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
# Test digestgen with fetch
os.unlink(os.path.join(os.path.dirname(ebuild_path), 'Manifest'))
for k in settings['AA'].split():
os.unlink(os.path.join(settings['DISTDIR'], k))
with ForkExecutor(loop=loop) as executor:
self.assertTrue(bool(loop.run_until_complete(
loop.run_in_executor(executor, functools.partial(
digestgen, mysettings=settings, myportdb=portdb)))))
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
# Test missing files in DISTDIR
for k in settings['AA'].split():
os.unlink(os.path.join(settings['DISTDIR'], k))
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
# Test empty files in DISTDIR
for k in settings['AA'].split():
file_path = os.path.join(settings['DISTDIR'], k)
with open(file_path, 'wb') as f:
pass
self.assertEqual(os.stat(file_path).st_size, 0)
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
# Test non-empty files containing null bytes in DISTDIR
for k in settings['AA'].split():
file_path = os.path.join(settings['DISTDIR'], k)
with open(file_path, 'wb') as f:
f.write(len(distfiles[k]) * b'\0')
self.assertEqual(os.stat(file_path).st_size, len(distfiles[k]))
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
# Test PORTAGE_RO_DISTDIRS
settings['PORTAGE_RO_DISTDIRS'] = '"{}"'.format(ro_distdir)
orig_fetchcommand = settings['FETCHCOMMAND']
orig_resumecommand = settings['RESUMECOMMAND']
try:
settings['FETCHCOMMAND'] = settings['RESUMECOMMAND'] = ''
for k in settings['AA'].split():
file_path = os.path.join(settings['DISTDIR'], k)
os.rename(file_path, os.path.join(ro_distdir, k))
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
file_path = os.path.join(settings['DISTDIR'], k)
self.assertTrue(os.path.islink(file_path))
with open(file_path, 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
os.unlink(file_path)
finally:
settings.pop('PORTAGE_RO_DISTDIRS')
settings['FETCHCOMMAND'] = orig_fetchcommand
settings['RESUMECOMMAND'] = orig_resumecommand
# Test local filesystem in GENTOO_MIRRORS
orig_mirrors = settings['GENTOO_MIRRORS']
orig_fetchcommand = settings['FETCHCOMMAND']
try:
settings['GENTOO_MIRRORS'] = ro_distdir
settings['FETCHCOMMAND'] = settings['RESUMECOMMAND'] = ''
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
finally:
settings['GENTOO_MIRRORS'] = orig_mirrors
settings['FETCHCOMMAND'] = orig_fetchcommand
settings['RESUMECOMMAND'] = orig_resumecommand
# Test readonly DISTDIR
orig_distdir_mode = os.stat(settings['DISTDIR']).st_mode
try:
os.chmod(settings['DISTDIR'], 0o555)
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
finally:
os.chmod(settings['DISTDIR'], orig_distdir_mode)
# Test parallel-fetch mode
settings['PORTAGE_PARALLEL_FETCHONLY'] = '1'
try:
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
for k in settings['AA'].split():
os.unlink(os.path.join(settings['DISTDIR'], k))
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
finally:
settings.pop('PORTAGE_PARALLEL_FETCHONLY')
# Test RESUMECOMMAND
orig_resume_min_size = settings['PORTAGE_FETCH_RESUME_MIN_SIZE']
try:
settings['PORTAGE_FETCH_RESUME_MIN_SIZE'] = '2'
for k in settings['AA'].split():
file_path = os.path.join(settings['DISTDIR'], k)
os.unlink(file_path)
with open(file_path + _download_suffix, 'wb') as f:
f.write(distfiles[k][:2])
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
for k in settings['AA'].split():
with open(os.path.join(settings['DISTDIR'], k), 'rb') as f:
self.assertEqual(f.read(), distfiles[k])
finally:
settings['PORTAGE_FETCH_RESUME_MIN_SIZE'] = orig_resume_min_size
# Test readonly DISTDIR + skiprocheck, with FETCHCOMMAND set to temporarily chmod DISTDIR
orig_fetchcommand = settings['FETCHCOMMAND']
orig_distdir_mode = os.stat(settings['DISTDIR']).st_mode
for k in settings['AA'].split():
os.unlink(os.path.join(settings['DISTDIR'], k))
try:
os.chmod(settings['DISTDIR'], 0o555)
settings['FETCHCOMMAND'] = '"%s" -c "chmod ug+w \\"${DISTDIR}\\"; %s; status=\\$?; chmod a-w \\"${DISTDIR}\\"; exit \\$status"' % (BASH_BINARY, orig_fetchcommand.replace('"', '\\"'))
settings.features.add('skiprocheck')
settings.features.remove('distlocks')
self.assertEqual(loop.run_until_complete(async_fetch(pkg, ebuild_path)), 0)
finally:
settings['FETCHCOMMAND'] = orig_fetchcommand
os.chmod(settings['DISTDIR'], orig_distdir_mode)
settings.features.remove('skiprocheck')
settings.features.add('distlocks')
async def _test_content_db(
self, emirrordist_cmd, env, layouts, content_db, distfiles, settings, portdb
):
# Simulate distfile digest change for ContentDB.
emdisopts = types.SimpleNamespace(
content_db=content_db, distfiles=settings["DISTDIR"]
)
with EmirrordistConfig(
emdisopts, portdb, asyncio.get_event_loop()
) as emdisconf:
# Copy revisions from bar to foo.
for revision_key in emdisconf.content_db["filename:{}".format("bar")]:
emdisconf.content_db.add(
DistfileName("foo", digests=dict(revision_key))
)
# Copy revisions from foo to bar.
for revision_key in emdisconf.content_db["filename:{}".format("foo")]:
emdisconf.content_db.add(
DistfileName("bar", digests=dict(revision_key))
)
content_db_state = dict(emdisconf.content_db.items())
self.assertEqual(content_db_state, dict(emdisconf.content_db.items()))
self.assertEqual(
[
k[len("filename:") :]
for k in content_db_state
if k.startswith("filename:")
],
["bar", "foo"],
)
self.assertEqual(
content_db_state["filename:foo"], content_db_state["filename:bar"]
)
self.assertEqual(len(content_db_state["filename:foo"]), 2)
for k in distfiles:
try:
os.unlink(os.path.join(settings["DISTDIR"], k))
except OSError:
pass
proc = await asyncio.create_subprocess_exec(*emirrordist_cmd, env=env)
self.assertEqual(await proc.wait(), 0)
for k in distfiles:
with open(
os.path.join(settings["DISTDIR"], layouts[0].get_path(k)), "rb"
) as f:
self.assertEqual(f.read(), distfiles[k])
with EmirrordistConfig(
emdisopts, portdb, asyncio.get_event_loop()
) as emdisconf:
self.assertEqual(content_db_state, dict(emdisconf.content_db.items()))
# Verify that remove works as expected
filename = [filename for filename in distfiles if filename == "foo"][0]
self.assertTrue(bool(filename.digests))
emdisconf.content_db.remove(filename)
# foo should still have a content revision corresponding to bar's content.
self.assertEqual(
[
k[len("filename:") :]
for k in emdisconf.content_db
if k.startswith("filename:")
],
["bar", "foo"],
)
self.assertEqual(len(emdisconf.content_db["filename:foo"]), 1)
self.assertEqual(
len(
[
revision_key
for revision_key in emdisconf.content_db["filename:foo"]
if not filename.digests_equal(
DistfileName(
"foo",
digests=dict(revision_key),
)
)
]
),
1,
)
# bar should still have a content revision corresponding to foo's content.
self.assertEqual(len(emdisconf.content_db["filename:bar"]), 2)
self.assertEqual(
len(
[
revision_key
for revision_key in emdisconf.content_db["filename:bar"]
if filename.digests_equal(
DistfileName(
"bar",
digests=dict(revision_key),
)
)
]
),
1,
)
# remove the foo which refers to bar's content
bar = [filename for filename in distfiles if filename == "bar"][0]
foo_remaining = DistfileName("foo", digests=bar.digests)
emdisconf.content_db.remove(foo_remaining)
self.assertEqual(
[
k[len("filename:") :]
for k in emdisconf.content_db
if k.startswith("filename:")
],
["bar"],
)
self.assertRaises(KeyError, emdisconf.content_db.__getitem__, "filename:foo")
# bar should still have a content revision corresponding to foo's content.
self.assertEqual(len(emdisconf.content_db["filename:bar"]), 2)
def test_flat_layout(self):
self.assertTrue(FlatLayout.verify_args(('flat',)))
self.assertFalse(FlatLayout.verify_args(('flat', 'extraneous-arg')))
self.assertEqual(FlatLayout().get_path('foo-1.tar.gz'), 'foo-1.tar.gz')
def test_filename_hash_layout(self):
self.assertFalse(FilenameHashLayout.verify_args(('filename-hash',)))
self.assertTrue(FilenameHashLayout.verify_args(('filename-hash', 'SHA1', '8')))
self.assertFalse(FilenameHashLayout.verify_args(('filename-hash', 'INVALID-HASH', '8')))
self.assertTrue(FilenameHashLayout.verify_args(('filename-hash', 'SHA1', '4:8:12')))
self.assertFalse(FilenameHashLayout.verify_args(('filename-hash', 'SHA1', '3')))
self.assertFalse(FilenameHashLayout.verify_args(('filename-hash', 'SHA1', 'junk')))
self.assertFalse(FilenameHashLayout.verify_args(('filename-hash', 'SHA1', '4:8:junk')))
self.assertEqual(FilenameHashLayout('SHA1', '4').get_path('foo-1.tar.gz'),
'1/foo-1.tar.gz')
self.assertEqual(FilenameHashLayout('SHA1', '8').get_path('foo-1.tar.gz'),
'19/foo-1.tar.gz')
self.assertEqual(FilenameHashLayout('SHA1', '8:16').get_path('foo-1.tar.gz'),
'19/c3b6/foo-1.tar.gz')
self.assertEqual(FilenameHashLayout('SHA1', '8:16:24').get_path('foo-1.tar.gz'),
'19/c3b6/37a94b/foo-1.tar.gz')
def test_content_hash_layout(self):
self.assertFalse(ContentHashLayout.verify_args(('content-hash',)))
self.assertTrue(ContentHashLayout.verify_args(('content-hash', 'SHA1', '8')))
self.assertFalse(ContentHashLayout.verify_args(('content-hash', 'INVALID-HASH', '8')))
self.assertTrue(ContentHashLayout.verify_args(('content-hash', 'SHA1', '4:8:12')))
self.assertFalse(ContentHashLayout.verify_args(('content-hash', 'SHA1', '3')))
self.assertFalse(ContentHashLayout.verify_args(('content-hash', 'SHA1', 'junk')))
self.assertFalse(ContentHashLayout.verify_args(('content-hash', 'SHA1', '4:8:junk')))
filename = DistfileName(
'foo-1.tar.gz',
digests=dict((algo, checksum_str(b'', hashname=algo)) for algo in MANIFEST2_HASH_DEFAULTS),
)
# Raise KeyError for a hash algorithm SHA1 which is not in MANIFEST2_HASH_DEFAULTS.
self.assertRaises(KeyError, ContentHashLayout('SHA1', '4').get_path, filename)
# Raise AttributeError for a plain string argument.
self.assertRaises(AttributeError, ContentHashLayout('SHA512', '4').get_path, str(filename))
self.assertEqual(ContentHashLayout('SHA512', '4').get_path(filename),
'c/cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
self.assertEqual(ContentHashLayout('SHA512', '8').get_path(filename),
'cf/cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
self.assertEqual(ContentHashLayout('SHA512', '8:16').get_path(filename),
'cf/83e1/cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
self.assertEqual(ContentHashLayout('SHA512', '8:16:24').get_path(filename),
'cf/83e1/357eef/cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e')
def test_mirror_layout_config(self):
mlc = MirrorLayoutConfig()
self.assertEqual(mlc.serialize(), ())
self.assertIsInstance(mlc.get_best_supported_layout(), FlatLayout)
conf = '''
[structure]
0=flat
'''
mlc.read_from_file(io.StringIO(conf))
self.assertEqual(mlc.serialize(), (('flat',),))
self.assertIsInstance(mlc.get_best_supported_layout(), FlatLayout)
self.assertEqual(mlc.get_best_supported_layout().get_path('foo-1.tar.gz'),
'foo-1.tar.gz')
conf = '''
[structure]
0=filename-hash SHA1 8:16
1=flat
'''
mlc.read_from_file(io.StringIO(conf))
self.assertEqual(mlc.serialize(), (
('filename-hash', 'SHA1', '8:16'),
('flat',)
))
self.assertIsInstance(mlc.get_best_supported_layout(), FilenameHashLayout)
self.assertEqual(mlc.get_best_supported_layout().get_path('foo-1.tar.gz'),
'19/c3b6/foo-1.tar.gz')
serialized = mlc.serialize()
# test fallback
conf = '''
[structure]
0=filename-hash INVALID-HASH 8:16
1=filename-hash SHA1 32
2=flat
'''
mlc.read_from_file(io.StringIO(conf))
self.assertEqual(mlc.serialize(), (
('filename-hash', 'INVALID-HASH', '8:16'),
('filename-hash', 'SHA1', '32'),
('flat',)
))
self.assertIsInstance(mlc.get_best_supported_layout(), FilenameHashLayout)
self.assertEqual(mlc.get_best_supported_layout().get_path('foo-1.tar.gz'),
'19c3b637/foo-1.tar.gz')
# test deserialization
mlc.deserialize(serialized)
self.assertEqual(mlc.serialize(), (
('filename-hash', 'SHA1', '8:16'),
('flat',)
))
self.assertIsInstance(mlc.get_best_supported_layout(), FilenameHashLayout)
self.assertEqual(mlc.get_best_supported_layout().get_path('foo-1.tar.gz'),
'19/c3b6/foo-1.tar.gz')
# test erraneous input
conf = '''
[#(*DA*&*F
[structure]
0=filename-hash SHA1 32
'''
self.assertRaises(ConfigParserError, mlc.read_from_file,
io.StringIO(conf))
def test_filename_hash_layout_get_filenames(self):
filename = DistfileName(
'foo-1.tar.gz',
digests=dict((algo, checksum_str(b'', hashname=algo)) for algo in MANIFEST2_HASH_DEFAULTS),
)
layouts = (
FlatLayout(),
FilenameHashLayout('SHA1', '4'),
FilenameHashLayout('SHA1', '8'),
FilenameHashLayout('SHA1', '8:16'),
FilenameHashLayout('SHA1', '8:16:24'),
ContentHashLayout('SHA512', '8:8:8'),
)
for layout in layouts:
distdir = tempfile.mkdtemp()
try:
path = os.path.join(distdir, layout.get_path(filename))
try:
os.makedirs(os.path.dirname(path))
except OSError:
pass
with open(path, 'wb') as f:
pass
file_list = list(layout.get_filenames(distdir))
self.assertTrue(len(file_list) > 0)
for filename_result in file_list:
if isinstance(filename_result, DistfileName):
self.assertTrue(filename_result.digests_equal(filename))
else:
self.assertEqual(filename_result, str(filename))
finally:
shutil.rmtree(distdir)