这有点复杂,但可以forkpty(3)
用来创建一个新的 TTY,您可以在其中完全控制less
,将输入和输出转发到原始 TTY,以便感觉无缝。
下面的代码使用 Python 3 及其标准库。pexpect可以做很多繁重的工作,但这并不随 Python 一起提供。另外,这种方式更有教育意义。
import contextlib
import fcntl
import io
import os
import pty
import select
import signal
import struct
import termios
import time
import tty
假设其余代码缩进以在此上下文管理器中运行。
with contextlib.ExitStack() as stack:
我们需要获取真正的 TTY 并将其设置为原始模式。这可能会混淆 TTY 的其他用户(例如,该程序退出后的 shell),因此请确保将其恢复到相同的状态。
tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
stack.callback(os.close, tty_fd)
tc = termios.tcgetattr(tty_fd)
stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
tty.setraw(tty_fd, when=termios.TCSANOW)
然后我们可以调用Pythonforkpty
中命名的 。pty.fork()
这做了几件事:
- 创建一个伪终端。
- 叉一个新的孩子。
- 将孩子连接到 PTY 的从属端。
- 将子进程的PID和PTY的master端返回给原来的进程。
孩子应该跑less
。请注意使用,_exit(2)
因为在fork
.
child_pid, master_fd = pty.fork()
if child_pid == 0:
os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
os._exit(0)
stack.callback(os.close, master_fd)
然后需要做一些工作来设置一些异步信号处理程序。
SIGCHLD
当子进程改变状态(如退出)时接收。我们可以使用它来跟踪孩子是否还在跑步。
SIGWINCH
当控制终端改变大小时接收。我们将此大小转发给 PTY(它将自动向附加的进程发送另一个窗口更改信号)。我们也应该设置 PTY 的窗口大小以匹配启动。
转发诸如 , 等信号也可能SIGINT
有意义SIGTERM
。
child_is_running = True
def handle_chld(signum, frame):
while True:
pid, status = os.waitpid(-1, os.P_NOWAIT)
if not pid:
break
if pid == child_pid:
child_is_running = False
def handle_winch(signum, frame):
tc = struct.pack('HHHH', 0, 0, 0, 0)
tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
handler = signal.signal(signal.SIGCHLD, handle_chld)
stack.callback(signal.signal, signal.SIGCHLD, handler)
handler = signal.signal(signal.SIGWINCH, handle_winch)
stack.callback(signal.signal, signal.SIGWINCH, handler)
handle_winch(0, None)
现在来看看真肉:在真假 TTY 之间复制数据。
target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
has_sent_q = False
with contextlib.suppress(OSError):
while child_is_running:
now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
if now < target_time:
timeout = target_time - now
else:
timeout = None
if not has_sent_q:
os.write(master_fd, b'q')
has_sent_q = True
rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
if tty_fd in rfds:
data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
os.write(master_fd, data)
if master_fd in rfds:
data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
os.write(tty_fd, data)
它看起来很简单,尽管我在掩饰一些事情,例如正确的短写和SIGTTIN
/SIGTTOU
处理(通过抑制部分隐藏OSError
)。