问题 - ProcessPoolExecutor 没有提高速度。tqdm 确认
对 python 有足够的了解,可以复制和/或编写一个有效的程序。每个文件需要大约 40 秒来加载->过滤->写入。我有大约 6,800 个文件要处理,并且想要一个更好的版本,它可以使用我所有的处理能力(6 核),我尝试编写该版本(如下)。所述版本产生,但比我原来的功能稍慢:
from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer
decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers
def load_decode_filter(file):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open(file, 'rb') as ins:
for bufr_message in generate_bufr_message(
decoder,ins.read()):
input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
mask = [obj for obj in input_list if ((PHI_MAX > obj[
12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
output_message.extend(mask)
return output_message
def main(files_in):
'''
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor(max_workers=6) as executor:
with tqdm(range(len(files_in)), desc='files loaded',
position=0) as progress:
futures = []
for file in files_in:
future = executor.submit(load_decode_filter(file), file)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
dump(results, f_o)
if __name__ == '__main__':
main(files)
我希望至少减少每个文件的处理时间。
更新,结束:
首先,我要感谢所有评论的人和回答者(我太新了,无法投票)。似乎有意义地提高效率的唯一方法是从不解码并从原位缓冲区数据中获取我想要的东西,这完全超出了我目前的能力(这是我第一次接触任何类型的代码)。
我计划(目前)尽可能地运行我的初始版本(f.bufr in,f.bufr_.txt out),我将在每次“运行”后将处理过的文件移动到子目录。一线希望是我在这方面学到了足够多的知识,我将能够制作一个程序来将所有文本输出合并到一个文件中。再次感谢。