blob: 90880b1cd240cc0a39a781c784d59db22c31f61a [file] [log] [blame]
# Copyright 2015 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import errno
import re
import sys
if sys.hexversion >= 0x3000000:
basestring = str
from portage import _encodings, _unicode_encode
from portage.exception import FileNotFound, PermissionDenied
_compressors = {
"bzip2": {
"compress": "${PORTAGE_BZIP2_COMMAND} ${BINPKG_COMPRESS_FLAGS}",
"decompress": "${PORTAGE_BUNZIP2_COMMAND}",
"decompress_alt": "${PORTAGE_BZIP2_COMMAND} -d",
"package": "app-arch/bzip2",
},
"gzip": {
"compress": "gzip ${BINPKG_COMPRESS_FLAGS}",
"decompress": "gzip -d",
"package": "app-arch/gzip",
},
"lz4": {
"compress": "lz4 ${BINPKG_COMPRESS_FLAGS}",
"decompress": "lz4 -d",
"package": "app-arch/lz4",
},
"lzip": {
"compress": "lzip ${BINPKG_COMPRESS_FLAGS}",
"decompress": "lzip -d",
"package": "app-arch/lzip",
},
"lzop": {
"compress": "lzop ${BINPKG_COMPRESS_FLAGS}",
"decompress": "lzop -d",
"package": "app-arch/lzop",
},
"xz": {
"compress": "xz ${BINPKG_COMPRESS_FLAGS}",
"decompress": "xz -d",
"package": "app-arch/xz-utils",
},
"zstd": {
"compress": "zstd ${BINPKG_COMPRESS_FLAGS}",
"decompress": "zstd -d --long=31",
"package": "app-arch/zstd",
},
}
_compression_re = re.compile(b'^(' +
b'(?P<bzip2>\x42\x5a\x68\x39)|' +
b'(?P<gzip>\x1f\x8b)|' +
b'(?P<lz4>(?:\x04\x22\x4d\x18|\x02\x21\x4c\x18))|' +
b'(?P<lzip>LZIP)|' +
b'(?P<lzop>\x89LZO\x00\x0d\x0a\x1a\x0a)|' +
b'(?P<xz>\xfd\x37\x7a\x58\x5a\x00)|' +
b'(?P<zstd>([\x22-\x28]\xb5\x2f\xfd)))')
_max_compression_re_len = 9
def compression_probe(f):
"""
Identify the compression type of a file. Returns one of the
following identifier strings:
bzip2
gzip
lz4
lzip
lzop
xz
zstd
@param f: a file path, or file-like object
@type f: str or file
@return: a string identifying the compression type, or None if the
compression type is unrecognized
@rtype str or None
"""
open_file = isinstance(f, basestring)
if open_file:
try:
f = open(_unicode_encode(f,
encoding=_encodings['fs'], errors='strict'), mode='rb')
except IOError as e:
if e.errno == PermissionDenied.errno:
raise PermissionDenied(f)
elif e.errno in (errno.ENOENT, errno.ESTALE):
raise FileNotFound(f)
else:
raise
try:
return _compression_probe_file(f)
finally:
if open_file:
f.close()
def _compression_probe_file(f):
m = _compression_re.match(f.read(_max_compression_re_len))
if m is not None:
for k, v in m.groupdict().items():
if v is not None:
return k
return None