0

我需要加快 Python 脚本的执行速度,该脚本以块的形式读取大型 CSV 文件,进行一些处理,然后将处理后的行保存到数据库中。处理 10,000 行然后持久化它们需要相当的时间(1.5 秒)。时代确实有一些波动,当然,有时处理速度更快,有时持续。

不幸的是,处理记录不容易并行化,因为处理是历史的(记录是股票交易,并且有基于先前活动的计算)。这是可能的,但对于这个问题,可以做的事情是并行处理一个块并保留前一个块的结果。这应该使总时间减半。

for chunk in pd.read_csv(filename, chunksize=chunksize):
    # the following two tasks in parallel
    persist (rows_from_previous_chunk) # this is I/O waiting, mostly
    rows_to_save = process(chunk)      # this is Python, not C
    # wait for the above to finish
    rows_from_previous_chunk = rows_to_save

我的问题是关于执行上述操作的推荐方法。我能想到几个:

  1. 鉴于一项任务主要是 I/O 等待,我有机会使用多线程而不会遇到 GIL 争用。

  2. 第二种选择是使用 Dask,特别是Delayed。但是,鉴于每项任务使用的时间很短(不到 2 秒),我不确定这是不是最好的方法。

  3. 第三种选择是让一个进程读取并处理行,然后通过有界队列将它们发送到一个单独的进程,该进程将保存到数据库。使用 JMS 队列有点矫枉过正,我在想multiprocessing.Queue()

任何建议表示赞赏。我是一名资深的 Java 程序员,最近改用 Python 并学习使用 GIL,因此提出了这个问题。

4

3 回答 3

1

Dask 确实增加了开销,但与典型的 2 秒任务时间相比非常小。为了保持顺序,您可以让每个任务依赖于前一个任务。这是一个尝试

@dask.delayed
def process_save(rows_from_previous_chunk, chunk):
    if rows_from_previous_chunk:
        persist(rows_from_previous_chunk)
    return process(chunk)

parts = dd.read_csv(filename, chunksize=chunksize).to_delayed()

prev = None
for chunk in parts:
    prev = process_save(prev, chunk)
out = dask.delayed(persist)(prev)
dask.compute(out)

out.visualize()  # should look interesting
于 2019-05-03T03:44:54.580 回答
0

我最终采用了以下方法。有趣的是,使用多线程并没有按预期工作。将数据帧传递到另一个队列进行保存仍然阻止主线程继续工作。不是 100% 确定发生了什么,但为了节省时间,我转而使用流程并且它有效。为了清楚起见,下面的代码稍微简化了一点,实际上我使用了多个数据库工作进程。

import multiprocessing

# this function will run into a separate process, saving the df asynchronously
def save(queue):
    db_engine = create_engine(...)
    while True:
        df  = queue.get()
        if df is None:
            break
        df.to_sql(schema="...", name="...", con=db_engine, if_exists="append", chunksize=1000, index=False)
        queue.task_done()

if __name__ == '__main__':

    queue = multiprocessing.JoinableQueue(maxsize=2) 
    worker = multiprocessing.Process(name="db_worker", target=save, args=(queue,))
    worker.daemon = True
    workers.start()

    # inside the main loop to process the df
        queue.put(df_to_save)

    # at the end 
    worker.join()  # wait for the last save job to finish before terminating the main process
于 2019-05-08T20:53:22.257 回答
0

这可能取决于您的数据库,但如果存在,最简单的方法可能是使用异步库(如aiomysqlasyncpg)来允许您在后台执行插入查询。

I/O 绑定部分可以在不需要 GIL 锁的情况下执行,因此您的 Python 部分代码将能够继续。

于 2019-05-03T02:37:12.890 回答