我正在尝试实现一种多处理方法来读取和比较两个 csv 文件。为了让我开始,我从embarassingly parallel questions中的代码示例开始,它对文件中的整数求和。问题是该示例不会为我运行。(我在 Windows 上运行 Python 2.6。)
我收到以下 EOF 错误:
File "C:\Python26\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
在这一行:
self.pin.start()
我发现一些示例表明问题可能是 csv 打开方法需要为“rb”。我试过了,但这也不起作用。
然后我尝试简化代码以在最基本的层面重现错误。我在同一行上遇到了同样的错误。即使我简化了 parse_input_csv 函数甚至不读取文件。(不确定如果文件没有被读取,EOF 是如何触发的?)
import csv
import multiprocessing
class CSVWorker(object):
def __init__(self, infile, outfile):
#self.infile = open(infile)
self.infile = open(infile, 'rb') #try rb for Windows
self.in_csvfile = csv.reader(self.infile)
self.inq = multiprocessing.Queue()
self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
self.pin.start()
self.pin.join()
self.infile.close()
def parse_input_csv(self):
# for i, row in enumerate(self.in_csvfile):
# self.inq.put( (i, row) )
# for row in self.in_csvfile:
# print row
# #self.inq.put( row )
print 'yup'
if __name__ == '__main__':
c = CSVWorker('random_ints.csv', 'random_ints_sums.csv')
print 'done'
最后,我试着把它全部拉到一个班级之外。如果我不遍历 csv,这将有效,但如果我这样做,则会给出相同的错误。
def manualCSVworker(infile, outfile):
f = open(infile, 'rb')
in_csvfile = csv.reader(f)
inq = multiprocessing.Queue()
# this works (no reading csv file)
pin = multiprocessing.Process(target=manual_parse_input_csv, args=(in_csvfile,))
# this does not work (tries to read csv, and fails with EOFError)
#pin = multiprocessing.Process(target=print_yup, args=())
pin.start()
pin.join()
f.close()
def print_yup():
print 'yup'
def manual_parse_input_csv(csvReader):
for row in csvReader:
print row
if __name__ == '__main__':
manualCSVworker('random_ints.csv', 'random_ints_sums.csv')
print 'done'
有人可以帮我找出这里的问题吗?
编辑:只是想我会发布工作代码。我最终放弃了 Class 实现。正如 Tim Peters 所建议的,我只传递文件名(而不是打开的文件)。
在 500 万行 x 2 列上,我注意到 2 个处理器比 1 个处理器的时间改进了大约 20%。我预计会更多,但我认为问题在于排队的额外开销。根据这个线程,一个改进可能是在 100 个或更多的块中(而不是每行)对记录进行排队。
import csv
import multiprocessing
from datetime import datetime
NUM_PROCS = multiprocessing.cpu_count()
def main(numprocsrequested, infile, outfile):
inq = multiprocessing.Queue()
outq = multiprocessing.Queue()
numprocs = min(numprocsrequested, NUM_PROCS)
pin = multiprocessing.Process(target=parse_input_csv, args=(infile,numprocs,inq,))
pout = multiprocessing.Process(target=write_output_csv, args=(outfile,numprocs,outq,))
ps = [ multiprocessing.Process(target=sum_row, args=(inq,outq,)) for i in range(numprocs)]
pin.start()
pout.start()
for p in ps:
p.start()
pin.join()
i = 0
for p in ps:
p.join()
#print "Done", i
i += 1
pout.join()
def parse_input_csv(infile, numprocs, inq):
"""Parses the input CSV and yields tuples with the index of the row
as the first element, and the integers of the row as the second
element.
The index is zero-index based.
The data is then sent over inqueue for the workers to do their
thing. At the end the input thread sends a 'STOP' message for each
worker.
"""
f = open(infile, 'rb')
in_csvfile = csv.reader(f)
for i, row in enumerate(in_csvfile):
row = [ int(entry) for entry in row ]
inq.put( (i,row) )
for i in range(numprocs):
inq.put("STOP")
f.close()
def sum_row(inq, outq):
"""
Workers. Consume inq and produce answers on outq
"""
tot = 0
for i, row in iter(inq.get, "STOP"):
outq.put( (i, sum(row)) )
outq.put("STOP")
def write_output_csv(outfile, numprocs, outq):
"""
Open outgoing csv file then start reading outq for answers
Since I chose to make sure output was synchronized to the input there
is some extra goodies to do that.
Obviously your input has the original row number so this is not
required.
"""
cur = 0
stop = 0
buffer = {}
# For some reason csv.writer works badly across threads so open/close
# and use it all in the same thread or else you'll have the last
# several rows missing
f = open(outfile, 'wb')
out_csvfile = csv.writer(f)
#Keep running until we see numprocs STOP messages
for works in range(numprocs):
for i, val in iter(outq.get, "STOP"):
# verify rows are in order, if not save in buffer
if i != cur:
buffer[i] = val
else:
#if yes are write it out and make sure no waiting rows exist
out_csvfile.writerow( [i, val] )
cur += 1
while cur in buffer:
out_csvfile.writerow([ cur, buffer[cur] ])
del buffer[cur]
cur += 1
f.close()
if __name__ == '__main__':
startTime = datetime.now()
main(4, 'random_ints.csv', 'random_ints_sums.csv')
print 'done'
print(datetime.now()-startTime)