我在 Sun Grid Engine 超级计算集群上运行 Python 脚本,它读取文件 id 列表,将每个文件发送到工作进程进行分析,并将每个输入文件的一个输出写入磁盘。
问题是我在工作函数内部的某个地方收到 IOError(110, 'Connection timed out') ,我不知道为什么。我过去在发出严重延迟的网络请求时收到此错误,但在这种情况下,工作人员只是试图从磁盘读取数据。
我的问题是:从磁盘读取时会导致连接超时错误,如何解决此错误?其他人可以提供的任何帮助将不胜感激。
完整脚本(IOError 出现在 中minhash_text()
):
from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config
cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4
def minhash_text(args):
'''Return a list of hashband strings for an input doc'''
try:
file_id, path = args
with codecs.open(path, 'r', 'utf8') as f:
f = f.read()
all_hashbands = []
for window_idx, window in enumerate(ngrams(f.split(), window_len)):
window_hashbands = []
if window_idx % step != 0:
continue
minhash = MinHash(num_perm=permutations, seed=1)
for ngram in set(ngrams(' '.join(window), 3)):
minhash.update( ''.join(ngram).encode('utf8') )
hashband_vals = []
for i in minhash.hashvalues:
hashband_vals.append(i)
if len(hashband_vals) == hashband_len:
window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
hashband_vals = []
all_hashbands.append(window_hashbands)
return {'file_id': file_id, 'hashbands': all_hashbands}
except Exception as exc:
print(' ! error occurred while processing', file_id, exc)
return {'file_id': file_id, 'hashbands': []}
if __name__ == '__main__':
file_ids = json.load(open('file_ids.json'))
file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]
worker_id = int(sys.argv[1])
worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]
hashband_to_ids = defaultdict(list)
pool = Pool(cores)
for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
print(' * processed', idx, 'results')
file_id = result['file_id']
hashbands = result['hashbands']
for window_idx, window_hashbands in enumerate(hashbands):
for hashband in window_hashbands:
hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))
with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
json.dump(dict(hashband_to_ids), out)