Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ Released 2025-05-10
allows the user to search for future output of the generator when
using less and then aborting the program using ctrl-c.

- Add ``click.get_pager_file`` for file-like access to an output
pager. :pr:`1572`
- ``deprecated: bool | str`` can now be used on options and arguments. This
previously was only available for ``Command``. The message can now also be
customised by using a ``str`` instead of a ``bool``. :issue:`2263` :pr:`2271`
Expand Down
4 changes: 4 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ classes and functions.
.. autofunction:: echo_via_pager
```

```{eval-rst}
.. autofunction:: get_pager_file
```

```{eval-rst}
.. autofunction:: prompt
```
Expand Down
13 changes: 13 additions & 0 deletions docs/utils.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ you can pass a generator (or generator function) instead of a string:
click.echo_via_pager(_generate_output())
```

For more complex programs, which can't easily use a simple generator, you
can get access to a writable file-like object for the pager, and write to
that instead:

```{eval-rst}
.. click:example::
@click.command()
def less():
with click.get_pager_file() as pager:
for idx in range(50000):
print(idx, file=pager)
```

## Screen Clearing

```{versionadded} 2.0
Expand Down
1 change: 1 addition & 0 deletions src/click/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .termui import confirm as confirm
from .termui import echo_via_pager as echo_via_pager
from .termui import edit as edit
from .termui import get_pager_file as get_pager_file
from .termui import getchar as getchar
from .termui import launch as launch
from .termui import pause as pause
Expand Down
4 changes: 4 additions & 0 deletions src/click/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ def should_strip_ansi(
if color is None:
if stream is None:
stream = sys.stdin
elif hasattr(stream, "color"):
# ._termui_impl.MaybeStripAnsi handles stripping ansi itself,
# so we don't need to strip it here
return False
return not isatty(stream) and not _is_jupyter_kernel_output(stream)
return not color

Expand Down
165 changes: 93 additions & 72 deletions src/click/_termui_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import collections.abc as cabc
import contextlib
import io
import math
import os
import shlex
Expand All @@ -23,7 +24,6 @@
from ._compat import CYGWIN
from ._compat import get_best_encoding
from ._compat import isatty
from ._compat import open_stream
from ._compat import strip_ansi
from ._compat import term_len
from ._compat import WIN
Expand Down Expand Up @@ -366,7 +366,20 @@ def generator(self) -> cabc.Iterator[V]:
self.render_progress()


def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
class MaybeStripAnsi(io.TextIOWrapper):
def __init__(self, stream: t.IO[bytes], *, color: bool, **kwargs: t.Any):
super().__init__(stream, **kwargs)
self.color = color

def write(self, text: str) -> int:
if not self.color:
text = strip_ansi(text)
return super().write(text)


def _pager_contextmanager(
color: bool | None = None,
) -> t.ContextManager[tuple[t.BinaryIO | t.TextIO, str, bool]]:
"""Decide what method to use for paging through text."""
stdout = _default_text_stdout()

Expand All @@ -376,50 +389,59 @@ def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
stdout = StringIO()

if not isatty(sys.stdin) or not isatty(stdout):
return _nullpager(stdout, generator, color)
return _nullpager(stdout, color)

# Split and normalize the pager command into parts.
pager_cmd_parts = shlex.split(os.environ.get("PAGER", ""), posix=False)
if pager_cmd_parts:
if WIN:
if _tempfilepager(generator, pager_cmd_parts, color):
return
elif _pipepager(generator, pager_cmd_parts, color):
return
return _tempfilepager(pager_cmd_parts, color)
return _pipepager(pager_cmd_parts, color)

if os.environ.get("TERM") in ("dumb", "emacs"):
return _nullpager(stdout, generator, color)
if (WIN or sys.platform.startswith("os2")) and _tempfilepager(
generator, ["more"], color
):
return
if _pipepager(generator, ["less"], color):
return

import tempfile

fd, filename = tempfile.mkstemp()
os.close(fd)
try:
if _pipepager(generator, ["more"], color):
return
return _nullpager(stdout, generator, color)
finally:
os.unlink(filename)
return _nullpager(stdout, color)
if WIN or sys.platform.startswith("os2"):
return _tempfilepager(["more"], color)
return _pipepager(["less"], color)


@contextlib.contextmanager
def get_pager_file(color: bool | None = None) -> t.Generator[t.TextIO, None, None]:
"""Context manager.
Yields a writable file-like object which can be used as an output pager.
.. versionadded:: 8.2
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
with _pager_contextmanager(color=color) as (stream, encoding, color):
if not isinstance(stream, MaybeStripAnsi):
if hasattr(stream, "buffer"):
# Real TextIO with buffer - unwrap and wrap in MaybeStripAnsi
stream = MaybeStripAnsi(stream.buffer, color=color, encoding=encoding)
elif not getattr(stream, "encoding", None):
# BinaryIO - wrap directly in MaybeStripAnsi
stream = MaybeStripAnsi(stream, color=color, encoding=encoding)
else:
# StringIO - add .color attribute only, no ANSI stripping
stream.color = color # type: ignore[attr-defined]
try:
yield t.cast(t.TextIO, stream)
finally:
stream.flush()


@contextlib.contextmanager
def _pipepager(
generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None
) -> bool:
cmd_parts: list[str], color: bool | None = None
) -> t.Iterator[tuple[t.BinaryIO | t.TextIO, str, bool]]:
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.

Returns `True` if the command was found, `False` otherwise and thus another
pager should be attempted.
"""
# Split the command into the invoked CLI and its parameters.
if not cmd_parts:
return False
stdout = _default_text_stdout() or StringIO()
yield stdout, "utf-8", False
return

import shutil

Expand All @@ -428,7 +450,9 @@ def _pipepager(

cmd_filepath = shutil.which(cmd)
if not cmd_filepath:
return False
stdout = _default_text_stdout() or StringIO()
yield stdout, "utf-8", False
return

# Produces a normalized absolute path string.
# multi-call binaries such as busybox derive their identity from the symlink
Expand All @@ -451,6 +475,9 @@ def _pipepager(
elif "r" in less_flags or "R" in less_flags:
color = True

if color is None:
color = False

c = subprocess.Popen(
[str(cmd_path)] + cmd_params,
shell=False,
Expand All @@ -459,13 +486,10 @@ def _pipepager(
errors="replace",
text=True,
)
assert c.stdin is not None
stdin = t.cast(t.BinaryIO, c.stdin)
encoding = get_best_encoding(stdin)
try:
for text in generator:
if not color:
text = strip_ansi(text)

c.stdin.write(text)
yield stdin, encoding, color
except BrokenPipeError:
# In case the pager exited unexpectedly, ignore the broken pipe error.
pass
Expand All @@ -479,7 +503,7 @@ def _pipepager(
finally:
# We must close stdin and wait for the pager to exit before we continue
try:
c.stdin.close()
stdin.close()
# Close implies flush, so it might throw a BrokenPipeError if the pager
# process exited already.
except BrokenPipeError:
Expand All @@ -501,64 +525,61 @@ def _pipepager(
else:
break

return True


@contextlib.contextmanager
def _tempfilepager(
generator: cabc.Iterable[str], cmd_parts: list[str], color: bool | None
) -> bool:
"""Page through text by invoking a program on a temporary file.

Returns `True` if the command was found, `False` otherwise and thus another
pager should be attempted.
"""
cmd_parts: list[str], color: bool | None = None
) -> t.Iterator[tuple[t.BinaryIO | t.TextIO, str, bool]]:
"""Page through text by invoking a program on a temporary file."""
# Split the command into the invoked CLI and its parameters.
if not cmd_parts:
return False
stdout = _default_text_stdout() or StringIO()
yield stdout, "utf-8", False
return

import shutil
import subprocess

cmd = cmd_parts[0]

cmd_filepath = shutil.which(cmd)
if not cmd_filepath:
return False
stdout = _default_text_stdout() or StringIO()
yield stdout, "utf-8", False
return

# Produces a normalized absolute path string.
# multi-call binaries such as busybox derive their identity from the symlink
# less -> busybox. resolve() causes them to misbehave. (eg. less becomes busybox)
cmd_path = Path(cmd_filepath).absolute()

import subprocess
import tempfile

fd, filename = tempfile.mkstemp()
# TODO: This never terminates if the passed generator never terminates.
text = "".join(generator)
if not color:
text = strip_ansi(text)
encoding = get_best_encoding(sys.stdout)
with open_stream(filename, "wb")[0] as f:
f.write(text.encode(encoding))
if color is None:
color = False
# On Windows, NamedTemporaryFile cannot be opened by another process
# while Python still has it open, so we use delete=False and clean up manually
# rather than using a contextmanager here.
f = tempfile.NamedTemporaryFile(mode="wb", delete=False)
try:
subprocess.call([str(cmd_path), filename])
except OSError:
# Command not found
pass
yield t.cast(t.BinaryIO, f), encoding, color
f.flush()
f.close()
subprocess.call([str(cmd_path), f.name])
finally:
os.close(fd)
os.unlink(filename)

return True
os.unlink(f.name)


@contextlib.contextmanager
def _nullpager(
stream: t.TextIO, generator: cabc.Iterable[str], color: bool | None
) -> None:
stream: t.TextIO, color: bool | None = None
) -> t.Iterator[tuple[t.TextIO, str, bool]]:
"""Simply print unformatted text. This is the ultimate fallback."""
for text in generator:
if not color:
text = strip_ansi(text)
stream.write(text)
encoding = get_best_encoding(stream)
if color is None:
color = False
yield stream, encoding, color


class Editor:
Expand Down
26 changes: 22 additions & 4 deletions src/click/termui.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,25 @@ def confirm(
return rv


def get_pager_file(
color: bool | None = None,
) -> t.ContextManager[t.TextIO]:
"""Context manager.

Yields a writable file-like object which can be used as an output pager.

.. versionadded:: 8.2

:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
from ._termui_impl import get_pager_file

color = resolve_color_default(color)

return get_pager_file(color=color)


def echo_via_pager(
text_or_generator: cabc.Iterable[str] | t.Callable[[], cabc.Iterable[str]] | str,
color: bool | None = None,
Expand All @@ -273,7 +292,6 @@ def echo_via_pager(
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
color = resolve_color_default(color)

if inspect.isgeneratorfunction(text_or_generator):
i = t.cast("t.Callable[[], cabc.Iterable[str]]", text_or_generator)()
Expand All @@ -285,9 +303,9 @@ def echo_via_pager(
# convert every element of i to a text type if necessary
text_generator = (el if isinstance(el, str) else str(el) for el in i)

from ._termui_impl import pager

return pager(itertools.chain(text_generator, "\n"), color)
with get_pager_file(color=color) as pager:
for text in itertools.chain(text_generator, "\n"):
pager.write(text)


@t.overload
Expand Down