所以我使用 Dask 来存储大量数据。我们每天获得大约 5000 万行新数据。没有多少列宽。我目前使用 ddf.to_parquet(long_term_storage_directory) 存储数据。当我获得新数据时,我将其附加到 long_term_storage_directory 目录中。一切正常,但速度很慢。
正在使用的索引是我希望当我添加数据时它会简单地添加到 long_term_storage_directory 中长长的镶木地板文件列表中的时间。(long_term_storage_directory 也是同一时间字段的索引)我担心我采用的方法在某些方面存在缺陷。也许我需要使用火花或其他东西来存储数据?
注意:ddf_new_data 的索引与 ddf_long_term_storage_directory 中使用的索引相同。我希望由于新数据的索引与当前在 long_term_storage_directory 中的索引相同,因此将数据添加到长期数据存储会更快。
ddf_long_term_storage_directory = dd.read_parquet(path=long_term_storage_directory, engine='pyarrow')
ddf_new_data = dd.read_parquet(path=directory_to_add_to_long_term_storage, engine='pyarrow')
ddf_new_data = ddf_new_data.set_index(index_name, sorted=False, drop=True)
ddf = dd.concat([ddf_long_term_storage_directory, ddf_new_data], axis=0)
ddf = ddf.repartition(partition_size='200MB') #??? Do I need to do this every time I add new data
ddf.to_parquet(long_term_storage_directory)