4

我一直在尝试编写一个基本的终端仿真脚本,因为出于某种原因,我的 Mac 上没有终端访问权限。但是要在 Blender 中编写游戏引擎脚本,控制台(通常在您启动 Blender 的终端中打开)是至关重要的。
只是做一些简单的事情,比如删除、重命名等。我曾经使用stream = os.popen(command)then来执行命令print (stream.read())。这适用于大多数事情,但不适用于任何互动。
不久,我发现了一种新方法:
sp = subprocess.Popen(["/bin/bash", "-i"], stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE)然后print(sp.communicate(command.encode())). 那应该产生一个我可以像终端一样使用的交互式外壳,不是吗?

但无论哪种方式,我都无法保持连接打开,并且使用最后一个示例我可以调用sp.communicate一次,给我以下输出(在这种情况下为 'ls /')和一些错误:
(b'Applications\n[...]usr\nvar\n', b'bash: no job control in this shell\nbash-3.2$ ls /\nbash-3.2$ exit\n'). 第二次它给了我一个ValueError: I/O operation on closed file. 有时(比如'ls')我只得到这个错误:b'ls\nbash-3.2$ exit\n'

这意味着什么?如何使用 python 模拟终端,允许我控制交互式 shell 或运行搅拌机并与控制台通信?

4

3 回答 3

10

假设您想要一个不断要求输入的交互式 shell,您可以尝试以下操作:

import subprocess
import re

while True:
    # prevents lots of python error output
    try:
        s = raw_input('> ')
    except:
        break

    # check if you should exit
    if s.strip().lower() == 'exit':
        break

    # try to run command
    try:
        cmd = subprocess.Popen(re.split(r'\s+', s), stdout=subprocess.PIPE)
        cmd_out = cmd.stdout.read()

        # Process output
        print cmd_out

    except OSError:
        print 'Invalid command'
于 2012-07-24T16:13:05.943 回答
5

这是我为在 Windows 中做你想做的事情而做的事情。一个更困难的问题,因为 Windows 不遵循任何标准,而是遵循它们自己的标准。对这段代码稍加修改应该会给你你正在寻找的东西。

'''
Created on Mar 2, 2013

@author: rweber
'''
import subprocess
import Queue
from Queue import Empty
import threading


class Process_Communicator():

    def join(self):
        self.te.join()
        self.to.join()
        self.running = False
        self.aggregator.join()

    def enqueue_in(self):
        while self.running and self.p.stdin is not None:
            while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')
            pass

    def enqueue_output(self):
        if not self.p.stdout or self.p.stdout.closed:
            return
        out = self.p.stdout
        for line in iter(out.readline, b''):
            self.qo.put(line)

    def enqueue_err(self):
        if not self.p.stderr or self.p.stderr.closed:
            return
        err = self.p.stderr
        for line in iter(err.readline, b''):
            self.qe.put(line)

    def aggregate(self):
        while (self.running):
            self.update()
        self.update()

    def update(self):
        line = ""
        try:
            while self.qe.not_empty:
                line = self.qe.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_err += line
        except Empty:
            pass

        line = ""
        try:
            while self.qo.not_empty:
                line = self.qo.get_nowait()  # or q.get(timeout=.1)
                self.unbblocked_out += line
        except Empty:
            pass

        while not self.stdin_queue.empty():
                s = self.stdin_queue.get()
                self.p.stdin.write(str(s) + '\n\r')

    def get_stdout(self, clear=True):
        ret = self.unbblocked_out
        if clear:
            self.unbblocked_out = ""
        return ret

    def has_stdout(self):
        ret = self.get_stdout(False)
        if ret == '':
            return None
        else:
            return ret

    def get_stderr(self, clear=True):
        ret = self.unbblocked_err
        if clear:
            self.unbblocked_err = ""
        return ret

    def has_stderr(self):
        ret = self.get_stderr(False)
        if ret == '':
            return None
        else:
            return ret

    def __init__(self, subp):
        '''This is a simple class that collects and aggregates the
        output from a subprocess so that you can more reliably use
        the class without having to block for subprocess.communicate.'''
        self.p = subp
        self.unbblocked_out = ""
        self.unbblocked_err = ""
        self.running = True
        self.qo = Queue.Queue()
        self.to = threading.Thread(name="out_read",
                                    target=self.enqueue_output,
                                    args=())
        self.to.daemon = True  # thread dies with the program
        self.to.start()

        self.qe = Queue.Queue()
        self.te = threading.Thread(name="err_read",
                                   target=self.enqueue_err,
                                   args=())
        self.te.daemon = True  # thread dies with the program
        self.te.start()

        self.stdin_queue = Queue.Queue()
        self.aggregator = threading.Thread(name="aggregate",
                                           target=self.aggregate,
                                           args=())
        self.aggregator.daemon = True  # thread dies with the program
        self.aggregator.start()
        pass
def write_stdin(p,c):
    while p.poll() == None:
        i = raw_input("send to process:")
        if i is not None:
            c.stdin_queue.put(i)


p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE, stdin=subprocess.PIPE)
c = Process_Communicator(p)
stdin = threading.Thread(name="write_stdin",
                           target=write_stdin,
                           args=(p,c))
stdin.daemon = True  # thread dies with the program
stdin.start()
while p.poll() == None:
    if c.has_stdout():
        print c.get_stdout()
    if c.has_stderr():
        print c.get_stderr()

c.join()
print "Exit"
于 2013-05-17T18:36:57.447 回答
1

看来您应该在分配有“forkpty”的新控制终端上运行它。要抑制警告“没有作业控制...”,您需要调用“setsid”。

于 2012-07-28T06:18:37.280 回答