4

我正在尝试less从 Mac OSX 上的 Python 脚本进行控制。基本上我想要的是能够转发控制字符(上/下/左/右)但在 Python 程序中处理其他输入。我使用Popen启动lessless但从stdin. 因此,我不确定如何将任何字符发送到更少。

该程序打开较少,等待一秒钟,然后尝试q使用两个单独的通道发送退出:stdin/dev/tty(因为它在我上面链接的 SO 问题中提到)。两者都不起作用。

from subprocess import Popen, PIPE
import time
p1 = Popen("echo hello | less -K -R", stdin=PIPE, shell=True)
time.sleep(1)
p1.stdin.write(bytes('q', 'utf-8'))
with open("/dev/tty", 'w') as tty:
    tty.write('q')
p1.wait()

如何less从 Python 脚本进行控制?

4

1 回答 1

1

这有点复杂,但可以forkpty(3)用来创建一个新的 TTY,您可以在其中完全控制less,将输入和输出转发到原始 TTY,以便感觉无缝。

下面的代码使用 Python 3 及其标准库。pexpect可以做很多繁重的工作,但这并不随 Python 一起提供。另外,这种方式更有教育意义。

import contextlib
import fcntl
import io
import os
import pty
import select
import signal
import struct
import termios
import time
import tty

假设其余代码缩进以在此上下文管理器中运行。

with contextlib.ExitStack() as stack:

我们需要获取真正的 TTY 并将其设置为原始模式。这可能会混淆 TTY 的其他用户(例如,该程序退出后的 shell),因此请确保将其恢复到相同的状态。

tty_fd = os.open('/dev/tty', os.O_RDWR | os.O_CLOEXEC)
stack.callback(os.close, tty_fd)
tc = termios.tcgetattr(tty_fd)
stack.callback(termios.tcsetattr, tty_fd, termios.TCSANOW, tc)
tty.setraw(tty_fd, when=termios.TCSANOW)

然后我们可以调用Pythonforkpty中命名的 。pty.fork()这做了几件事:

  • 创建一个伪终端
  • 叉一个新的孩子。
  • 将孩子连接到 PTY 的从属端。
  • 将子进程的PID和PTY的master端返回给原来的进程。

孩子应该跑less。请注意使用,_exit(2)因为在fork.

child_pid, master_fd = pty.fork()
if child_pid == 0:
    os.execv('/bin/sh', ('/bin/sh', '-c', 'echo hello | less -K -R'))
    os._exit(0)
stack.callback(os.close, master_fd)

然后需要做一些工作来设置一些异步信号处理程序。

  • SIGCHLD当子进程改变状态(如退出)时接收。我们可以使用它来跟踪孩子是否还在跑步。
  • SIGWINCH当控制终端改变大小时接收。我们将此大小转发给 PTY(它将自动向附加的进程发送另一个窗口更改信号)。我们也应该设置 PTY 的窗口大小以匹配启动。

转发诸如 , 等信号也可能SIGINT有意义SIGTERM

child_is_running = True
def handle_chld(signum, frame):
    while True:
        pid, status = os.waitpid(-1, os.P_NOWAIT)
        if not pid:
            break
        if pid == child_pid:
            child_is_running = False
def handle_winch(signum, frame):
    tc = struct.pack('HHHH', 0, 0, 0, 0)
    tc = fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, tc)
    fcntl.ioctl(master_fd, termios.TIOCSWINSZ, tc)
handler = signal.signal(signal.SIGCHLD, handle_chld)
stack.callback(signal.signal, signal.SIGCHLD, handler)
handler = signal.signal(signal.SIGWINCH, handle_winch)
stack.callback(signal.signal, signal.SIGWINCH, handler)
handle_winch(0, None)

现在来看看真肉:在真假 TTY 之间复制数据。

target_time = time.clock_gettime(time.CLOCK_MONOTONIC_RAW) + 1
has_sent_q = False
with contextlib.suppress(OSError):
    while child_is_running:
        now = time.clock_gettime(time.CLOCK_MONOTONIC_RAW)
        if now < target_time:
            timeout = target_time - now
        else:
            timeout = None
            if not has_sent_q:
                os.write(master_fd, b'q')
                has_sent_q = True
        rfds, wfds, xfds = select.select((tty_fd, master_fd), (), (), timeout)
        if tty_fd in rfds:
            data = os.read(tty_fd, io.DEFAULT_BUFFER_SIZE)
            os.write(master_fd, data)
        if master_fd in rfds:
            data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
            os.write(tty_fd, data)

它看起来很简单,尽管我在掩饰一些事情,例如正确的短写和SIGTTIN/SIGTTOU处理(通过抑制部分隐藏OSError)。

于 2018-01-20T22:25:12.033 回答