我有一个大文件,每行都有一条 JSON 记录。我正在编写一个脚本来通过 API 将这些记录的一个子集上传到 CouchDB,并尝试使用不同的方法来查看最快的方法。这是我发现工作速度最快到最慢的(在我的本地主机上的 CouchDB 实例上):
将每个需要的记录读入内存。在所有记录都在内存中之后,为每条记录生成一个上传协程,并一次收集/运行所有协程
同步读取文件,遇到需要的记录,同步上传
用于
aiofiles
读取文件,当遇到需要的记录时,异步更新
方法 #1 比其他两个快得多(大约快两倍)。我很困惑为什么方法 #2 比 #3 快,尤其是与这里的示例相比,异步运行所需的时间是同步运行的一半(未提供同步代码,必须自己重写)。是否是从文件 i/o 到 HTTP i/o 的上下文切换,尤其是文件读取比 API 上传更频繁的情况下?
为了进一步说明,这里有一些代表每种方法的 Python 伪代码:
方法 1 - 同步文件 IO、异步 HTTP IO
import json
import asyncio
import aiohttp
records = []
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
records.append(record)
async def batch_upload(records):
async with aiohttp.ClientSession() as session:
tasks = []
for record in records:
task = async_upload(record, session)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(batch_upload(properties))
方法 2 - 同步文件 IO,同步 HTTP IO
import json
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
sync_upload(record)
方法 3 - 异步文件 IO、异步 HTTP IO
import json
import asyncio
import aiohttp
import aiofiles
async def batch_upload()
async with aiohttp.ClientSession() as session:
async with open('records.txt', 'r') as record_file:
line = await record_file.readline()
while line:
record = json.loads(line)
if valid(record):
await async_upload(record, session)
line = await record_file.readline()
asyncio.run(batch_upload())
我正在开发的文件大约是 1.3 GB,总共有 100000 条记录,其中我上传了 691 条。每次上传都以 GET 请求开始,以查看记录是否已存在于 CouchDB 中。如果是,则执行 PUT 以使用任何新信息更新 CouchDB 记录;如果没有,则将记录发布到数据库。因此,每次上传都包含两个 API 请求。出于开发目的,我只创建记录,所以我运行 GET 和 POST 请求,总共 1382 个 API 调用。
方法#1 大约需要17 秒,方法#2 大约需要33 秒,方法#3 大约需要42 秒。