3

io_add_watch在 python 中使用监视器时遇到了问题(通过 gobject)。我想在每次通知后对整个缓冲区进行非阻塞读取。这是代码(稍微缩短了一点):

class SomeApp(object):

   def __init__(self):
      # some other init that does a lot of stderr debug writes
      fl = fcntl.fcntl(0, fcntl.F_GETFL, 0)
      fcntl.fcntl(0, fcntl.F_SETFL, fl | os.O_NONBLOCK)
      print "hooked", gobject.io_add_watch(0, gobject.IO_IN | gobject.IO_PRI, self.got_message, [""])
      self.app = gobject.MainLoop()

   def run(self):
      print "ready"
      self.app.run()

   def got_message(self, fd, condition, data):
      print "reading now"
      data[0] += os.read(0, 1024)
      print "got something", fd, condition, data
      return True

gobject.threads_init()
SomeApp().run()

这是诀窍 - 当我在没有激活调试输出的情况下运行程序时,我没有got_message接到电话。当我先向stderr写很多东西时,问题就消失了。如果除了这段代码中可见的打印之外我不写任何东西,我就不会收到 stdin 消息信号。另一个有趣的事情是,当我尝试在启用了 stderr 调试但通过strace(检查是否有任何我错过的 fcntl / ioctl 调用)的情况下运行相同的应用程序时,问题再次出现。

简而言之:如果我先在没有 strace 的情况下向 stderr 写了很多东西,那就io_watch可以了。如果我用 strace 写了很多,或者根本不写都io_watch行不通。

“其他一些初始化”部分需要一些时间,所以如果我在看到“钩子 2”输出之前输入一些文本,然后在“就绪”之后按“ctrl+c”,则get_message调用回调,但读取调用会抛出 EAGAIN,所以缓冲区似乎是空的。

与标准输入相关的 strace 日志:

ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo ...}) = 0
fcntl(0, F_GETFL)                       = 0xa002 (flags O_RDWR|O_ASYNC|O_LARGEFILE)
fcntl(0, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE) = 0
fcntl(0, F_GETFL)                       = 0xa802 (flags O_RDWR|O_NONBLOCK|O_ASYNC|O_LARGEFILE)

有人对这里发生的事情有一些想法吗?


编辑:另一个线索。我试图重构应用程序以在不同的线程中进行读取并通过管道将其传回。它“有点”有效:

...
      rpipe, wpipe = os.pipe()
      stopped = threading.Event()
      self.stdreader = threading.Thread(name = "reader", target = self.std_read_loop, args = (wpipe, stopped))
      self.stdreader.start()
      new_data = ""
      print "hooked", gobject.io_add_watch(rpipe, gobject.IO_IN | gobject.IO_PRI, self.got_message, [new_data])

   def std_read_loop(self, wpipe, stop_event):
      while True:
         try:
            new_data = os.read(0, 1024)
            while len(new_data) > 0:
               l = os.write(wpipe, new_data)
               new_data = new_data[l:]
         except OSError, e:
            if stop_event.isSet():
               break
            time.sleep(0.1)
...

令人惊讶的是,如果我只是将相同的文本放入新管道中,一切都会开始工作。问题是:

  • 第一行根本没有“注意到” - 我只得到第二行和以下行
  • 丑陋

也许这会给其他人一个关于为什么会发生这种情况的线索?

4

3 回答 3

2

这听起来像是一种竞争条件,其中设置回调有一些延迟,或者环境发生变化会影响您是否可以设置回调。

在你打电话之前,我会仔细看看会发生什么io_add_watch()。例如 Python fcntl 文档说:

该模块中的所有函数都将文件描述符 fd 作为其第一个参数。这可以是一个整数文件描述符,例如由 sys.stdin.fileno() 返回,也可以是一个文件对象,例如 sys.stdin 本身,它提供了一个返回真正文件描述符的 fileno()。

显然,当您假设 STDIN 将具有 FD == 0 时,这不是您正在做的事情。我会先更改它然后再试一次。

另一件事是,如果 FD 已经被阻塞,那么您的进程可能正在等待,而其他非阻塞进程正在运行,因此根据您首先执行的操作存在时间差异。如果你重构 fcntl 的东西,让它在程序启动后很快完成,甚至在导入 GTK 模块之前,会发生什么?

我不确定我是否理解为什么使用 GTK GUI 的程序首先要从标准输入中读取。如果您实际上是在尝试捕获另一个进程的输出,则应该使用 subprocess 模块来设置管道,然后io_add_watch()在管道上像这样:

proc = subprocess.Popen(command, stdout = subprocess.PIPE)
gobject.io_add_watch(proc.stdout, glib.IO_IN, self.write_to_buffer )

同样,在这个例子中,我们确保在调用io_add_watch() 之前我们有一个有效的打开的 FD。

通常,当gobject.io_add_watch()被使用时,它会在之前被调用gobject.MainLoop()。例如,这里是一些io_add_watch用于捕获 IO_IN 的工作代码。

于 2009-10-25T19:27:24.343 回答
0

如果在任何 stderr 输出之前先挂钩回调会发生什么?当您启用调试输出时,它仍然会被调用吗?

另外,我想您可能应该反复调用os.read()您的处理程序,直到它没有提供任何数据,以防在调用之间准备好 >1024 个字节。

您是否尝试过select在后台线程中使用该模块来模拟gio功能?那样有用吗?这是什么平台,你在处理什么样的FD?(文件?套接字?管道?)

于 2009-10-28T17:39:19.757 回答
0

文档说您应该从回调中返回,TRUE否则它将从事件源列表中删除。

于 2009-10-19T11:29:38.930 回答