8

我正在使用简单的线程模块来执行并发作业。现在我想利用并发期货模块。有人可以给我举一个使用队列和并发库的例子吗?

我收到 TypeError: 'Queue' object is not iterable 我不知道如何迭代队列

代码片段:

 def run(item):
      self.__log.info(str(item))
      return True
<queue filled here>

with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams = { executor.submit(run, item): item for item in list(queue)}
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)
4

2 回答 2

9

我会建议这样的事情:

def run(queue):
      item = queue.get()
      self.__log.info(str(item))
      return True
<queue filled here>
workerThreadsToStart = 10
with concurrent.futures.ThreadPoolExecutor(max_workers = 100) as executor:
        furtureIteams = { executor.submit(run, queue): index for intex in range(workerThreadsToStart)}
        for future in concurrent.futures.as_completed(furtureIteams):
            f = furtureIteams[future]
            print(f)

您将遇到的问题是队列被认为是无止境的,并且作为一种媒介来解耦将某些内容放入队列的线程和将项目从队列中取出的线程。

什么时候

  1. 您有有限数量的项目或
  2. 您一次计算所有项目

然后并行处理它们,队列没有意义。在这些情况下,ThreadPoolExecutor 会使队列过时。

我查看了 ThreadPoolExecutor 源代码:

def submit(self, fn, *args, **kwargs): # line 94
    self._work_queue.put(w) # line 102

里面使用了一个队列。

于 2013-06-05T20:23:27.590 回答
1

如上所述,您可以使用该iter()函数在队列对象上执行 ThreadPool。一个非常通用的代码如下所示:

with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(run, iter(queue.get, None))

run 方法在队列中的项目上执行期望的工作。

于 2021-01-27T14:45:38.360 回答