You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

archive_util.py 6.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """Utilities for extracting common archive formats"""
  2. import zipfile
  3. import tarfile
  4. import os
  5. import shutil
  6. import posixpath
  7. import contextlib
  8. from distutils.errors import DistutilsError
  9. from pkg_resources import ensure_directory
  10. __all__ = [
  11. "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
  12. "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
  13. ]
  14. class UnrecognizedFormat(DistutilsError):
  15. """Couldn't recognize the archive type"""
  16. def default_filter(src, dst):
  17. """The default progress/filter callback; returns True for all files"""
  18. return dst
  19. def unpack_archive(filename, extract_dir, progress_filter=default_filter,
  20. drivers=None):
  21. """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
  22. `progress_filter` is a function taking two arguments: a source path
  23. internal to the archive ('/'-separated), and a filesystem path where it
  24. will be extracted. The callback must return the desired extract path
  25. (which may be the same as the one passed in), or else ``None`` to skip
  26. that file or directory. The callback can thus be used to report on the
  27. progress of the extraction, as well as to filter the items extracted or
  28. alter their extraction paths.
  29. `drivers`, if supplied, must be a non-empty sequence of functions with the
  30. same signature as this function (minus the `drivers` argument), that raise
  31. ``UnrecognizedFormat`` if they do not support extracting the designated
  32. archive type. The `drivers` are tried in sequence until one is found that
  33. does not raise an error, or until all are exhausted (in which case
  34. ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
  35. drivers, the module's ``extraction_drivers`` constant will be used, which
  36. means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
  37. order.
  38. """
  39. for driver in drivers or extraction_drivers:
  40. try:
  41. driver(filename, extract_dir, progress_filter)
  42. except UnrecognizedFormat:
  43. continue
  44. else:
  45. return
  46. else:
  47. raise UnrecognizedFormat(
  48. "Not a recognized archive type: %s" % filename
  49. )
  50. def unpack_directory(filename, extract_dir, progress_filter=default_filter):
  51. """"Unpack" a directory, using the same interface as for archives
  52. Raises ``UnrecognizedFormat`` if `filename` is not a directory
  53. """
  54. if not os.path.isdir(filename):
  55. raise UnrecognizedFormat("%s is not a directory" % filename)
  56. paths = {
  57. filename: ('', extract_dir),
  58. }
  59. for base, dirs, files in os.walk(filename):
  60. src, dst = paths[base]
  61. for d in dirs:
  62. paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
  63. for f in files:
  64. target = os.path.join(dst, f)
  65. target = progress_filter(src + f, target)
  66. if not target:
  67. # skip non-files
  68. continue
  69. ensure_directory(target)
  70. f = os.path.join(base, f)
  71. shutil.copyfile(f, target)
  72. shutil.copystat(f, target)
  73. def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
  74. """Unpack zip `filename` to `extract_dir`
  75. Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
  76. by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
  77. of the `progress_filter` argument.
  78. """
  79. if not zipfile.is_zipfile(filename):
  80. raise UnrecognizedFormat("%s is not a zip file" % (filename,))
  81. with zipfile.ZipFile(filename) as z:
  82. for info in z.infolist():
  83. name = info.filename
  84. # don't extract absolute paths or ones with .. in them
  85. if name.startswith('/') or '..' in name.split('/'):
  86. continue
  87. target = os.path.join(extract_dir, *name.split('/'))
  88. target = progress_filter(name, target)
  89. if not target:
  90. continue
  91. if name.endswith('/'):
  92. # directory
  93. ensure_directory(target)
  94. else:
  95. # file
  96. ensure_directory(target)
  97. data = z.read(info.filename)
  98. with open(target, 'wb') as f:
  99. f.write(data)
  100. unix_attributes = info.external_attr >> 16
  101. if unix_attributes:
  102. os.chmod(target, unix_attributes)
  103. def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
  104. """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
  105. Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
  106. by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
  107. of the `progress_filter` argument.
  108. """
  109. try:
  110. tarobj = tarfile.open(filename)
  111. except tarfile.TarError:
  112. raise UnrecognizedFormat(
  113. "%s is not a compressed or uncompressed tar file" % (filename,)
  114. )
  115. with contextlib.closing(tarobj):
  116. # don't do any chowning!
  117. tarobj.chown = lambda *args: None
  118. for member in tarobj:
  119. name = member.name
  120. # don't extract absolute paths or ones with .. in them
  121. if not name.startswith('/') and '..' not in name.split('/'):
  122. prelim_dst = os.path.join(extract_dir, *name.split('/'))
  123. # resolve any links and to extract the link targets as normal
  124. # files
  125. while member is not None and (member.islnk() or member.issym()):
  126. linkpath = member.linkname
  127. if member.issym():
  128. base = posixpath.dirname(member.name)
  129. linkpath = posixpath.join(base, linkpath)
  130. linkpath = posixpath.normpath(linkpath)
  131. member = tarobj._getmember(linkpath)
  132. if member is not None and (member.isfile() or member.isdir()):
  133. final_dst = progress_filter(name, prelim_dst)
  134. if final_dst:
  135. if final_dst.endswith(os.sep):
  136. final_dst = final_dst[:-1]
  137. try:
  138. # XXX Ugh
  139. tarobj._extract_member(member, final_dst)
  140. except tarfile.ExtractError:
  141. # chown/chmod/mkfifo/mknode/makedev failed
  142. pass
  143. return True
  144. extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile