0

我正在处理数据(> 40MB)的大型文本文件,并且连续执行需要很多时间。我决定使用 python 3.5 多处理器包。当它起作用时,它会明显更快,但我得到的结果好坏参半。

我在两个系统上运行以下代码,第一个是非常规范的双 xeon 服务器,第二个是我的 X1 Carbon thinkpad。两者都在运行 ubuntu16.04 和普通的 python 3.5。

queue = Queue()
cpus = cpu_count()

def get_gauging_data_from_files(self,fileName,id):
    global cpus
    print("I:", id)
    data_lines = getFileData(fileName)
    pool = Pool(cpus)
    block = getBlock(data_lines)
    for i in range(len(data_lines)):
        if data_lines[i].startswith("Axial Slice at"):
            block.append([data_lines[i:(i+54)]])
            i += 53
    print("Line index got to:",i, "CpuCount:",cpus, "block size:", len(block))
    pool.map(getBlockData,block)
    block_data = []
    i=0
    while not queue.empty():
        block_data.append(queue.get())
        i+=1
    print("Queue Count:",i)
    raw_data = np.asarray(block_data,dtype=np.float64)
    raw_data = raw_data[raw_data[:,0].argsort()]
    data_axial = raw_data[:,0]
    data_radial = raw_data[:,1:361]
    print("data_axial size:", len(data_axial)," data_radial size:", len(data_radial))
    print("O:", id)
    return data_axial, data_radial

双至强提供以下输出。

I: True
Line index got to: 450586 CpuCount: 24 block size: 16686
Queue Count: 16686
data_axial size: 16686  data_radial size: 16686
O: True

16686 的块大小与 166886 的队列计数相匹配,这表明工作函数的每个参数都已被处理,并且相应的数据点已添加到队列中。

笔记本电脑在完全相同的文件/数据上运行,程序快速终止并给出以下输出:

I: True
Line index got to: 450586 CpuCount: 24 block size: 16686
Queue Count: 525
data_axial size: 525  data_radial size: 525
O: True

这表明由于某种原因,传递给 getBLockData 的 16686 个参数几乎没有被处理。

我希望笔记本电脑的运行速度比 Xeon 慢得多,但我也希望它已经处理了传递到池中的所有数据。我很困惑……有人能从经验中汲取经验吗?

worker 函数如下所示:

def getBlockData(B):
i=0
#print(len(B[0]))
queue_item = [[B[0][i].rstrip().split()[3]],
    B[0][i+4].rstrip().split()[1:10],
    B[0][i+5].rstrip().split()[1:10],
    B[0][i+6].rstrip().split()[1:10],
    B[0][i+7].rstrip().split()[1:10],
    B[0][i+8].rstrip().split()[1:10],
    B[0][i+9].rstrip().split()[1:10],
    B[0][i+10].rstrip().split()[1:10],
    B[0][i+11].rstrip().split()[1:10],
    B[0][i+12].rstrip().split()[1:10],
    B[0][i+13].rstrip().split()[1:10],

    B[0][i+17].rstrip().split()[1:10],
    B[0][i+18].rstrip().split()[1:10],
    B[0][i+19].rstrip().split()[1:10],
    B[0][i+20].rstrip().split()[1:10],
    B[0][i+21].rstrip().split()[1:10],
    B[0][i+22].rstrip().split()[1:10],
    B[0][i+23].rstrip().split()[1:10],
    B[0][i+24].rstrip().split()[1:10],
    B[0][i+25].rstrip().split()[1:10],
    B[0][i+26].rstrip().split()[1:10],

    B[0][i+30].rstrip().split()[1:10],
    B[0][i+31].rstrip().split()[1:10],
    B[0][i+32].rstrip().split()[1:10],
    B[0][i+33].rstrip().split()[1:10],
    B[0][i+34].rstrip().split()[1:10],
    B[0][i+35].rstrip().split()[1:10],
    B[0][i+36].rstrip().split()[1:10],
    B[0][i+37].rstrip().split()[1:10],
    B[0][i+38].rstrip().split()[1:10],
    B[0][i+39].rstrip().split()[1:10],

    B[0][i+43].rstrip().split()[1:10],
    B[0][i+44].rstrip().split()[1:10],
    B[0][i+45].rstrip().split()[1:10],
    B[0][i+46].rstrip().split()[1:10],
    B[0][i+47].rstrip().split()[1:10],
    B[0][i+48].rstrip().split()[1:10],
    B[0][i+49].rstrip().split()[1:10],
    B[0][i+50].rstrip().split()[1:10],
    B[0][i+51].rstrip().split()[1:10],
    B[0][i+52].rstrip().split()[1:10] 
]
queue_item = [val for sublist in queue_item for val in sublist]
queue.put( queue_item )
4

