0

我有一个有 N 个消费者的生产者设置。

生产者侦听接收大量 TCP 消息(每分钟 10,000 条)的套接字,读取此数据并将其放入工作人员的队列中。

我设置的从队列中读取的工作人员如下:

iterations = 0
work_iterations = 0
while True:
  try:
    iterations += 1
    data = queue.get(block=False)
    work_iterations +=1
    do_work(data)
  except Queue.Empty:
    time.sleep(0.001) #avoid high CPU usage


  if iterations == 100:
    load = float(work_iterations/iterations)
    print load
    iterations = 0
    work_iterations = 0

这是简化的代码,但您可以看到我正在尝试查看工作负载,但查看工作人员实际上能够从队列中拉出工作的 100 次迭代中有多少次。如果负载始终为 100/100,那么我知道生产者/消费者队列正在积压。理论上这应该有效。

我在输出中看到很多 0.97、0.99 和很少的 1.0。但是队列确实会在几分钟内填满(它的大小限制为 10,000),我必须开始在 Producer 端删除数据。任何人都可以阐明为什么会这样吗?如果工作进程平均进行 97/100 次迭代,这意味着队列应该接近空,不是吗?

4

2 回答 2

0

当您调用 queue.get(block=False) 时,即使队列实际上不是空的,也可能会引发 Queue.Empty。如果您当前的进程无法获取访问队列的锁,则无论队列中实际有多少项目,都会引发 Queue.Empty。

快速查看 multiprocessing/queues.py 中的 Queue.get() 代码:

126    if not self._rlock.acquire(block, timeout):
127        raise Empty

请注意,在引发异常之前,不会检查队列实际上有多满。由于您有这么多信息被排队,我怀疑几次 Queue.Empty 被提出,它实际上是由于生产者在排队时持有锁,导致您的工作人员尝试访问队列失败。

您可以通过对代码稍作更改来检查这一点:

except Queue.Empty:
    print queue.qsize() # returns the approximate number of elements in the queue

正如文档所说,这个数字并不完全可靠。但是,由于您正在处理队列中如此大量的项目,它应该足够接近来告诉您队列是接近 0 还是接近 10,000。

于 2014-07-14T21:04:32.583 回答
-1

如果您删除 block=Flase 和 time.sleep() 怎么办?您将无法计算工人。

于 2013-07-29T16:39:23.597 回答