0

基本上这段代码从管道中读取并不断打印输出而不会阻塞......这是整个代码:

1)第一个脚本:

if __name__ == '__main__':
    for i in range(5):
        print str(i)+'\n',
        sys.stdout.flush()
        time.sleep(1)

2)第二个脚本:

def log_worker(stdout):

    while True:
        output = non_block_read(stdout).strip()
        if output:
            print output


def non_block_read(output):
    ''' even in a thread, a normal read with block until the buffer is full '''
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ''

if __name__ == '__main__':

    mysql_process = subprocess.Popen(['python','-u', 'flush.py'],  stdin=sys.stdin,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)


    thread = Thread(target=log_worker, args=[mysql_process.stdout])
    thread.daemon = True
    thread.start()

    mysql_process.wait()
    thread.join(timeout=1)

我想知道为什么它会这样工作:

1)如果我把 Thread 完全带走,并且只在 main 中调用 log_worker 也会一一打印所有内容,但问题是它在完成后挂起而没有完成。我在某处读到这里的线程完全用于完成打印或更正确地线程在它完成打印时死亡,那么为什么它会这样工作呢?什么线程在这里以及如何?

2)如果我保留线程但删除 mysql_process.wait() 和 thread.join 它什么也不打印......为什么?我读到 Popen.wait 是为了终止其子进程。设置并返回 returncode 属性。这里的子进程是什么以及为什么/如何是子 O_O ?

3)如果我只删除 thread.join(timeout=1) 那么它完成但有错误Exception in thread Thread-1 (most likely raised during interpreter shutdown):。为什么 ?.join 在这里扮演什么角色。

4)我阅读了 non_block_read 函数中使用的函数的文档,但仍然感到困惑。好的,很明显他们采用文件描述符并将其设置为非阻塞。我感到困惑的是,我可以在哪些功能上使用所有这些功能,我的意思是我理解在文件上,但他们为什么在标准输出 O_O 上使用它?不是文件,是流~~?

我所做的所有这些都是为了在龙卷风脚本中使用 subprocess.Popen 执行脚本,并不断向客户/我自己发送输出而不会阻塞,所以如果有人能帮助我这样做,我将非常感激,因为我无法想象如何以某种方式从线程中获取此输出,以便我可以不断地将其插入 self.send 中的 torndio2 ...

def on_message(self, message):
#        list = subprocess.Popen([r"ls", "-l"], stdout=subprocess.PIPE)
#        list_stdout = list.communicate()[0]
        for i in range(1,10):
            time.sleep(1)
            self.send(i)
4

1 回答 1

0

使用开发版(3.2),这真的很容易

from tornado.web import RequestHandler,asynchronous
from tornado.process import Subprocess

class home(RequestHandler):
  @asynchrounous
  def get(self):
    seld.sp=Subprocess('date && sleep 5 && date',shell=True,stdout=Subprocess.STREAM)
    self.sp.set_exit_callback(self.next)
  def next(self,s):
    self.sp.stdout.read_until_close(self.finish)

application = ...
于 2013-12-05T11:08:29.000 回答