1

许多(> 1000)工人(流程)做了一些工作并希望将他们的工作结果保存在数据库中。工作结果是 JSON 对象。工人每秒产生 1-5 个 JSON 对象。数据库保护程序是独立的进程。用于将 JSON 对象从 worker 传输到 saver 的单向连接是 multiprocessing.Pipe。管道数量等于工人数量。

在保护程序过程中定期调用:

def recv_data(self):
    data = []
    for pipe in self.data_pipe_pool:
        if pipe.poll():
            data.append(pipe.recv())
    return data

self.data_pipe_pool - 来自工人的管道列表。

如果我运行约 100 名工人,一切正常。如果我运行 >1000 个工人,我会得到异常:

2013-02-13T15:17:40.731429
Traceback (most recent call last):
  File "saver.py", line 44, in run
    profile = self.poll_data()
  File "saver.py", line 116, in poll_data
    ret = self.recv_data()
  File "saver_unit.py", line 127, in recv_data
    if pipe.poll():
IOError: handle out of range in select()

我知道这是由于select()电话引起的,并且:

FD_SETSIZE 在 GNU/Linux 系统中通常定义为 1024

但是在哪里调用select?如果在pipe.poll(),为什么我超过 FD_SETSIZE 限制,我pipe.poll()单独调用 1 个管道?我在哪里可以通过此调用观看 python 语言源select

什么解决方法不超过FD_SETSIZE限制或不使用select

4

4 回答 4

2

如果您查看select手册页,您将看到:

使用 fd 为负数或等于或大于 FD_SETSIZE 的值执行 FD_CLR() 或 FD_SET() 将导致未定义的行为。

这意味着 ifselect在您的调用中在幕后使用poll(这似乎很可能),并且您的文件描述符大于FD_SETSIZE(如果您有超过 1000 个管道,则很可能)那么结果可以是任何东西。

于 2013-02-13T12:43:32.353 回答
0

你有没有想过使用beanstalkd之类的东西,因为听起来你有一个从工作人员到服务器的管道,而通常你会有 1000 个工作人员,然后有 10 个服务器从结果队列中读取以将它们存储在数据库中。

好处是,您的工作人员可能花费更多的时间来完成他们的工作,而不是与服务器交谈,但您已经为该服务器打开了一条管道。

你可以有一个“工作队列”,让你的 1000 名工人从中获取,还有一个他们存储结果的“结果队列”。此后,您可以让许多服务器从“结果队列”中获取存储在数据库中。

这是一个很长的说法“至少你不会用完文件句柄”。

于 2013-02-13T14:17:40.500 回答
0

我使用 epoll 解决了这个问题。解决方法很简单:

def set_data_pipe_poll(self, data_pipe_poll):
    self.epoll = select.epoll()
    for p in data_pipe_poll:
        self.epoll.register(p, select.EPOLLIN)
    self.data_pipe_poll = data_pipe_poll

def recv_data(self):
    data = []
    events = self.epoll.poll(timeout = 0)
    for fileno, _ in events:
        p = filter(lambda x: x.fileno() == fileno, self.data_pipe_poll)[0]
        data.append(p.recv())
    return data

当我打电话时,epoll.poll()我不打电话select

于 2013-03-20T13:58:05.117 回答
0

我有类似的问题。我为每个工作人员使用了 multiprocessing.Queue,当工作人员数量约为 600 时,队列方法以IOError: handle out of range in select().

我在官方错误跟踪器中发现了导致问题的问题它在 2.7.4 中修复(我有 2.7.3)。更新python包解决了这个问题。

于 2016-08-05T17:30:37.340 回答