1

eventlet 和 gevent 的文档都有几个关于如何异步生成 IO 任务并在后面获得结果的示例。但到目前为止,所有应该从异步调用返回值的示例,我总是在所有对spawn(). 无论join()是 , joinall(), wait(), waitall()。这假设调用使用 IO 的函数是立即的,我们可以直接跳到我们正在等待结果的地方。

但在我的情况下,我想从一个可能很慢和/或任意大甚至无限的生成器中获得工作。

我显然不能这样做

pile = eventlet.GreenPile(pool)
for url in mybiggenerator():
    pile.spawn(fetch_title, url)
titles = '\n'.join(pile)

因为mybiggenerator()可能需要很长时间才能用完。因此,我必须在仍在产生异步调用的同时开始使用结果。

这可能通常是通过队列资源完成的,但我不太确定如何。假设我创建了一个队列来保存作业,从一个名为 P 的 greenlet 推送一堆作业,然后从另一个 greenlet C 弹出它们。在 C 中,如果我发现队列是空的,我怎么知道 P 是否推送了每个作业它必须推动还是只是在迭代的中间?

或者,Eventlet 允许我循环 apile以获取返回值,但是我可以在不产生所有必须产生的作业的情况下开始这样做吗?如何?这将是一个更简单的选择。

4

4 回答 4

0

默认情况下,您不需要任何池或桩。它们只是实现特定策略的便捷包装器。首先,您应该了解您的代码在所有情况下必须如何准确地工作,即:何时以及为何启动另一个绿色线程,何时以及为何等待某事。

当您对这些问题有一些答案而对其他问题有疑问时,请离开。同时,这是一个处理无限“生成器”(实际上是一个队列)的原型。

queue = eventlet.queue.Queue(10000)
wait = eventlet.semaphore.CappedSemaphore(1000)


def fetch(url):
  # httplib2.Http().request
  # or requests.get
  # or urllib.urlopen
  # or whatever API you like
  return response


def crawl(url):
  with wait:
    response = fetch(url)
    links = parse(response)
    for url in link:
      queue.put(url)


def spawn_crawl_next():
  try:
    url = queue.get(block=False)
  except eventlet.queue.Empty:
    return False
  # use another CappedSemaphore here to limit number of outstanding connections
  eventlet.spawn(crawl, url)
  return True


def crawler():
  while True:
    if spawn_crawl_next():
      continue

    while wait.balance != 0:
      eventlet.sleep(1)

    # if last spawned `crawl` enqueued more links -- process them
    if not spawn_crawl_next():
      break


def main():
  queue.put('http://initial-url')
  crawler()
于 2014-01-06T10:13:37.543 回答
0

回复:“来自 Python3 的 concurrent.futures 并不真正适用于“eventlet 或 gevent”部分。”

其实可以结合eventlet来将concurrent.futures ThreadPoolExecutor部署为GreenThread executor。

见:https ://github.com/zopefiend/green-concurrent.futures-with-eventlet/commit/aed3b9f17ac27eeaf8c56210e0c8e4aff2ecbdb5

于 2019-10-28T20:18:37.423 回答
0

我遇到了同样的问题,而且很难找到任何答案。

我想我设法通过让消费者在单独的线程上运行并Event用于同步来使某些东西正常工作。似乎工作正常。

唯一需要注意的是,你必须小心猴子修补。如果您使用猴子补丁线程设施,这可能无法正常工作。

import gevent
import gevent.queue
import threading
import time


q = gevent.queue.JoinableQueue()
queue_not_empty = threading.Event()


def run_task(task):
    print(f"Started task {task} @ {time.time()}")
    # Use whatever has been monkey-patched with gevent here
    gevent.sleep(1)
    print(f"Finished task {task} @ {time.time()}")


def consumer():
    while True:
        print("Waiting for item in queue")
        queue_not_empty.wait()
        try:
            task = q.get()
            print(f"Dequed task {task} for consumption @ {time.time()}")
        except gevent.exceptions.LoopExit:
            queue_not_empty.clear()
            continue
        try:
            gevent.spawn(run_task, task)
        finally:
            q.task_done()
        gevent.sleep(0)  # Kickstart task


def enqueue(item):
    q.put(item)
    queue_not_empty.set()


# Run consumer on separate thread
consumer_thread = threading.Thread(target=consumer, daemon=True)
consumer_thread.start()

# Add some tasks
for i in range(5):
    enqueue(i)
time.sleep(2)

输出:

Waiting for item in queue
Dequed task 0 for consumption @ 1643232632.0220542
Started task 0 @ 1643232632.0222237
Waiting for item in queue
Dequed task 1 for consumption @ 1643232632.0222733
Started task 1 @ 1643232632.0222948
Waiting for item in queue
Dequed task 2 for consumption @ 1643232632.022315
Started task 2 @ 1643232632.02233
Waiting for item in queue
Dequed task 3 for consumption @ 1643232632.0223525
Started task 3 @ 1643232632.0223687
Waiting for item in queue
Dequed task 4 for consumption @ 1643232632.022386
Started task 4 @ 1643232632.0224123
Waiting for item in queue
Finished task 0 @ 1643232633.0235817
Finished task 1 @ 1643232633.0236874
Finished task 2 @ 1643232633.0237293
Finished task 3 @ 1643232633.0237558
Finished task 4 @ 1643232633.0237799
Waiting for item in queue
于 2022-01-26T21:34:42.097 回答
-1

使用 Py3k 中的新concurrent.futures模块,我想说(假设您想要做的处理实际上比 更复杂join):

with concurrent.futures.ThreadPoolExecutor(max_workers=foo) as wp:
    res = [wp.submit(fetchtitle, url) for url in mybiggenerator()]
ans = '\n'.join([a for a in concurrent.futures.as_completed(res)]

这将允许您在所有fetchtitle调用完成之前开始处理结果。但是,它需要您mybiggenerator在继续之前筋疲力尽——目前尚不清楚您想如何解决这个问题,除非您想设置一些max_urls参数或类似的参数。不过,这仍然是您可以使用原始实现执行的操作。

于 2014-01-04T16:01:06.170 回答