4

我有一个 Python 2.7 程序,它从网站中提取数据并将结果转储到数据库中。它遵循消费者生产者模型,并使用线程模块编写。

只是为了好玩,我想使用新的asyncio模块(从 3.4 开始)重写这个程序,但我不知道如何正确地做到这一点。

最关键的要求是程序必须按顺序从同一网站获取数据。例如,对于 url ' http://a-restaurant.com ',它应该首先得到' http://a-restaurant.com/menu/0 ',然后是 ' http://a-restaurant.com/menu/ 1 ',然后' http://a-restaurant.com/menu/2 ',...如果它们没有被提取以使网站完全停止提供页面,那么您必须从 0 开始。

然而,另一个网站( ' http://another-restaurant.com ' )的另一个获取可以(并且应该)同时运行(其他网站也有连续限制)。

线程模块非常适合这个,因为我可以为每个网站创建单独的线程,并且在每个线程中它可以等待一个页面完成加载,然后再获取另一个页面。

这是线程版本(Python 2.7)的一个非常简化的代码片段:

class FetchThread(threading.Threading)
    def __init__(self, queue, url)
        self.queue = queue
        self.baseurl = url
    ...
    def run(self)
        # Get 10 menu pages in a sequantial order
        for food in range(10):
            url = self.baseurl + '/' + str(food)
            text = urllib2.urlopen(url).read()
            self.queue.put(text)
            ...
def main()
    queue = Queue.Queue()
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
    for url in urls:
        fetcher = FetchThread(queue, url)
        fetcher.start()
        ...

以下是我尝试使用 asyncio 的方法(在 3.4.1 中):

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)


l = []
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
for url in urls:
    for food in range(10):
        menu_url = url + '/' + str(food)
        l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))

它以非顺序的顺序获取和打印所有内容。好吧,我想这就是这些协程的全部想法。我不应该使用 aiohttp 而只使用 urllib 获取吗?但是第一家餐厅的抓取是否会阻止其他餐厅的抓取?我只是在想这完全错误吗?(这只是一个尝试按顺序获取东西的测试。还没有到队列部分。)

4

2 回答 2

5

您当前的代码适用于不关心请求顺序排序的餐厅。菜单的所有十个请求将同时运行,并在完成后立即打印到标准输出。

显然,这不适用于需要顺序请求的餐厅。您需要进行一些重构才能使其正常工作:

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)

@syncio.coroutine
def print_pages_sequential(url, num_pages):
    for food in range(num_pages):
        menu_url = url + '/' + str(food)
        yield from print_page(menu_url)

l = [print_pages_sequential('http://a-restaurant.com/menu', 10)]

conc_url = 'http://another-restaurant.com/menu'
for food in range(10):
    menu_url = conc_url + '/' + str(food)
    l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))

我们没有将连续餐厅的所有 10 个请求添加到列表中,而是将一个协程添加到列表中,它将依次遍历所有 10 个页面。它的工作方式是在请求完成之前yield from print_page停止执行,但它不会阻塞任何其他同时运行的协程(就像您附加到的所有调用一样)。print_pages_sequentialprint_pageprint_pagel

通过这种方式,您的所有“another-restaurant”请求可以完全同时运行,就像您想要的那样,并且您的“a-restaurant”请求将按顺序运行,但不会阻塞任何“another-restaurant”请求。

编辑:

如果所有站点都具有相同的顺序获取要求,则可以进一步简化逻辑:

l = []
urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"]
for url in urls:
    menu_url = url + '/' + str(food)
    l.append(print_page_sequential(menu_url, 10))

loop.run_until_complete(asyncio.wait(l))
于 2014-06-16T15:19:19.990 回答
2

asyncio.Taskasyncio世界 threading.Thread中的替代品。还会创建新任务。asyncio.async

asyncio.gather等待几个协程是非常方便的方式,我更喜欢它而不是asyncio.wait.

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)

@asyncio.coroutine
def process_restaurant(url):
    for food in range(10):
        menu_url = url + '/' + str(food)
        yield from print_page(menu_url)

urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
coros = []
for url in urls:
    coros.append(asyncio.Task(process_restaurant(url)))

loop.run_until_complete(asyncio.gather(*coros))
于 2014-06-17T08:46:24.897 回答