|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990 |
- """Routines related to PyPI, indexes"""
- from __future__ import absolute_import
-
- import cgi
- import itertools
- import logging
- import mimetypes
- import os
- import posixpath
- import re
- import sys
- from collections import namedtuple
-
- from pip._vendor import html5lib, requests, six
- from pip._vendor.distlib.compat import unescape
- from pip._vendor.packaging import specifiers
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.packaging.version import parse as parse_version
- from pip._vendor.requests.exceptions import RetryError, SSLError
- from pip._vendor.six.moves.urllib import parse as urllib_parse
- from pip._vendor.six.moves.urllib import request as urllib_request
-
- from pip._internal.download import HAS_TLS, is_url, path_to_url, url_to_path
- from pip._internal.exceptions import (
- BestVersionAlreadyInstalled, DistributionNotFound, InvalidWheelFilename,
- UnsupportedWheel,
- )
- from pip._internal.models.candidate import InstallationCandidate
- from pip._internal.models.format_control import FormatControl
- from pip._internal.models.index import PyPI
- from pip._internal.models.link import Link
- from pip._internal.pep425tags import get_supported
- from pip._internal.utils.compat import ipaddress
- from pip._internal.utils.logging import indent_log
- from pip._internal.utils.misc import (
- ARCHIVE_EXTENSIONS, SUPPORTED_EXTENSIONS, WHEEL_EXTENSION, normalize_path,
- redact_password_from_url,
- )
- from pip._internal.utils.packaging import check_requires_python
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- from pip._internal.wheel import Wheel
-
- if MYPY_CHECK_RUNNING:
- from logging import Logger # noqa: F401
- from typing import ( # noqa: F401
- Tuple, Optional, Any, List, Union, Callable, Set, Sequence,
- Iterable, MutableMapping
- )
- from pip._vendor.packaging.version import _BaseVersion # noqa: F401
- from pip._vendor.requests import Response # noqa: F401
- from pip._internal.req import InstallRequirement # noqa: F401
- from pip._internal.download import PipSession # noqa: F401
-
- SecureOrigin = Tuple[str, str, Optional[str]]
- BuildTag = Tuple[Any, ...] # either emply tuple or Tuple[int, str]
- CandidateSortingKey = Tuple[int, _BaseVersion, BuildTag, Optional[int]]
-
- __all__ = ['FormatControl', 'PackageFinder']
-
-
- SECURE_ORIGINS = [
- # protocol, hostname, port
- # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
- ("https", "*", "*"),
- ("*", "localhost", "*"),
- ("*", "127.0.0.0/8", "*"),
- ("*", "::1/128", "*"),
- ("file", "*", None),
- # ssh is always secure.
- ("ssh", "*", "*"),
- ] # type: List[SecureOrigin]
-
-
- logger = logging.getLogger(__name__)
-
-
- def _match_vcs_scheme(url):
- # type: (str) -> Optional[str]
- """Look for VCS schemes in the URL.
-
- Returns the matched VCS scheme, or None if there's no match.
- """
- from pip._internal.vcs import VcsSupport
- for scheme in VcsSupport.schemes:
- if url.lower().startswith(scheme) and url[len(scheme)] in '+:':
- return scheme
- return None
-
-
- def _is_url_like_archive(url):
- # type: (str) -> bool
- """Return whether the URL looks like an archive.
- """
- filename = Link(url).filename
- for bad_ext in ARCHIVE_EXTENSIONS:
- if filename.endswith(bad_ext):
- return True
- return False
-
-
- class _NotHTML(Exception):
- def __init__(self, content_type, request_desc):
- # type: (str, str) -> None
- super(_NotHTML, self).__init__(content_type, request_desc)
- self.content_type = content_type
- self.request_desc = request_desc
-
-
- def _ensure_html_header(response):
- # type: (Response) -> None
- """Check the Content-Type header to ensure the response contains HTML.
-
- Raises `_NotHTML` if the content type is not text/html.
- """
- content_type = response.headers.get("Content-Type", "")
- if not content_type.lower().startswith("text/html"):
- raise _NotHTML(content_type, response.request.method)
-
-
- class _NotHTTP(Exception):
- pass
-
-
- def _ensure_html_response(url, session):
- # type: (str, PipSession) -> None
- """Send a HEAD request to the URL, and ensure the response contains HTML.
-
- Raises `_NotHTTP` if the URL is not available for a HEAD request, or
- `_NotHTML` if the content type is not text/html.
- """
- scheme, netloc, path, query, fragment = urllib_parse.urlsplit(url)
- if scheme not in {'http', 'https'}:
- raise _NotHTTP()
-
- resp = session.head(url, allow_redirects=True)
- resp.raise_for_status()
-
- _ensure_html_header(resp)
-
-
- def _get_html_response(url, session):
- # type: (str, PipSession) -> Response
- """Access an HTML page with GET, and return the response.
-
- This consists of three parts:
-
- 1. If the URL looks suspiciously like an archive, send a HEAD first to
- check the Content-Type is HTML, to avoid downloading a large file.
- Raise `_NotHTTP` if the content type cannot be determined, or
- `_NotHTML` if it is not HTML.
- 2. Actually perform the request. Raise HTTP exceptions on network failures.
- 3. Check the Content-Type header to make sure we got HTML, and raise
- `_NotHTML` otherwise.
- """
- if _is_url_like_archive(url):
- _ensure_html_response(url, session=session)
-
- logger.debug('Getting page %s', url)
-
- resp = session.get(
- url,
- headers={
- "Accept": "text/html",
- # We don't want to blindly returned cached data for
- # /simple/, because authors generally expecting that
- # twine upload && pip install will function, but if
- # they've done a pip install in the last ~10 minutes
- # it won't. Thus by setting this to zero we will not
- # blindly use any cached data, however the benefit of
- # using max-age=0 instead of no-cache, is that we will
- # still support conditional requests, so we will still
- # minimize traffic sent in cases where the page hasn't
- # changed at all, we will just always incur the round
- # trip for the conditional GET now instead of only
- # once per 10 minutes.
- # For more information, please see pypa/pip#5670.
- "Cache-Control": "max-age=0",
- },
- )
- resp.raise_for_status()
-
- # The check for archives above only works if the url ends with
- # something that looks like an archive. However that is not a
- # requirement of an url. Unless we issue a HEAD request on every
- # url we cannot know ahead of time for sure if something is HTML
- # or not. However we can check after we've downloaded it.
- _ensure_html_header(resp)
-
- return resp
-
-
- def _handle_get_page_fail(
- link, # type: Link
- reason, # type: Union[str, Exception]
- meth=None # type: Optional[Callable[..., None]]
- ):
- # type: (...) -> None
- if meth is None:
- meth = logger.debug
- meth("Could not fetch URL %s: %s - skipping", link, reason)
-
-
- def _get_html_page(link, session=None):
- # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
- if session is None:
- raise TypeError(
- "_get_html_page() missing 1 required keyword argument: 'session'"
- )
-
- url = link.url.split('#', 1)[0]
-
- # Check for VCS schemes that do not support lookup as web pages.
- vcs_scheme = _match_vcs_scheme(url)
- if vcs_scheme:
- logger.debug('Cannot look at %s URL %s', vcs_scheme, link)
- return None
-
- # Tack index.html onto file:// URLs that point to directories
- scheme, _, path, _, _, _ = urllib_parse.urlparse(url)
- if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))):
- # add trailing slash if not present so urljoin doesn't trim
- # final segment
- if not url.endswith('/'):
- url += '/'
- url = urllib_parse.urljoin(url, 'index.html')
- logger.debug(' file: URL is directory, getting %s', url)
-
- try:
- resp = _get_html_response(url, session=session)
- except _NotHTTP as exc:
- logger.debug(
- 'Skipping page %s because it looks like an archive, and cannot '
- 'be checked by HEAD.', link,
- )
- except _NotHTML as exc:
- logger.debug(
- 'Skipping page %s because the %s request got Content-Type: %s',
- link, exc.request_desc, exc.content_type,
- )
- except requests.HTTPError as exc:
- _handle_get_page_fail(link, exc)
- except RetryError as exc:
- _handle_get_page_fail(link, exc)
- except SSLError as exc:
- reason = "There was a problem confirming the ssl certificate: "
- reason += str(exc)
- _handle_get_page_fail(link, reason, meth=logger.info)
- except requests.ConnectionError as exc:
- _handle_get_page_fail(link, "connection error: %s" % exc)
- except requests.Timeout:
- _handle_get_page_fail(link, "timed out")
- else:
- return HTMLPage(resp.content, resp.url, resp.headers)
- return None
-
-
- class PackageFinder(object):
- """This finds packages.
-
- This is meant to match easy_install's technique for looking for
- packages, by reading pages and looking for appropriate links.
- """
-
- def __init__(
- self,
- find_links, # type: List[str]
- index_urls, # type: List[str]
- allow_all_prereleases=False, # type: bool
- trusted_hosts=None, # type: Optional[Iterable[str]]
- session=None, # type: Optional[PipSession]
- format_control=None, # type: Optional[FormatControl]
- platform=None, # type: Optional[str]
- versions=None, # type: Optional[List[str]]
- abi=None, # type: Optional[str]
- implementation=None, # type: Optional[str]
- prefer_binary=False # type: bool
- ):
- # type: (...) -> None
- """Create a PackageFinder.
-
- :param format_control: A FormatControl object or None. Used to control
- the selection of source packages / binary packages when consulting
- the index and links.
- :param platform: A string or None. If None, searches for packages
- that are supported by the current system. Otherwise, will find
- packages that can be built on the platform passed in. These
- packages will only be downloaded for distribution: they will
- not be built locally.
- :param versions: A list of strings or None. This is passed directly
- to pep425tags.py in the get_supported() method.
- :param abi: A string or None. This is passed directly
- to pep425tags.py in the get_supported() method.
- :param implementation: A string or None. This is passed directly
- to pep425tags.py in the get_supported() method.
- """
- if session is None:
- raise TypeError(
- "PackageFinder() missing 1 required keyword argument: "
- "'session'"
- )
-
- # Build find_links. If an argument starts with ~, it may be
- # a local file relative to a home directory. So try normalizing
- # it and if it exists, use the normalized version.
- # This is deliberately conservative - it might be fine just to
- # blindly normalize anything starting with a ~...
- self.find_links = [] # type: List[str]
- for link in find_links:
- if link.startswith('~'):
- new_link = normalize_path(link)
- if os.path.exists(new_link):
- link = new_link
- self.find_links.append(link)
-
- self.index_urls = index_urls
-
- # These are boring links that have already been logged somehow:
- self.logged_links = set() # type: Set[Link]
-
- self.format_control = format_control or FormatControl(set(), set())
-
- # Domains that we won't emit warnings for when not using HTTPS
- self.secure_origins = [
- ("*", host, "*")
- for host in (trusted_hosts if trusted_hosts else [])
- ] # type: List[SecureOrigin]
-
- # Do we want to allow _all_ pre-releases?
- self.allow_all_prereleases = allow_all_prereleases
-
- # The Session we'll use to make requests
- self.session = session
-
- # The valid tags to check potential found wheel candidates against
- self.valid_tags = get_supported(
- versions=versions,
- platform=platform,
- abi=abi,
- impl=implementation,
- )
-
- # Do we prefer old, but valid, binary dist over new source dist
- self.prefer_binary = prefer_binary
-
- # If we don't have TLS enabled, then WARN if anyplace we're looking
- # relies on TLS.
- if not HAS_TLS:
- for link in itertools.chain(self.index_urls, self.find_links):
- parsed = urllib_parse.urlparse(link)
- if parsed.scheme == "https":
- logger.warning(
- "pip is configured with locations that require "
- "TLS/SSL, however the ssl module in Python is not "
- "available."
- )
- break
-
- def get_formatted_locations(self):
- # type: () -> str
- lines = []
- if self.index_urls and self.index_urls != [PyPI.simple_url]:
- lines.append(
- "Looking in indexes: {}".format(", ".join(
- redact_password_from_url(url) for url in self.index_urls))
- )
- if self.find_links:
- lines.append(
- "Looking in links: {}".format(", ".join(self.find_links))
- )
- return "\n".join(lines)
-
- @staticmethod
- def _sort_locations(locations, expand_dir=False):
- # type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
- """
- Sort locations into "files" (archives) and "urls", and return
- a pair of lists (files,urls)
- """
- files = []
- urls = []
-
- # puts the url for the given file path into the appropriate list
- def sort_path(path):
- url = path_to_url(path)
- if mimetypes.guess_type(url, strict=False)[0] == 'text/html':
- urls.append(url)
- else:
- files.append(url)
-
- for url in locations:
-
- is_local_path = os.path.exists(url)
- is_file_url = url.startswith('file:')
-
- if is_local_path or is_file_url:
- if is_local_path:
- path = url
- else:
- path = url_to_path(url)
- if os.path.isdir(path):
- if expand_dir:
- path = os.path.realpath(path)
- for item in os.listdir(path):
- sort_path(os.path.join(path, item))
- elif is_file_url:
- urls.append(url)
- else:
- logger.warning(
- "Path '{0}' is ignored: "
- "it is a directory.".format(path),
- )
- elif os.path.isfile(path):
- sort_path(path)
- else:
- logger.warning(
- "Url '%s' is ignored: it is neither a file "
- "nor a directory.", url,
- )
- elif is_url(url):
- # Only add url with clear scheme
- urls.append(url)
- else:
- logger.warning(
- "Url '%s' is ignored. It is either a non-existing "
- "path or lacks a specific scheme.", url,
- )
-
- return files, urls
-
- def _candidate_sort_key(self, candidate):
- # type: (InstallationCandidate) -> CandidateSortingKey
- """
- Function used to generate link sort key for link tuples.
- The greater the return value, the more preferred it is.
- If not finding wheels, then sorted by version only.
- If finding wheels, then the sort order is by version, then:
- 1. existing installs
- 2. wheels ordered via Wheel.support_index_min(self.valid_tags)
- 3. source archives
- If prefer_binary was set, then all wheels are sorted above sources.
- Note: it was considered to embed this logic into the Link
- comparison operators, but then different sdist links
- with the same version, would have to be considered equal
- """
- support_num = len(self.valid_tags)
- build_tag = tuple() # type: BuildTag
- binary_preference = 0
- if candidate.location.is_wheel:
- # can raise InvalidWheelFilename
- wheel = Wheel(candidate.location.filename)
- if not wheel.supported(self.valid_tags):
- raise UnsupportedWheel(
- "%s is not a supported wheel for this platform. It "
- "can't be sorted." % wheel.filename
- )
- if self.prefer_binary:
- binary_preference = 1
- pri = -(wheel.support_index_min(self.valid_tags))
- if wheel.build_tag is not None:
- match = re.match(r'^(\d+)(.*)$', wheel.build_tag)
- build_tag_groups = match.groups()
- build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
- else: # sdist
- pri = -(support_num)
- return (binary_preference, candidate.version, build_tag, pri)
-
- def _validate_secure_origin(self, logger, location):
- # type: (Logger, Link) -> bool
- # Determine if this url used a secure transport mechanism
- parsed = urllib_parse.urlparse(str(location))
- origin = (parsed.scheme, parsed.hostname, parsed.port)
-
- # The protocol to use to see if the protocol matches.
- # Don't count the repository type as part of the protocol: in
- # cases such as "git+ssh", only use "ssh". (I.e., Only verify against
- # the last scheme.)
- protocol = origin[0].rsplit('+', 1)[-1]
-
- # Determine if our origin is a secure origin by looking through our
- # hardcoded list of secure origins, as well as any additional ones
- # configured on this PackageFinder instance.
- for secure_origin in (SECURE_ORIGINS + self.secure_origins):
- if protocol != secure_origin[0] and secure_origin[0] != "*":
- continue
-
- try:
- # We need to do this decode dance to ensure that we have a
- # unicode object, even on Python 2.x.
- addr = ipaddress.ip_address(
- origin[1]
- if (
- isinstance(origin[1], six.text_type) or
- origin[1] is None
- )
- else origin[1].decode("utf8")
- )
- network = ipaddress.ip_network(
- secure_origin[1]
- if isinstance(secure_origin[1], six.text_type)
- # setting secure_origin[1] to proper Union[bytes, str]
- # creates problems in other places
- else secure_origin[1].decode("utf8") # type: ignore
- )
- except ValueError:
- # We don't have both a valid address or a valid network, so
- # we'll check this origin against hostnames.
- if (origin[1] and
- origin[1].lower() != secure_origin[1].lower() and
- secure_origin[1] != "*"):
- continue
- else:
- # We have a valid address and network, so see if the address
- # is contained within the network.
- if addr not in network:
- continue
-
- # Check to see if the port patches
- if (origin[2] != secure_origin[2] and
- secure_origin[2] != "*" and
- secure_origin[2] is not None):
- continue
-
- # If we've gotten here, then this origin matches the current
- # secure origin and we should return True
- return True
-
- # If we've gotten to this point, then the origin isn't secure and we
- # will not accept it as a valid location to search. We will however
- # log a warning that we are ignoring it.
- logger.warning(
- "The repository located at %s is not a trusted or secure host and "
- "is being ignored. If this repository is available via HTTPS we "
- "recommend you use HTTPS instead, otherwise you may silence "
- "this warning and allow it anyway with '--trusted-host %s'.",
- parsed.hostname,
- parsed.hostname,
- )
-
- return False
-
- def _get_index_urls_locations(self, project_name):
- # type: (str) -> List[str]
- """Returns the locations found via self.index_urls
-
- Checks the url_name on the main (first in the list) index and
- use this url_name to produce all locations
- """
-
- def mkurl_pypi_url(url):
- loc = posixpath.join(
- url,
- urllib_parse.quote(canonicalize_name(project_name)))
- # For maximum compatibility with easy_install, ensure the path
- # ends in a trailing slash. Although this isn't in the spec
- # (and PyPI can handle it without the slash) some other index
- # implementations might break if they relied on easy_install's
- # behavior.
- if not loc.endswith('/'):
- loc = loc + '/'
- return loc
-
- return [mkurl_pypi_url(url) for url in self.index_urls]
-
- def find_all_candidates(self, project_name):
- # type: (str) -> List[Optional[InstallationCandidate]]
- """Find all available InstallationCandidate for project_name
-
- This checks index_urls and find_links.
- All versions found are returned as an InstallationCandidate list.
-
- See _link_package_versions for details on which files are accepted
- """
- index_locations = self._get_index_urls_locations(project_name)
- index_file_loc, index_url_loc = self._sort_locations(index_locations)
- fl_file_loc, fl_url_loc = self._sort_locations(
- self.find_links, expand_dir=True,
- )
-
- file_locations = (Link(url) for url in itertools.chain(
- index_file_loc, fl_file_loc,
- ))
-
- # We trust every url that the user has given us whether it was given
- # via --index-url or --find-links.
- # We want to filter out any thing which does not have a secure origin.
- url_locations = [
- link for link in itertools.chain(
- (Link(url) for url in index_url_loc),
- (Link(url) for url in fl_url_loc),
- )
- if self._validate_secure_origin(logger, link)
- ]
-
- logger.debug('%d location(s) to search for versions of %s:',
- len(url_locations), project_name)
-
- for location in url_locations:
- logger.debug('* %s', location)
-
- canonical_name = canonicalize_name(project_name)
- formats = self.format_control.get_allowed_formats(canonical_name)
- search = Search(project_name, canonical_name, formats)
- find_links_versions = self._package_versions(
- # We trust every directly linked archive in find_links
- (Link(url, '-f') for url in self.find_links),
- search
- )
-
- page_versions = []
- for page in self._get_pages(url_locations, project_name):
- logger.debug('Analyzing links from page %s', page.url)
- with indent_log():
- page_versions.extend(
- self._package_versions(page.iter_links(), search)
- )
-
- file_versions = self._package_versions(file_locations, search)
- if file_versions:
- file_versions.sort(reverse=True)
- logger.debug(
- 'Local files found: %s',
- ', '.join([
- url_to_path(candidate.location.url)
- for candidate in file_versions
- ])
- )
-
- # This is an intentional priority ordering
- return file_versions + find_links_versions + page_versions
-
- def find_requirement(self, req, upgrade):
- # type: (InstallRequirement, bool) -> Optional[Link]
- """Try to find a Link matching req
-
- Expects req, an InstallRequirement and upgrade, a boolean
- Returns a Link if found,
- Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
- """
- all_candidates = self.find_all_candidates(req.name)
-
- # Filter out anything which doesn't match our specifier
- compatible_versions = set(
- req.specifier.filter(
- # We turn the version object into a str here because otherwise
- # when we're debundled but setuptools isn't, Python will see
- # packaging.version.Version and
- # pkg_resources._vendor.packaging.version.Version as different
- # types. This way we'll use a str as a common data interchange
- # format. If we stop using the pkg_resources provided specifier
- # and start using our own, we can drop the cast to str().
- [str(c.version) for c in all_candidates],
- prereleases=(
- self.allow_all_prereleases
- if self.allow_all_prereleases else None
- ),
- )
- )
- applicable_candidates = [
- # Again, converting to str to deal with debundling.
- c for c in all_candidates if str(c.version) in compatible_versions
- ]
-
- if applicable_candidates:
- best_candidate = max(applicable_candidates,
- key=self._candidate_sort_key)
- else:
- best_candidate = None
-
- if req.satisfied_by is not None:
- installed_version = parse_version(req.satisfied_by.version)
- else:
- installed_version = None
-
- if installed_version is None and best_candidate is None:
- logger.critical(
- 'Could not find a version that satisfies the requirement %s '
- '(from versions: %s)',
- req,
- ', '.join(
- sorted(
- {str(c.version) for c in all_candidates},
- key=parse_version,
- )
- )
- )
-
- raise DistributionNotFound(
- 'No matching distribution found for %s' % req
- )
-
- best_installed = False
- if installed_version and (
- best_candidate is None or
- best_candidate.version <= installed_version):
- best_installed = True
-
- if not upgrade and installed_version is not None:
- if best_installed:
- logger.debug(
- 'Existing installed version (%s) is most up-to-date and '
- 'satisfies requirement',
- installed_version,
- )
- else:
- logger.debug(
- 'Existing installed version (%s) satisfies requirement '
- '(most up-to-date version is %s)',
- installed_version,
- best_candidate.version,
- )
- return None
-
- if best_installed:
- # We have an existing version, and its the best version
- logger.debug(
- 'Installed version (%s) is most up-to-date (past versions: '
- '%s)',
- installed_version,
- ', '.join(sorted(compatible_versions, key=parse_version)) or
- "none",
- )
- raise BestVersionAlreadyInstalled
-
- logger.debug(
- 'Using version %s (newest of versions: %s)',
- best_candidate.version,
- ', '.join(sorted(compatible_versions, key=parse_version))
- )
- return best_candidate.location
-
- def _get_pages(self, locations, project_name):
- # type: (Iterable[Link], str) -> Iterable[HTMLPage]
- """
- Yields (page, page_url) from the given locations, skipping
- locations that have errors.
- """
- seen = set() # type: Set[Link]
- for location in locations:
- if location in seen:
- continue
- seen.add(location)
-
- page = _get_html_page(location, session=self.session)
- if page is None:
- continue
-
- yield page
-
- _py_version_re = re.compile(r'-py([123]\.?[0-9]?)$')
-
- def _sort_links(self, links):
- # type: (Iterable[Link]) -> List[Link]
- """
- Returns elements of links in order, non-egg links first, egg links
- second, while eliminating duplicates
- """
- eggs, no_eggs = [], []
- seen = set() # type: Set[Link]
- for link in links:
- if link not in seen:
- seen.add(link)
- if link.egg_fragment:
- eggs.append(link)
- else:
- no_eggs.append(link)
- return no_eggs + eggs
-
- def _package_versions(
- self,
- links, # type: Iterable[Link]
- search # type: Search
- ):
- # type: (...) -> List[Optional[InstallationCandidate]]
- result = []
- for link in self._sort_links(links):
- v = self._link_package_versions(link, search)
- if v is not None:
- result.append(v)
- return result
-
- def _log_skipped_link(self, link, reason):
- # type: (Link, str) -> None
- if link not in self.logged_links:
- logger.debug('Skipping link %s; %s', link, reason)
- self.logged_links.add(link)
-
- def _link_package_versions(self, link, search):
- # type: (Link, Search) -> Optional[InstallationCandidate]
- """Return an InstallationCandidate or None"""
- version = None
- if link.egg_fragment:
- egg_info = link.egg_fragment
- ext = link.ext
- else:
- egg_info, ext = link.splitext()
- if not ext:
- self._log_skipped_link(link, 'not a file')
- return None
- if ext not in SUPPORTED_EXTENSIONS:
- self._log_skipped_link(
- link, 'unsupported archive format: %s' % ext,
- )
- return None
- if "binary" not in search.formats and ext == WHEEL_EXTENSION:
- self._log_skipped_link(
- link, 'No binaries permitted for %s' % search.supplied,
- )
- return None
- if "macosx10" in link.path and ext == '.zip':
- self._log_skipped_link(link, 'macosx10 one')
- return None
- if ext == WHEEL_EXTENSION:
- try:
- wheel = Wheel(link.filename)
- except InvalidWheelFilename:
- self._log_skipped_link(link, 'invalid wheel filename')
- return None
- if canonicalize_name(wheel.name) != search.canonical:
- self._log_skipped_link(
- link, 'wrong project name (not %s)' % search.supplied)
- return None
-
- if not wheel.supported(self.valid_tags):
- self._log_skipped_link(
- link, 'it is not compatible with this Python')
- return None
-
- version = wheel.version
-
- # This should be up by the search.ok_binary check, but see issue 2700.
- if "source" not in search.formats and ext != WHEEL_EXTENSION:
- self._log_skipped_link(
- link, 'No sources permitted for %s' % search.supplied,
- )
- return None
-
- if not version:
- version = _egg_info_matches(egg_info, search.canonical)
- if not version:
- self._log_skipped_link(
- link, 'Missing project version for %s' % search.supplied)
- return None
-
- match = self._py_version_re.search(version)
- if match:
- version = version[:match.start()]
- py_version = match.group(1)
- if py_version != sys.version[:3]:
- self._log_skipped_link(
- link, 'Python version is incorrect')
- return None
- try:
- support_this_python = check_requires_python(link.requires_python)
- except specifiers.InvalidSpecifier:
- logger.debug("Package %s has an invalid Requires-Python entry: %s",
- link.filename, link.requires_python)
- support_this_python = True
-
- if not support_this_python:
- logger.debug("The package %s is incompatible with the python "
- "version in use. Acceptable python versions are: %s",
- link, link.requires_python)
- return None
- logger.debug('Found link %s, version: %s', link, version)
-
- return InstallationCandidate(search.supplied, version, link)
-
-
- def _find_name_version_sep(egg_info, canonical_name):
- # type: (str, str) -> int
- """Find the separator's index based on the package's canonical name.
-
- `egg_info` must be an egg info string for the given package, and
- `canonical_name` must be the package's canonical name.
-
- This function is needed since the canonicalized name does not necessarily
- have the same length as the egg info's name part. An example::
-
- >>> egg_info = 'foo__bar-1.0'
- >>> canonical_name = 'foo-bar'
- >>> _find_name_version_sep(egg_info, canonical_name)
- 8
- """
- # Project name and version must be separated by one single dash. Find all
- # occurrences of dashes; if the string in front of it matches the canonical
- # name, this is the one separating the name and version parts.
- for i, c in enumerate(egg_info):
- if c != "-":
- continue
- if canonicalize_name(egg_info[:i]) == canonical_name:
- return i
- raise ValueError("{} does not match {}".format(egg_info, canonical_name))
-
-
- def _egg_info_matches(egg_info, canonical_name):
- # type: (str, str) -> Optional[str]
- """Pull the version part out of a string.
-
- :param egg_info: The string to parse. E.g. foo-2.1
- :param canonical_name: The canonicalized name of the package this
- belongs to.
- """
- try:
- version_start = _find_name_version_sep(egg_info, canonical_name) + 1
- except ValueError:
- return None
- version = egg_info[version_start:]
- if not version:
- return None
- return version
-
-
- def _determine_base_url(document, page_url):
- """Determine the HTML document's base URL.
-
- This looks for a ``<base>`` tag in the HTML document. If present, its href
- attribute denotes the base URL of anchor tags in the document. If there is
- no such tag (or if it does not have a valid href attribute), the HTML
- file's URL is used as the base URL.
-
- :param document: An HTML document representation. The current
- implementation expects the result of ``html5lib.parse()``.
- :param page_url: The URL of the HTML document.
- """
- for base in document.findall(".//base"):
- href = base.get("href")
- if href is not None:
- return href
- return page_url
-
-
- def _get_encoding_from_headers(headers):
- """Determine if we have any encoding information in our headers.
- """
- if headers and "Content-Type" in headers:
- content_type, params = cgi.parse_header(headers["Content-Type"])
- if "charset" in params:
- return params['charset']
- return None
-
-
- _CLEAN_LINK_RE = re.compile(r'[^a-z0-9$&+,/:;=?@.#%_\\|-]', re.I)
-
-
- def _clean_link(url):
- # type: (str) -> str
- """Makes sure a link is fully encoded. That is, if a ' ' shows up in
- the link, it will be rewritten to %20 (while not over-quoting
- % or other characters)."""
- return _CLEAN_LINK_RE.sub(lambda match: '%%%2x' % ord(match.group(0)), url)
-
-
- class HTMLPage(object):
- """Represents one page, along with its URL"""
-
- def __init__(self, content, url, headers=None):
- # type: (bytes, str, MutableMapping[str, str]) -> None
- self.content = content
- self.url = url
- self.headers = headers
-
- def __str__(self):
- return redact_password_from_url(self.url)
-
- def iter_links(self):
- # type: () -> Iterable[Link]
- """Yields all links in the page"""
- document = html5lib.parse(
- self.content,
- transport_encoding=_get_encoding_from_headers(self.headers),
- namespaceHTMLElements=False,
- )
- base_url = _determine_base_url(document, self.url)
- for anchor in document.findall(".//a"):
- if anchor.get("href"):
- href = anchor.get("href")
- url = _clean_link(urllib_parse.urljoin(base_url, href))
- pyrequire = anchor.get('data-requires-python')
- pyrequire = unescape(pyrequire) if pyrequire else None
- yield Link(url, self.url, requires_python=pyrequire)
-
-
- Search = namedtuple('Search', 'supplied canonical formats')
- """Capture key aspects of a search.
-
- :attribute supplied: The user supplied package.
- :attribute canonical: The canonical package name.
- :attribute formats: The formats allowed for this package. Should be a set
- with 'binary' or 'source' or both in it.
- """
|