我是数据分析和 python 的新手,几乎没有 numpy、pytable、pandas 等方面的经验。
我正在将 csv 文件逐块读取到数据帧中,并将其附加到 HDFStore,因为整个数据无法放入内存中。在我的追加操作中,我通过 data_columns 参数传递了一个数据列列表,以便我可以通过 HDFStore.select 将条件传递给 where 参数来执行核外过滤。
附加表后是否可以指定数据列?
原因是每次写入时,逐块附加到 HDFStore 表的时间呈指数增长。我猜每次使用新数据对 HDFStore 进行追加时,都会重新计算数据列中的索引。如果数据完全放在存储中并且在创建后指定数据列以便只计算一次索引,则效率会高得多。
感谢您的想法和帮助。
更新 1
以下大致是我正在运行的代码。请不要我必须清理代码以避免不必要的披露,并且没有在以下 3 行代码中提供整个列列表,因为有 > 100 列:
我已经确定要从 csv 读取的 dtypes 并像这样声明它们:
csv_dt = {'COL1': dtype('O'), 'TAX': dtype('float64'), 'COL2': dtype('O'), ... }
以下是要写入的表的 dtype 声明。请注意,我添加了不在原始 csv 中的计算字段“REVENUE”。
t_dt = [('COL1', 'S20'), ('COL2', 'S3'), ('COL3', 'f8'), ('TAX', 'f8'), ('REVENUE','f8') ... ]
确定要传递给的每列的最大字符串长度min_itemsize
。这仅适用于具有dtype('O')
str_len = {'COL1': 3, 'COL2': 3, ...}
这是分块的代码:
f = HDFStore('FLN2.h5')
for num, chunk in enumerate(read_csv('FLN.csv',dtype=csv_dt, iterator=True, chunksize=250000, parse_dates=True)):
print str(now()),str(num), "Started new chunk"
chunk['MTH'] = to_datetime(chunk.MTH)
chunk.TAX = chunk.TAX.fillna(0)
chunk['REVENUE'] = chunk.NET - chunk.TAX
print str(now()),str(num), "Completed Revenue Calculation"
if num == 0:
f.append('df', chunk, dtype=t_dt, min_itemsize=str_len, dtype=t_dt, data_columns = ['DC1', 'DC2'], expectedrows=42000000)
else:
f.append('df', chunk)
print str(now()),str(num), "Completed writing chunk"
f.flush()
f.flush()
f.close()
昨天,当chunksize
读取 csv 的设置为 50000 时,此代码运行良好。但是,今天当我尝试运行代码时,出现以下错误:
Exception: tables cannot write this data -> rows parameter cannot be converted into a recarray object compliant with table '/df/table (Table(0,))
对于我的一生,我无法弄清楚发生了什么。
信息()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 250000 entries, 0 to 249999
Columns: 122 entries, COL1 to REVENUE
dtypes: datetime64[ns](1), float64(40), int64(13), object(68)
请注意,以上内容仅来自写入磁盘的一个块。