-1

这是我第一次尝试在 python 中使用 asyncio。目标是将 40000+ htmls 转换为 jsons。使用同步 for 循环大约需要 3.5 分钟。我有兴趣看到使用 asyncio 的性能提升。我正在使用以下代码:

import glob
import json
from parsel import Selector
import asyncio
import aiofiles

async def read_html(path):
    async with aiofiles.open(path, 'r') as f:
        html = await f.read()
    return html


async def parse_job(path):
    html = await read_html(path)
    sel_obj = Selector(html)
    jobs = dict()
    jobs['some_var'] = sel_obj.xpath('some-xpath').get()
    return jobs


async def write_json(path):
    job = await parse_job(path)
    async with aiofiles.open(file_name.replace("html","json"), "w") as f:
        await f.write(job)


async def bulk_read_and_write(files):
    # this function is from realpython tutorial. 
    # I have little understanding of whats going on with gather()
    tasks = list()
    for file in files:
        tasks.append(write_json(file))
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    files = glob.glob("some_folder_path/*.html")
    asyncio.run(bulk_read_and_write(files))

运行几秒钟后,我收到以下错误。

Traceback (most recent call last):
  File "06_extract_jobs_async.py", line 84, in <module>
    asyncio.run(bulk_read_and_write(files))
  File "/anaconda3/envs/py37/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/anaconda3/envs/py37/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "06_extract_jobs_async.py", line 78, in bulk_read_and_write
    await asyncio.gather(*tasks)
  File "06_extract_jobs_async.py", line 68, in write_json
    job = await parse_job(path)
  File "06_extract_jobs_async.py", line 35, in parse_job
    html = await read_html(path)
  File "06_extract_jobs_async.py", line 29, in read_html
    async with aiofiles.open(path, 'r') as f:
  File "/anaconda3/envs/py37/lib/python3.7/site-packages/aiofiles/base.py", line 78, in __aenter__
    self._obj = yield from self._coro
  File "/anaconda3/envs/py37/lib/python3.7/site-packages/aiofiles/threadpool/__init__.py", line 35, in _open
    f = yield from loop.run_in_executor(executor, cb)
  File "/anaconda3/envs/py37/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
OSError: [Errno 24] Too many open files: '../html_output/jobs/6706538_478752_job.html'

这里发生了什么?提前致谢

4

2 回答 2

2

您正在尽可能快地进行异步调用,但是将文件写入磁盘的过程仍然是一项有效的同步任务。您的操作系统可以尝试一次执行多个写入,但有一个限制。通过尽可能快地生成异步任务,您可以一次获得大量结果,这意味着同时打开大量文件进行写入。正如您的错误所暗示的那样,有一个限制。

这里有很多关于使用 asyncio 限制并发的好线程,但最简单的解决方案可能是具有合理大小的asyncio-pool 。

于 2019-09-30T21:51:18.403 回答
2

尝试为并行任务的数量添加限制:

# ...rest of code unchanged

async def write_json(path, limiter):
    with limiter:
        job = await parse_job(path)
        async with aiofiles.open(file_name.replace("html","json"), "w") as f:
            await f.write(job)

async def bulk_read_and_write(files):
    limiter = asyncio.Semaphore(1000)
    tasks = []
    for file in files:
        tasks.append(write_json(file, limiter))
    await asyncio.gather(*tasks)
于 2019-10-01T13:14:52.580 回答