450 lines
14 KiB
Python
450 lines
14 KiB
Python
|
|
"""
|
||
|
|
Progress bar implementation on top of prompt_toolkit.
|
||
|
|
|
||
|
|
::
|
||
|
|
|
||
|
|
with ProgressBar(...) as pb:
|
||
|
|
for item in pb(data):
|
||
|
|
...
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import contextvars
|
||
|
|
import datetime
|
||
|
|
import functools
|
||
|
|
import os
|
||
|
|
import signal
|
||
|
|
import threading
|
||
|
|
import traceback
|
||
|
|
from typing import (
|
||
|
|
Callable,
|
||
|
|
Generic,
|
||
|
|
Iterable,
|
||
|
|
Iterator,
|
||
|
|
Sequence,
|
||
|
|
Sized,
|
||
|
|
TextIO,
|
||
|
|
TypeVar,
|
||
|
|
cast,
|
||
|
|
)
|
||
|
|
|
||
|
|
from prompt_toolkit.application import Application
|
||
|
|
from prompt_toolkit.application.current import get_app_session
|
||
|
|
from prompt_toolkit.filters import Condition, is_done, renderer_height_is_known
|
||
|
|
from prompt_toolkit.formatted_text import (
|
||
|
|
AnyFormattedText,
|
||
|
|
StyleAndTextTuples,
|
||
|
|
to_formatted_text,
|
||
|
|
)
|
||
|
|
from prompt_toolkit.input import Input
|
||
|
|
from prompt_toolkit.key_binding import KeyBindings
|
||
|
|
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
|
||
|
|
from prompt_toolkit.layout import (
|
||
|
|
ConditionalContainer,
|
||
|
|
FormattedTextControl,
|
||
|
|
HSplit,
|
||
|
|
Layout,
|
||
|
|
VSplit,
|
||
|
|
Window,
|
||
|
|
)
|
||
|
|
from prompt_toolkit.layout.controls import UIContent, UIControl
|
||
|
|
from prompt_toolkit.layout.dimension import AnyDimension, D
|
||
|
|
from prompt_toolkit.output import ColorDepth, Output
|
||
|
|
from prompt_toolkit.styles import BaseStyle
|
||
|
|
from prompt_toolkit.utils import in_main_thread
|
||
|
|
|
||
|
|
from .formatters import Formatter, create_default_formatters
|
||
|
|
|
||
|
|
__all__ = ["ProgressBar"]
|
||
|
|
|
||
|
|
E = KeyPressEvent
|
||
|
|
|
||
|
|
_SIGWINCH = getattr(signal, "SIGWINCH", None)
|
||
|
|
|
||
|
|
|
||
|
|
def create_key_bindings(cancel_callback: Callable[[], None] | None) -> KeyBindings:
|
||
|
|
"""
|
||
|
|
Key bindings handled by the progress bar.
|
||
|
|
(The main thread is not supposed to handle any key bindings.)
|
||
|
|
"""
|
||
|
|
kb = KeyBindings()
|
||
|
|
|
||
|
|
@kb.add("c-l")
|
||
|
|
def _clear(event: E) -> None:
|
||
|
|
event.app.renderer.clear()
|
||
|
|
|
||
|
|
if cancel_callback is not None:
|
||
|
|
|
||
|
|
@kb.add("c-c")
|
||
|
|
def _interrupt(event: E) -> None:
|
||
|
|
"Kill the 'body' of the progress bar, but only if we run from the main thread."
|
||
|
|
assert cancel_callback is not None
|
||
|
|
cancel_callback()
|
||
|
|
|
||
|
|
return kb
|
||
|
|
|
||
|
|
|
||
|
|
_T = TypeVar("_T")
|
||
|
|
|
||
|
|
|
||
|
|
class ProgressBar:
|
||
|
|
"""
|
||
|
|
Progress bar context manager.
|
||
|
|
|
||
|
|
Usage ::
|
||
|
|
|
||
|
|
with ProgressBar(...) as pb:
|
||
|
|
for item in pb(data):
|
||
|
|
...
|
||
|
|
|
||
|
|
:param title: Text to be displayed above the progress bars. This can be a
|
||
|
|
callable or formatted text as well.
|
||
|
|
:param formatters: List of :class:`.Formatter` instances.
|
||
|
|
:param bottom_toolbar: Text to be displayed in the bottom toolbar. This
|
||
|
|
can be a callable or formatted text.
|
||
|
|
:param style: :class:`prompt_toolkit.styles.BaseStyle` instance.
|
||
|
|
:param key_bindings: :class:`.KeyBindings` instance.
|
||
|
|
:param cancel_callback: Callback function that's called when control-c is
|
||
|
|
pressed by the user. This can be used for instance to start "proper"
|
||
|
|
cancellation if the wrapped code supports it.
|
||
|
|
:param file: The file object used for rendering, by default `sys.stderr` is used.
|
||
|
|
|
||
|
|
:param color_depth: `prompt_toolkit` `ColorDepth` instance.
|
||
|
|
:param output: :class:`~prompt_toolkit.output.Output` instance.
|
||
|
|
:param input: :class:`~prompt_toolkit.input.Input` instance.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
title: AnyFormattedText = None,
|
||
|
|
formatters: Sequence[Formatter] | None = None,
|
||
|
|
bottom_toolbar: AnyFormattedText = None,
|
||
|
|
style: BaseStyle | None = None,
|
||
|
|
key_bindings: KeyBindings | None = None,
|
||
|
|
cancel_callback: Callable[[], None] | None = None,
|
||
|
|
file: TextIO | None = None,
|
||
|
|
color_depth: ColorDepth | None = None,
|
||
|
|
output: Output | None = None,
|
||
|
|
input: Input | None = None,
|
||
|
|
) -> None:
|
||
|
|
self.title = title
|
||
|
|
self.formatters = formatters or create_default_formatters()
|
||
|
|
self.bottom_toolbar = bottom_toolbar
|
||
|
|
self.counters: list[ProgressBarCounter[object]] = []
|
||
|
|
self.style = style
|
||
|
|
self.key_bindings = key_bindings
|
||
|
|
self.cancel_callback = cancel_callback
|
||
|
|
|
||
|
|
# If no `cancel_callback` was given, and we're creating the progress
|
||
|
|
# bar from the main thread. Cancel by sending a `KeyboardInterrupt` to
|
||
|
|
# the main thread.
|
||
|
|
if self.cancel_callback is None and in_main_thread():
|
||
|
|
|
||
|
|
def keyboard_interrupt_to_main_thread() -> None:
|
||
|
|
os.kill(os.getpid(), signal.SIGINT)
|
||
|
|
|
||
|
|
self.cancel_callback = keyboard_interrupt_to_main_thread
|
||
|
|
|
||
|
|
# Note that we use __stderr__ as default error output, because that
|
||
|
|
# works best with `patch_stdout`.
|
||
|
|
self.color_depth = color_depth
|
||
|
|
self.output = output or get_app_session().output
|
||
|
|
self.input = input or get_app_session().input
|
||
|
|
|
||
|
|
self._thread: threading.Thread | None = None
|
||
|
|
|
||
|
|
self._has_sigwinch = False
|
||
|
|
self._app_started = threading.Event()
|
||
|
|
|
||
|
|
def __enter__(self) -> ProgressBar:
|
||
|
|
# Create UI Application.
|
||
|
|
title_toolbar = ConditionalContainer(
|
||
|
|
Window(
|
||
|
|
FormattedTextControl(lambda: self.title),
|
||
|
|
height=1,
|
||
|
|
style="class:progressbar,title",
|
||
|
|
),
|
||
|
|
filter=Condition(lambda: self.title is not None),
|
||
|
|
)
|
||
|
|
|
||
|
|
bottom_toolbar = ConditionalContainer(
|
||
|
|
Window(
|
||
|
|
FormattedTextControl(
|
||
|
|
lambda: self.bottom_toolbar, style="class:bottom-toolbar.text"
|
||
|
|
),
|
||
|
|
style="class:bottom-toolbar",
|
||
|
|
height=1,
|
||
|
|
),
|
||
|
|
filter=~is_done
|
||
|
|
& renderer_height_is_known
|
||
|
|
& Condition(lambda: self.bottom_toolbar is not None),
|
||
|
|
)
|
||
|
|
|
||
|
|
def width_for_formatter(formatter: Formatter) -> AnyDimension:
|
||
|
|
# Needs to be passed as callable (partial) to the 'width'
|
||
|
|
# parameter, because we want to call it on every resize.
|
||
|
|
return formatter.get_width(progress_bar=self)
|
||
|
|
|
||
|
|
progress_controls = [
|
||
|
|
Window(
|
||
|
|
content=_ProgressControl(self, f, self.cancel_callback),
|
||
|
|
width=functools.partial(width_for_formatter, f),
|
||
|
|
)
|
||
|
|
for f in self.formatters
|
||
|
|
]
|
||
|
|
|
||
|
|
self.app: Application[None] = Application(
|
||
|
|
min_redraw_interval=0.05,
|
||
|
|
layout=Layout(
|
||
|
|
HSplit(
|
||
|
|
[
|
||
|
|
title_toolbar,
|
||
|
|
VSplit(
|
||
|
|
progress_controls,
|
||
|
|
height=lambda: D(
|
||
|
|
preferred=len(self.counters), max=len(self.counters)
|
||
|
|
),
|
||
|
|
),
|
||
|
|
Window(),
|
||
|
|
bottom_toolbar,
|
||
|
|
]
|
||
|
|
)
|
||
|
|
),
|
||
|
|
style=self.style,
|
||
|
|
key_bindings=self.key_bindings,
|
||
|
|
refresh_interval=0.3,
|
||
|
|
color_depth=self.color_depth,
|
||
|
|
output=self.output,
|
||
|
|
input=self.input,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Run application in different thread.
|
||
|
|
def run() -> None:
|
||
|
|
try:
|
||
|
|
self.app.run(pre_run=self._app_started.set)
|
||
|
|
except BaseException as e:
|
||
|
|
traceback.print_exc()
|
||
|
|
print(e)
|
||
|
|
|
||
|
|
ctx: contextvars.Context = contextvars.copy_context()
|
||
|
|
|
||
|
|
self._thread = threading.Thread(target=ctx.run, args=(run,))
|
||
|
|
self._thread.start()
|
||
|
|
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __exit__(self, *a: object) -> None:
|
||
|
|
# Wait for the app to be started. Make sure we don't quit earlier,
|
||
|
|
# otherwise `self.app.exit` won't terminate the app because
|
||
|
|
# `self.app.future` has not yet been set.
|
||
|
|
self._app_started.wait()
|
||
|
|
|
||
|
|
# Quit UI application.
|
||
|
|
if self.app.is_running and self.app.loop is not None:
|
||
|
|
self.app.loop.call_soon_threadsafe(self.app.exit)
|
||
|
|
|
||
|
|
if self._thread is not None:
|
||
|
|
self._thread.join()
|
||
|
|
|
||
|
|
def __call__(
|
||
|
|
self,
|
||
|
|
data: Iterable[_T] | None = None,
|
||
|
|
label: AnyFormattedText = "",
|
||
|
|
remove_when_done: bool = False,
|
||
|
|
total: int | None = None,
|
||
|
|
) -> ProgressBarCounter[_T]:
|
||
|
|
"""
|
||
|
|
Start a new counter.
|
||
|
|
|
||
|
|
:param label: Title text or description for this progress. (This can be
|
||
|
|
formatted text as well).
|
||
|
|
:param remove_when_done: When `True`, hide this progress bar.
|
||
|
|
:param total: Specify the maximum value if it can't be calculated by
|
||
|
|
calling ``len``.
|
||
|
|
"""
|
||
|
|
counter = ProgressBarCounter(
|
||
|
|
self, data, label=label, remove_when_done=remove_when_done, total=total
|
||
|
|
)
|
||
|
|
self.counters.append(counter)
|
||
|
|
return counter
|
||
|
|
|
||
|
|
def invalidate(self) -> None:
|
||
|
|
self.app.invalidate()
|
||
|
|
|
||
|
|
|
||
|
|
class _ProgressControl(UIControl):
|
||
|
|
"""
|
||
|
|
User control for the progress bar.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
progress_bar: ProgressBar,
|
||
|
|
formatter: Formatter,
|
||
|
|
cancel_callback: Callable[[], None] | None,
|
||
|
|
) -> None:
|
||
|
|
self.progress_bar = progress_bar
|
||
|
|
self.formatter = formatter
|
||
|
|
self._key_bindings = create_key_bindings(cancel_callback)
|
||
|
|
|
||
|
|
def create_content(self, width: int, height: int) -> UIContent:
|
||
|
|
items: list[StyleAndTextTuples] = []
|
||
|
|
|
||
|
|
for pr in self.progress_bar.counters:
|
||
|
|
try:
|
||
|
|
text = self.formatter.format(self.progress_bar, pr, width)
|
||
|
|
except BaseException:
|
||
|
|
traceback.print_exc()
|
||
|
|
text = "ERROR"
|
||
|
|
|
||
|
|
items.append(to_formatted_text(text))
|
||
|
|
|
||
|
|
def get_line(i: int) -> StyleAndTextTuples:
|
||
|
|
return items[i]
|
||
|
|
|
||
|
|
return UIContent(get_line=get_line, line_count=len(items), show_cursor=False)
|
||
|
|
|
||
|
|
def is_focusable(self) -> bool:
|
||
|
|
return True # Make sure that the key bindings work.
|
||
|
|
|
||
|
|
def get_key_bindings(self) -> KeyBindings:
|
||
|
|
return self._key_bindings
|
||
|
|
|
||
|
|
|
||
|
|
_CounterItem = TypeVar("_CounterItem", covariant=True)
|
||
|
|
|
||
|
|
|
||
|
|
class ProgressBarCounter(Generic[_CounterItem]):
|
||
|
|
"""
|
||
|
|
An individual counter (A progress bar can have multiple counters).
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
progress_bar: ProgressBar,
|
||
|
|
data: Iterable[_CounterItem] | None = None,
|
||
|
|
label: AnyFormattedText = "",
|
||
|
|
remove_when_done: bool = False,
|
||
|
|
total: int | None = None,
|
||
|
|
) -> None:
|
||
|
|
self.start_time = datetime.datetime.now()
|
||
|
|
self.stop_time: datetime.datetime | None = None
|
||
|
|
self.progress_bar = progress_bar
|
||
|
|
self.data = data
|
||
|
|
self.items_completed = 0
|
||
|
|
self.label = label
|
||
|
|
self.remove_when_done = remove_when_done
|
||
|
|
self._done = False
|
||
|
|
self.total: int | None
|
||
|
|
|
||
|
|
if total is None:
|
||
|
|
try:
|
||
|
|
self.total = len(cast(Sized, data))
|
||
|
|
except TypeError:
|
||
|
|
self.total = None # We don't know the total length.
|
||
|
|
else:
|
||
|
|
self.total = total
|
||
|
|
|
||
|
|
def __iter__(self) -> Iterator[_CounterItem]:
|
||
|
|
if self.data is not None:
|
||
|
|
try:
|
||
|
|
for item in self.data:
|
||
|
|
yield item
|
||
|
|
self.item_completed()
|
||
|
|
|
||
|
|
# Only done if we iterate to the very end.
|
||
|
|
self.done = True
|
||
|
|
finally:
|
||
|
|
# Ensure counter has stopped even if we did not iterate to the
|
||
|
|
# end (e.g. break or exceptions).
|
||
|
|
self.stopped = True
|
||
|
|
else:
|
||
|
|
raise NotImplementedError("No data defined to iterate over.")
|
||
|
|
|
||
|
|
def item_completed(self) -> None:
|
||
|
|
"""
|
||
|
|
Start handling the next item.
|
||
|
|
|
||
|
|
(Can be called manually in case we don't have a collection to loop through.)
|
||
|
|
"""
|
||
|
|
self.items_completed += 1
|
||
|
|
self.progress_bar.invalidate()
|
||
|
|
|
||
|
|
@property
|
||
|
|
def done(self) -> bool:
|
||
|
|
"""Whether a counter has been completed.
|
||
|
|
|
||
|
|
Done counter have been stopped (see stopped) and removed depending on
|
||
|
|
remove_when_done value.
|
||
|
|
|
||
|
|
Contrast this with stopped. A stopped counter may be terminated before
|
||
|
|
100% completion. A done counter has reached its 100% completion.
|
||
|
|
"""
|
||
|
|
return self._done
|
||
|
|
|
||
|
|
@done.setter
|
||
|
|
def done(self, value: bool) -> None:
|
||
|
|
self._done = value
|
||
|
|
self.stopped = value
|
||
|
|
|
||
|
|
if value and self.remove_when_done:
|
||
|
|
self.progress_bar.counters.remove(self)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def stopped(self) -> bool:
|
||
|
|
"""Whether a counter has been stopped.
|
||
|
|
|
||
|
|
Stopped counters no longer have increasing time_elapsed. This distinction is
|
||
|
|
also used to prevent the Bar formatter with unknown totals from continuing to run.
|
||
|
|
|
||
|
|
A stopped counter (but not done) can be used to signal that a given counter has
|
||
|
|
encountered an error but allows other counters to continue
|
||
|
|
(e.g. download X of Y failed). Given how only done counters are removed
|
||
|
|
(see remove_when_done) this can help aggregate failures from a large number of
|
||
|
|
successes.
|
||
|
|
|
||
|
|
Contrast this with done. A done counter has reached its 100% completion.
|
||
|
|
A stopped counter may be terminated before 100% completion.
|
||
|
|
"""
|
||
|
|
return self.stop_time is not None
|
||
|
|
|
||
|
|
@stopped.setter
|
||
|
|
def stopped(self, value: bool) -> None:
|
||
|
|
if value:
|
||
|
|
# This counter has not already been stopped.
|
||
|
|
if not self.stop_time:
|
||
|
|
self.stop_time = datetime.datetime.now()
|
||
|
|
else:
|
||
|
|
# Clearing any previously set stop_time.
|
||
|
|
self.stop_time = None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def percentage(self) -> float:
|
||
|
|
if self.total is None:
|
||
|
|
return 0
|
||
|
|
else:
|
||
|
|
return self.items_completed * 100 / max(self.total, 1)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def time_elapsed(self) -> datetime.timedelta:
|
||
|
|
"""
|
||
|
|
Return how much time has been elapsed since the start.
|
||
|
|
"""
|
||
|
|
if self.stop_time is None:
|
||
|
|
return datetime.datetime.now() - self.start_time
|
||
|
|
else:
|
||
|
|
return self.stop_time - self.start_time
|
||
|
|
|
||
|
|
@property
|
||
|
|
def time_left(self) -> datetime.timedelta | None:
|
||
|
|
"""
|
||
|
|
Timedelta representing the time left.
|
||
|
|
"""
|
||
|
|
if self.total is None or not self.percentage:
|
||
|
|
return None
|
||
|
|
elif self.done or self.stopped:
|
||
|
|
return datetime.timedelta(0)
|
||
|
|
else:
|
||
|
|
return self.time_elapsed * (100 - self.percentage) / self.percentage
|