1

我有一个大文件,每行都有一条 JSON 记录。我正在编写一个脚本来通过 API 将这些记录的一个子集上传到 CouchDB,并尝试使用不同的方法来查看最快的方法。这是我发现工作速度最快到最慢的(在我的本地主机上的 CouchDB 实例上):

  1. 将每个需要的记录读入内存。在所有记录都在内存中之后,为每条记录生成一个上传协程,并一次收集/运行所有协程

  2. 同步读取文件,遇到需要的记录,同步上传

  3. 用于aiofiles读取文件,当遇到需要的记录时,异步更新

方法 #1 比其他两个快得多(大约快两倍)。我很困惑为什么方法 #2 比 #3 快,尤其是与这里的示例相比,异步运行所需的时间是同步运行的一半(未提供同步代码,必须自己重写)。是否是从文件 i/o 到 HTTP i/o 的上下文切换,尤其是文件读取比 API 上传更频繁的情况下?

为了进一步说明,这里有一些代表每种方法的 Python 伪代码:

方法 1 - 同步文件 IO、异步 HTTP IO

import json
import asyncio
import aiohttp

records = []
with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            records.append(record)

async def batch_upload(records):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for record in records:
            task = async_upload(record, session)
            tasks.append(task)  
        await asyncio.gather(*tasks)

asyncio.run(batch_upload(properties))

方法 2 - 同步文件 IO,同步 HTTP IO

import json

with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            sync_upload(record)

方法 3 - 异步文件 IO、异步 HTTP IO

import json
import asyncio
import aiohttp
import aiofiles

async def batch_upload()
    async with aiohttp.ClientSession() as session:
        async with open('records.txt', 'r') as record_file:
            line = await record_file.readline()
            while line:
                record = json.loads(line)
                if valid(record):
                    await async_upload(record, session)
                line = await record_file.readline()

asyncio.run(batch_upload())

我正在开发的文件大约是 1.3 GB,总共有 100000 条记录,其中我上传了 691 条。每次上传都以 GET 请求开始,以查看记录是否已存在于 CouchDB 中。如果是,则执行 PUT 以使用任何新信息更新 CouchDB 记录;如果没有,则将记录发布到数据库。因此,每次上传都包含两个 API 请求。出于开发目的,我只创建记录,所以我运行 GET 和 POST 请求,总共 1382 个 API 调用。

方法#1 大约需要17 秒,方法#2 大约需要33 秒,方法#3 大约需要42 秒。

4

1 回答 1

1

您的代码使用异步,但它同步完成工作,在这种情况下,它会比同步方法慢。如果没有有效地构建/使用,Asyc 将不会加快执行速度。

您可以创建 2 个协程并让它们并行运行。也许这样可以加快操作速度。

例子:

#!/usr/bin/env python3

import asyncio


async def upload(event, queue):
    # This logic is not so correct when it comes to shutdown,
    # but gives the idea
    while not event.is_set():
        record = await queue.get()
        print(f'uploading record : {record}')
    return


async def read(event, queue):
    # dummy logic : instead read here and populate the queue.
    for i in range(1, 10):
        await queue.put(i)
    # Initiate shutdown..
    event.set()


async def main():
    event = asyncio.Event()
    queue = asyncio.Queue()

    uploader = asyncio.create_task(upload(event, queue))
    reader = asyncio.create_task(read(event, queue))
    tasks = [uploader, reader]

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())
于 2019-08-20T18:10:01.013 回答