1 回答 1

0

下面的代码似乎对我有用,浏览了网上的帖子,我发现我缺少的是 pool.map 想要返回值的事实。我还添加了 pool.terminate() 因为要在代码终止后清理 python 进程。

笔记本电脑和至强都运行得很愉快......仍然不确定之前发生了什么......

def getBlockData(B):
    i=0
    queue_item = [[B[0][i].rstrip().split()[3]],
    B[0][i+4].rstrip().split()[1:10],
    B[0][i+5].rstrip().split()[1:10],
    B[0][i+6].rstrip().split()[1:10],
    B[0][i+7].rstrip().split()[1:10],
    B[0][i+8].rstrip().split()[1:10],
    B[0][i+9].rstrip().split()[1:10],
    B[0][i+10].rstrip().split()[1:10],
    B[0][i+11].rstrip().split()[1:10],
    B[0][i+12].rstrip().split()[1:10],
    B[0][i+13].rstrip().split()[1:10],

    B[0][i+17].rstrip().split()[1:10],
    B[0][i+18].rstrip().split()[1:10],
    B[0][i+19].rstrip().split()[1:10],
    B[0][i+20].rstrip().split()[1:10],
    B[0][i+21].rstrip().split()[1:10],
    B[0][i+22].rstrip().split()[1:10],
    B[0][i+23].rstrip().split()[1:10],
    B[0][i+24].rstrip().split()[1:10],
    B[0][i+25].rstrip().split()[1:10],
    B[0][i+26].rstrip().split()[1:10],

    B[0][i+30].rstrip().split()[1:10],
    B[0][i+31].rstrip().split()[1:10],
    B[0][i+32].rstrip().split()[1:10],
    B[0][i+33].rstrip().split()[1:10],
    B[0][i+34].rstrip().split()[1:10],
    B[0][i+35].rstrip().split()[1:10],
    B[0][i+36].rstrip().split()[1:10],
    B[0][i+37].rstrip().split()[1:10],
    B[0][i+38].rstrip().split()[1:10],
    B[0][i+39].rstrip().split()[1:10],

    B[0][i+43].rstrip().split()[1:10],
    B[0][i+44].rstrip().split()[1:10],
    B[0][i+45].rstrip().split()[1:10],
    B[0][i+46].rstrip().split()[1:10],
    B[0][i+47].rstrip().split()[1:10],
    B[0][i+48].rstrip().split()[1:10],
    B[0][i+49].rstrip().split()[1:10],
    B[0][i+50].rstrip().split()[1:10],
    B[0][i+51].rstrip().split()[1:10],
    B[0][i+52].rstrip().split()[1:10] 
    ]
    queue_item = [val for sublist in queue_item for val in sublist]
    return queue_item

def get_gauging_data_from_files(self,fileName):
    data_lines = getFileData(fileName)
    pool = Pool()
    block = getBlock(data_lines)
    block_data = []
    for i in range(len(data_lines)):
        if data_lines[i].startswith("Axial Slice at"):
            block.append([data_lines[i:(i+54)]])
            i += 53
    print("Line index got to:",i, "block size:", len(block))            
    block_data.append(pool.map(getBlockData,block))
    pool.terminate()
    raw_data = np.asarray(block_data[0], dtype=np.float64)
    raw_data = raw_data[raw_data[:,0].argsort()]
    data_axial = raw_data[:,0]
    data_radial = raw_data[:,1:361]
    print("data_axial size:", len(data_axial)," data_radial size:", len(data_radial))
    return data_axial, data_radial
于 2017-01-19T23:54:14.043 回答