我需要加快 Python 脚本的执行速度,该脚本以块的形式读取大型 CSV 文件,进行一些处理,然后将处理后的行保存到数据库中。处理 10,000 行然后持久化它们需要相当的时间(1.5 秒)。时代确实有一些波动,当然,有时处理速度更快,有时持续。
不幸的是,处理记录不容易并行化,因为处理是历史的(记录是股票交易,并且有基于先前活动的计算)。这是可能的,但对于这个问题,可以做的事情是并行处理一个块并保留前一个块的结果。这应该使总时间减半。
for chunk in pd.read_csv(filename, chunksize=chunksize):
# the following two tasks in parallel
persist (rows_from_previous_chunk) # this is I/O waiting, mostly
rows_to_save = process(chunk) # this is Python, not C
# wait for the above to finish
rows_from_previous_chunk = rows_to_save
我的问题是关于执行上述操作的推荐方法。我能想到几个:
鉴于一项任务主要是 I/O 等待,我有机会使用多线程而不会遇到 GIL 争用。
第二种选择是使用 Dask,特别是Delayed。但是,鉴于每项任务使用的时间很短(不到 2 秒),我不确定这是不是最好的方法。
第三种选择是让一个进程读取并处理行,然后通过有界队列将它们发送到一个单独的进程,该进程将保存到数据库。使用 JMS 队列有点矫枉过正,我在想
multiprocessing.Queue()
任何建议表示赞赏。我是一名资深的 Java 程序员,最近改用 Python 并学习使用 GIL,因此提出了这个问题。