我正在处理数据(> 40MB)的大型文本文件,并且连续执行需要很多时间。我决定使用 python 3.5 多处理器包。当它起作用时,它会明显更快,但我得到的结果好坏参半。
我在两个系统上运行以下代码,第一个是非常规范的双 xeon 服务器,第二个是我的 X1 Carbon thinkpad。两者都在运行 ubuntu16.04 和普通的 python 3.5。
queue = Queue()
cpus = cpu_count()
def get_gauging_data_from_files(self,fileName,id):
global cpus
print("I:", id)
data_lines = getFileData(fileName)
pool = Pool(cpus)
block = getBlock(data_lines)
for i in range(len(data_lines)):
if data_lines[i].startswith("Axial Slice at"):
block.append([data_lines[i:(i+54)]])
i += 53
print("Line index got to:",i, "CpuCount:",cpus, "block size:", len(block))
pool.map(getBlockData,block)
block_data = []
i=0
while not queue.empty():
block_data.append(queue.get())
i+=1
print("Queue Count:",i)
raw_data = np.asarray(block_data,dtype=np.float64)
raw_data = raw_data[raw_data[:,0].argsort()]
data_axial = raw_data[:,0]
data_radial = raw_data[:,1:361]
print("data_axial size:", len(data_axial)," data_radial size:", len(data_radial))
print("O:", id)
return data_axial, data_radial
双至强提供以下输出。
I: True
Line index got to: 450586 CpuCount: 24 block size: 16686
Queue Count: 16686
data_axial size: 16686 data_radial size: 16686
O: True
16686 的块大小与 166886 的队列计数相匹配,这表明工作函数的每个参数都已被处理,并且相应的数据点已添加到队列中。
笔记本电脑在完全相同的文件/数据上运行,程序快速终止并给出以下输出:
I: True
Line index got to: 450586 CpuCount: 24 block size: 16686
Queue Count: 525
data_axial size: 525 data_radial size: 525
O: True
这表明由于某种原因,传递给 getBLockData 的 16686 个参数几乎没有被处理。
我希望笔记本电脑的运行速度比 Xeon 慢得多,但我也希望它已经处理了传递到池中的所有数据。我很困惑……有人能从经验中汲取经验吗?
worker 函数如下所示:
def getBlockData(B):
i=0
#print(len(B[0]))
queue_item = [[B[0][i].rstrip().split()[3]],
B[0][i+4].rstrip().split()[1:10],
B[0][i+5].rstrip().split()[1:10],
B[0][i+6].rstrip().split()[1:10],
B[0][i+7].rstrip().split()[1:10],
B[0][i+8].rstrip().split()[1:10],
B[0][i+9].rstrip().split()[1:10],
B[0][i+10].rstrip().split()[1:10],
B[0][i+11].rstrip().split()[1:10],
B[0][i+12].rstrip().split()[1:10],
B[0][i+13].rstrip().split()[1:10],
B[0][i+17].rstrip().split()[1:10],
B[0][i+18].rstrip().split()[1:10],
B[0][i+19].rstrip().split()[1:10],
B[0][i+20].rstrip().split()[1:10],
B[0][i+21].rstrip().split()[1:10],
B[0][i+22].rstrip().split()[1:10],
B[0][i+23].rstrip().split()[1:10],
B[0][i+24].rstrip().split()[1:10],
B[0][i+25].rstrip().split()[1:10],
B[0][i+26].rstrip().split()[1:10],
B[0][i+30].rstrip().split()[1:10],
B[0][i+31].rstrip().split()[1:10],
B[0][i+32].rstrip().split()[1:10],
B[0][i+33].rstrip().split()[1:10],
B[0][i+34].rstrip().split()[1:10],
B[0][i+35].rstrip().split()[1:10],
B[0][i+36].rstrip().split()[1:10],
B[0][i+37].rstrip().split()[1:10],
B[0][i+38].rstrip().split()[1:10],
B[0][i+39].rstrip().split()[1:10],
B[0][i+43].rstrip().split()[1:10],
B[0][i+44].rstrip().split()[1:10],
B[0][i+45].rstrip().split()[1:10],
B[0][i+46].rstrip().split()[1:10],
B[0][i+47].rstrip().split()[1:10],
B[0][i+48].rstrip().split()[1:10],
B[0][i+49].rstrip().split()[1:10],
B[0][i+50].rstrip().split()[1:10],
B[0][i+51].rstrip().split()[1:10],
B[0][i+52].rstrip().split()[1:10]
]
queue_item = [val for sublist in queue_item for val in sublist]
queue.put( queue_item )