diff --git a/CHANGES.rst b/CHANGES.rst index 774d59674..d5d6e1145 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -158,6 +158,8 @@ Released 2025-05-10 allows the user to search for future output of the generator when using less and then aborting the program using ctrl-c. +- Add ``click.get_pager_file`` for file-like access to an output + pager. :pr:`1572` - ``deprecated: bool | str`` can now be used on options and arguments. This previously was only available for ``Command``. The message can now also be customised by using a ``str`` instead of a ``bool``. :issue:`2263` :pr:`2271` diff --git a/docs/api.md b/docs/api.md index 5ecb48350..ea46bc905 100644 --- a/docs/api.md +++ b/docs/api.md @@ -72,6 +72,10 @@ classes and functions. .. autofunction:: echo_via_pager ``` +```{eval-rst} +.. autofunction:: get_pager_file +``` + ```{eval-rst} .. autofunction:: prompt ``` diff --git a/docs/utils.md b/docs/utils.md index c0a3f96b7..8afe39079 100644 --- a/docs/utils.md +++ b/docs/utils.md @@ -110,6 +110,19 @@ you can pass a generator (or generator function) instead of a string: click.echo_via_pager(_generate_output()) ``` +For more complex programs, which can't easily use a simple generator, you +can get access to a writable file-like object for the pager, and write to +that instead: + +```{eval-rst} +.. click:example:: + @click.command() + def less(): + with click.get_pager_file() as pager: + for idx in range(50000): + print(idx, file=pager) +``` + ## Screen Clearing ```{versionadded} 2.0 diff --git a/src/click/__init__.py b/src/click/__init__.py index 1aa547c57..3f3366523 100644 --- a/src/click/__init__.py +++ b/src/click/__init__.py @@ -41,6 +41,7 @@ from .termui import confirm as confirm from .termui import echo_via_pager as echo_via_pager from .termui import edit as edit +from .termui import get_pager_file as get_pager_file from .termui import getchar as getchar from .termui import launch as launch from .termui import pause as pause diff --git a/src/click/_compat.py b/src/click/_compat.py index f2726b93a..134c4f389 100644 --- a/src/click/_compat.py +++ b/src/click/_compat.py @@ -502,6 +502,10 @@ def should_strip_ansi( if color is None: if stream is None: stream = sys.stdin + elif hasattr(stream, "color"): + # ._termui_impl.MaybeStripAnsi handles stripping ansi itself, + # so we don't need to strip it here + return False return not isatty(stream) and not _is_jupyter_kernel_output(stream) return not color diff --git a/src/click/_termui_impl.py b/src/click/_termui_impl.py index ee8225c4c..ee1eb4110 100644 --- a/src/click/_termui_impl.py +++ b/src/click/_termui_impl.py @@ -8,6 +8,7 @@ import collections.abc as cabc import contextlib +import io import math import os import shlex @@ -23,7 +24,6 @@ from ._compat import CYGWIN from ._compat import get_best_encoding from ._compat import isatty -from ._compat import open_stream from ._compat import strip_ansi from ._compat import term_len from ._compat import WIN @@ -366,7 +366,20 @@ def generator(self) -> cabc.Iterator[V]: self.render_progress() -def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None: +class MaybeStripAnsi(io.TextIOWrapper): + def __init__(self, stream: t.IO[bytes], *, color: bool, **kwargs: t.Any): + super().__init__(stream, **kwargs) + self.color = color + + def write(self, text: str) -> int: + if not self.color: + text = strip_ansi(text) + return super().write(text) + + +def _pager_contextmanager( + color: bool | None = None, +) -> t.ContextManager[tuple[t.BinaryIO | t.TextIO, str, bool]]: """Decide what method to use for paging through text.""" stdout = _default_text_stdout() @@ -376,50 +389,59 @@ def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None: stdout = StringIO() if not isatty(sys.stdin) or not isatty(stdout): - return _nullpager(stdout, generator, color) + return _nullpager(stdout, color) # Split and normalize the pager command into parts. pager_cmd_parts = shlex.split(os.environ.get("PAGER", ""), posix=False) if pager_cmd_parts: if WIN: - if _tempfilepager(generator, pager_cmd_parts, color): - return - elif _pipepager(generator, pager_cmd_parts, color): - return + return _tempfilepager(pager_cmd_parts, color) + return _pipepager(pager_cmd_parts, color) if os.environ.get("TERM") in ("dumb", "emacs"): - return _nullpager(stdout, generator, color) - if (WIN or sys.platform.startswith("os2")) and _tempfilepager( - generator, ["more"], color - ): - return - if _pipepager(generator, ["less"], color): - return - - import tempfile - - fd, filename = tempfile.mkstemp() - os.close(fd) - try: - if _pipepager(generator, ["more"], color): - return - return _nullpager(stdout, generator, color) - finally: - os.unlink(filename) + return _nullpager(stdout, color) + if WIN or sys.platform.startswith("os2"): + return _tempfilepager(["more"], color) + return _pipepager(["less"], color) + + +@contextlib.contextmanager +def get_pager_file(color: bool | None = None) -> t.Generator[t.TextIO, None, None]: + """Context manager. + Yields a writable file-like object which can be used as an output pager. + .. versionadded:: 8.2 + :param color: controls if the pager supports ANSI colors or not. The + default is autodetection. + """ + with _pager_contextmanager(color=color) as (stream, encoding, color): + if not isinstance(stream, MaybeStripAnsi): + if hasattr(stream, "buffer"): + # Real TextIO with buffer - unwrap and wrap in MaybeStripAnsi + stream = MaybeStripAnsi(stream.buffer, color=color, encoding=encoding) + elif not getattr(stream, "encoding", None): + # BinaryIO - wrap directly in MaybeStripAnsi + stream = MaybeStripAnsi(stream, color=color, encoding=encoding) + else: + # StringIO - add .color attribute only, no ANSI stripping + stream.color = color # type: ignore[attr-defined] + try: + yield t.cast(t.TextIO, stream) + finally: + stream.flush() +@contextlib.contextmanager def _pipepager( - generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None -) -> bool: + cmd_parts: list[str], color: bool | None = None +) -> t.Iterator[tuple[t.BinaryIO | t.TextIO, str, bool]]: """Page through text by feeding it to another program. Invoking a pager through this might support colors. - - Returns `True` if the command was found, `False` otherwise and thus another - pager should be attempted. """ # Split the command into the invoked CLI and its parameters. if not cmd_parts: - return False + stdout = _default_text_stdout() or StringIO() + yield stdout, "utf-8", False + return import shutil @@ -428,7 +450,9 @@ def _pipepager( cmd_filepath = shutil.which(cmd) if not cmd_filepath: - return False + stdout = _default_text_stdout() or StringIO() + yield stdout, "utf-8", False + return # Produces a normalized absolute path string. # multi-call binaries such as busybox derive their identity from the symlink @@ -451,6 +475,9 @@ def _pipepager( elif "r" in less_flags or "R" in less_flags: color = True + if color is None: + color = False + c = subprocess.Popen( [str(cmd_path)] + cmd_params, shell=False, @@ -459,13 +486,10 @@ def _pipepager( errors="replace", text=True, ) - assert c.stdin is not None + stdin = t.cast(t.BinaryIO, c.stdin) + encoding = get_best_encoding(stdin) try: - for text in generator: - if not color: - text = strip_ansi(text) - - c.stdin.write(text) + yield stdin, encoding, color except BrokenPipeError: # In case the pager exited unexpectedly, ignore the broken pipe error. pass @@ -479,7 +503,7 @@ def _pipepager( finally: # We must close stdin and wait for the pager to exit before we continue try: - c.stdin.close() + stdin.close() # Close implies flush, so it might throw a BrokenPipeError if the pager # process exited already. except BrokenPipeError: @@ -501,64 +525,61 @@ def _pipepager( else: break - return True - +@contextlib.contextmanager def _tempfilepager( - generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None -) -> bool: - """Page through text by invoking a program on a temporary file. - - Returns `True` if the command was found, `False` otherwise and thus another - pager should be attempted. - """ + cmd_parts: list[str], color: bool | None = None +) -> t.Iterator[tuple[t.BinaryIO | t.TextIO, str, bool]]: + """Page through text by invoking a program on a temporary file.""" # Split the command into the invoked CLI and its parameters. if not cmd_parts: - return False + stdout = _default_text_stdout() or StringIO() + yield stdout, "utf-8", False + return import shutil + import subprocess cmd = cmd_parts[0] cmd_filepath = shutil.which(cmd) if not cmd_filepath: - return False + stdout = _default_text_stdout() or StringIO() + yield stdout, "utf-8", False + return + # Produces a normalized absolute path string. # multi-call binaries such as busybox derive their identity from the symlink # less -> busybox. resolve() causes them to misbehave. (eg. less becomes busybox) cmd_path = Path(cmd_filepath).absolute() - import subprocess import tempfile - fd, filename = tempfile.mkstemp() - # TODO: This never terminates if the passed generator never terminates. - text = "".join(generator) - if not color: - text = strip_ansi(text) encoding = get_best_encoding(sys.stdout) - with open_stream(filename, "wb")[0] as f: - f.write(text.encode(encoding)) + if color is None: + color = False + # On Windows, NamedTemporaryFile cannot be opened by another process + # while Python still has it open, so we use delete=False and clean up manually + # rather than using a contextmanager here. + f = tempfile.NamedTemporaryFile(mode="wb", delete=False) try: - subprocess.call([str(cmd_path), filename]) - except OSError: - # Command not found - pass + yield t.cast(t.BinaryIO, f), encoding, color + f.flush() + f.close() + subprocess.call([str(cmd_path), f.name]) finally: - os.close(fd) - os.unlink(filename) - - return True + os.unlink(f.name) +@contextlib.contextmanager def _nullpager( - stream: t.TextIO, generator: cabc.Iterable[str], color: bool | None -) -> None: + stream: t.TextIO, color: bool | None = None +) -> t.Iterator[tuple[t.TextIO, str, bool]]: """Simply print unformatted text. This is the ultimate fallback.""" - for text in generator: - if not color: - text = strip_ansi(text) - stream.write(text) + encoding = get_best_encoding(stream) + if color is None: + color = False + yield stream, encoding, color class Editor: diff --git a/src/click/termui.py b/src/click/termui.py index 2e98a0771..99014b0fa 100644 --- a/src/click/termui.py +++ b/src/click/termui.py @@ -258,6 +258,25 @@ def confirm( return rv +def get_pager_file( + color: bool | None = None, +) -> t.ContextManager[t.TextIO]: + """Context manager. + + Yields a writable file-like object which can be used as an output pager. + + .. versionadded:: 8.2 + + :param color: controls if the pager supports ANSI colors or not. The + default is autodetection. + """ + from ._termui_impl import get_pager_file + + color = resolve_color_default(color) + + return get_pager_file(color=color) + + def echo_via_pager( text_or_generator: cabc.Iterable[str] | t.Callable[[], cabc.Iterable[str]] | str, color: bool | None = None, @@ -273,7 +292,6 @@ def echo_via_pager( :param color: controls if the pager supports ANSI colors or not. The default is autodetection. """ - color = resolve_color_default(color) if inspect.isgeneratorfunction(text_or_generator): i = t.cast("t.Callable[[], cabc.Iterable[str]]", text_or_generator)() @@ -285,9 +303,9 @@ def echo_via_pager( # convert every element of i to a text type if necessary text_generator = (el if isinstance(el, str) else str(el) for el in i) - from ._termui_impl import pager - - return pager(itertools.chain(text_generator, "\n"), color) + with get_pager_file(color=color) as pager: + for text in itertools.chain(text_generator, "\n"): + pager.write(text) @t.overload