|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- """Utilities for extracting common archive formats"""
-
- import zipfile
- import tarfile
- import os
- import shutil
- import posixpath
- import contextlib
- from distutils.errors import DistutilsError
-
- from pkg_resources import ensure_directory
-
- __all__ = [
- "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
- "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
- ]
-
-
- class UnrecognizedFormat(DistutilsError):
- """Couldn't recognize the archive type"""
-
-
- def default_filter(src, dst):
- """The default progress/filter callback; returns True for all files"""
- return dst
-
-
- def unpack_archive(
- filename, extract_dir, progress_filter=default_filter,
- drivers=None):
- """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
-
- `progress_filter` is a function taking two arguments: a source path
- internal to the archive ('/'-separated), and a filesystem path where it
- will be extracted. The callback must return the desired extract path
- (which may be the same as the one passed in), or else ``None`` to skip
- that file or directory. The callback can thus be used to report on the
- progress of the extraction, as well as to filter the items extracted or
- alter their extraction paths.
-
- `drivers`, if supplied, must be a non-empty sequence of functions with the
- same signature as this function (minus the `drivers` argument), that raise
- ``UnrecognizedFormat`` if they do not support extracting the designated
- archive type. The `drivers` are tried in sequence until one is found that
- does not raise an error, or until all are exhausted (in which case
- ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
- drivers, the module's ``extraction_drivers`` constant will be used, which
- means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
- order.
- """
- for driver in drivers or extraction_drivers:
- try:
- driver(filename, extract_dir, progress_filter)
- except UnrecognizedFormat:
- continue
- else:
- return
- else:
- raise UnrecognizedFormat(
- "Not a recognized archive type: %s" % filename
- )
-
-
- def unpack_directory(filename, extract_dir, progress_filter=default_filter):
- """"Unpack" a directory, using the same interface as for archives
-
- Raises ``UnrecognizedFormat`` if `filename` is not a directory
- """
- if not os.path.isdir(filename):
- raise UnrecognizedFormat("%s is not a directory" % filename)
-
- paths = {
- filename: ('', extract_dir),
- }
- for base, dirs, files in os.walk(filename):
- src, dst = paths[base]
- for d in dirs:
- paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
- for f in files:
- target = os.path.join(dst, f)
- target = progress_filter(src + f, target)
- if not target:
- # skip non-files
- continue
- ensure_directory(target)
- f = os.path.join(base, f)
- shutil.copyfile(f, target)
- shutil.copystat(f, target)
-
-
- def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
- """Unpack zip `filename` to `extract_dir`
-
- Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
- by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
- of the `progress_filter` argument.
- """
-
- if not zipfile.is_zipfile(filename):
- raise UnrecognizedFormat("%s is not a zip file" % (filename,))
-
- with zipfile.ZipFile(filename) as z:
- for info in z.infolist():
- name = info.filename
-
- # don't extract absolute paths or ones with .. in them
- if name.startswith('/') or '..' in name.split('/'):
- continue
-
- target = os.path.join(extract_dir, *name.split('/'))
- target = progress_filter(name, target)
- if not target:
- continue
- if name.endswith('/'):
- # directory
- ensure_directory(target)
- else:
- # file
- ensure_directory(target)
- data = z.read(info.filename)
- with open(target, 'wb') as f:
- f.write(data)
- unix_attributes = info.external_attr >> 16
- if unix_attributes:
- os.chmod(target, unix_attributes)
-
-
- def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
- """Resolve any links and extract link targets as normal files."""
- while tar_member_obj is not None and (
- tar_member_obj.islnk() or tar_member_obj.issym()):
- linkpath = tar_member_obj.linkname
- if tar_member_obj.issym():
- base = posixpath.dirname(tar_member_obj.name)
- linkpath = posixpath.join(base, linkpath)
- linkpath = posixpath.normpath(linkpath)
- tar_member_obj = tar_obj._getmember(linkpath)
-
- is_file_or_dir = (
- tar_member_obj is not None and
- (tar_member_obj.isfile() or tar_member_obj.isdir())
- )
- if is_file_or_dir:
- return tar_member_obj
-
- raise LookupError('Got unknown file type')
-
-
- def _iter_open_tar(tar_obj, extract_dir, progress_filter):
- """Emit member-destination pairs from a tar archive."""
- # don't do any chowning!
- tar_obj.chown = lambda *args: None
-
- with contextlib.closing(tar_obj):
- for member in tar_obj:
- name = member.name
- # don't extract absolute paths or ones with .. in them
- if name.startswith('/') or '..' in name.split('/'):
- continue
-
- prelim_dst = os.path.join(extract_dir, *name.split('/'))
-
- try:
- member = _resolve_tar_file_or_dir(tar_obj, member)
- except LookupError:
- continue
-
- final_dst = progress_filter(name, prelim_dst)
- if not final_dst:
- continue
-
- if final_dst.endswith(os.sep):
- final_dst = final_dst[:-1]
-
- yield member, final_dst
-
-
- def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
- """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
-
- Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
- by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
- of the `progress_filter` argument.
- """
- try:
- tarobj = tarfile.open(filename)
- except tarfile.TarError as e:
- raise UnrecognizedFormat(
- "%s is not a compressed or uncompressed tar file" % (filename,)
- ) from e
-
- for member, final_dst in _iter_open_tar(
- tarobj, extract_dir, progress_filter,
- ):
- try:
- # XXX Ugh
- tarobj._extract_member(member, final_dst)
- except tarfile.ExtractError:
- # chown/chmod/mkfifo/mknode/makedev failed
- pass
-
- return True
-
-
- extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
|