You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

compat.py 9.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. """Stuff that differs in different Python versions and platform
  2. distributions."""
  3. # The following comment should be removed at some point in the future.
  4. # mypy: disallow-untyped-defs=False
  5. from __future__ import absolute_import, division
  6. import codecs
  7. import locale
  8. import logging
  9. import os
  10. import shutil
  11. import sys
  12. from pip._vendor.six import PY2, text_type
  13. from pip._vendor.urllib3.util import IS_PYOPENSSL
  14. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  15. if MYPY_CHECK_RUNNING:
  16. from typing import Optional, Text, Tuple, Union
  17. try:
  18. import _ssl # noqa
  19. except ImportError:
  20. ssl = None
  21. else:
  22. # This additional assignment was needed to prevent a mypy error.
  23. ssl = _ssl
  24. try:
  25. import ipaddress
  26. except ImportError:
  27. try:
  28. from pip._vendor import ipaddress # type: ignore
  29. except ImportError:
  30. import ipaddr as ipaddress # type: ignore
  31. ipaddress.ip_address = ipaddress.IPAddress # type: ignore
  32. ipaddress.ip_network = ipaddress.IPNetwork # type: ignore
  33. __all__ = [
  34. "ipaddress", "uses_pycache", "console_to_str", "native_str",
  35. "get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size",
  36. "get_extension_suffixes",
  37. ]
  38. logger = logging.getLogger(__name__)
  39. HAS_TLS = (ssl is not None) or IS_PYOPENSSL
  40. if PY2:
  41. import imp
  42. try:
  43. cache_from_source = imp.cache_from_source # type: ignore
  44. except AttributeError:
  45. # does not use __pycache__
  46. cache_from_source = None
  47. uses_pycache = cache_from_source is not None
  48. else:
  49. uses_pycache = True
  50. from importlib.util import cache_from_source
  51. if PY2:
  52. # In Python 2.7, backslashreplace exists
  53. # but does not support use for decoding.
  54. # We implement our own replace handler for this
  55. # situation, so that we can consistently use
  56. # backslash replacement for all versions.
  57. def backslashreplace_decode_fn(err):
  58. raw_bytes = (err.object[i] for i in range(err.start, err.end))
  59. # Python 2 gave us characters - convert to numeric bytes
  60. raw_bytes = (ord(b) for b in raw_bytes)
  61. return u"".join(u"\\x%x" % c for c in raw_bytes), err.end
  62. codecs.register_error(
  63. "backslashreplace_decode",
  64. backslashreplace_decode_fn,
  65. )
  66. backslashreplace_decode = "backslashreplace_decode"
  67. else:
  68. backslashreplace_decode = "backslashreplace"
  69. def str_to_display(data, desc=None):
  70. # type: (Union[bytes, Text], Optional[str]) -> Text
  71. """
  72. For display or logging purposes, convert a bytes object (or text) to
  73. text (e.g. unicode in Python 2) safe for output.
  74. :param desc: An optional phrase describing the input data, for use in
  75. the log message if a warning is logged. Defaults to "Bytes object".
  76. This function should never error out and so can take a best effort
  77. approach. It is okay to be lossy if needed since the return value is
  78. just for display.
  79. We assume the data is in the locale preferred encoding. If it won't
  80. decode properly, we warn the user but decode as best we can.
  81. We also ensure that the output can be safely written to standard output
  82. without encoding errors.
  83. """
  84. if isinstance(data, text_type):
  85. return data
  86. # Otherwise, data is a bytes object (str in Python 2).
  87. # First, get the encoding we assume. This is the preferred
  88. # encoding for the locale, unless that is not found, or
  89. # it is ASCII, in which case assume UTF-8
  90. encoding = locale.getpreferredencoding()
  91. if (not encoding) or codecs.lookup(encoding).name == "ascii":
  92. encoding = "utf-8"
  93. # Now try to decode the data - if we fail, warn the user and
  94. # decode with replacement.
  95. try:
  96. decoded_data = data.decode(encoding)
  97. except UnicodeDecodeError:
  98. if desc is None:
  99. desc = 'Bytes object'
  100. msg_format = '{} does not appear to be encoded as %s'.format(desc)
  101. logger.warning(msg_format, encoding)
  102. decoded_data = data.decode(encoding, errors=backslashreplace_decode)
  103. # Make sure we can print the output, by encoding it to the output
  104. # encoding with replacement of unencodable characters, and then
  105. # decoding again.
  106. # We use stderr's encoding because it's less likely to be
  107. # redirected and if we don't find an encoding we skip this
  108. # step (on the assumption that output is wrapped by something
  109. # that won't fail).
  110. # The double getattr is to deal with the possibility that we're
  111. # being called in a situation where sys.__stderr__ doesn't exist,
  112. # or doesn't have an encoding attribute. Neither of these cases
  113. # should occur in normal pip use, but there's no harm in checking
  114. # in case people use pip in (unsupported) unusual situations.
  115. output_encoding = getattr(getattr(sys, "__stderr__", None),
  116. "encoding", None)
  117. if output_encoding:
  118. output_encoded = decoded_data.encode(
  119. output_encoding,
  120. errors="backslashreplace"
  121. )
  122. decoded_data = output_encoded.decode(output_encoding)
  123. return decoded_data
  124. def console_to_str(data):
  125. # type: (bytes) -> Text
  126. """Return a string, safe for output, of subprocess output.
  127. """
  128. return str_to_display(data, desc='Subprocess output')
  129. if PY2:
  130. def native_str(s, replace=False):
  131. # type: (str, bool) -> str
  132. # Replace is ignored -- unicode to UTF-8 can't fail
  133. if isinstance(s, text_type):
  134. return s.encode('utf-8')
  135. return s
  136. else:
  137. def native_str(s, replace=False):
  138. # type: (str, bool) -> str
  139. if isinstance(s, bytes):
  140. return s.decode('utf-8', 'replace' if replace else 'strict')
  141. return s
  142. def get_path_uid(path):
  143. # type: (str) -> int
  144. """
  145. Return path's uid.
  146. Does not follow symlinks:
  147. https://github.com/pypa/pip/pull/935#discussion_r5307003
  148. Placed this function in compat due to differences on AIX and
  149. Jython, that should eventually go away.
  150. :raises OSError: When path is a symlink or can't be read.
  151. """
  152. if hasattr(os, 'O_NOFOLLOW'):
  153. fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
  154. file_uid = os.fstat(fd).st_uid
  155. os.close(fd)
  156. else: # AIX and Jython
  157. # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
  158. if not os.path.islink(path):
  159. # older versions of Jython don't have `os.fstat`
  160. file_uid = os.stat(path).st_uid
  161. else:
  162. # raise OSError for parity with os.O_NOFOLLOW above
  163. raise OSError(
  164. "%s is a symlink; Will not return uid for symlinks" % path
  165. )
  166. return file_uid
  167. if PY2:
  168. from imp import get_suffixes
  169. def get_extension_suffixes():
  170. return [suffix[0] for suffix in get_suffixes()]
  171. else:
  172. from importlib.machinery import EXTENSION_SUFFIXES
  173. def get_extension_suffixes():
  174. return EXTENSION_SUFFIXES
  175. def expanduser(path):
  176. # type: (str) -> str
  177. """
  178. Expand ~ and ~user constructions.
  179. Includes a workaround for https://bugs.python.org/issue14768
  180. """
  181. expanded = os.path.expanduser(path)
  182. if path.startswith('~/') and expanded.startswith('//'):
  183. expanded = expanded[1:]
  184. return expanded
  185. # packages in the stdlib that may have installation metadata, but should not be
  186. # considered 'installed'. this theoretically could be determined based on
  187. # dist.location (py27:`sysconfig.get_paths()['stdlib']`,
  188. # py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may
  189. # make this ineffective, so hard-coding
  190. stdlib_pkgs = {"python", "wsgiref", "argparse"}
  191. # windows detection, covers cpython and ironpython
  192. WINDOWS = (sys.platform.startswith("win") or
  193. (sys.platform == 'cli' and os.name == 'nt'))
  194. def samefile(file1, file2):
  195. # type: (str, str) -> bool
  196. """Provide an alternative for os.path.samefile on Windows/Python2"""
  197. if hasattr(os.path, 'samefile'):
  198. return os.path.samefile(file1, file2)
  199. else:
  200. path1 = os.path.normcase(os.path.abspath(file1))
  201. path2 = os.path.normcase(os.path.abspath(file2))
  202. return path1 == path2
  203. if hasattr(shutil, 'get_terminal_size'):
  204. def get_terminal_size():
  205. # type: () -> Tuple[int, int]
  206. """
  207. Returns a tuple (x, y) representing the width(x) and the height(y)
  208. in characters of the terminal window.
  209. """
  210. return tuple(shutil.get_terminal_size()) # type: ignore
  211. else:
  212. def get_terminal_size():
  213. # type: () -> Tuple[int, int]
  214. """
  215. Returns a tuple (x, y) representing the width(x) and the height(y)
  216. in characters of the terminal window.
  217. """
  218. def ioctl_GWINSZ(fd):
  219. try:
  220. import fcntl
  221. import termios
  222. import struct
  223. cr = struct.unpack_from(
  224. 'hh',
  225. fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678')
  226. )
  227. except Exception:
  228. return None
  229. if cr == (0, 0):
  230. return None
  231. return cr
  232. cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
  233. if not cr:
  234. try:
  235. fd = os.open(os.ctermid(), os.O_RDONLY)
  236. cr = ioctl_GWINSZ(fd)
  237. os.close(fd)
  238. except Exception:
  239. pass
  240. if not cr:
  241. cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
  242. return int(cr[1]), int(cr[0])