|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- # The following comment should be removed at some point in the future.
- # mypy: strict-optional=False
- # mypy: disallow-untyped-defs=False
-
- from __future__ import absolute_import, division
-
- import contextlib
- import itertools
- import logging
- import sys
- import time
- from signal import SIGINT, default_int_handler, signal
-
- from pip._vendor import six
- from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
- from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
- from pip._vendor.progress.spinner import Spinner
-
- from pip._internal.utils.compat import WINDOWS
- from pip._internal.utils.logging import get_indentation
- from pip._internal.utils.misc import format_size
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
-
- if MYPY_CHECK_RUNNING:
- from typing import Any, Iterator, IO
-
- try:
- from pip._vendor import colorama
- # Lots of different errors can come from this, including SystemError and
- # ImportError.
- except Exception:
- colorama = None
-
- logger = logging.getLogger(__name__)
-
-
- def _select_progress_class(preferred, fallback):
- encoding = getattr(preferred.file, "encoding", None)
-
- # If we don't know what encoding this file is in, then we'll just assume
- # that it doesn't support unicode and use the ASCII bar.
- if not encoding:
- return fallback
-
- # Collect all of the possible characters we want to use with the preferred
- # bar.
- characters = [
- getattr(preferred, "empty_fill", six.text_type()),
- getattr(preferred, "fill", six.text_type()),
- ]
- characters += list(getattr(preferred, "phases", []))
-
- # Try to decode the characters we're using for the bar using the encoding
- # of the given file, if this works then we'll assume that we can use the
- # fancier bar and if not we'll fall back to the plaintext bar.
- try:
- six.text_type().join(characters).encode(encoding)
- except UnicodeEncodeError:
- return fallback
- else:
- return preferred
-
-
- _BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any
-
-
- class InterruptibleMixin(object):
- """
- Helper to ensure that self.finish() gets called on keyboard interrupt.
-
- This allows downloads to be interrupted without leaving temporary state
- (like hidden cursors) behind.
-
- This class is similar to the progress library's existing SigIntMixin
- helper, but as of version 1.2, that helper has the following problems:
-
- 1. It calls sys.exit().
- 2. It discards the existing SIGINT handler completely.
- 3. It leaves its own handler in place even after an uninterrupted finish,
- which will have unexpected delayed effects if the user triggers an
- unrelated keyboard interrupt some time after a progress-displaying
- download has already completed, for example.
- """
-
- def __init__(self, *args, **kwargs):
- """
- Save the original SIGINT handler for later.
- """
- super(InterruptibleMixin, self).__init__(*args, **kwargs)
-
- self.original_handler = signal(SIGINT, self.handle_sigint)
-
- # If signal() returns None, the previous handler was not installed from
- # Python, and we cannot restore it. This probably should not happen,
- # but if it does, we must restore something sensible instead, at least.
- # The least bad option should be Python's default SIGINT handler, which
- # just raises KeyboardInterrupt.
- if self.original_handler is None:
- self.original_handler = default_int_handler
-
- def finish(self):
- """
- Restore the original SIGINT handler after finishing.
-
- This should happen regardless of whether the progress display finishes
- normally, or gets interrupted.
- """
- super(InterruptibleMixin, self).finish()
- signal(SIGINT, self.original_handler)
-
- def handle_sigint(self, signum, frame):
- """
- Call self.finish() before delegating to the original SIGINT handler.
-
- This handler should only be in place while the progress display is
- active.
- """
- self.finish()
- self.original_handler(signum, frame)
-
-
- class SilentBar(Bar):
-
- def update(self):
- pass
-
-
- class BlueEmojiBar(IncrementalBar):
-
- suffix = "%(percent)d%%"
- bar_prefix = " "
- bar_suffix = " "
- phases = (u"\U0001F539", u"\U0001F537", u"\U0001F535") # type: Any
-
-
- class DownloadProgressMixin(object):
-
- def __init__(self, *args, **kwargs):
- super(DownloadProgressMixin, self).__init__(*args, **kwargs)
- self.message = (" " * (get_indentation() + 2)) + self.message
-
- @property
- def downloaded(self):
- return format_size(self.index)
-
- @property
- def download_speed(self):
- # Avoid zero division errors...
- if self.avg == 0.0:
- return "..."
- return format_size(1 / self.avg) + "/s"
-
- @property
- def pretty_eta(self):
- if self.eta:
- return "eta %s" % self.eta_td
- return ""
-
- def iter(self, it, n=1):
- for x in it:
- yield x
- self.next(n)
- self.finish()
-
-
- class WindowsMixin(object):
-
- def __init__(self, *args, **kwargs):
- # The Windows terminal does not support the hide/show cursor ANSI codes
- # even with colorama. So we'll ensure that hide_cursor is False on
- # Windows.
- # This call needs to go before the super() call, so that hide_cursor
- # is set in time. The base progress bar class writes the "hide cursor"
- # code to the terminal in its init, so if we don't set this soon
- # enough, we get a "hide" with no corresponding "show"...
- if WINDOWS and self.hide_cursor:
- self.hide_cursor = False
-
- super(WindowsMixin, self).__init__(*args, **kwargs)
-
- # Check if we are running on Windows and we have the colorama module,
- # if we do then wrap our file with it.
- if WINDOWS and colorama:
- self.file = colorama.AnsiToWin32(self.file)
- # The progress code expects to be able to call self.file.isatty()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.isatty = lambda: self.file.wrapped.isatty()
- # The progress code expects to be able to call self.file.flush()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.flush = lambda: self.file.wrapped.flush()
-
-
- class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin,
- DownloadProgressMixin):
-
- file = sys.stdout
- message = "%(percent)d%%"
- suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
-
- # NOTE: The "type: ignore" comments on the following classes are there to
- # work around https://github.com/python/typing/issues/241
-
-
- class DefaultDownloadProgressBar(BaseDownloadProgressBar,
- _BaseBar):
- pass
-
-
- class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): # type: ignore
- pass
-
-
- class DownloadBar(BaseDownloadProgressBar, # type: ignore
- Bar):
- pass
-
-
- class DownloadFillingCirclesBar(BaseDownloadProgressBar, # type: ignore
- FillingCirclesBar):
- pass
-
-
- class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, # type: ignore
- BlueEmojiBar):
- pass
-
-
- class DownloadProgressSpinner(WindowsMixin, InterruptibleMixin,
- DownloadProgressMixin, Spinner):
-
- file = sys.stdout
- suffix = "%(downloaded)s %(download_speed)s"
-
- def next_phase(self):
- if not hasattr(self, "_phaser"):
- self._phaser = itertools.cycle(self.phases)
- return next(self._phaser)
-
- def update(self):
- message = self.message % self
- phase = self.next_phase()
- suffix = self.suffix % self
- line = ''.join([
- message,
- " " if message else "",
- phase,
- " " if suffix else "",
- suffix,
- ])
-
- self.writeln(line)
-
-
- BAR_TYPES = {
- "off": (DownloadSilentBar, DownloadSilentBar),
- "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
- "ascii": (DownloadBar, DownloadProgressSpinner),
- "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
- "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner)
- }
-
-
- def DownloadProgressProvider(progress_bar, max=None):
- if max is None or max == 0:
- return BAR_TYPES[progress_bar][1]().iter
- else:
- return BAR_TYPES[progress_bar][0](max=max).iter
-
-
- ################################################################
- # Generic "something is happening" spinners
- #
- # We don't even try using progress.spinner.Spinner here because it's actually
- # simpler to reimplement from scratch than to coerce their code into doing
- # what we need.
- ################################################################
-
- @contextlib.contextmanager
- def hidden_cursor(file):
- # type: (IO) -> Iterator[None]
- # The Windows terminal does not support the hide/show cursor ANSI codes,
- # even via colorama. So don't even try.
- if WINDOWS:
- yield
- # We don't want to clutter the output with control characters if we're
- # writing to a file, or if the user is running with --quiet.
- # See https://github.com/pypa/pip/issues/3418
- elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
- yield
- else:
- file.write(HIDE_CURSOR)
- try:
- yield
- finally:
- file.write(SHOW_CURSOR)
-
-
- class RateLimiter(object):
- def __init__(self, min_update_interval_seconds):
- # type: (float) -> None
- self._min_update_interval_seconds = min_update_interval_seconds
- self._last_update = 0 # type: float
-
- def ready(self):
- # type: () -> bool
- now = time.time()
- delta = now - self._last_update
- return delta >= self._min_update_interval_seconds
-
- def reset(self):
- # type: () -> None
- self._last_update = time.time()
-
-
- class SpinnerInterface(object):
- def spin(self):
- # type: () -> None
- raise NotImplementedError()
-
- def finish(self, final_status):
- # type: (str) -> None
- raise NotImplementedError()
-
-
- class InteractiveSpinner(SpinnerInterface):
- def __init__(self, message, file=None, spin_chars="-\\|/",
- # Empirically, 8 updates/second looks nice
- min_update_interval_seconds=0.125):
- self._message = message
- if file is None:
- file = sys.stdout
- self._file = file
- self._rate_limiter = RateLimiter(min_update_interval_seconds)
- self._finished = False
-
- self._spin_cycle = itertools.cycle(spin_chars)
-
- self._file.write(" " * get_indentation() + self._message + " ... ")
- self._width = 0
-
- def _write(self, status):
- assert not self._finished
- # Erase what we wrote before by backspacing to the beginning, writing
- # spaces to overwrite the old text, and then backspacing again
- backup = "\b" * self._width
- self._file.write(backup + " " * self._width + backup)
- # Now we have a blank slate to add our status
- self._file.write(status)
- self._width = len(status)
- self._file.flush()
- self._rate_limiter.reset()
-
- def spin(self):
- # type: () -> None
- if self._finished:
- return
- if not self._rate_limiter.ready():
- return
- self._write(next(self._spin_cycle))
-
- def finish(self, final_status):
- # type: (str) -> None
- if self._finished:
- return
- self._write(final_status)
- self._file.write("\n")
- self._file.flush()
- self._finished = True
-
-
- # Used for dumb terminals, non-interactive installs (no tty), etc.
- # We still print updates occasionally (once every 60 seconds by default) to
- # act as a keep-alive for systems like Travis-CI that take lack-of-output as
- # an indication that a task has frozen.
- class NonInteractiveSpinner(SpinnerInterface):
- def __init__(self, message, min_update_interval_seconds=60):
- # type: (str, float) -> None
- self._message = message
- self._finished = False
- self._rate_limiter = RateLimiter(min_update_interval_seconds)
- self._update("started")
-
- def _update(self, status):
- assert not self._finished
- self._rate_limiter.reset()
- logger.info("%s: %s", self._message, status)
-
- def spin(self):
- # type: () -> None
- if self._finished:
- return
- if not self._rate_limiter.ready():
- return
- self._update("still running...")
-
- def finish(self, final_status):
- # type: (str) -> None
- if self._finished:
- return
- self._update("finished with status '%s'" % (final_status,))
- self._finished = True
-
-
- @contextlib.contextmanager
- def open_spinner(message):
- # type: (str) -> Iterator[SpinnerInterface]
- # Interactive spinner goes directly to sys.stdout rather than being routed
- # through the logging system, but it acts like it has level INFO,
- # i.e. it's only displayed if we're at level INFO or better.
- # Non-interactive spinner goes through the logging system, so it is always
- # in sync with logging configuration.
- if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
- spinner = InteractiveSpinner(message) # type: SpinnerInterface
- else:
- spinner = NonInteractiveSpinner(message)
- try:
- with hidden_cursor(sys.stdout):
- yield spinner
- except KeyboardInterrupt:
- spinner.finish("canceled")
- raise
- except Exception:
- spinner.finish("error")
- raise
- else:
- spinner.finish("done")
|