概述:
我正在尝试为财务数据分析项目创建大量变量(宽数据集)。我有一个包含 140,000 行的 pandas 数据框“position_history”,每行包含一只股票和买卖日期/价格。
我有一个函数 create_domain 接受输入(股票、买入日期、卖出日期)和:
- 对我的 SQLite3 数据库进行查询,以在给定这些日期的情况下提取该股票的时间序列。
- 使用时间序列构造我的变量
我使用 df.apply 将函数 create_domain 应用于 position_history
当我按顺序运行我的代码时,构建变量需要大约 4 个小时,我想使用多个进程来加快这个速度,因为我必须这样做很多次并且可能需要更广泛的数据集。
对于多个进程,我将 position_history 垂直拆分为多个块,创建一个数据帧列表。我将此列表传递给 joblib(多处理)。我的代码几乎总是无限期挂起而不会引发任何错误(但有时会在小样本上运行)。
我的怀疑是我的工作进程试图同时读取同一个 SQL 表时出现了问题。
我尝试了以下补救措施:
在函数 create_domain 中打开了一个新连接(sqlalchemy.create_engine),因此每个工作进程都有自己的引擎/连接
遵循 sqlalchemy 文档(http://docs.sqlalchemy.org/en/latest/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork )
- 将 poolclass 更改为 NullPool,这会禁用一个引擎内的多个连接,强制引擎为每个事务打开和关闭一个新连接
- 以防万一,为每个孩子使用 engine.dispose() 以确保刷新所有引擎连接并获取新连接
- 注意 - 这些解决方案旨在强制 sqlite3 不跨线程共享任何连接,这是行不通的(SQLAlchemy 是否可以在多个 Python 进程之间共享 SQLite 数据库?)
在 joblib 下(https://pythonhosted.org/joblib/parallel.html)
- 尝试使用“线程”后端而不是“多处理”。这有效,但并没有大大加快代码速度,从阅读此处的线程(Multiprocessing vs Threading Python)来看这是有道理的,因为线程实际上不允许您使用多个cpu。
- 尝试使用 memmaping 技术,因为 pandas 中有一些 numpy;我不认为这对我来说是一个相关的修复
相关的stackoverflow条目:
- 我想使用多个进程,而不是讨论的多线程:
python multiple threaded processes for running executables;
Python sqlite3 和并发;
SQLite3 和多处理;*我不明白这个的最后一个答案
- 我只是想读,而不是写,所以我不确定是否需要管理 SQLite 锁定机制,如所讨论的: SQLite 适合并发读取?; 用 sqlite3 并发写
- 这篇文章说它理论上应该可以工作:sqlite3 concurrent access;注意——我认为 WAL 模式对我没有帮助,因为我只是在阅读。
(伪)代码片段
我对joblib的调用:
x = Parallel(n_jobs =4)(delayed(create_domain)(chunk, other inputs) for chunk in chunks)
# where each chunk is a portion of the position_history df
我的 create_domain 函数:
def create_domain (df=position_history, inputs):
# create vars using row x of position_history
f = lambda x: sql_query_and_create_vars(inputs, x['column'])
result = df.apply(f, axis=1)
return result
摘要:我的代码永远挂起,使内核崩溃,并且没有引发任何错误。我将不胜感激任何见解:
- 为什么会这样?
- 我该如何解决?
- 有没有更好的方法来做我想做的事情?我尽我所能对我的 SQL 查询进行矢量化和优化。
- 这是 SQLite3 的问题吗?像 MySQL 这样的东西会更好吗?
- 有小费吗; 我是编码/python/数据科学的新手。
详细信息:我在一个超级计算机集群上,运行 Linux,并在 Python 3.4.3 上使用 iPython。
这是我的第一个 stackoverflow 问题 - 提前为失礼道歉并感谢您的帮助!