有没有办法并行调用 read_table()?在我的情况下,由于日期解析,它受 CPU 限制。我看不出有任何方法可以通过阅读文档来实现这一目标。唯一想到的是拆分输入文件,并行调用 read_table,然后连接数据帧。
问问题
1448 次
1 回答
1
这将并行读取 CSV 文件并将它们连接起来。恼人的一点是它不能处理numpy
类型,所以它不能解析日期。我一直在努力解决同样的问题,但到目前为止,诸如此类的库似乎execnet
无法处理非内置类型。这就是为什么我在发送之前将 DataFrames 转换为json
。它将类型剥离为基本的 Python 类型。
编辑:如果您需要解析日期,也许更明智的方法是远程读取CSV
文件,解析日期并将它们保存pickle
到硬盘驱动器。然后你可以在主进程中读取泡菜文件并将它们连接起来。我还没有尝试过,看看它是否会导致性能提升。
remote_read_csv.py
import cPickle as pickle
if __name__ == '__channelexec__':
reader = pickle.loads(channel.receive())
for filename in channel:
channel.send(reader(filename).to_json())
下面这个使用了上面的模块。我在 IPython 中对其进行了测试。
from pandas import DataFrame, concat, read_csv, read_json
from numpy import random
import execnet
import remote_read_csv
import cPickle as pickle
import itertools
import psutil
### Create dummy data and save to CSV
def rdf():
return DataFrame((random.rand(4, 3) * 100).astype(int))
d1 = rdf()
d2 = rdf()
d3 = rdf()
dfsl = [d1, d2, d3]
names = 'd1.csv d2.csv d3.csv'.split()
for i in range(3):
dfsl[i].to_csv(names[i])
### Read CSV files in separate threads then concatenate
reader = pickle.dumps(read_csv)
def set_gateways(remote_module, *channel_sends):
gateways = []
channels = []
for i in range(psutil.NUM_CPUS):
gateways.append(execnet.makegateway())
channels.append(gateways[i].remote_exec(remote_module))
for send in channel_sends:
channels[i].send(send)
return (gateways, channels)
def para_read(names):
gateways, channels = set_gateways(remote_read_csv, reader)
mch = execnet.MultiChannel(channels)
queue = mch.make_receive_queue()
channel_ring = itertools.cycle(mch)
for f in names:
channel = channel_ring.next()
channel.send(f)
dfs = []
for i in range(len(names)):
channel, df = queue.get()
dfs.append(df)
[gw.exit() for gw in gateways]
return concat([read_json(i) for i in dfs], keys=names)
para_read(names)
于 2014-05-02T04:56:21.127 回答